第 10 章 利用Spark处理结构化数据
10.1 spark SQL简介
Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL 提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL 使用这些额外的信息来执行额外的优化。与Spark SQL交互的方法有多种,包括SQL和Dataset API。 计算结果时,将使用相同的执行引擎,而与要用来表达计算的API /语言无关。 这种统一意味着开发人员可以轻松地在不同的API之间来回切换,从而提供最自然的方式来表达给定的转换。 Spark SQL的一种用途是执行SQL查询。 Spark SQL还可以用于从现有的Hive安装中读取数据。 当从另一种编程语言中运行SQL时,结果将作为Dataset / DataFrame返回。还可以使用命令行 或通过JDBC / ODBC与SQL接口进行交互。
10.2 数据集(Datasets)和数据框(DataFrames)
数据集(Datasets)是数据的分布式集合。Datasets是Spark 1.6中添加的新接口, 它具有RDD的优点(强类型输入,使用强大的Lambda函数的能力)以及Spark SQL优化的执行引擎的优点。 可以从JVM对象构造Datasets,然后使用功能转换(map,flatMap,filter等)进行操作。 Dataset API在Scala和Java中可用。 Python不支持Dataset API。但是由于Python的动态特性, Python可以通过其他方式实现Dataset API的优势功能,例如, 可以通过名称row.columnName来访问行的字段。 R的情况类似。
数据框(DataFrame)是组织为命名列的数据集。从概念上讲,它等效于关系数据库中的表或R / Python中
的数据框,但是在后台进行了更丰富的优化。可以从多种来源构造DataFrame,例如:结构化数据文件,
Hive中的表,外部数据库或现有RDD。
DataFrame API在Scala,Java,Python和R中可用。
在Scala和Java中,DataFrame由行的数据集表示。
在Scala API中,DataFrame只是Dataset[Row]的类型别名。而在Java API中,
用户需要使用Dataset
10.3 相关操作命令
10.3.1 创建Spark Session
对于Spark DataFrame的所有操作的功能的入口点都是SparkSession类。SparkSession类似于Sparkcontext,
可以被看作是专门面向Spark DataFrame的Sparkcontext。
在Python中,要创建基本的SparkSession,只需导入pyspark.sql
模块的SparkSession
函数,
使用SparkSession.builder
命令创建:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark").getOrCreate()
类似于Sparkcontext,Spark内也只能有一个SparkSession,所以创建候要使用.getOrCreate()
,
防止已存在SparkSession导致创建过程出错。
直接调用创建的SparkSession可以查看SparkSession的相关信息,如使用spark
命令调用上例创建的SparkSession,
可获得该SparkSession的相关信息。
10.3.2 创建DataFrames
使用SparkSession,可以从现有RDD,Hive表或Spark数据源创建DataFrame。
10.3.2.1 将RDD转换为DataFrame
Spark SQL支持两种将现有RDD转换为数据集的方法。 第一种方法使用反射来推断包含特定对象类型的RDD的架构。这种基于反射的方法可以使代码更简洁, 当用户在编写Spark应用程序时已经了解架构,这种方法可以很好地工作。 创建数据集的第二种方法是通过编程界面,该界面允许用户构造模式,然后将其应用于现有的RDD。 尽管此方法较为冗长,但可以在运行时才知道列及其类型的情况下构造数据集。
10.3.2.1.1 使用反射推断架构
Spark SQL可以将Row对象的RDD转换为DataFrame,从而推断数据类型。 通过将key/value 对的列表 作为kwargs传递给Row类来构造行。 此列表的键定义表的列名,并且通过对整个数据集进行采样来推断类型。 使用people.txt文件作为示例,实现命令如下:
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
对于创建好的DataFrame,直接调用DataFrame的名称schemaPeople只能得到schemaPeople内的数据的结构,
若想查看DataFrame内容,可使用.show()
命令:
schemaPeople.show()
在Python中可以直接使用SQL语句在刚才创建好的DataFrame上进行一些简单的查询:
# SQL can be run over DataFrames that have been registered as a table.
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
10.3.2.1.2 以编程方式指定架构
如果无法提前定义kwarg的字典(例如,记录的结构编码为字符串,或者将解析文本数据集,或者为不同的用户对字段进行不同的投影),则可以使用以下方式以编程方式创建DataFrame:
1、从原始RDD创建元组或列表的RDD; 2、在第1步中创建的RDD中,创建一个由StructType表示的模式,该模式与元组或列表的结构匹配; 3、通过SparkSession提供的createDataFrame方法将架构应用于RDD。
使用people.txt文件作为示例,实现命令如下:
# Import data types
from pyspark.sql.types import *
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
上述命令利用schema将每一列的具体描述添加至要创建的DataFrame
10.3.2.1.3 将DataFrame转换为RDD
使用以下命令可将创建的DataFrame转化成RDD:
rdd1 = schemaPeople.rdd
在这个操作过程中并没有对数据进行变换,没有对原始数据做任何操作,只是将数据进行修饰,进行一些特殊描述,使得RDD也能操作这些数据。
10.3.2.2 直接从文件创建Spark DataFrame
10.3.2.2.1 JSON文件
Spark SQL可以自动推断JSON数据集的架构并将其作为DataFrame加载。 在Python中可以使用SparkSession.read.json
命令完成此转换。
要注意的是,以json文件形式提供的文件不是典型的JSON文件。 每行必须包含一个单独的,自包含的有效JSON对象。
对于常规的多行JSON文件,应将multiLine参数设置为True。
以people.json文件为例,创建命令如下:
sdf_json = spark.read.json("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
sdf_json.show()
10.3.2.2.2 CSV文件
Spark SQL不能自动推断CSV数据集的架构,直接使用JSON文件的创建方法所创建的DataFrame没有对每列数据的描述。可以使用RDD向DataFrame转化的第二种方法,以编程方式指定架构。 以people.txt文件为例,创建命令如下:
sdf_csv = spark.read.csv("/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/examples/src/main/resources/people.txt")
# Import data types
from pyspark.sql.types import *
# The schema is encoded in a string.
# Create a schema
schemaString = ["name", "age"]
fields = [StructField(field_name, StringType(), True) for field_name in schemaString]
schema = StructType(fields)
sdf_withschema = spark.createDataFrame(sdf_csv, schema)
sdf_withschema.show()
10.3.3 将DataFrame导出到本地磁盘
以DataFrame schemaPeople为例,命令如下:
# Save Spark DataFrame to a folder on the local disk.
schemaPeople.write.mode('overwrite').csv("myspark/")
其中,.mode()
指写入的形式。
写入文件一般有三种形式,
覆盖写入(overwrite):覆盖模式意味着将DataFrame保存到数据源时,如果已经存在数据/表,则预期现有数据将被DataFrame的内容覆盖。
追加写入(append):将DataFrame保存到数据源时,如果已经存在数据/表,则应该将DataFrame的内容附加到现有数据中。
错误(error):将DataFrame保存到数据源时,如果已经存在数据,则将引发异常。
使用以下命令可查看保存的DataFrame:
# Let's check if everything is there on the local disk
import os
os.listdir("myspark")