第 10 章 利用Spark处理结构化数据

10.1 spark SQL简介

Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL 提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL 使用这些额外的信息来执行额外的优化。与Spark SQL交互的方法有多种,包括SQL和Dataset API。 计算结果时,将使用相同的执行引擎,而与要用来表达计算的API /语言无关。 这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供最自然的方式来表达给定的转换。 Spark SQL的一种用途是执行SQL查询。 Spark SQL还可以用于从现有的Hive安装中读取数据。 当从另一种编程语言中运行SQL时,结果将作为Dataset / DataFrame返回。还可以使用命令行 或通过JDBC / ODBC与SQL接口进行交互。

10.2 数据集(Datasets)和数据框(DataFrames)

数据集(Datasets)是数据的分布式集合。Datasets是Spark 1.6中添加的新接口, 它具有RDD的优点(强类型输入,使用强大的Lambda函数的能力)以及Spark SQL优化的执行引擎的优点。 可以从JVM对象构造Datasets,然后使用功能转换(map,flatMap,filter等)进行操作。 Dataset API在Scala和Java中可用。 Python不支持Dataset API。但是由于Python的动态特性, Python可以通过其他方式实现Dataset API的优势功能,例如, 可以通过名称row.columnName来访问行的字段。 R的情况类似。

数据框(DataFrame)是组织为命名列的数据集。从概念上讲,它等效于关系数据库中的表或R / Python中 的数据框,但是在后台进行了更丰富的优化。可以从多种来源构造DataFrame,例如:结构化数据文件, Hive中的表,外部数据库或现有RDD。 DataFrame API在Scala,Java,Python和R中可用。 在Scala和Java中,DataFrame由行的数据集表示。 在Scala API中,DataFrame只是Dataset[Row]的类型别名。而在Java API中, 用户需要使用Dataset表示一个DataFrame。

10.3 相关操作命令

10.3.1 创建Spark Session

对于Spark DataFrame的所有操作的功能的入口点都是SparkSession类。SparkSession类似于Sparkcontext, 可以被看作是专门面向Spark DataFrame的Sparkcontext。 在Python中,要创建基本的SparkSession,只需导入pyspark.sql模块的SparkSession函数, 使用SparkSession.builder命令创建:

 from pyspark.sql import SparkSession
 spark = SparkSession.builder.appName("Python Spark").getOrCreate()

类似于Sparkcontext,Spark内也只能有一个SparkSession,所以创建候要使用.getOrCreate(), 防止已存在SparkSession导致创建过程出错。

直接调用创建的SparkSession可以查看SparkSession的相关信息,如使用spark命令调用上例创建的SparkSession, 可获得该SparkSession的相关信息。

10.3.2 创建DataFrames

使用SparkSession,可以从现有RDD,Hive表或Spark数据源创建DataFrame。

10.3.2.1 将RDD转换为DataFrame

Spark SQL支持两种将现有RDD转换为数据集的方法。 第一种方法使用反射来推断包含特定对象类型的RDD的架构。这种基于反射的方法可以使代码更简洁, 当用户在编写Spark应用程序时已经了解架构,这种方法可以很好地工作。 创建数据集的第二种方法是通过编程界面,该界面允许用户构造模式,然后将其应用于现有的RDD。 尽管此方法较为冗长,但可以在运行时才知道列及其类型的情况下构造数据集。

10.3.2.1.1 使用反射推断架构

