第 11 章 Spark DataFrame

本章将继续介绍针对Spark DataFrame进行的操作,包括使用Spark DataFrame进行描述性统计、数据清洗、统计分析等。

11.1 读取文件并从表头推断架构

在读取数据时,如果数据有表头,Spark可自动根据表头和现有数据推断数据类型schema描述。 此功能通过在读取数据时加入选项options函数实现,以从本地路径读取airdelay_small.csv文件为例:

 ## Load a local file becasue we are using spark's local mode
 air0 = spark.read.options(header='true', inferSchema='true').csv("/home/lectures/data/airdelay_small.csv") 

其中,options函数参量header为true表明读取该表是有表头,inferschema为true表明要求从表头和数据推断schema。

上述方法给出的schema推断并不是完全准确的,使用时应该注意,检查推断schema是否准确。

为防止schema推断的方式产生错误,可以在数据使用过程中手动规定DataFrame的schema,示例如下:

 # We specify the correct schema by hand
 from pyspark.sql.types import *
 schema_sdf = StructType([
         StructField('Year', IntegerType(), True),
         StructField('Month', IntegerType(), True),
         StructField('DayofMonth', IntegerType(), True),
         StructField('DayOfWeek', IntegerType(), True),
         StructField('DepTime', DoubleType(), True),
         StructField('CRSDepTime', DoubleType(), True),
         StructField('ArrTime', DoubleType(), True),
         StructField('CRSArrTime', DoubleType(), True),
         StructField('UniqueCarrier', StringType(), True),
         StructField('FlightNum', StringType(), True),
         StructField('TailNum', StringType(), True),
         StructField('ActualElapsedTime', DoubleType(), True),
         StructField('CRSElapsedTime',  DoubleType(), True),
         StructField('AirTime',  DoubleType(), True),
         StructField('ArrDelay',  DoubleType(), True),
         StructField('DepDelay',  DoubleType(), True),
         StructField('Origin', StringType(), True),
         StructField('Dest',  StringType(), True),
         StructField('Distance',  DoubleType(), True),
         StructField('TaxiIn',  DoubleType(), True),
         StructField('TaxiOut',  DoubleType(), True),
         StructField('Cancelled',  IntegerType(), True),
         StructField('CancellationCode',  StringType(), True),
         StructField('Diverted',  IntegerType(), True),
         StructField('CarrierDelay', DoubleType(), True),
         StructField('WeatherDelay',  DoubleType(), True),
         StructField('NASDelay',  DoubleType(), True),
         StructField('SecurityDelay',  DoubleType(), True),
         StructField('LateAircraftDelay',  DoubleType(), True)
     ])

由于用户已手动标记好数据的schema,导入数据时无需使用inferschema进行schema推断,这时需要.schema来规定本数据的schema,示例如下:

 air = spark.read.options(header='true').schema(schema_sdf).csv("/home/lectures/data/airdelay_small.csv")

由于Spark在计算过程中对数据类型是有要求的,在导入过程中一定要注意数据的schema的正确性。如果数据的schema不正确,将来在计算过程中会出现错误。

11.2 数据内容查看

11.2.1 查看数据架构

想要查看数据的schema,可直接调用命令:

 # spark, air are the example of airdelay
 # Print the schema in a tree format
 air.printSchema()

11.2.2 提取某几列数据

直接提取某列或某几列数据可使用select函数。select函数投影一组表达式并返回一个新的DataFrame,其一般形式为

 select(*cols)

其中,cols是列名(字符串)或表达式(列)的列表。如果列名之一是“ *”,则该列将扩展为包括当前DataFrame中的所有列。

在Python中,可以通过属性(air.AirTime)或通过索引(air [‘AirTime’])访问DataFrame的列。 前者虽然便于进行交互式数据探索,但强烈建议用户使用后者的形式,后者不会过时,并且不会破坏列名,因为列名也是DataFrame类的属性。

