{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Understanding MapReduce\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## (Key, Value) Pairs in MapReduce \n", "\n", "\n", "- When the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair.\n", "- The prefix of the line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![Map-Reduce-Key-Value](./figures/mapreduce-key-value.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Shuffle and Sort\n", "\n", "- MapReduce makes the guarantee that the input to every reducer is sorted by key. \n", "\n", "- The process by which the system performs the sort, and transfers the map outputs to the reducers as inputs is known as the **shuffle**.\n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![mapreduce-shuffle](./figures/mapreduce-shuffle.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## A trival example\n", "\n", "- Data excerpt\n", "```\n", "0067011990999991950051507004...9999999N9+00001+99999999999...\n", "0043011990999991950051512004...9999999N9+00221+99999999999...\n", "0043011990999991950051518004...9999999N9-00111+99999999999...\n", "0043012650999991949032412004...0500001N9+01111+99999999999...\n", "0043012650999991949032418004...0500001N9+00781+99999999999...\n", "```\n", "\n", "- The keys are the line offsets within the file, which we ignore in our map function.\n", "\n", "- These lines are presented to the map function as the key-value pairs:\n", "```\n", "(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)\n", "(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)\n", "(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)\n", "(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)\n", "(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## The complete case: find the maxmum value of temperatures\n", "\n", "- Input Data: Raw data with temperatures records \n", "\n", "- Mapper: pull out the year and the air temperature.\n", "```\n", "(1950, 0)\n", "(1950, 22)\n", "(1950, -11)\n", "(1949, 111)\n", "(1949, 78)\n", "...\n", "```\n", "- Group and Sort: Sorts and groups the key-value pairs by key.\n", "```\n", "(1949, [111, 78])\n", "(1950, [0, 22, −11])\n", "...\n", "```\n", "- Reducer: iterate through the list and pick up the maximum reading\n", "```\n", "(1949, 111)\n", "(1950, 22)\n", "...\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "![Map-without-Reduce](./figures/key-value-example.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Map-without-Reduce\n", "\n", "![Map-without-Reduce](./figures/map-no-reduce.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## MapReduce data flow with multiple reduce tasks\n", "\n", "![Map-with-many-Reduce](./figures/multi-reduce.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## MapReduce data flow with a single reduce task\n", "\n", "\n", "![Map-with-many-Reduce](./figures/single-reduce.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### The Map Side\n", "\n", "- Each map task has a circular memory buffer that it writes the output to.\n", "- When the contents of the buffer reaches a certain threshold size, a background thread will start to spill the contents to disk.\n", "- Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. \n", "- Within each partition, the background thread performs an in-memory **sort** by key, and if there is a combiner function, it is run on the output of the sort." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### The Reduce Side\n", "\n", "- The map output file is sitting on the local disk of the working machine that ran the map task. (**This makes Hadoop slow!**)\n", "- The reduce task starts copying their outputs as soon as each map task completes. This is known as the **copy phase** of the reduce task.\n", "- As the copies accumulate on disk, a background thread merges them into larger, sorted files.\n", "- When all the map outputs have been copied, the reduce task moves into the **sort phase**.\n", "- During the reduce phase, the reduce function is invoked for each key in the sorted output. The output of this phase is written directly to the output filesystem." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Shuffle and sort in MapReduce\n", "\n", "### Why Hadoop is slow\n", "\n", "![Shuffle-Sort](./figures/shuffle-sort.png)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Simulate the whole MapReduce with a Linux pipeline\n", "\n", "\n", "- You could test your MapReduce locally with Linux\n", "\n", "- It simulates a single-map-and-single-reduce task.\n", "\n", "```sh\n", "cat sample_input.txt | mapper.py | sort | reducer.py\n", "```\n", "\n", "- If your program fails here, it will not work on Hadoop either." ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Python3.9-virtualenv", "language": "python", "name": "python3.9-virtualenv" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.17" } }, "nbformat": 4, "nbformat_minor": 4 }