Statistics with Hadoop Streaming¶

Feng Li¶

Central University of Finance and Economics¶

feng.li@cufe.edu.cn¶

Course home page: https://feng.li/distcomp¶

A simple line count program¶

In [1]:
cat line_count.py
#! /usr/bin/env python3

import sys
count = 0
# data = []
for line in sys.stdin: # read input from stdin
    count += 1
    # data.append(line)    
print(count) # print goes to sys.stdout
In [44]:
cat license.txt | python3 line_count.py
29

We could write the long Hadoop command into an .sh file, say run_line_count.sh¶

In [45]:
cat run_line_count.sh
#! /usr/bin/sh

TASKNAME=line_count

hadoop fs -rm -r ./output/
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
    -jobconf mapred.job.name=$TASKNAME \
    -input /user/lifeng/data/license.txt \
    -output ./output  \
    -file "line_count.py" \
    -mapper "/usr/bin/cat" \
    -reducer "python3 line_count.py" \
    -numReduceTasks 1 
In [18]:
sh run_line_count.sh
18/12/06 23:46:54 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/06 23:46:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/06 23:46:56 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/12/06 23:46:56 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [line_count.py, /tmp/hadoop-unjar6165594769339738850/] [] /tmp/streamjob1645783056874142978.jar tmpDir=null
18/12/06 23:46:56 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/06 23:46:57 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/06 23:46:57 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/06 23:46:57 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/06 23:46:57 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/12/06 23:46:57 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
18/12/06 23:46:57 INFO mapred.FileInputFormat: Total input paths to process : 1
18/12/06 23:46:57 INFO mapreduce.JobSubmitter: number of splits:16
18/12/06 23:46:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0075
18/12/06 23:46:57 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0075
18/12/06 23:46:57 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0075/
18/12/06 23:46:57 INFO mapreduce.Job: Running job: job_1542711134746_0075
18/12/06 23:47:02 INFO mapreduce.Job: Job job_1542711134746_0075 running in uber mode : false
18/12/06 23:47:02 INFO mapreduce.Job:  map 0% reduce 0%
18/12/06 23:47:08 INFO mapreduce.Job:  map 100% reduce 0%
18/12/06 23:47:13 INFO mapreduce.Job:  map 100% reduce 100%
18/12/06 23:47:13 INFO mapreduce.Job: Job job_1542711134746_0075 completed successfully
18/12/06 23:47:14 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=895
		FILE: Number of bytes written=2258315
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=14800
		HDFS: Number of bytes written=4
		HDFS: Number of read operations=51
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=16
		Launched reduce tasks=1
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=3969756
		Total time spent by all reduces in occupied slots (ms)=229905
		Total time spent by all map tasks (ms)=67284
		Total time spent by all reduce tasks (ms)=1965
		Total vcore-milliseconds taken by all map tasks=67284
		Total vcore-milliseconds taken by all reduce tasks=1965
		Total megabyte-milliseconds taken by all map tasks=125955648
		Total megabyte-milliseconds taken by all reduce tasks=7356960
	Map-Reduce Framework
		Map input records=29
		Map output records=29
		Map output bytes=1540
		Map output materialized bytes=1535
		Input split bytes=1904
		Combine input records=0
		Combine output records=0
		Reduce input groups=25
		Reduce shuffle bytes=1535
		Reduce input records=29
		Reduce output records=1
		Spilled Records=58
		Shuffled Maps =16
		Failed Shuffles=0
		Merged Map outputs=16
		GC time elapsed (ms)=3148
		CPU time spent (ms)=15270
		Physical memory (bytes) snapshot=8800583680
		Virtual memory (bytes) snapshot=63775653888
		Total committed heap usage (bytes)=12616990720
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=12896
	File Output Format Counters 
		Bytes Written=4
18/12/06 23:47:14 INFO streaming.StreamJob: Output directory: ./output

Simple statistics with MapReduce¶