直接提取某列数据可使用命令:

 # Select only the "AirTime" column
 air.select("AirTime").show()

提取多列数据可使用命令:

 air.select(["ArrDelay","AirTime","Distance"]).show()

提取某几列数据并进行相应计算使其达到要求可使用命令:

 air.select(air['UniqueCarrier'], air['ArrDelay']>0).show()

11.2.3 数据分类汇总

使用指定的列对DataFrame进行分组,方便以后对它们进行聚合,可使用groupBygroupby函数。进行分类汇总后的数据可以进行计数。 groupBy函数的一般形式是

 groupBy(*cols)

其中,参量cols是分组依据的列或的列列表。 每个元素应该是列名(字符串)或表达式(列)。

以单一列条件进行分类汇总并计数的命令如下:

 # group data with respect to one column 
 air.groupBy(["DayOfWeek"]).count().show() 

对多列进行分类汇总并计数的命令如下:

 # group data with respect to some columns 
 air.groupBy(["UniqueCarrier","DayOfWeek"]).count().show() 

11.2.4 数据排序

对数据进行分类汇总之后还可使用sort函数对其进行排序,sort函数实现排序功能并返回按指定列排序的新DataFrame,其一般形式为:

 sort(*cols, **kwargs)

其中,参量分别为:

  • cols,要排序的列或列名称的列表。
  • ascending,布尔值或布尔值列表(默认为True),判断升序与降序排序。 也可指定多个排序顺序的列表,如果指定了列表,则列表的长度必须等于cols的长度。

对分类汇总后的数据进行降序排序的命令如下:

 ## Group and sort
 aircount=air.groupBy("UniqueCarrier").count()
 aircount.sort("count",ascending=False).show()

11.2.5 数据筛选

可以对数据进行特定条件的筛选,spark内的筛选可以直接通过filter函数来实现,filter函数的一般形式为

 filter(condition)

其中,参量condition是type.BooleanType的列或SQL表达式的字符串。

如筛选飞机延误时间超过60分钟的数据可以使用命令:

 # filter with certain conditions 
 air.filter(air.ArrDelay > 60).show() 

11.3 数据清洗

用户获得的待处理数据可能有存在重复值、空值、错误标记等,在数据处理前应该进行数据的清理。

数据可能存在大量无意义重复值,这时需要对其进行去重。distinct函数可进行去重,应用distinct函数后返回一个新的DataFrame,其中包含原DataFrame中的不同行。调用distinct函数是非常占用计算机资源的,因为对每一条数据,distinct函数都会在整个DataFrame中寻找重复值。由于distinct函数运行效率较低,花费时间较长,除非非常必要最好不要使用此函数。 使用distinct函数去掉数据air中的重复值示例如下:

 ## Returns a new DataFrame containing the distinct rows in this DataFrame.
 ## Takes a while to compute
 air.distinct()

如果数据中存在空值,处理空值的一种方式为删掉空值所在的行,可以使用dropna函数。调用dropna函数后可返回一个新的DataFrame,省略具有空值的行。dropna函数是na.drop函数的别名 dropna函数的一般形式为:

 dropna(how='any', thresh=None, subset=None)

其中,参量分别为:

  • how,取值“any”或“all”,如果为“any”,则删除任何包含null的行;如果为“all”,则仅当其所有值均为null时才删除该行。

  • thresh,取值为整数,如果指定,则删除存在小于thresh指定数量的非空值的行。此参数将覆盖how参数。

  • subset,要考虑的可选列名称列表。

删除air数据内的空值命令示例如下:

 ## Returns a new DataFrame omitting rows with null values
 air_without_na = air.na.drop()
 air_without_na.show()

也可通过将空值替换为其他值来处理空值,可使用fillna函数进行处理。fillna函数是na.fill函数的别名。 fillna函数的一般形式为:

 fillna(value, subset=None)

