Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.
Spark SQL uses this extra information to perform extra optimizations.
One use of Spark SQL is to execute SQL queries.
Spark SQL can also be used to read data from an existing Hive installation.
A Dataset is a distributed collection of data.
Dataset can be constructed from JVM objects and then manipulated using functional transformations (
map, flatMap, filter, etc.).
The Dataset API is available in Scala and Java.
Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).
A DataFrame is a Dataset organized into named columns.
It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
DataFrames can be constructed from a wide array of sources such as:
The DataFrame API is available in Scala, Java, Python, and R.
## Only needed when you run spark witin Jupyter notebook import findspark findspark.init('/usr/lib/spark-current')
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python Spark").getOrCreate()
spark # test if Spark session is created or not
sc = spark.sparkContext # make a spakr context for RDD
# Load a text file and convert each line to a Row. from pyspark.sql import Row lines = sc.textFile("data/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p, age=int(p))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople
DataFrame[age: bigint, name: string]
# SQL can be run over DataFrames that have been registered as a table. schemaPeople.createOrReplaceTempView("people") teenagers
teenagers.toPandas() # We could export the Spark DataFrame to a usual Pandas DataFrame
sdf = spark.read.csv("data/people.txt") sdf.show() # Displays the content of the DataFrame to stdout
+-------+---+ | _c0|_c1| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
sdf2 = spark.read.json("data/people.json") # Displays the content of the DataFrame to stdout sdf2.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
# Import data types from pyspark.sql.types import * # The schema is encoded in a string. # Create a schema schemaString = ["name", "age"] fields = [StructField(field_name, StringType(), True) for field_name in schemaString] schema = StructType(fields)
sdf_withschema = spark.createDataFrame(people, schema) sdf_withschema.show()
+-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
rdd1 = sdf_withschema.rdd rdd1
MapPartitionsRDD at javaToPython at NativeMethodAccessorImpl.java:0
sdf.write.mode('overwrite').csv("myspark/")# Save Spark DataFrame to a folder on the local disk.
import os os.listdir("myspark") # Let's check if everything is there on the local disk
['_SUCCESS', '.part-00000-23e89afa-747c-46dd-9a9b-f2ee8d79b94b-c000.csv.crc', '._SUCCESS.crc', 'part-00000-23e89afa-747c-46dd-9a9b-f2ee8d79b94b-c000.csv']