In [19]:
cat stocks.txt | python3 stocks_mapper.py | python3 stocks_reducer.py
AAPL	10
CSCO	10
GOOG	5
MSFT	10
YHOO	10
In [47]:
cat stocks.txt | Rscript stock_day_avg.R
AAPL	2009-01-02	88.315	
AAPL	2008-01-02	197.055	
AAPL	2007-01-03	85.045	
AAPL	2006-01-03	73.565	
AAPL	2005-01-03	64.035	
AAPL	2004-01-02	21.415	
AAPL	2003-01-02	14.58	
AAPL	2002-01-02	22.675	
AAPL	2001-01-02	14.88	
AAPL	2000-01-03	108.405	
CSCO	2009-01-02	16.685	
CSCO	2008-01-02	26.77	
CSCO	2007-01-03	27.595	
CSCO	2006-01-03	17.33	
CSCO	2005-01-03	19.37	
CSCO	2004-01-02	24.305	
CSCO	2003-01-02	13.375	
CSCO	2002-01-02	18.835	
CSCO	2001-01-02	35.72	
CSCO	2000-01-03	109	
GOOG	2009-01-02	314.96	
GOOG	2008-01-02	689.03	
GOOG	2007-01-03	466.795	
GOOG	2006-01-03	428.875	
GOOG	2005-01-03	200.055	
MSFT	2009-01-02	19.93	
MSFT	2008-01-02	35.505	
MSFT	2007-01-03	29.885	
MSFT	2006-01-03	26.545	
MSFT	2005-01-03	26.77	
MSFT	2004-01-02	27.515	
MSFT	2003-01-02	53.01	
MSFT	2002-01-02	66.845	
MSFT	2001-01-02	43.755	
MSFT	2000-01-03	116.965	
YHOO	2009-01-02	12.51	
YHOO	2008-01-02	23.76	
YHOO	2007-01-03	25.73	
YHOO	2006-01-03	40.3	
YHOO	2005-01-03	38.27	
YHOO	2004-01-02	45.45	
YHOO	2003-01-02	17.095	
YHOO	2002-01-02	18.385	
YHOO	2001-01-02	29.25	
YHOO	2000-01-03	458.96	

R version¶

In [48]:
sh run_stocks_mean.sh
18/12/07 15:41:48 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/07 15:41:49 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/07 15:41:50 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/12/07 15:41:50 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [stock_day_avg.R, /tmp/hadoop-unjar8447436429789349154/] [] /tmp/streamjob5401932799782783867.jar tmpDir=null
18/12/07 15:41:50 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 15:41:51 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 15:41:51 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 15:41:51 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 15:41:51 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/12/07 15:41:51 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
18/12/07 15:41:51 INFO mapred.FileInputFormat: Total input paths to process : 1
18/12/07 15:41:51 INFO mapreduce.JobSubmitter: number of splits:16
18/12/07 15:41:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0119
18/12/07 15:41:51 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0119
18/12/07 15:41:51 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0119/
18/12/07 15:41:51 INFO mapreduce.Job: Running job: job_1542711134746_0119
18/12/07 15:41:56 INFO mapreduce.Job: Job job_1542711134746_0119 running in uber mode : false
18/12/07 15:41:56 INFO mapreduce.Job:  map 0% reduce 0%
18/12/07 15:42:03 INFO mapreduce.Job:  map 100% reduce 0%
18/12/07 15:42:08 INFO mapreduce.Job:  map 100% reduce 100%
18/12/07 15:42:08 INFO mapreduce.Job: Job job_1542711134746_0119 completed successfully
18/12/07 15:42:08 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=1068
		FILE: Number of bytes written=2258875
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=23344
		HDFS: Number of bytes written=1066
		HDFS: Number of read operations=51
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=16
		Launched reduce tasks=1
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=3672278
		Total time spent by all reduces in occupied slots (ms)=243009
		Total time spent by all map tasks (ms)=62242
		Total time spent by all reduce tasks (ms)=2077
		Total vcore-milliseconds taken by all map tasks=62242
		Total vcore-milliseconds taken by all reduce tasks=2077
		Total megabyte-milliseconds taken by all map tasks=116517024
		Total megabyte-milliseconds taken by all reduce tasks=7776288
	Map-Reduce Framework
		Map input records=45
		Map output records=45
		Map output bytes=2557
		Map output materialized bytes=1752
		Input split bytes=1888
		Combine input records=0
		Combine output records=0
		Reduce input groups=45
		Reduce shuffle bytes=1752
		Reduce input records=45
		Reduce output records=45
		Spilled Records=90
		Shuffled Maps =16
		Failed Shuffles=0
		Merged Map outputs=16
		GC time elapsed (ms)=2874
		CPU time spent (ms)=14450
		Physical memory (bytes) snapshot=8774807552
		Virtual memory (bytes) snapshot=63769485312
		Total committed heap usage (bytes)=12871794688
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=21456
	File Output Format Counters 
		Bytes Written=1066