其中,参量分别为:

  • value,用于替换空值的值,取值类型可为int,long,float,string,bool或dict。 如果该值为dict,则将忽略子集,并且该值必须是从列名(字符串)到替换值的映射。 替换值必须是int,long,float,boolean或string。

  • subset,要考虑的可选列名称列表。子集中指定的不具有匹配数据类型的列将被忽略。 例如,如果value是一个字符串,并且子集包含一个非字符串列,则该非字符串列将被忽略。

将air数据内的空值替换为“unknown”的命令示例如下:

 ## Replace null values
 air.na.fill("unknown").show()

Spark内的NA与null是不同的,NA为缺失值而null为空值,上述操作只针对空值null,而不能对数据中的缺失值NA进行处理。若想将数据中的缺失值NA替换成指定数据,可使用replace函数进行处理。使用replace函数后可返回一个新的DataFrame,新DataFrame中已用另一个值替换原DataFrame某个值。 replace函数的一般形式为:

 replace(to_replace, value=<no value>, subset=None)

其中,参量分别为:

  • to_replace,要替换的值,取值类型可为bool,int,long,float,string,list或dict。如果值是dict,则将忽略值或将其忽略,并且to_replace必须是值和替换之间的映射。

  • value,替换值,取值类型为bool,int,long,float,string或None。如果value是一个列表,则value的长度和类型应与to_replace相同。如果value是一个标量,并且to_replace是一个序列,则使用value替换to_replace中的每个项目。

  • subset,要考虑的可选列名列表。子集中指定的不具有匹配数据类型的列将被忽略,例如,如果value是一个字符串,并且子集包含一个非字符串列,则该非字符串列将被忽略。

值to_replace和value必须具有相同的类型,并且只能是数字,布尔值或字符串。替换值可以没有。替换时,新值将强制转换为现有列的类型。对于数字替换,所有要替换的值应具有唯一的浮点表示形式。如果发生冲突(例如{42: -1, 42.0: 1}),则将使用任意替换。

将air数据内的缺失值NA替换为“unknown”的命令示例如下:

 air.na.replace('NA', "unknown").show()

11.4 数据统计处理

11.4.1 数据描述性统计

由于Spark中有内置的基本函数,在计算数据的基本统计信息时不再需要像使用Hadoop MapReduce一样编写大量代码,这大大简化了使用的难度,提高了工作效率。

在Spark的DataFrame中可使用describe函数计算基本统计信息,包括计数,平均值,标准差,最小值和最大值。若给出特定列,可单独计算某列的相关信息,如果没有给出列,则此函数将计算所有数字或字符串列的统计信息。此函数用于探索性数据分析。 describe函数的一般形式为:

 describe(*cols)

其中,参量cols为需要计算的某一列或某几列,若不指定列,则对整个DataFrame进行计算。

计算air数据的基本统计信息的命令示例如下:

 air.describe().show()

计算air数据ArrDelay列的基本统计信息的命令示例如下:

 air.describe(['ArrDelay']).show()

结果中某些变量如字符变量不能进行平均值、最大最小值等的计算,该类计算结果会用NA显示。

11.4.2 统计计算

在Spark的DataFrame中也可以进行其他的相关统计计算

如计算相关性可使用corr函数。corr函数将DataFrame的两列的相关性计算为双精度值,当前仅支持Pearson相关系数的计算。 corr函数的一般形式为:

 corr(col1, col2, method=None)

其中,参量col1、col2分别为第一列、第二列的名称,method为相关性计算方法,目前仅支持Pearson方法。

计算air数据Distance与ArrDelay列的相关系数的命令示例如下:

 air.corr("Distance","ArrDelay")

此函数的计算结果是在不考虑其他列的影响下得出的某两列的相关系数,因此,此计算结果并不能完全反应此两列的实际相关性。

还可以使用cov函数计算某两列的协方差。cov函数计算给定列(由其名称指定)的样本协方差,结果为双精度值。 cov函数的一般形式为:

 corr(col1, col2)

其中,参量col1、col2分别为第一列、第二列的名称。

