{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Structured Data Processing with Spark\n", "\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark SQL\n", "\n", "- Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.\n", "\n", "- Spark SQL uses this extra information to perform extra optimizations.\n", "\n", "- One use of Spark SQL is to execute SQL queries.\n", "\n", "- Spark SQL can also be used to read data from an existing Hive installation." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Datasets and DataFrames\n", "\n", "\n", "### Spark Datasets\n", "\n", "- A Dataset is a distributed collection of data.\n", "\n", "- Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map, flatMap, filter`, etc.). \n", "\n", "- The Dataset API is available in **Scala** and **Java**. \n", "\n", "- Python **does not have the support for the Dataset API**. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName)." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Spark DataFrame\n", "\n", "- A DataFrame is a Dataset organized into named columns. \n", "\n", "- It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. \n", "\n", "- DataFrames can be constructed from a wide array of sources such as: \n", "\n", " - structured data files, \n", " - tables in Hive, \n", " - external databases, or \n", " - existing RDDs. \n", " \n", "- The DataFrame API is available in Scala, Java, Python, and R. " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Start a Spark session\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "## Only needed when you run spark witin Jupyter notebook\n", "import findspark\n", "findspark.init()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.appName(\"Python Spark\").getOrCreate()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.4.4
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
Python Spark
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark # test if Spark session is created or not" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Creating DataFrames" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "### Convert an RDD to a DataFrame" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "sc = spark.sparkContext # make a spakr context for RDD" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.4.4
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
Python Spark
\n", "
\n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sc" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "DataFrame[age: bigint, name: string]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Load a text file and convert each line to a Row.\n", "from pyspark.sql import Row\n", "lines = sc.textFile(\"data/people.txt\")\n", "parts = lines.map(lambda l: l.split(\",\"))\n", "people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))\n", "\n", "# Infer the schema, and register the DataFrame as a table.\n", "schemaPeople = spark.createDataFrame(people)\n", "schemaPeople" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "DataFrame[name: string]" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# SQL can be run over DataFrames that have been registered as a table.\n", "schemaPeople.createOrReplaceTempView(\"people\")\n", "teenagers" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
name
0Justin
\n", "
" ], "text/plain": [ " name\n", "0 Justin" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "teenagers.toPandas() # We could export the Spark DataFrame to a usual Pandas DataFrame" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Create Spark DataFrame directly from a file" ] }, { "cell_type": "code", "execution_count": 39, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| _c0|_c1|\n", "+-------+---+\n", "|Michael| 29|\n", "| Andy| 30|\n", "| Justin| 19|\n", "+-------+---+\n", "\n" ] } ], "source": [ "sdf = spark.read.csv(\"data/people.txt\")\n", "sdf.show() # Displays the content of the DataFrame to stdout" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+-------+\n", "| age| name|\n", "+----+-------+\n", "|null|Michael|\n", "| 30| Andy|\n", "| 19| Justin|\n", "+----+-------+\n", "\n" ] } ], "source": [ "sdf2 = spark.read.json(\"data/people.json\")\n", "# Displays the content of the DataFrame to stdout\n", "sdf2.show()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- The CSV file dose not have a header of the data, but we could create a description (**schema** in Spark) for it " ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "# Import data types\n", "from pyspark.sql.types import *\n", "# The schema is encoded in a string.\n", "\n", "# Create a schema\n", "schemaString = [\"name\", \"age\"]\n", "fields = [StructField(field_name, StringType(), True) for field_name in schemaString]\n", "schema = StructType(fields)\n" ] }, { "cell_type": "code", "execution_count": 36, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "schema" ] }, { "cell_type": "code", "execution_count": 38, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+---+\n", "| name|age|\n", "+-------+---+\n", "|Michael| 29|\n", "| Andy| 30|\n", "| Justin| 19|\n", "+-------+---+\n", "\n" ] } ], "source": [ "sdf_withschema = spark.createDataFrame(people, schema)\n", "sdf_withschema.show()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Convert DataFrame to RDD" ] }, { "cell_type": "code", "execution_count": 43, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "MapPartitionsRDD[132] at javaToPython at NativeMethodAccessorImpl.java:0" ] }, "execution_count": 43, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd1 = sdf_withschema.rdd\n", "rdd1" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Export DataFrame to a local disk" ] }, { "cell_type": "code", "execution_count": 53, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "sdf.write.mode('overwrite').csv(\"myspark/\")# Save Spark DataFrame to a folder on the local disk." ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "['_SUCCESS',\n", " '.part-00000-23e89afa-747c-46dd-9a9b-f2ee8d79b94b-c000.csv.crc',\n", " '._SUCCESS.crc',\n", " 'part-00000-23e89afa-747c-46dd-9a9b-f2ee8d79b94b-c000.csv']" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os \n", "os.listdir(\"myspark\") # Let's check if everything is there on the local disk" ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.17" }, "rise": { "auto_select": "first", "autolaunch": false, "chalkboard": { "chalkEffect": 1, "chalkWidth": 4, "theme": "whiteboard", "transition": 800 }, "enable_chalkboard": true, "reveal_shortcuts": { "chalkboard": { "clear": "ctrl-k" } }, "start_slideshow_at": "selected", "theme": "black" } }, "nbformat": 4, "nbformat_minor": 4 }