{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Statistics with Hadoop Streaming\n", "\n", "## Feng Li\n", "\n", "### Central University of Finance and Economics\n", "\n", "### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)\n", "### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## A simple line count program" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#! /usr/bin/env python3\n", "\n", "import sys\n", "count = 0\n", "# data = []\n", "for line in sys.stdin: # read input from stdin\n", " count += 1\n", " # data.append(line) \n", "print(count) # print goes to sys.stdout\n" ] } ], "source": [ "cat line_count.py" ] }, { "cell_type": "code", "execution_count": 44, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "29\n" ] } ], "source": [ "cat license.txt | python3 line_count.py" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### We could write the **long** Hadoop command into an `.sh` file, say `run_line_count.sh`" ] }, { "cell_type": "code", "execution_count": 45, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#! /usr/bin/sh\n", "\n", "TASKNAME=line_count\n", "\n", "hadoop fs -rm -r ./output/\n", "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \\\n", " -jobconf mapred.job.name=$TASKNAME \\\n", " -input /user/lifeng/data/license.txt \\\n", " -output ./output \\\n", " -file \"line_count.py\" \\\n", " -mapper \"/usr/bin/cat\" \\\n", " -reducer \"python3 line_count.py\" \\\n", " -numReduceTasks 1 \n" ] } ], "source": [ "cat run_line_count.sh" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "18/12/06 23:46:54 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.\n", "Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current\n", "18/12/06 23:46:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "18/12/06 23:46:56 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.\n", "18/12/06 23:46:56 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name\n", "packageJobJar: [line_count.py, /tmp/hadoop-unjar6165594769339738850/] [] /tmp/streamjob1645783056874142978.jar tmpDir=null\n", "18/12/06 23:46:56 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/06 23:46:57 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/06 23:46:57 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/06 23:46:57 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/06 23:46:57 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries\n", "18/12/06 23:46:57 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]\n", "18/12/06 23:46:57 INFO mapred.FileInputFormat: Total input paths to process : 1\n", "18/12/06 23:46:57 INFO mapreduce.JobSubmitter: number of splits:16\n", "18/12/06 23:46:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0075\n", "18/12/06 23:46:57 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0075\n", "18/12/06 23:46:57 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0075/\n", "18/12/06 23:46:57 INFO mapreduce.Job: Running job: job_1542711134746_0075\n", "18/12/06 23:47:02 INFO mapreduce.Job: Job job_1542711134746_0075 running in uber mode : false\n", "18/12/06 23:47:02 INFO mapreduce.Job: map 0% reduce 0%\n", "18/12/06 23:47:08 INFO mapreduce.Job: map 100% reduce 0%\n", "18/12/06 23:47:13 INFO mapreduce.Job: map 100% reduce 100%\n", "18/12/06 23:47:13 INFO mapreduce.Job: Job job_1542711134746_0075 completed successfully\n", "18/12/06 23:47:14 INFO mapreduce.Job: Counters: 49\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=895\n", "\t\tFILE: Number of bytes written=2258315\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=14800\n", "\t\tHDFS: Number of bytes written=4\n", "\t\tHDFS: Number of read operations=51\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\tJob Counters \n", "\t\tLaunched map tasks=16\n", "\t\tLaunched reduce tasks=1\n", "\t\tData-local map tasks=16\n", "\t\tTotal time spent by all maps in occupied slots (ms)=3969756\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=229905\n", "\t\tTotal time spent by all map tasks (ms)=67284\n", "\t\tTotal time spent by all reduce tasks (ms)=1965\n", "\t\tTotal vcore-milliseconds taken by all map tasks=67284\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=1965\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=125955648\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=7356960\n", "\tMap-Reduce Framework\n", "\t\tMap input records=29\n", "\t\tMap output records=29\n", "\t\tMap output bytes=1540\n", "\t\tMap output materialized bytes=1535\n", "\t\tInput split bytes=1904\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=25\n", "\t\tReduce shuffle bytes=1535\n", "\t\tReduce input records=29\n", "\t\tReduce output records=1\n", "\t\tSpilled Records=58\n", "\t\tShuffled Maps =16\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=16\n", "\t\tGC time elapsed (ms)=3148\n", "\t\tCPU time spent (ms)=15270\n", "\t\tPhysical memory (bytes) snapshot=8800583680\n", "\t\tVirtual memory (bytes) snapshot=63775653888\n", "\t\tTotal committed heap usage (bytes)=12616990720\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=12896\n", "\tFile Output Format Counters \n", "\t\tBytes Written=4\n", "18/12/06 23:47:14 INFO streaming.StreamJob: Output directory: ./output\n" ] } ], "source": [ "sh run_line_count.sh" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Simple statistics with MapReduce" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "AAPL\t10\n", "CSCO\t10\n", "GOOG\t5\n", "MSFT\t10\n", "YHOO\t10\n" ] } ], "source": [ "cat stocks.txt | python3 stocks_mapper.py | python3 stocks_reducer.py" ] }, { "cell_type": "code", "execution_count": 47, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "AAPL\t2009-01-02\t88.315\t\n", "AAPL\t2008-01-02\t197.055\t\n", "AAPL\t2007-01-03\t85.045\t\n", "AAPL\t2006-01-03\t73.565\t\n", "AAPL\t2005-01-03\t64.035\t\n", "AAPL\t2004-01-02\t21.415\t\n", "AAPL\t2003-01-02\t14.58\t\n", "AAPL\t2002-01-02\t22.675\t\n", "AAPL\t2001-01-02\t14.88\t\n", "AAPL\t2000-01-03\t108.405\t\n", "CSCO\t2009-01-02\t16.685\t\n", "CSCO\t2008-01-02\t26.77\t\n", "CSCO\t2007-01-03\t27.595\t\n", "CSCO\t2006-01-03\t17.33\t\n", "CSCO\t2005-01-03\t19.37\t\n", "CSCO\t2004-01-02\t24.305\t\n", "CSCO\t2003-01-02\t13.375\t\n", "CSCO\t2002-01-02\t18.835\t\n", "CSCO\t2001-01-02\t35.72\t\n", "CSCO\t2000-01-03\t109\t\n", "GOOG\t2009-01-02\t314.96\t\n", "GOOG\t2008-01-02\t689.03\t\n", "GOOG\t2007-01-03\t466.795\t\n", "GOOG\t2006-01-03\t428.875\t\n", "GOOG\t2005-01-03\t200.055\t\n", "MSFT\t2009-01-02\t19.93\t\n", "MSFT\t2008-01-02\t35.505\t\n", "MSFT\t2007-01-03\t29.885\t\n", "MSFT\t2006-01-03\t26.545\t\n", "MSFT\t2005-01-03\t26.77\t\n", "MSFT\t2004-01-02\t27.515\t\n", "MSFT\t2003-01-02\t53.01\t\n", "MSFT\t2002-01-02\t66.845\t\n", "MSFT\t2001-01-02\t43.755\t\n", "MSFT\t2000-01-03\t116.965\t\n", "YHOO\t2009-01-02\t12.51\t\n", "YHOO\t2008-01-02\t23.76\t\n", "YHOO\t2007-01-03\t25.73\t\n", "YHOO\t2006-01-03\t40.3\t\n", "YHOO\t2005-01-03\t38.27\t\n", "YHOO\t2004-01-02\t45.45\t\n", "YHOO\t2003-01-02\t17.095\t\n", "YHOO\t2002-01-02\t18.385\t\n", "YHOO\t2001-01-02\t29.25\t\n", "YHOO\t2000-01-03\t458.96\t\n" ] } ], "source": [ "cat stocks.txt | Rscript stock_day_avg.R" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### R version" ] }, { "cell_type": "code", "execution_count": 48, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "18/12/07 15:41:48 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.\n", "Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current\n", "18/12/07 15:41:49 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "18/12/07 15:41:50 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.\n", "18/12/07 15:41:50 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name\n", "packageJobJar: [stock_day_avg.R, /tmp/hadoop-unjar8447436429789349154/] [] /tmp/streamjob5401932799782783867.jar tmpDir=null\n", "18/12/07 15:41:50 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/07 15:41:51 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/07 15:41:51 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/07 15:41:51 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/07 15:41:51 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries\n", "18/12/07 15:41:51 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]\n", "18/12/07 15:41:51 INFO mapred.FileInputFormat: Total input paths to process : 1\n", "18/12/07 15:41:51 INFO mapreduce.JobSubmitter: number of splits:16\n", "18/12/07 15:41:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0119\n", "18/12/07 15:41:51 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0119\n", "18/12/07 15:41:51 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0119/\n", "18/12/07 15:41:51 INFO mapreduce.Job: Running job: job_1542711134746_0119\n", "18/12/07 15:41:56 INFO mapreduce.Job: Job job_1542711134746_0119 running in uber mode : false\n", "18/12/07 15:41:56 INFO mapreduce.Job: map 0% reduce 0%\n", "18/12/07 15:42:03 INFO mapreduce.Job: map 100% reduce 0%\n", "18/12/07 15:42:08 INFO mapreduce.Job: map 100% reduce 100%\n", "18/12/07 15:42:08 INFO mapreduce.Job: Job job_1542711134746_0119 completed successfully\n", "18/12/07 15:42:08 INFO mapreduce.Job: Counters: 49\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=1068\n", "\t\tFILE: Number of bytes written=2258875\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=23344\n", "\t\tHDFS: Number of bytes written=1066\n", "\t\tHDFS: Number of read operations=51\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\tJob Counters \n", "\t\tLaunched map tasks=16\n", "\t\tLaunched reduce tasks=1\n", "\t\tData-local map tasks=16\n", "\t\tTotal time spent by all maps in occupied slots (ms)=3672278\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=243009\n", "\t\tTotal time spent by all map tasks (ms)=62242\n", "\t\tTotal time spent by all reduce tasks (ms)=2077\n", "\t\tTotal vcore-milliseconds taken by all map tasks=62242\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=2077\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=116517024\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=7776288\n", "\tMap-Reduce Framework\n", "\t\tMap input records=45\n", "\t\tMap output records=45\n", "\t\tMap output bytes=2557\n", "\t\tMap output materialized bytes=1752\n", "\t\tInput split bytes=1888\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=45\n", "\t\tReduce shuffle bytes=1752\n", "\t\tReduce input records=45\n", "\t\tReduce output records=45\n", "\t\tSpilled Records=90\n", "\t\tShuffled Maps =16\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=16\n", "\t\tGC time elapsed (ms)=2874\n", "\t\tCPU time spent (ms)=14450\n", "\t\tPhysical memory (bytes) snapshot=8774807552\n", "\t\tVirtual memory (bytes) snapshot=63769485312\n", "\t\tTotal committed heap usage (bytes)=12871794688\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=21456\n", "\tFile Output Format Counters \n", "\t\tBytes Written=1066\n", "18/12/07 15:42:08 INFO streaming.StreamJob: Output directory: ./output\n", "AAPL\t2000-01-03\t108.405\t\n", "AAPL\t2001-01-02\t14.88\t\n", "AAPL\t2002-01-02\t22.675\t\n", "AAPL\t2003-01-02\t14.58\t\n", "AAPL\t2004-01-02\t21.415\t\n", "AAPL\t2005-01-03\t64.035\t\n", "AAPL\t2006-01-03\t73.565\t\n", "AAPL\t2007-01-03\t85.045\t\n", "AAPL\t2008-01-02\t197.055\t\n", "AAPL\t2009-01-02\t88.315\t\n", "CSCO\t2000-01-03\t109\t\n", "CSCO\t2001-01-02\t35.72\t\n", "CSCO\t2002-01-02\t18.835\t\n", "CSCO\t2003-01-02\t13.375\t\n", "CSCO\t2004-01-02\t24.305\t\n", "CSCO\t2005-01-03\t19.37\t\n", "CSCO\t2006-01-03\t17.33\t\n", "CSCO\t2007-01-03\t27.595\t\n", "CSCO\t2008-01-02\t26.77\t\n", "CSCO\t2009-01-02\t16.685\t\n", "GOOG\t2005-01-03\t200.055\t\n", "GOOG\t2006-01-03\t428.875\t\n", "GOOG\t2007-01-03\t466.795\t\n", "GOOG\t2008-01-02\t689.03\t\n", "GOOG\t2009-01-02\t314.96\t\n", "MSFT\t2000-01-03\t116.965\t\n", "MSFT\t2001-01-02\t43.755\t\n", "MSFT\t2002-01-02\t66.845\t\n", "MSFT\t2003-01-02\t53.01\t\n", "MSFT\t2004-01-02\t27.515\t\n", "MSFT\t2005-01-03\t26.77\t\n", "MSFT\t2006-01-03\t26.545\t\n", "MSFT\t2007-01-03\t29.885\t\n", "MSFT\t2008-01-02\t35.505\t\n", "MSFT\t2009-01-02\t19.93\t\n", "YHOO\t2000-01-03\t458.96\t\n", "YHOO\t2001-01-02\t29.25\t\n", "YHOO\t2002-01-02\t18.385\t\n", "YHOO\t2003-01-02\t17.095\t\n", "YHOO\t2004-01-02\t45.45\t\n", "YHOO\t2005-01-03\t38.27\t\n", "YHOO\t2006-01-03\t40.3\t\n", "YHOO\t2007-01-03\t25.73\t\n", "YHOO\t2008-01-02\t23.76\t\n", "YHOO\t2009-01-02\t12.51\t\n" ] } ], "source": [ "sh run_stocks_mean.sh" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Python version" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#! /usr/bin/env python2\n", "\n", "import sys\n", "\n", "for line in sys.stdin:\n", " part = line.split(',') \n", " print (part[0], 1)\n", " \n" ] } ], "source": [ "cat stocks_mapper.py" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#! /usr/bin/env python3\n", "\n", "import sys\n", "from operator import itemgetter \n", "wordcount = {}\n", "\n", "for line in sys.stdin:\n", " word,count = line.split(' ')\n", " count = int(count)\n", " wordcount[word] = wordcount.get(word,0) + count\n", "\n", "sorted_wordcount = sorted(wordcount.items(), key=itemgetter(0))\n", "\n", "for word, count in sorted_wordcount:\n", " print ('%s\\t%s'% (word,count))\n" ] } ], "source": [ "cat stocks_reducer.py" ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "18/12/07 15:22:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.\n", "Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current\n", "18/12/07 15:22:40 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "18/12/07 15:22:41 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.\n", "18/12/07 15:22:41 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name\n", "packageJobJar: [stocks_mapper.py, stocks_reducer.py, /tmp/hadoop-unjar6949997625926419144/] [] /tmp/streamjob8221192035336864415.jar tmpDir=null\n", "18/12/07 15:22:41 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/07 15:22:42 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/07 15:22:42 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/07 15:22:42 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/07 15:22:42 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries\n", "18/12/07 15:22:42 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]\n", "18/12/07 15:22:42 INFO mapred.FileInputFormat: Total input paths to process : 1\n", "18/12/07 15:22:42 INFO mapreduce.JobSubmitter: number of splits:16\n", "18/12/07 15:22:42 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0103\n", "18/12/07 15:22:42 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0103\n", "18/12/07 15:22:42 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0103/\n", "18/12/07 15:22:42 INFO mapreduce.Job: Running job: job_1542711134746_0103\n", "18/12/07 15:22:48 INFO mapreduce.Job: Job job_1542711134746_0103 running in uber mode : false\n", "18/12/07 15:22:48 INFO mapreduce.Job: map 0% reduce 0%\n", "18/12/07 15:22:54 INFO mapreduce.Job: map 100% reduce 0%\n", "18/12/07 15:22:59 INFO mapreduce.Job: map 100% reduce 67%\n", "18/12/07 15:23:00 INFO mapreduce.Job: map 100% reduce 100%\n", "18/12/07 15:23:00 INFO mapreduce.Job: Job job_1542711134746_0103 completed successfully\n", "18/12/07 15:23:00 INFO mapreduce.Job: Counters: 50\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=117\n", "\t\tFILE: Number of bytes written=2529253\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=23344\n", "\t\tHDFS: Number of bytes written=39\n", "\t\tHDFS: Number of read operations=57\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=6\n", "\tJob Counters \n", "\t\tKilled reduce tasks=1\n", "\t\tLaunched map tasks=16\n", "\t\tLaunched reduce tasks=3\n", "\t\tData-local map tasks=16\n", "\t\tTotal time spent by all maps in occupied slots (ms)=3827094\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=711711\n", "\t\tTotal time spent by all map tasks (ms)=64866\n", "\t\tTotal time spent by all reduce tasks (ms)=6083\n", "\t\tTotal vcore-milliseconds taken by all map tasks=64866\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=6083\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=121429152\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=22774752\n", "\tMap-Reduce Framework\n", "\t\tMap input records=45\n", "\t\tMap output records=45\n", "\t\tMap output bytes=360\n", "\t\tMap output materialized bytes=900\n", "\t\tInput split bytes=1888\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=5\n", "\t\tReduce shuffle bytes=900\n", "\t\tReduce input records=45\n", "\t\tReduce output records=5\n", "\t\tSpilled Records=90\n", "\t\tShuffled Maps =48\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=48\n", "\t\tGC time elapsed (ms)=3061\n", "\t\tCPU time spent (ms)=15520\n", "\t\tPhysical memory (bytes) snapshot=9426890752\n", "\t\tVirtual memory (bytes) snapshot=74381381632\n", "\t\tTotal committed heap usage (bytes)=14251720704\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=21456\n", "\tFile Output Format Counters \n", "\t\tBytes Written=39\n", "18/12/07 15:23:00 INFO streaming.StreamJob: Output directory: ./output\n", "AAPL\t10\n", "YHOO\t10\n", "CSCO\t10\n", "MSFT\t10\n", "GOOG\t5\n" ] } ], "source": [ "sh run_stocks.sh" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Linear Regression with Hadoop\n", "\n", "### Let's first generate some data" ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "scrolled": true, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#! /usr/bin/Rscript\n", "\n", "n = 100000\n", "p = 10\n", "x = matrix(rnorm(n*p), n, p)\n", "e = rnorm(n)\n", "beta = c(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) #生成beta系数分别为1~10\n", "y = x%*%beta+0.3*e\n", "mydata = cbind(x, y)\n", "dim(mydata)\n", "write.table(mydata, \"linear_random.csv\", sep = \",\" , row.names = FALSE, col.names = FALSE)\n", "colnames(mydata) = c(\"x1\", \"x2\", \"x3\", \"x4\", \"x5\", \"x6\", \"x7\", \"x8\", \"x9\", \"x10\", \"y\")\n", "mydata = data.frame(mydata)\n", "myfit = lm(y~x1+x2+x3+x4+x5+x6+x7+x8+x9+x10, mydata)\n", "myfit$coefficients\n" ] } ], "source": [ "cat simulation.R" ] }, { "cell_type": "code", "execution_count": 38, "metadata": { "scrolled": true, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1] 100000 11\n", " (Intercept) x1 x2 x3 x4 x5 \n", " 0.001462286 0.999006118 1.998435334 2.999921614 4.000487524 4.999367984 \n", " x6 x7 x8 x9 x10 \n", " 5.999635871 7.000364838 7.999219909 8.999339902 10.000583860 \n" ] } ], "source": [ "Rscript simulation.R" ] }, { "cell_type": "code", "execution_count": 39, "metadata": { "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-1.25783985738839,0.233853066199809,0.959321896002629,-0.927971998903392,-1.9081222471712,-1.15679780312436,-0.98285146450708,-0.833097463331552,0.305515852568209,-1.79625721854489,-47.3338205169692\n", "-0.620172983975508,1.73728982281345,-0.829302997285467,0.354197934959032,0.85902952682553,0.489616646142072,0.0563573499545251,-0.107110454558573,0.189569420283467,-0.500661238817971,5.24542404493916\n", "1.40735729353374,1.25565108871813,0.988131408681291,0.857230185753291,0.63206801604377,2.16416156314474,-0.644790848920365,-0.51230132081169,1.94601333805292,-0.0724748987249774,34.2115745174301\n", "-0.0423011414422383,1.60278591223843,0.670382006567984,0.133106694554098,0.351700062468421,0.129326484274798,-0.464781051242584,0.764922912370213,-0.419752110275864,1.07624550982786,17.5337240754468\n", "0.662088621989811,-0.554459357346132,-0.83952037843494,1.43008790985413,1.40697432960699,-0.17754879747715,0.0269668971687026,-1.10737416485557,-1.14655494403846,-0.825552889554892,-18.7422354539862\n", "-0.605091503904831,1.08722575798844,1.12151767015535,-0.467934606210984,-1.1702020092535,-0.443650557748794,-0.802428089144584,-0.703702959432261,0.765019193846429,0.950967476748649,-0.369219523421802\n", "1.45255921490658,0.596968957864977,-1.38110209866925,-1.22015325783075,1.3321747339368,-0.618288146956072,-0.382441706582264,0.207848059847487,0.889489099391418,-1.38471032943358,-10.751906812882\n", "-0.0794402496054108,-0.0501255662315621,-0.0350709967052265,-1.00325836595628,-1.63010974782501,-0.00656109346693426,-0.710238219439852,1.55369135240367,-0.655536261944941,0.27165497553634,-8.87422553928433\n", "1.82499632174219,-1.37138612590107,0.66600446984248,2.27965171364776,0.577535835916579,3.0670970849713,0.475444508651725,0.252335257217842,0.394343365860688,1.26731818217592,52.5594762829493\n", "0.569523756632114,0.0236995546981009,0.185252906422781,0.955783376629012,0.114066408572127,-0.906101655277262,-0.87515532316799,1.24224011041282,-1.95110207141076,-0.172283404356755,-15.4308746142398\n" ] } ], "source": [ "head linear_random.csv" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Try with Linux pipes first" ] }, { "cell_type": "code", "execution_count": 49, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "matrix([[ 0.99901524, 1.99843097, 2.99992025, 4.00048095,\n", " 4.99937017, 5.99963222, 7.00036961, 7.99921836,\n", " 8.99933589, 10.00058861]])\n" ] } ], "source": [ "cat linear_random.csv | python lr_mapper.py | python lr_reducer.py" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "### Run the regression model within Hadoop" ] }, { "cell_type": "code", "execution_count": 50, "metadata": { "scrolled": false, "slideshow": { "slide_type": "slide" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "18/12/07 16:39:28 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.\n", "Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current\n", "18/12/07 16:39:29 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "packageJobJar: [lr_mapper.py, lr_reducer.py, /tmp/hadoop-unjar4917789017892634208/] [] /tmp/streamjob4579007115660462517.jar tmpDir=null\n", "18/12/07 16:39:30 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/07 16:39:30 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/07 16:39:31 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/\n", "18/12/07 16:39:31 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/\n", "18/12/07 16:39:31 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries\n", "18/12/07 16:39:31 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]\n", "18/12/07 16:39:31 INFO mapred.FileInputFormat: Total input paths to process : 1\n", "18/12/07 16:39:31 INFO mapreduce.JobSubmitter: number of splits:16\n", "18/12/07 16:39:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0136\n", "18/12/07 16:39:31 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0136\n", "18/12/07 16:39:31 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0136/\n", "18/12/07 16:39:31 INFO mapreduce.Job: Running job: job_1542711134746_0136\n", "18/12/07 16:39:36 INFO mapreduce.Job: Job job_1542711134746_0136 running in uber mode : false\n", "18/12/07 16:39:36 INFO mapreduce.Job: map 0% reduce 0%\n", "18/12/07 16:39:43 INFO mapreduce.Job: map 100% reduce 0%\n", "18/12/07 16:39:47 INFO mapreduce.Job: map 100% reduce 100%\n", "18/12/07 16:39:47 INFO mapreduce.Job: Job job_1542711134746_0136 completed successfully\n", "18/12/07 16:39:48 INFO mapreduce.Job: Counters: 50\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=12815\n", "\t\tFILE: Number of bytes written=2286899\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=19966275\n", "\t\tHDFS: Number of bytes written=172\n", "\t\tHDFS: Number of read operations=51\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\tJob Counters \n", "\t\tKilled map tasks=1\n", "\t\tLaunched map tasks=16\n", "\t\tLaunched reduce tasks=1\n", "\t\tData-local map tasks=16\n", "\t\tTotal time spent by all maps in occupied slots (ms)=4250124\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=227331\n", "\t\tTotal time spent by all map tasks (ms)=72036\n", "\t\tTotal time spent by all reduce tasks (ms)=1943\n", "\t\tTotal vcore-milliseconds taken by all map tasks=72036\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=1943\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=134851392\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=7274592\n", "\tMap-Reduce Framework\n", "\t\tMap input records=100000\n", "\t\tMap output records=16\n", "\t\tMap output bytes=35143\n", "\t\tMap output materialized bytes=13082\n", "\t\tInput split bytes=2000\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=16\n", "\t\tReduce shuffle bytes=13082\n", "\t\tReduce input records=16\n", "\t\tReduce output records=3\n", "\t\tSpilled Records=32\n", "\t\tShuffled Maps =16\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=16\n", "\t\tGC time elapsed (ms)=2891\n", "\t\tCPU time spent (ms)=16690\n", "\t\tPhysical memory (bytes) snapshot=8780718080\n", "\t\tVirtual memory (bytes) snapshot=63781085184\n", "\t\tTotal committed heap usage (bytes)=12934709248\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=19964275\n", "\tFile Output Format Counters \n", "\t\tBytes Written=172\n", "18/12/07 16:39:48 INFO streaming.StreamJob: Output directory: ./output\n", "matrix([[ 0.99901524, 1.99843097, 2.99992025, 4.00048095,\t\n", " 4.99937017, 5.99963222, 7.00036961, 7.99921836,\t\n", " 8.99933589, 10.00058861]])\t\n" ] } ], "source": [ "sh run_lr.sh" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Lab\n", "\n", "- Use `airline_small.csv` as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html\n", "\n", "- Extract useful information from the data\n", "\n", " - List all airport codes, with frequency\n", " - Make a new binary variable (Y) to indicate if a trip is delayed or not.\n", " \n", "- Make dummy transformation for variables such as `DayofWeek`, `Month`...\n", "\n", "- Finally, save your output in a file.\n", "\n", " - Each row contains the binary variable (Y), `CarrierDelay`, and your constructed dummy variables as predictors.\n", " - If possible, save the output in a [`libsvm` sparse format](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.dump_svmlight_file.html#sklearn.datasets.dump_svmlight_file) to save space.\n", " \n", "\n", "- **Hint**\n", "\n", " - You could use any language but Python3.7 is preferable.\n", " - Try your code with pipe first and then Hadoop\n", " - Record the computing time." ] } ], "metadata": { "celltoolbar": "Slideshow", "kernelspec": { "display_name": "Bash", "language": "bash", "name": "bash" }, "language_info": { "codemirror_mode": "shell", "file_extension": ".sh", "mimetype": "text/x-sh", "name": "bash" } }, "nbformat": 4, "nbformat_minor": 2 }