{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Introduction to Spark Streaming\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Overview of Spark Streaming\n", "\n", "\n", "- Spark Streaming is an extension of the core Spark API that enables **scalable, high-throughput, fault-tolerant** stream processing of **live data streams**.\n", "\n", "- Data can be ingested from many sources like **Kafka, Flume, Kinesis, or TCP sockets**, and can be processed using complex algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`.\n", "\n", "- You can apply Spark’s machine learning and graph processing algorithms on data streams.\n", "\n", "- Processed data can be pushed out to **filesystems, databases, and live dashboards**. " ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![streaming-arch](./figures/streaming-arch.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Work Flow of Streaming Data Modelling \n", "\n", "\n", "- Spark Streaming receives live input data streams \n", "\n", "- Spark Streaming divides the data into batches.\n", "\n", "- The streaming data are then processed by the Spark engine to generate the final stream of results in batches.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![streaming-flow](./figures/streaming-flow.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## `DStream` -- Spark's representation of streaming of data\n", "\n", "- Spark Streaming provides a high-level abstraction called `discretized stream` or `DStream`, which represents a continuous stream of data. \n", "\n", "- DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. \n", "\n", "- Internally, a DStream is represented as a sequence of RDDs." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![streaming-dstream](./figures/streaming-dstream.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "import findspark\n", "findspark.init('/usr/lib/spark-current')\n", "from pyspark.sql import SparkSession\n", "spark = SparkSession.builder.appName(\"Python Spark with Streaming\").getOrCreate()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- First, we import `StreamingContext`, which is the main entry point for all streaming functionality. \n", "\n", "- We create a local StreamingContext, and batch interval of 10 seconds." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "from pyspark import SparkContext\n", "from pyspark.streaming import StreamingContext\n", "\n", "# Create a local StreamingContext with two working thread and batch interval of 1 second\n", "sc = spark.sparkContext # make a spark context for RDD \n", "ssc = StreamingContext(sc, 10) # batch interval of 10 seconds." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "We create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 9999)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "lines = ssc.socketTextStream(\"localhost\", 9999)\n", "words = lines.flatMap(lambda line: line.split(\" \"))" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "- This `lines `DStream represents the stream of data that will be received from the data server. Each record in this DStream is a line of text. Next, we want to split the lines by space into words.\n", "\n", "- `flatMap` is a one-to-many DStream operation that creates a new DStream by generating multiple new records from each record in the source DStream. In this case, each line will be split into multiple words and the stream of words is represented as the words DStream. Next, we want to count these words." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![streaming-dstream-ops](./figures/streaming-dstream-ops.png)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [], "source": [ "# Count each word in each batch\n", "pairs = words.map(lambda word: (word, 1))\n", "wordCounts = pairs.reduceByKey(lambda x, y: x + y)\n", "\n", "# Print the first ten elements of each RDD generated in this DStream to the console\n", "wordCounts.pprint()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- The `words` DStream is further mapped (one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. \n", "\n", "- Finally, `wordCounts.pprint()` will print a few of the counts generated every second.\n", "\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false, "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "ssc.start() # Start the computation\n", "ssc.awaitTermination() # Wait for the computation to terminate" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "fragment" } }, "source": [ "You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using\n", "\n", " nc -lk 9999\n", " \n", "Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.\n", "\n" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "slideshow": { "slide_type": "fragment" } }, "outputs": [], "source": [ "ssc.stop()" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Summary \n", "\n", "- Define the input sources by creating input DStreams.\n", "- Define the streaming computations by applying transformation and output operations to DStreams.\n", "- Start receiving data and processing it using `streamingContext.start()`.\n", "\n", "- Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().\n", "\n", "- The processing can be manually stopped using `streamingContext.stop()`." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Points to remember:\n", "\n", "- Once a context has been started, no new streaming computations can be set up or added to it.\n", "\n", "- Once a context has been stopped, it cannot be restarted.\n", "- Only one StreamingContext can be active in a JVM at the same time.\n", "- `stop()` on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of `stop()` called stopSparkContext to false.\n", "\n", "- A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Wordcount from HDFS files\n", "\n", "\n", " import sys\n", "\n", " from pyspark import SparkContext\n", " from pyspark.streaming import StreamingContext\n", "\n", "\n", " sc = SparkContext(appName=\"PythonStreamingHDFSWordCount\")\n", " ssc = StreamingContext(sc, 10)\n", "\n", " lines = ssc.textFileStream(sys.argv[1])\n", " counts = lines.flatMap(lambda line: line.split(\" \"))\\\n", " .map(lambda x: (x, 1))\\\n", " .reduceByKey(lambda a, b: a+b)\n", " counts.pprint()\n", "\n", " ssc.start()\n", " ssc.awaitTermination()" ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.17" } }, "nbformat": 4, "nbformat_minor": 4 }