# Statistics with Hadoop Streaming

## Feng Li

### Central University of Finance and Economics

### [feng.li@cufe.edu.cn](feng.li@cufe.edu.cn)
### Course home page: [https://feng.li/distcomp](https://feng.li/distcomp)

## A simple line count program

In [1]:
cat line_count.py

#! /usr/bin/env python3

import sys
count = 0
# data = []
for line in sys.stdin: # read input from stdin
 count += 1
 # data.append(line) 
print(count) # print goes to sys.stdout


In [44]:
cat license.txt | python3 line_count.py

29


### We could write the **long** Hadoop command into an `.sh` file, say `run_line_count.sh`

In [45]:
cat run_line_count.sh

#! /usr/bin/sh

TASKNAME=line_count

hadoop fs -rm -r ./output/
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
 -jobconf mapred.job.name=$TASKNAME \
 -input /user/lifeng/data/license.txt \
 -output ./output \
 -file "line_count.py" \
 -mapper "/usr/bin/cat" \
 -reducer "python3 line_count.py" \
 -numReduceTasks 1 


In [18]:
sh run_line_count.sh

18/12/06 23:46:54 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/06 23:46:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/06 23:46:56 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/12/06 23:46:56 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [line_count.py, /tmp/hadoop-unjar6165594769339738850/] [] /tmp/streamjob1645783056874142978.jar tmpDir=null
18/12/06 23:46:56 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/06 23:46:57 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/06 23:46:57 

### Simple statistics with MapReduce

In [19]:
cat stocks.txt | python3 stocks_mapper.py | python3 stocks_reducer.py

AAPL	10
CSCO	10
GOOG	5
MSFT	10
YHOO	10


In [47]:
cat stocks.txt | Rscript stock_day_avg.R

AAPL	2009-01-02	88.315	
AAPL	2008-01-02	197.055	
AAPL	2007-01-03	85.045	
AAPL	2006-01-03	73.565	
AAPL	2005-01-03	64.035	
AAPL	2004-01-02	21.415	
AAPL	2003-01-02	14.58	
AAPL	2002-01-02	22.675	
AAPL	2001-01-02	14.88	
AAPL	2000-01-03	108.405	
CSCO	2009-01-02	16.685	
CSCO	2008-01-02	26.77	
CSCO	2007-01-03	27.595	
CSCO	2006-01-03	17.33	
CSCO	2005-01-03	19.37	
CSCO	2004-01-02	24.305	
CSCO	2003-01-02	13.375	
CSCO	2002-01-02	18.835	
CSCO	2001-01-02	35.72	
CSCO	2000-01-03	109	
GOOG	2009-01-02	314.96	
GOOG	2008-01-02	689.03	
GOOG	2007-01-03	466.795	
GOOG	2006-01-03	428.875	
GOOG	2005-01-03	200.055	
MSFT	2009-01-02	19.93	
MSFT	2008-01-02	35.505	
MSFT	2007-01-03	29.885	
MSFT	2006-01-03	26.545	
MSFT	2005-01-03	26.77	
MSFT	2004-01-02	27.515	
MSFT	2003-01-02	53.01	
MSFT	2002-01-02	66.845	
MSFT	2001-01-02	43.755	
MSFT	2000-01-03	116.965	
YHOO	2009-01-02	12.51	
YHOO	2008-01-02	23.76	
YHOO	2007-01-03	25.73	
YHOO	2006-01-03	40.3	
YHOO	2005-01-03	38.27	
YHOO	2004-01-02	45.45	
YHOO	2003-01-02	17.095	
YHOO	

### R version

In [48]:
sh run_stocks_mean.sh

18/12/07 15:41:48 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/07 15:41:49 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/07 15:41:50 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/12/07 15:41:50 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [stock_day_avg.R, /tmp/hadoop-unjar8447436429789349154/] [] /tmp/streamjob5401932799782783867.jar tmpDir=null
18/12/07 15:41:50 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 15:41:51 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 15:41:5

### Python version

In [22]:
cat stocks_mapper.py

#! /usr/bin/env python2

import sys

for line in sys.stdin:
 part = line.split(',') 
 print (part[0], 1)
 


In [23]:
cat stocks_reducer.py

#! /usr/bin/env python3

import sys
from operator import itemgetter 
wordcount = {}

for line in sys.stdin:
 word,count = line.split(' ')
 count = int(count)
 wordcount[word] = wordcount.get(word,0) + count