18/12/07 15:42:08 INFO streaming.StreamJob: Output directory: ./output
AAPL	2000-01-03	108.405	
AAPL	2001-01-02	14.88	
AAPL	2002-01-02	22.675	
AAPL	2003-01-02	14.58	
AAPL	2004-01-02	21.415	
AAPL	2005-01-03	64.035	
AAPL	2006-01-03	73.565	
AAPL	2007-01-03	85.045	
AAPL	2008-01-02	197.055	
AAPL	2009-01-02	88.315	
CSCO	2000-01-03	109	
CSCO	2001-01-02	35.72	
CSCO	2002-01-02	18.835	
CSCO	2003-01-02	13.375	
CSCO	2004-01-02	24.305	
CSCO	2005-01-03	19.37	
CSCO	2006-01-03	17.33	
CSCO	2007-01-03	27.595	
CSCO	2008-01-02	26.77	
CSCO	2009-01-02	16.685	
GOOG	2005-01-03	200.055	
GOOG	2006-01-03	428.875	
GOOG	2007-01-03	466.795	
GOOG	2008-01-02	689.03	
GOOG	2009-01-02	314.96	
MSFT	2000-01-03	116.965	
MSFT	2001-01-02	43.755	
MSFT	2002-01-02	66.845	
MSFT	2003-01-02	53.01	
MSFT	2004-01-02	27.515	
MSFT	2005-01-03	26.77	
MSFT	2006-01-03	26.545	
MSFT	2007-01-03	29.885	
MSFT	2008-01-02	35.505	
MSFT	2009-01-02	19.93	
YHOO	2000-01-03	458.96	
YHOO	2001-01-02	29.25	
YHOO	2002-01-02	18.385	
YHOO	2003-01-02	17.095	
YHOO	2004-01-02	45.45	
YHOO	2005-01-03	38.27	
YHOO	2006-01-03	40.3	
YHOO	2007-01-03	25.73	
YHOO	2008-01-02	23.76	
YHOO	2009-01-02	12.51	

Python version¶

In [22]:
cat stocks_mapper.py
#! /usr/bin/env python2

import sys

for line in sys.stdin:
    part = line.split(',')    
    print (part[0], 1)
    
In [23]:
cat stocks_reducer.py
#! /usr/bin/env python3

import sys
from operator import itemgetter 
wordcount = {}

for line in sys.stdin:
    word,count = line.split(' ')
    count = int(count)
    wordcount[word] = wordcount.get(word,0) + count

sorted_wordcount = sorted(wordcount.items(), key=itemgetter(0))

for word, count in sorted_wordcount:
    print ('%s\t%s'% (word,count))
In [46]:
sh run_stocks.sh
18/12/07 15:22:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/07 15:22:40 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/07 15:22:41 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/12/07 15:22:41 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [stocks_mapper.py, stocks_reducer.py, /tmp/hadoop-unjar6949997625926419144/] [] /tmp/streamjob8221192035336864415.jar tmpDir=null
18/12/07 15:22:41 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 15:22:42 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 15:22:42 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 15:22:42 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 15:22:42 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/12/07 15:22:42 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
18/12/07 15:22:42 INFO mapred.FileInputFormat: Total input paths to process : 1
18/12/07 15:22:42 INFO mapreduce.JobSubmitter: number of splits:16
18/12/07 15:22:42 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0103
18/12/07 15:22:42 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0103
18/12/07 15:22:42 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0103/
18/12/07 15:22:42 INFO mapreduce.Job: Running job: job_1542711134746_0103
18/12/07 15:22:48 INFO mapreduce.Job: Job job_1542711134746_0103 running in uber mode : false
18/12/07 15:22:48 INFO mapreduce.Job:  map 0% reduce 0%
18/12/07 15:22:54 INFO mapreduce.Job:  map 100% reduce 0%
18/12/07 15:22:59 INFO mapreduce.Job:  map 100% reduce 67%
18/12/07 15:23:00 INFO mapreduce.Job:  map 100% reduce 100%
18/12/07 15:23:00 INFO mapreduce.Job: Job job_1542711134746_0103 completed successfully
18/12/07 15:23:00 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=117
		FILE: Number of bytes written=2529253
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=23344
		HDFS: Number of bytes written=39
		HDFS: Number of read operations=57
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=6
	Job Counters 
		Killed reduce tasks=1
		Launched map tasks=16
		Launched reduce tasks=3
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=3827094
		Total time spent by all reduces in occupied slots (ms)=711711
		Total time spent by all map tasks (ms)=64866
		Total time spent by all reduce tasks (ms)=6083
		Total vcore-milliseconds taken by all map tasks=64866
		Total vcore-milliseconds taken by all reduce tasks=6083
		Total megabyte-milliseconds taken by all map tasks=121429152
		Total megabyte-milliseconds taken by all reduce tasks=22774752
	Map-Reduce Framework
		Map input records=45
		Map output records=45
		Map output bytes=360
		Map output materialized bytes=900
		Input split bytes=1888
		Combine input records=0
		Combine output records=0
		Reduce input groups=5
		Reduce shuffle bytes=900
		Reduce input records=45
		Reduce output records=5
		Spilled Records=90
		Shuffled Maps =48
		Failed Shuffles=0
		Merged Map outputs=48
		GC time elapsed (ms)=3061
		CPU time spent (ms)=15520
		Physical memory (bytes) snapshot=9426890752
		Virtual memory (bytes) snapshot=74381381632
		Total committed heap usage (bytes)=14251720704
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=21456
	File Output Format Counters 
		Bytes Written=39