Spark SQL可以将Row对象的RDD转换为DataFrame,从而推断数据类型。 通过将key/value 对的列表 作为kwargs传递给Row类来构造行。 此列表的键定义表的列名,并且通过对整个数据集进行采样来推断类型。 使用people.txt文件作为示例,实现命令如下:

 from pyspark.sql import Row

 sc = spark.sparkContext

 # Load a text file and convert each line to a Row.
 lines = sc.textFile("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
 parts = lines.map(lambda l: l.split(","))
 people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

 # Infer the schema, and register the DataFrame as a table.
 schemaPeople = spark.createDataFrame(people)

对于创建好的DataFrame,直接调用DataFrame的名称schemaPeople只能得到schemaPeople内的数据的结构, 若想查看DataFrame内容,可使用.show()命令:

 schemaPeople.show()

在Python中可以直接使用SQL语句在刚才创建好的DataFrame上进行一些简单的查询:

 # SQL can be run over DataFrames that have been registered as a table.
 schemaPeople.createOrReplaceTempView("people")
 teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
10.3.2.1.2 以编程方式指定架构

如果无法提前定义kwarg的字典(例如,记录的结构编码为字符串,或者将解析文本数据集,或者为不同的用户对字段进行不同的投影),则可以使用以下方式以编程方式创建DataFrame:

1、从原始RDD创建元组或列表的RDD; 2、在第1步中创建的RDD中,创建一个由StructType表示的模式,该模式与元组或列表的结构匹配; 3、通过SparkSession提供的createDataFrame方法将架构应用于RDD。

使用people.txt文件作为示例,实现命令如下:

 # Import data types
 from pyspark.sql.types import *

 sc = spark.sparkContext

 # Load a text file and convert each line to a Row.
 lines = sc.textFile("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
 parts = lines.map(lambda l: l.split(","))
 # Each line is converted to a tuple.
 people = parts.map(lambda p: (p[0], p[1].strip()))

 # The schema is encoded in a string.
 schemaString = "name age"

 fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
 schema = StructType(fields)

 # Apply the schema to the RDD.
 schemaPeople = spark.createDataFrame(people, schema)

上述命令利用schema将每一列的具体描述添加至要创建的DataFrame

10.3.2.1.3 将DataFrame转换为RDD

使用以下命令可将创建的DataFrame转化成RDD:

 rdd1 = schemaPeople.rdd

在这个操作过程中并没有对数据进行变换,没有对原始数据做任何操作,只是将数据进行修饰,进行一些特殊描述,使得RDD也能操作这些数据。

10.3.2.2 直接从文件创建Spark DataFrame

10.3.2.2.1 JSON文件

Spark SQL可以自动推断JSON数据集的架构并将其作为DataFrame加载。 在Python中可以使用SparkSession.read.json命令完成此转换。 要注意的是,以json文件形式提供的文件不是典型的JSON文件。 每行必须包含一个单独的,自包含的有效JSON对象。 对于常规的多行JSON文件,应将multiLine参数设置为True。 以people.json文件为例,创建命令如下:

 sdf_json = spark.read.json("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
 # Displays the content of the DataFrame to stdout
 sdf_json.show()
10.3.2.2.2 CSV文件

Spark SQL不能自动推断CSV数据集的架构,直接使用JSON文件的创建方法所创建的DataFrame没有对每列数据的描述。可以使用RDD向DataFrame转化的第二种方法,以编程方式指定架构。 以people.txt文件为例,创建命令如下:

 sdf_csv = spark.read.csv("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")

 # Import data types
 from pyspark.sql.types import *
 # The schema is encoded in a string.

 # Create a schema
 schemaString = ["name", "age"]
 fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
 schema = StructType(fields)

 sdf_withschema = spark.createDataFrame(sdf_csv, schema)
 sdf_withschema.show()

10.3.3 将DataFrame导出到本地磁盘

以DataFrame schemaPeople为例,命令如下:

 # Save Spark DataFrame to a folder on the local disk.
 schemaPeople.write.mode('overwrite').csv("myspark/")

其中,.mode()指写入的形式。 写入文件一般有三种形式,

  • 覆盖写入(overwrite):覆盖模式意味着将DataFrame保存到数据源时,如果已经存在数据/表,则预期现有数据将被DataFrame的内容覆盖。

  • 追加写入(append):将DataFrame保存到数据源时,如果已经存在数据/表,则应该将DataFrame的内容附加到现有数据中。

  • 错误(error):将DataFrame保存到数据源时,如果已经存在数据,则将引发异常。

使用以下命令可查看保存的DataFrame:

 # Let's check if everything is there on the local disk
 import os
 os.listdir("myspark")