{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Datasets and Parallelization with Spark\n", "\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Two available APIs in Spark\n", "\n", "- RDDs (core but old API), accumulators, and broadcast variables\n", "- Spark SQL, Datasets, and DataFrames: processing structured data with relational queries (newer API than RDDs)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Resilient distributed dataset RDD\n", "\n", "- At a high level, every Spark application consists of \n", "\n", " - a driver program that runs the user’s main function and \n", " - executes various parallel operations on a cluster. " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- The main abstraction Spark provides is a **resilient distributed dataset** (RDD), \n", "\n", " - a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. \n", " - RDDs are created by starting with a file in the Hadoop file system and transforming it. \n", " - Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. \n", " - Finally, RDDs automatically recover from node failures.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- A second abstraction in Spark is **shared variables** that can be used in parallel operations. \n", "\n", " - By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. \n", " - Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. \n", " - Spark supports two types of shared variables: \n", " \n", " - **broadcast variables**, which can be used to cache a value in memory on all nodes, \n", " - **accumulators**, which are variables that are only “added” to, such as counters and sums." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Create RDDs\n", "\n", "- Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. \n", "\n", "- There are two ways to create RDDs: \n", "\n", " - parallelizing an existing collection in your driver program, \n", " - referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Create a SparkContext object\n", "\n", "- The first thing a Spark program must do is to create a `SparkContext` object, which tells Spark how to access a cluster. To create a `SparkContext` you first need to build a `SparkConf` object that contains information about your application." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import findspark\n", "findspark.init()\n", "import pyspark" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "conf = pyspark.SparkConf().setAppName(\"My First Spark RDD APP\") #.setMaster(\"local\") # “yarn”\n", "sc = pyspark.SparkContext(conf=conf)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- You are not allowd to create multiple `SparkContext`. You have to stop existing one first with" ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "sc.stop()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "- Or use a fault tolerant function " ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "sc = pyspark.SparkContext.getOrCreate()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Parallelized collections \n", "\n", "- Created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. \n", "\n", "- The elements of the collection are copied to form a distributed dataset that can be operated on in parallel." ] }, { "cell_type": "code", "execution_count": 57, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "ParallelCollectionRDD[12] at parallelize at PythonRDD.scala:195" ] }, "execution_count": 57, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data = [1, 2, 3, 4, 5]\n", "distData = sc.parallelize(data)\n", "distData\n", "\n", "data2 = [11, 21, 31, 41, 51]\n", "distData2 = sc.parallelize(data2)\n", "distData2" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## External Datasets with RDD\n", "\n", "- PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.\n", "\n", "- Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. " ] }, { "cell_type": "code", "execution_count": 58, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "data": { "text/plain": [ "/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/licenses/LICENSE-vis.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0" ] }, "execution_count": 58, "metadata": {}, "output_type": "execute_result" } ], "source": [ "licenseFile = sc.textFile(\"/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/licenses/LICENSE-vis.txt\")\n", "licenseFile" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## External Datasets with RDD\n", "\n", "- **If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes**. Either copy the file to all workers or use a network-mounted shared file system.\n", "\n", "- All of Spark’s file-based input methods, including textFile, **support running on directories**, **compressed files**, and **wildcards** as well. For example, you can use `textFile(\"/my/directory\")`, `textFile(\"/my/directory/*.txt\")`, and `textFile(\"/my/directory/*.gz\")`.\n", "\n", "- The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.\n", "\n", "- `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as `(filename, content)` pairs. This is in contrast with textFile, which would return one record per line in each file.\n", "\n", "- `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## RDD Operations" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Simple MapReduce" ] }, { "cell_type": "code", "execution_count": 59, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "lineLengths = licenseFile.map(lambda s: len(s))\n", "totalLength = lineLengths.reduce(lambda a, b: a + b)" ] }, { "cell_type": "code", "execution_count": 40, "metadata": { "scrolled": true, "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "PythonRDD[4] at RDD at PythonRDD.scala:53" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "lineLengths" ] }, { "cell_type": "code", "execution_count": 41, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "385" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "totalLength" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## RDD Persistence\n", "\n", "\n", "- One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. \n", "\n", "- When you persist an RDD, **each node stores any partitions** of it that it computes **in memory** and reuses them in other actions on that dataset (or datasets derived from it). \n", "\n", "- This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.\n", "\n", "- You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it." ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "PythonRDD[4] at RDD at PythonRDD.scala:53" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "lineLengths.persist()" ] }, { "cell_type": "code", "execution_count": 47, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "PythonRDD[4] at RDD at PythonRDD.scala:53" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "lineLengths.cache() # same as persist() but use default storage level" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## RDD different storage level\n", "\n", "\n", "- `MEMORY_ONLY`\tStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.\n", "\n", "- `MEMORY_AND_DISK`\tStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.\n", "\n", "- `MEMORY_ONLY_SER` (Java and Scala)\tStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.\n", "\n", "- `MEMORY_AND_DISK_SER` (Java and Scala)\tSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.\n", "\n", "- `DISK_ONLY` Store the RDD partitions only on disk.\n", "\n", "- `MEMORY_ONLY_2`, `MEMORY_AND_DISK_2`, etc.\tSame as the levels above, but replicate each partition on two cluster nodes.\n", "\n", "- `OFF_HEAP` (experimental)\tSimilar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## RDD Unpersist \n", "\n", "- Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. \n", "\n", "- If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method." ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "PythonRDD[4] at RDD at PythonRDD.scala:53" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "lineLengths.unpersist()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Broadcast Variables\n", "\n", "- Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. \n", "\n", "- They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.\n" ] }, { "cell_type": "code", "execution_count": 48, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "broadcastVar = sc.broadcast([1, 2, 3])\n", "broadcastVar" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. \n", "\n", "- The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `.value` method.\n", "\n", "- After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that **`v` is not shipped to the nodes more than once**. \n", "\n", "- In addition, the object `v` **should not be modified after it is broadcast** in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later)." ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "[1, 2, 3]" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "broadcastVar.value" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Accumulators\n", "\n", "- Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. \n", "\n", "- They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types." ] }, { "cell_type": "code", "execution_count": 50, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "data": { "text/plain": [ "10" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "accum = sc.accumulator(0)\n", "sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))\n", "accum.value" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark Lazy Evaluation\n", "\n", "- “LAZY” the word itself indicates its meaning ‘not at the same time’. That means, it **evaluates something only when we require it**. \n", "\n", "- In accordance with a spark, it does not execute each operation right away, that means it does not start until we trigger any action. \n", "\n", "- Once an action is called all the transformations will execute in one go.\n", "\n", "- It figured out that all transformations can be combined together into a single transformation and executed together. \n", "\n", "- Benifits\n", "\n", " - It results in saving the time as well as reducing the space complexity.\n", " - The system works more efficiently with fewer resources and it also decreases the number of queries.\n", " - Lazy Evaluation works as a key building block in operations of Spark RDD.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Spark Data Structures" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "- Local vector" ] }, { "cell_type": "code", "execution_count": 52, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "import numpy as np\n", "import scipy.sparse as sps\n", " \n", "# Use a NumPy array as a dense vector.\n", "dv1 = np.array([1.0, 0.0, 3.0])\n", "# Use a Python list as a dense vector.\n", "dv2 = [1.0, 0.0, 3.0]\n", "# Create a SparseVector.\n", "sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])\n", "# Use a single-column SciPy csc_matrix as a sparse vector.\n", "sv2 = sps.csc_matrix((np.array([1.0, 3.0]),\n", " np.array([0, 2]),\n", " np.array([0, 2])), shape=(3, 1))" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- Local matrix" ] }, { "cell_type": "code", "execution_count": 51, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "DenseMatrix([[1., 4.],\n", " [2., 5.],\n", " [3., 6.]])\n", "3 X 2 CSCMatrix\n", "(0,0) 9.0\n", "(2,1) 6.0\n", "(1,1) 8.0\n" ] } ], "source": [ "from pyspark.mllib.linalg import Matrix, Matrices\n", "# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))\n", "dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])\n", "print(dm2)\n", "# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))\n", "sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])\n", "print(sm)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- Labeled Points" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "from pyspark.mllib.linalg import SparseVector\n", "from pyspark.mllib.regression import LabeledPoint\n", "\n", "# Create a labeled point with a positive label and a dense feature vector.\n", "pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])\n", "\n", "# Create a labeled point with a negative label and a sparse feature vector.\n", "neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- Sparse data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "from pyspark.mllib.util import MLUtils\n", "examples = MLUtils.loadLibSVMFile(sc,\n", " \"/opt/apps/ecm/service/spark/2.4.4/package/spark-2.4.4-bin-hadoop2.7/data/mllib/sample_libsvm_data.txt\")\n", "print(examples)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Distributed matrix\n", "\n", "- RowMatrix" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "from pyspark.mllib.linalg.distributed import RowMatrix\n", "# Create an RDD of vectors.\n", "rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])\n", "# Create a RowMatrix from an RDD of vectors.\n", "mat = RowMatrix(rows)\n", "# Get its size.\n", "m = mat.numRows() # 4\n", "n = mat.numCols() # 3\n", "# Get the rows as an RDD of vectors again.\n", "rowsRDD = mat.rows\n", "print(m,n,rowsRDD)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- IndexedRowMatrix" ] }, { "cell_type": "code", "execution_count": 53, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix\n", "# Create an RDD of indexed rows.\n", "# - This can be done explicitly with the IndexedRow class:\n", "indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),\n", " IndexedRow(1, [4, 5, 6]),\n", " IndexedRow(2, [7, 8, 9]),\n", " IndexedRow(3, [10, 11, 12])])\n", "# - or by using (long, vector) tuples:\n", "indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),\n", " (2, [7, 8, 9]), (3, [10, 11, 12])])" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- BlockMatrix \n", "\n", "A BlockMatrix is a distributed matrix backed by an RDD of MatrixBlocks, where a MatrixBlock is a tuple of ((Int, Int), Matrix), where the (Int, Int) is the index of the block, and Matrix is the sub-matrix at the given index with size rowsPerBlock x colsPerBlock. BlockMatrix supports methods such as add and multiply with another BlockMatrix. BlockMatrix also has a helper function validate which can be used to check whether the BlockMatrix is set up properly.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark.mllib.linalg import Matrices\n", "from pyspark.mllib.linalg.distributed import BlockMatrix\n", "\n", "# Create an RDD of sub-matrix blocks.\n", "blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),\n", " ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])\n", "\n", "# Create a BlockMatrix from an RDD of sub-matrix blocks.\n", "mat = BlockMatrix(blocks, 3, 2)\n", "\n", "# Get its size.\n", "m = mat.numRows() # 6\n", "n = mat.numCols() # 2\n", "\n", "# Get the blocks as an RDD of sub-matrix blocks.\n", "blocksRDD = mat.blocks\n", "\n", "# Convert to a LocalMatrix.\n", "localMat = mat.toLocalMatrix()\n", "\n", "# Convert to an IndexedRowMatrix.\n", "indexedRowMat = mat.toIndexedRowMatrix()\n", "\n", "# Convert to a CoordinateMatrix.\n", "coordinateMat = mat.toCoordinateMatrix()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Lab\n", "\n", "- Run a simple Spark application。\n", "\n", "- Understand the [basic data structure with RDD interface](https://spark.apache.org/docs/latest/mllib-data-types.html)。\n", "\n", "- External Read\n", "\n", " - https://spark.apache.org/docs/latest/api/python/index.html\n", " - https://spark.apache.org/examples.html" ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.17" }, "rise": { "auto_select": "first", "autolaunch": false, "chalkboard": { "chalkEffect": 1, "chalkWidth": 4, "theme": "whiteboard", "transition": 800 }, "enable_chalkboard": true, "reveal_shortcuts": { "chalkboard": { "clear": "ctrl-k" } }, "start_slideshow_at": "selected", "theme": "white" } }, "nbformat": 4, "nbformat_minor": 4 }