18/12/07 15:23:00 INFO streaming.StreamJob: Output directory: ./output
AAPL	10
YHOO	10
CSCO	10
MSFT	10
GOOG	5

Linear Regression with Hadoop¶

Let's first generate some data¶

In [37]:
cat simulation.R
#! /usr/bin/Rscript

n = 100000
p = 10
x = matrix(rnorm(n*p), n, p)
e = rnorm(n)
beta = c(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) #生成beta系数分别为1~10
y = x%*%beta+0.3*e
mydata = cbind(x, y)
dim(mydata)
write.table(mydata, "linear_random.csv", sep = "," , row.names = FALSE,  col.names = FALSE)
colnames(mydata) = c("x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "y")
mydata = data.frame(mydata)
myfit  =  lm(y~x1+x2+x3+x4+x5+x6+x7+x8+x9+x10, mydata)
myfit$coefficients
In [38]:
Rscript simulation.R
[1] 100000     11
 (Intercept)           x1           x2           x3           x4           x5 
 0.001462286  0.999006118  1.998435334  2.999921614  4.000487524  4.999367984 
          x6           x7           x8           x9          x10 
 5.999635871  7.000364838  7.999219909  8.999339902 10.000583860 
In [39]:
head linear_random.csv
-1.25783985738839,0.233853066199809,0.959321896002629,-0.927971998903392,-1.9081222471712,-1.15679780312436,-0.98285146450708,-0.833097463331552,0.305515852568209,-1.79625721854489,-47.3338205169692
-0.620172983975508,1.73728982281345,-0.829302997285467,0.354197934959032,0.85902952682553,0.489616646142072,0.0563573499545251,-0.107110454558573,0.189569420283467,-0.500661238817971,5.24542404493916
1.40735729353374,1.25565108871813,0.988131408681291,0.857230185753291,0.63206801604377,2.16416156314474,-0.644790848920365,-0.51230132081169,1.94601333805292,-0.0724748987249774,34.2115745174301
-0.0423011414422383,1.60278591223843,0.670382006567984,0.133106694554098,0.351700062468421,0.129326484274798,-0.464781051242584,0.764922912370213,-0.419752110275864,1.07624550982786,17.5337240754468
0.662088621989811,-0.554459357346132,-0.83952037843494,1.43008790985413,1.40697432960699,-0.17754879747715,0.0269668971687026,-1.10737416485557,-1.14655494403846,-0.825552889554892,-18.7422354539862
-0.605091503904831,1.08722575798844,1.12151767015535,-0.467934606210984,-1.1702020092535,-0.443650557748794,-0.802428089144584,-0.703702959432261,0.765019193846429,0.950967476748649,-0.369219523421802
1.45255921490658,0.596968957864977,-1.38110209866925,-1.22015325783075,1.3321747339368,-0.618288146956072,-0.382441706582264,0.207848059847487,0.889489099391418,-1.38471032943358,-10.751906812882
-0.0794402496054108,-0.0501255662315621,-0.0350709967052265,-1.00325836595628,-1.63010974782501,-0.00656109346693426,-0.710238219439852,1.55369135240367,-0.655536261944941,0.27165497553634,-8.87422553928433
1.82499632174219,-1.37138612590107,0.66600446984248,2.27965171364776,0.577535835916579,3.0670970849713,0.475444508651725,0.252335257217842,0.394343365860688,1.26731818217592,52.5594762829493
0.569523756632114,0.0236995546981009,0.185252906422781,0.955783376629012,0.114066408572127,-0.906101655277262,-0.87515532316799,1.24224011041282,-1.95110207141076,-0.172283404356755,-15.4308746142398

Try with Linux pipes first¶

In [49]:
cat linear_random.csv | python lr_mapper.py | python lr_reducer.py
matrix([[  0.99901524,   1.99843097,   2.99992025,   4.00048095,
           4.99937017,   5.99963222,   7.00036961,   7.99921836,
           8.99933589,  10.00058861]])

Run the regression model within Hadoop¶

In [50]:
sh run_lr.sh
18/12/07 16:39:28 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/07 16:39:29 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [lr_mapper.py, lr_reducer.py, /tmp/hadoop-unjar4917789017892634208/] [] /tmp/streamjob4579007115660462517.jar tmpDir=null
18/12/07 16:39:30 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 16:39:30 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 16:39:31 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 16:39:31 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 16:39:31 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
18/12/07 16:39:31 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
18/12/07 16:39:31 INFO mapred.FileInputFormat: Total input paths to process : 1
18/12/07 16:39:31 INFO mapreduce.JobSubmitter: number of splits:16
18/12/07 16:39:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542711134746_0136
18/12/07 16:39:31 INFO impl.YarnClientImpl: Submitted application application_1542711134746_0136
18/12/07 16:39:31 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-41697:20888/proxy/application_1542711134746_0136/
18/12/07 16:39:31 INFO mapreduce.Job: Running job: job_1542711134746_0136
18/12/07 16:39:36 INFO mapreduce.Job: Job job_1542711134746_0136 running in uber mode : false
18/12/07 16:39:36 INFO mapreduce.Job:  map 0% reduce 0%
18/12/07 16:39:43 INFO mapreduce.Job:  map 100% reduce 0%
18/12/07 16:39:47 INFO mapreduce.Job:  map 100% reduce 100%
18/12/07 16:39:47 INFO mapreduce.Job: Job job_1542711134746_0136 completed successfully
18/12/07 16:39:48 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=12815
		FILE: Number of bytes written=2286899
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=19966275
		HDFS: Number of bytes written=172
		HDFS: Number of read operations=51
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Killed map tasks=1
		Launched map tasks=16
		Launched reduce tasks=1
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=4250124
		Total time spent by all reduces in occupied slots (ms)=227331
		Total time spent by all map tasks (ms)=72036
		Total time spent by all reduce tasks (ms)=1943
		Total vcore-milliseconds taken by all map tasks=72036
		Total vcore-milliseconds taken by all reduce tasks=1943
		Total megabyte-milliseconds taken by all map tasks=134851392
		Total megabyte-milliseconds taken by all reduce tasks=7274592
	Map-Reduce Framework
		Map input records=100000
		Map output records=16
		Map output bytes=35143
		Map output materialized bytes=13082
		Input split bytes=2000
		Combine input records=0
		Combine output records=0
		Reduce input groups=16
		Reduce shuffle bytes=13082
		Reduce input records=16
		Reduce output records=3
		Spilled Records=32
		Shuffled Maps =16
		Failed Shuffles=0
		Merged Map outputs=16
		GC time elapsed (ms)=2891
		CPU time spent (ms)=16690
		Physical memory (bytes) snapshot=8780718080
		Virtual memory (bytes) snapshot=63781085184
		Total committed heap usage (bytes)=12934709248
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=19964275
	File Output Format Counters 
		Bytes Written=172
18/12/07 16:39:48 INFO streaming.StreamJob: Output directory: ./output
matrix([[  0.99901524,   1.99843097,   2.99992025,   4.00048095,	
           4.99937017,   5.99963222,   7.00036961,   7.99921836,	
           8.99933589,  10.00058861]])	

Lab¶

  • Use airline_small.csv as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html

  • Extract useful information from the data

    • List all airport codes, with frequency
    • Make a new binary variable (Y) to indicate if a trip is delayed or not.
  • Make dummy transformation for variables such as DayofWeek, Month...

  • Finally, save your output in a file.

    • Each row contains the binary variable (Y), CarrierDelay, and your constructed dummy variables as predictors.
    • If possible, save the output in a libsvm sparse format to save space.
  • Hint

    • You could use any language but Python3.7 is preferable.
    • Try your code with pipe first and then Hadoop
    • Record the computing time.