{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# MapReduce with Hadoop Streaming\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Hadoop Streaming, `stdin` and `stdout`\n", "\n", "- Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java\n", "\n", "- Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any combination of languages that can read standard input and write to standard output to write your MapReduce program.\n", "\n", " - You could use different language in mapper and reduce functions.\n", "\n", " - It suits for text processing (e.g. read every line from a big CSV file).\n", "\n", " - It can also handle binary streams (e.g. read image as input)." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Your first Hadoop MapReduce program" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "scrolled": true, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/bin/cat\n" ] } ], "source": [ "which cat " ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "/bin/wc\n" ] } ], "source": [ "which wc" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "rm: `/user/lifeng/output': No such file or directory\n", "packageJobJar: [/tmp/hadoop-unjar7580110097437608263/] [] /tmp/streamjob1753312041067415791.jar tmpDir=null\n", "20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/\n", "20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032\n", "20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/\n", "20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032\n", "20/02/26 21:41:53 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries\n", "20/02/26 21:41:53 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]\n", "20/02/26 21:41:53 INFO mapred.FileInputFormat: Total input paths to process : 1\n", "20/02/26 21:41:53 INFO mapreduce.JobSubmitter: number of splits:16\n", "20/02/26 21:41:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0005\n", "20/02/26 21:41:53 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0005\n", "20/02/26 21:41:53 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0005/\n", "20/02/26 21:41:53 INFO mapreduce.Job: Running job: job_1579055927679_0005\n", "20/02/26 21:41:58 INFO mapreduce.Job: Job job_1579055927679_0005 running in uber mode : false\n", "20/02/26 21:41:58 INFO mapreduce.Job: map 0% reduce 0%\n", "20/02/26 21:42:06 INFO mapreduce.Job: map 100% reduce 0%\n", "20/02/26 21:42:11 INFO mapreduce.Job: map 100% reduce 29%\n", "20/02/26 21:42:12 INFO mapreduce.Job: map 100% reduce 57%\n", "20/02/26 21:42:13 INFO mapreduce.Job: map 100% reduce 86%\n", "20/02/26 21:42:14 INFO mapreduce.Job: map 100% reduce 100%\n", "20/02/26 21:42:14 INFO mapreduce.Job: Job job_1579055927679_0005 completed successfully\n", "20/02/26 21:42:14 INFO mapreduce.Job: Counters: 49\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=1367\n", "\t\tFILE: Number of bytes written=3035077\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=14720\n", "\t\tHDFS: Number of bytes written=175\n", "\t\tHDFS: Number of read operations=69\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=14\n", "\tJob Counters \n", "\t\tLaunched map tasks=16\n", "\t\tLaunched reduce tasks=7\n", "\t\tData-local map tasks=16\n", "\t\tTotal time spent by all maps in occupied slots (ms)=4947858\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=1907919\n", "\t\tTotal time spent by all map tasks (ms)=83862\n", "\t\tTotal time spent by all reduce tasks (ms)=16307\n", "\t\tTotal vcore-milliseconds taken by all map tasks=83862\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=16307\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=156989664\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=61053408\n", "\tMap-Reduce Framework\n", "\t\tMap input records=29\n", "\t\tMap output records=29\n", "\t\tMap output bytes=1540\n", "\t\tMap output materialized bytes=2961\n", "\t\tInput split bytes=1824\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=25\n", "\t\tReduce shuffle bytes=2961\n", "\t\tReduce input records=29\n", "\t\tReduce output records=7\n", "\t\tSpilled Records=58\n", "\t\tShuffled Maps =112\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=112\n", "\t\tGC time elapsed (ms)=3117\n", "\t\tCPU time spent (ms)=33410\n", "\t\tPhysical memory (bytes) snapshot=10298241024\n", "\t\tVirtual memory (bytes) snapshot=92690731008\n", "\t\tTotal committed heap usage (bytes)=16765157376\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=12896\n", "\tFile Output Format Counters \n", "\t\tBytes Written=175\n", "20/02/26 21:42:14 INFO streaming.StreamJob: Output directory: /user/lifeng/output\n" ] } ], "source": [ "hadoop fs -rm -r /user/lifeng/output\n", "\n", "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \\\n", " -input /user/lifeng/asm-LICENSE \\\n", " -output /user/lifeng/output \\\n", " -mapper \"/usr/bin/cat\" \\\n", " -reducer \"/usr/bin/wc\" " ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 8 items\n", "-rw-r----- 2 lifeng hadoop 0 2020-02-26 21:42 /user/lifeng/output/_SUCCESS\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00000\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00001\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00002\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00003\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00004\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00005\n", "-rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00006\n" ] } ], "source": [ "hadoop fs -ls /user/lifeng/output" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 6 63 375\t\n", " 7 24 159\t\n", " 5 39 290\t\n", " 4 39 279\t\n", " 3 21 150\t\n", " 2 20 147\t\n", " 2 19 140\t\n" ] } ], "source": [ "hadoop fs -cat /user/lifeng/output/*" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "20/02/26 21:43:21 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.\n", "Moved: 'hdfs://emr-header-1.cluster-46968:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-46968:9000/user/lifeng/.Trash/Current\n", "packageJobJar: [/tmp/hadoop-unjar6344502777978736495/] [] /tmp/streamjob5520248859546438609.jar tmpDir=null\n", "20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/\n", "20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032\n", "20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/\n", "20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032\n", "20/02/26 21:43:24 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries\n", "20/02/26 21:43:24 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]\n", "20/02/26 21:43:24 INFO mapred.FileInputFormat: Total input paths to process : 1\n", "20/02/26 21:43:24 INFO mapreduce.JobSubmitter: number of splits:16\n", "20/02/26 21:43:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0006\n", "20/02/26 21:43:25 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0006\n", "20/02/26 21:43:25 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0006/\n", "20/02/26 21:43:25 INFO mapreduce.Job: Running job: job_1579055927679_0006\n", "20/02/26 21:43:31 INFO mapreduce.Job: Job job_1579055927679_0006 running in uber mode : false\n", "20/02/26 21:43:31 INFO mapreduce.Job: map 0% reduce 0%\n", "20/02/26 21:43:37 INFO mapreduce.Job: map 13% reduce 0%\n", "20/02/26 21:43:38 INFO mapreduce.Job: map 100% reduce 0%\n", "20/02/26 21:43:42 INFO mapreduce.Job: map 100% reduce 100%\n", "20/02/26 21:43:42 INFO mapreduce.Job: Job job_1579055927679_0006 completed successfully\n", "20/02/26 21:43:42 INFO mapreduce.Job: Counters: 50\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=895\n", "\t\tFILE: Number of bytes written=2241281\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=14720\n", "\t\tHDFS: Number of bytes written=25\n", "\t\tHDFS: Number of read operations=51\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\tJob Counters \n", "\t\tKilled map tasks=1\n", "\t\tLaunched map tasks=16\n", "\t\tLaunched reduce tasks=1\n", "\t\tData-local map tasks=16\n", "\t\tTotal time spent by all maps in occupied slots (ms)=4388774\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=295191\n", "\t\tTotal time spent by all map tasks (ms)=74386\n", "\t\tTotal time spent by all reduce tasks (ms)=2523\n", "\t\tTotal vcore-milliseconds taken by all map tasks=74386\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=2523\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=139250592\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=9446112\n", "\tMap-Reduce Framework\n", "\t\tMap input records=29\n", "\t\tMap output records=29\n", "\t\tMap output bytes=1540\n", "\t\tMap output materialized bytes=1535\n", "\t\tInput split bytes=1824\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=25\n", "\t\tReduce shuffle bytes=1535\n", "\t\tReduce input records=29\n", "\t\tReduce output records=1\n", "\t\tSpilled Records=58\n", "\t\tShuffled Maps =16\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=16\n", "\t\tGC time elapsed (ms)=2576\n", "\t\tCPU time spent (ms)=27920\n", "\t\tPhysical memory (bytes) snapshot=8505241600\n", "\t\tVirtual memory (bytes) snapshot=61655015424\n", "\t\tTotal committed heap usage (bytes)=12341739520\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=12896\n", "\tFile Output Format Counters \n", "\t\tBytes Written=25\n", "20/02/26 21:43:42 INFO streaming.StreamJob: Output directory: /user/lifeng/output\n", " 29 225 1540\t\n" ] } ], "source": [ "hadoop fs -rm -r /user/lifeng/output\n", "\n", "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar -input /user/lifeng/asm-LICENSE -output /user/lifeng/output -mapper \"/usr/bin/cat\" -reducer \"/usr/bin/wc\" -numReduceTasks 1" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 29 225 1540\t\n" ] } ], "source": [ "hadoop fs -cat /user/lifeng/output/*" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Best Practice with Hadoop Streaming\n", "\n", "- Write you Hadoop commnad in a Bash file instead run it directly on Linux Shell\n", "\n", "- Use the following command to track Hadoop MapReduce errors\n", "\n", " yarn logs -applicationId JOB_ID" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#!/bin/bash\n", "\n", "PWD=$(cd $(dirname $0); pwd)\n", "cd $PWD 1> /dev/null 2>&1\n", "\n", "TASKNAME=task1_fli\n", "# python location on hadoop\n", "PY27='/fli/tools/python2.7.tar.gz'\n", "# hadoop client\n", "HADOOP_HOME=/home/users/fli/hadoop/bin/hadoop\n", "HADOOP_INPUT_DIR1=/fli/data1/part-*\n", "HADOOP_INPUT_DIR2=/fli/data2/part-*\n", "HADOOP_OUTPUT_DIR=/fli/results/task1\n", "\n", "echo $HADOOP_HOME\n", "echo $HADOOP_INPUT_DIR\n", "echo $HADOOP_OUTPUT_DIR\n", "\n", "$HADOOP_HOME fs -rmr $HADOOP_OUTPUT_DIR\n", "\n", "$HADOOP_HOME streaming \\\n", " -jobconf mapred.job.name=$TASKNAME \\\n", " -jobconf mapred.job.priority=NORMAL \\\n", " -jobconf mapred.map.tasks=500 \\\n", " -jobconf mapred.reduce.tasks=500 \\\n", " -jobconf mapred.job.map.capacity=500 \\\n", " -jobconf mapred.job.reduce.capacity=500 \\\n", " -jobconf stream.num.map.output.key.fields=2 \\\n", " -jobconf mapred.text.key.partitioner.options=-k1,1 \\\n", " -jobconf stream.memory.limit=1000 \\\n", " -file $PWD/mapper.sh \\\n", " -output ${HADOOP_OUTPUT_DIR} \\\n", " -input ${HADOOP_INPUT_DIR1} \\\n", " -input ${HADOOP_INPUT_DIR2} \\\n", " -mapper \"sh mapper.sh\" \\\n", " -reducer \"cat\" \\\n", " -cacheArchive ${PY27}#py27 \\\n", " -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner\n", "\n", "\n", "if [ $? -ne 0 ]; then\n", " echo 'error'\n", " exit 1\n", "fi\n", "$HADOOP_HOME fs -touchz ${HADOOP_OUTPUT_DIR}/done\n", "\n", "exit 0\n", "\n", "\n" ] } ], "source": [ "cat examples/print-colums/main.sh" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## MapReduce with examples\n", "\n", "\n", "- Mapper: Bash, Reducer: Python\n", "\n", "- Write MapReduce with Python\n", "\n", "- Write MapReduce with R" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Other Hadoop APIs\n", "\n", "- Hadoop with Java MapReduce\n", " - Hadoop is written in Java. There are rich classes of Java MapReduce modules ready to use.\n", " - You need javac (in JDK) and hadoop-mapreduce-client-core-xxx.jar to comile your jar files.\n", " \n", " ```sh\n", " javac -classpath $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar -d FirstJar\\\n", "\n", " jar -cvf FirstJar.jar -C FirstJar/\n", " ```\n", "\n", "- The Java version Hadoop syntax is\n", "\n", "```sh\n", "hadoop jar FirstJar.jar [mainClass] input output\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "- Hadoop Pipe\n", " - Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce.\n", " - Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function.\n", " - The application links against the Hadoop C++ library, which is a thin wrapper for communicating with the tasktracker child process.\n", "\n", "- You have to compile and link your C++ program before send it to Hadoop.\n", "\n", "- The Hadoop Pipe syntax\n", "```sh\n", "hadoop pipes \\\n", " -D hadoop.pipes.java.recordreader=true \\\n", " -D hadoop.pipes.java.recordwriter=true \\\n", " -input sample.txt \\\n", " -output output \\\n", " -program myCPProgram\n", "```" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Lab\n", "\n", "- Use `airline_small.csv` as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html\n", "\n", "- Extract useful information from the data\n", "\n", " - List all airport codes, with frequency\n", " - Make a new binary variable (Y) to indicate if a trip is delayed or not.\n", " \n", "- Make dummy transformation for variables such as `DayofWeek`, `Month`...\n", "\n", "- Finally, save your output in a file.\n", "\n", " - Each row contains the binary variable (Y), `CarrierDelay`, and your constructed dummy variables as predictors.\n", " \n", "\n", "- **Hint**\n", "\n", " - You could use any language but Python3 is preferable.\n", " - Try your code with Linux pipe first and then Hadoop\n", " - Record the computing time." ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Bash", "language": "bash", "name": "bash" }, "language_info": { "codemirror_mode": "shell", "file_extension": ".sh", "mimetype": "text/x-sh", "name": "bash" } }, "nbformat": 4, "nbformat_minor": 4 }