计算air数据Distance与ArrDelay列的协方差的命令示例如下:

 air.cov("Distance","ArrDelay")

11.5 用户自定义函数(UDF)

UDF是用户定义的函数,由Spark使用Arrow来传输数据,并通过Pandas处理数据来执行。 UDF通过关键字pandas_udf进行定义,不需要其他配置。目前,有两种类型的 UDF:Scalar和Grouped Map。

UDF的一般形式为:

 pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)

其中,系数意义分别为

  • f,用户定义的函数。如果用作独立函数可以是python函数

  • returnType,用户定义函数的返回类型。该值可以是pyspark.sql.types.DataType对象或DDL格式的类型字符串

  • functionType,pyspark.sql.functions.PandasUDFType中的枚举值,默认值为SCALAR

11.5.1 Scalar

Scalar UDF用于向量化标量操作,它定义了一种转换:由一个或多个pandas.Series转换为一个pandas.Series,可以与selectwithColumn之类的功能一起使用。 Python函数应将pandas.Series作为输入并返回相同长度的pandas.Series。 在内部,Spark通过将列拆分为批处理并为每个批处理调用函数作为数据的子集来执行Pandas UDF,然后将结果串联在一起。 以下示例显示了如何创建一个计算两列乘积的Scalar UDF:

 import pandas as pd

 from pyspark.sql.functions import col, pandas_udf
 from pyspark.sql.types import LongType

 # Declare the function and create the UDF
 def multiply_func(a, b):
     return a * b

 multiply = pandas_udf(multiply_func, returnType=LongType())

 # The function for a pandas_udf should be able to execute with local Pandas data
 x = pd.Series([1, 2, 3])
 print(multiply_func(x, x))

 # Create a Spark DataFrame, 'spark' is an existing SparkSession
 df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

 # Execute function as a Spark vectorized UDF
 df.select(multiply(col("x"), col("x"))).show()

11.5.2 Grouped Map

Grouped map UDF定义了一个转换:由一个pandas.DataFrame转换为另一个pandas.DataFrame,与groupBy().apply()结合使用,通过这个命令实现了“ split-apply-combine”模式。该模式包括三个步骤:

  • 通过使用DataFrame.groupBy将数据分成组。

  • 在每个组上应用相关函数。该函数的输入和输出均为pandas.DataFrame形式,输入数据包含每个组的所有行和列。

  • 将结果合并到一个新的DataFrame中。

要使用groupBy().apply(),用户需要定义以下内容:

  • 一个Python函数,用于定义每个组的计算。

  • 一个StructType对象或定义输出DataFrame架构的字符串。

如果指定为字符串,则返回的pandas.DataFrame的列标签必须与定义的输出模式中的字段名称匹配,或者如果不是字符串,则必须按位置匹配字段数据类型,例如整数索引。返回的pandas.DataFrame的长度可以是任意的。 要注意的是,在应用该功能之前,每组的数据都将被加载到内存中,这可能会导致内存不足或异常,尤其是在分组大小偏斜的情况下。 如果返回的新pandas.DataFrame是字典形式,建议按名称显式索引列以确保位置正确,或者使用OrderedDict。例如:pd.DataFrame({‘id’: ids, ‘a’: data}, columns=[‘id’, ‘a’]) 或者 pd.DataFrame(OrderedDict([(‘id’, ids), (‘a’, data)]))。

下面的示例演示如何使用groupBy().apply()计算每组中每个值减去平均值后的值:

 from pyspark.sql.functions import pandas_udf, PandasUDFType

 df = spark.createDataFrame(
     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
     ("id", "v"))

 @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
 def subtract_mean(pdf):
     # pdf is a pandas.DataFrame
     v = pdf.v
     return pdf.assign(v=v - v.mean())

 df.groupby("id").apply(subtract_mean).show()

本部分中对Spark DataFrame的部分操作进行了介绍,若想了解更多操作,可通过查询PySpark documentation查看pyspark.sql module部分的相关内容。