sorted_wordcount = sorted(wordcount.items(), key=itemgetter(0))

for word, count in sorted_wordcount:
 print ('%s\t%s'% (word,count))


In [46]:
sh run_stocks.sh

18/12/07 15:22:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/07 15:22:40 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/07 15:22:41 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
18/12/07 15:22:41 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
packageJobJar: [stocks_mapper.py, stocks_reducer.py, /tmp/hadoop-unjar6949997625926419144/] [] /tmp/streamjob8221192035336864415.jar tmpDir=null
18/12/07 15:22:41 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 15:22:42 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8

## Linear Regression with Hadoop

### Let's first generate some data

In [37]:
cat simulation.R

#! /usr/bin/Rscript

n = 100000
p = 10
x = matrix(rnorm(n*p), n, p)
e = rnorm(n)
beta = c(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) #生成beta系数分别为1~10
y = x%*%beta+0.3*e
mydata = cbind(x, y)
dim(mydata)
write.table(mydata, "linear_random.csv", sep = "," , row.names = FALSE, col.names = FALSE)
colnames(mydata) = c("x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8", "x9", "x10", "y")
mydata = data.frame(mydata)
myfit = lm(y~x1+x2+x3+x4+x5+x6+x7+x8+x9+x10, mydata)
myfit$coefficients


In [38]:
Rscript simulation.R

[1] 100000 11
 (Intercept) x1 x2 x3 x4 x5 
 0.001462286 0.999006118 1.998435334 2.999921614 4.000487524 4.999367984 
 x6 x7 x8 x9 x10 
 5.999635871 7.000364838 7.999219909 8.999339902 10.000583860 


In [39]:
head linear_random.csv

-1.25783985738839,0.233853066199809,0.959321896002629,-0.927971998903392,-1.9081222471712,-1.15679780312436,-0.98285146450708,-0.833097463331552,0.305515852568209,-1.79625721854489,-47.3338205169692
-0.620172983975508,1.73728982281345,-0.829302997285467,0.354197934959032,0.85902952682553,0.489616646142072,0.0563573499545251,-0.107110454558573,0.189569420283467,-0.500661238817971,5.24542404493916
1.40735729353374,1.25565108871813,0.988131408681291,0.857230185753291,0.63206801604377,2.16416156314474,-0.644790848920365,-0.51230132081169,1.94601333805292,-0.0724748987249774,34.2115745174301
-0.0423011414422383,1.60278591223843,0.670382006567984,0.133106694554098,0.351700062468421,0.129326484274798,-0.464781051242584,0.764922912370213,-0.419752110275864,1.07624550982786,17.5337240754468
0.662088621989811,-0.554459357346132,-0.83952037843494,1.43008790985413,1.40697432960699,-0.17754879747715,0.0269668971687026,-1.10737416485557,-1.14655494403846,-0.825552889554892,-18.7422354539862
-0.60509

### Try with Linux pipes first

In [49]:
cat linear_random.csv | python lr_mapper.py | python lr_reducer.py

matrix([[ 0.99901524, 1.99843097, 2.99992025, 4.00048095,
 4.99937017, 5.99963222, 7.00036961, 7.99921836,
 8.99933589, 10.00058861]])


### Run the regression model within Hadoop

In [50]:
sh run_lr.sh

18/12/07 16:39:28 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-41697:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-41697:9000/user/lifeng/.Trash/Current
18/12/07 16:39:29 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [lr_mapper.py, lr_reducer.py, /tmp/hadoop-unjar4917789017892634208/] [] /tmp/streamjob4579007115660462517.jar tmpDir=null
18/12/07 16:39:30 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 16:39:30 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-41697/192.168.0.219:8032
18/12/07 16:39:31 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-41697:8188/ws/v1/timeline/
18/12/07 16:39:31 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.c

## Lab

- Use `airline_small.csv` as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html

- Extract useful information from the data

 - List all airport codes, with frequency
 - Make a new binary variable (Y) to indicate if a trip is delayed or not.
 
- Make dummy transformation for variables such as `DayofWeek`, `Month`...

- Finally, save your output in a file.

 - Each row contains the binary variable (Y), `CarrierDelay`, and your constructed dummy variables as predictors.
 - If possible, save the output in a [`libsvm` sparse format](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.dump_svmlight_file.html#sklearn.datasets.dump_svmlight_file) to save space.
 

- **Hint**

 - You could use any language but Python3.7 is preferable.
 - Try your code with pipe first and then Hadoop
 - Record the computing time.