Spark 笔记|教程 —— Spark SQL (Java API)

Spark SQL可以处理structured和semi-structured数据。这些数据在spark中通常被表示为Dataset(统称),就像数据库中的table一样。Dataset具有natural schema。Spark和Java可以在编译的时候(compile time)就知道Dataset中数据的类型,这是Python这样的动态语言做不到的,所有Spark Python API下只能使用DataFrame,而不能用Dataset。Dataset有两个不同的API接口,strongly-typed API(e.g.Dataset(非统称))和untyped API(e.g.DataFrame)。本文主要列举一些Spark SQL (Java API)在untyped API(e.g.DataFrame)上的具体的应用。下面代码中的Dataset<Row>等价于DataFrame。


Spark SQL Basics

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.avg;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.max;

public class Main {

    public static void main(String[] args) {

        //set the spark logging level to Error
        Logger.getLogger("org").setLevel(Level.ERROR);

        //spark session internally has a sparkcontext for actual computation.
        //local[*] is to use all the available cores of CPU.
        //getOrCreate() is to create a new session, or get an exiting session you created previously.
        SparkSession session = SparkSession.builder().appName("appName").master("local[*]").getOrCreate();

        //DataFrameReader is spark interface used to load a dataset from external storage systems
        DataFrameReader dataFrameReader = session.read();

        //Dataset<Row> is equivalent to DataFrame.
        //DataFrameReader can read various file formats, such as Json, csv, JDBC, parquet, txt etc.
        //it will return as a data object of type row. Row is a data abstraction of an order collection of fields. Here, each row maps a csv line
        Dataset<Row> responses = dataFrameReader.option("header", "true").csv("/PATH/test.csv");

        //print out schema
        responses.printSchema();

        //Print top 20 records of responses table in a tabular format
        responses.show(20);

        // group all the same entries in this column, then use count() to do aggregation
        RelationalGroupedDataset groupedDataset = responses.groupBy(col("column_name"));
        groupedDataset.count().show();

        //change data type of a column(replace the old column if .withColumn use the same column name or add a new one if using a different name).
        //All the entries are String at first, we need to cast them first, then we can use them as Integer
        Dataset<Row> castedResponse = responses.withColumn("newColumnName", col("columnName").cast("integer")).withColumn("newColumnName", col("columName").cast("integer"));

        //add a space to the end of each entry of a particular column
        //the first parameter of concat_ws() is the separator between each element after concatenation.
        responses.withColumn("newColumnName", concat_ws("", col("column_name"), lit(" ")));

        //aggregate to get average and max value
        RelationalGroupedDataset datasetGroupBy = castedResponse.groupBy("column_name");
        datasetGroupBy.agg(avg("column1_name"), max("column2_name")).show(); //mean, sum, first can also be used

        //select particular columns
        castedResponse.select(col("column1_name"), col("column2_name")).show();

        //conditional row/record selection
        castedResponse.filter(col("column_name").equalTo("entry in this column")).show();
        castedResponse.filter(col("column_name").$less(20)).show(); // filter the entries that are less than 20

        //order by a column
        castedResponse.orderBy(col("column_name").desc()).show();

        //count the records in each even interval, like 0-20000, 20000-40000, 40000-60000, etc
        Dataset<Row> responseWithBucket = castedResponse.withColumn("BUCKET", col("columnName").divide(20000).cast("integer").multiply(20000)); // cast("integer") is to get the lower boundary
        responseWithBucket.groupBy("BUCKET").count().orderBy(col("BUCKET")).show();

        session.stop();
    }
}


Spark SQL Joins

Spark SQL Join的类型包括:inner, outer, left outer, right outer, left semi

df1.join(df2, df1.col("name").equalTo(df2.col("name")), "inner"); // df1 is the left one
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "left_outer");
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "right_outer");
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "left_semi");

//self join, a and b are alias, as() is for renaming
(df.as("a")).join(df.as("b")).where("a.name = b.name");

登录注册后参与评论