MapReduce with Hadoop Streaming¶

Feng Li¶

Central University of Finance and Economics¶

feng.li@cufe.edu.cn¶

Course home page: https://feng.li/distcomp¶

Hadoop Streaming, stdin and stdout¶

  • Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java

  • Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any combination of languages that can read standard input and write to standard output to write your MapReduce program.

    • You could use different language in mapper and reduce functions.

    • It suits for text processing (e.g. read every line from a big CSV file).

    • It can also handle binary streams (e.g. read image as input).

Your first Hadoop MapReduce program¶

In [9]:
which cat
/bin/cat
In [10]:
which wc
/bin/wc
In [12]:
hadoop fs -rm -r /user/lifeng/output

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
       -input /user/lifeng/asm-LICENSE  \
       -output /user/lifeng/output \
       -mapper "/usr/bin/cat"      \
       -reducer "/usr/bin/wc"
rm: `/user/lifeng/output': No such file or directory
packageJobJar: [/tmp/hadoop-unjar7580110097437608263/] [] /tmp/streamjob1753312041067415791.jar tmpDir=null
20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/
20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032
20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/
20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032
20/02/26 21:41:53 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
20/02/26 21:41:53 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
20/02/26 21:41:53 INFO mapred.FileInputFormat: Total input paths to process : 1
20/02/26 21:41:53 INFO mapreduce.JobSubmitter: number of splits:16
20/02/26 21:41:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0005
20/02/26 21:41:53 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0005
20/02/26 21:41:53 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0005/
20/02/26 21:41:53 INFO mapreduce.Job: Running job: job_1579055927679_0005
20/02/26 21:41:58 INFO mapreduce.Job: Job job_1579055927679_0005 running in uber mode : false
20/02/26 21:41:58 INFO mapreduce.Job:  map 0% reduce 0%
20/02/26 21:42:06 INFO mapreduce.Job:  map 100% reduce 0%
20/02/26 21:42:11 INFO mapreduce.Job:  map 100% reduce 29%
20/02/26 21:42:12 INFO mapreduce.Job:  map 100% reduce 57%
20/02/26 21:42:13 INFO mapreduce.Job:  map 100% reduce 86%
20/02/26 21:42:14 INFO mapreduce.Job:  map 100% reduce 100%
20/02/26 21:42:14 INFO mapreduce.Job: Job job_1579055927679_0005 completed successfully
20/02/26 21:42:14 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=1367
		FILE: Number of bytes written=3035077
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=14720
		HDFS: Number of bytes written=175
		HDFS: Number of read operations=69
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=14
	Job Counters 
		Launched map tasks=16
		Launched reduce tasks=7
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=4947858
		Total time spent by all reduces in occupied slots (ms)=1907919
		Total time spent by all map tasks (ms)=83862
		Total time spent by all reduce tasks (ms)=16307
		Total vcore-milliseconds taken by all map tasks=83862
		Total vcore-milliseconds taken by all reduce tasks=16307
		Total megabyte-milliseconds taken by all map tasks=156989664
		Total megabyte-milliseconds taken by all reduce tasks=61053408
	Map-Reduce Framework
		Map input records=29
		Map output records=29
		Map output bytes=1540
		Map output materialized bytes=2961
		Input split bytes=1824
		Combine input records=0
		Combine output records=0
		Reduce input groups=25
		Reduce shuffle bytes=2961
		Reduce input records=29
		Reduce output records=7
		Spilled Records=58
		Shuffled Maps =112
		Failed Shuffles=0
		Merged Map outputs=112
		GC time elapsed (ms)=3117
		CPU time spent (ms)=33410
		Physical memory (bytes) snapshot=10298241024
		Virtual memory (bytes) snapshot=92690731008
		Total committed heap usage (bytes)=16765157376
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=12896
	File Output Format Counters 
		Bytes Written=175
20/02/26 21:42:14 INFO streaming.StreamJob: Output directory: /user/lifeng/output
In [13]:
hadoop fs -ls /user/lifeng/output
Found 8 items
-rw-r-----   2 lifeng hadoop          0 2020-02-26 21:42 /user/lifeng/output/_SUCCESS
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00000
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00001
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00002
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00003
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00004
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00005
-rw-r-----   2 lifeng hadoop         25 2020-02-26 21:42 /user/lifeng/output/part-00006
In [15]:
hadoop fs -cat /user/lifeng/output/*
      6      63     375	
      7      24     159	
      5      39     290	
      4      39     279	
      3      21     150	
      2      20     147	
      2      19     140	
In [16]:
hadoop fs -rm -r /user/lifeng/output

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar -input /user/lifeng/asm-LICENSE  -output /user/lifeng/output -mapper "/usr/bin/cat" -reducer "/usr/bin/wc" -numReduceTasks 1
20/02/26 21:43:21 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-46968:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-46968:9000/user/lifeng/.Trash/Current
packageJobJar: [/tmp/hadoop-unjar6344502777978736495/] [] /tmp/streamjob5520248859546438609.jar tmpDir=null
20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/
20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032
20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/
20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032
20/02/26 21:43:24 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
20/02/26 21:43:24 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
20/02/26 21:43:24 INFO mapred.FileInputFormat: Total input paths to process : 1
20/02/26 21:43:24 INFO mapreduce.JobSubmitter: number of splits:16
20/02/26 21:43:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0006
20/02/26 21:43:25 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0006
20/02/26 21:43:25 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0006/
20/02/26 21:43:25 INFO mapreduce.Job: Running job: job_1579055927679_0006
20/02/26 21:43:31 INFO mapreduce.Job: Job job_1579055927679_0006 running in uber mode : false
20/02/26 21:43:31 INFO mapreduce.Job:  map 0% reduce 0%
20/02/26 21:43:37 INFO mapreduce.Job:  map 13% reduce 0%
20/02/26 21:43:38 INFO mapreduce.Job:  map 100% reduce 0%
20/02/26 21:43:42 INFO mapreduce.Job:  map 100% reduce 100%
20/02/26 21:43:42 INFO mapreduce.Job: Job job_1579055927679_0006 completed successfully
20/02/26 21:43:42 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=895
		FILE: Number of bytes written=2241281
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=14720
		HDFS: Number of bytes written=25
		HDFS: Number of read operations=51
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Killed map tasks=1
		Launched map tasks=16
		Launched reduce tasks=1
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=4388774
		Total time spent by all reduces in occupied slots (ms)=295191
		Total time spent by all map tasks (ms)=74386
		Total time spent by all reduce tasks (ms)=2523
		Total vcore-milliseconds taken by all map tasks=74386
		Total vcore-milliseconds taken by all reduce tasks=2523
		Total megabyte-milliseconds taken by all map tasks=139250592
		Total megabyte-milliseconds taken by all reduce tasks=9446112
	Map-Reduce Framework
		Map input records=29
		Map output records=29
		Map output bytes=1540
		Map output materialized bytes=1535
		Input split bytes=1824
		Combine input records=0
		Combine output records=0
		Reduce input groups=25
		Reduce shuffle bytes=1535
		Reduce input records=29
		Reduce output records=1
		Spilled Records=58
		Shuffled Maps =16
		Failed Shuffles=0
		Merged Map outputs=16
		GC time elapsed (ms)=2576
		CPU time spent (ms)=27920
		Physical memory (bytes) snapshot=8505241600
		Virtual memory (bytes) snapshot=61655015424
		Total committed heap usage (bytes)=12341739520
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=12896
	File Output Format Counters 
		Bytes Written=25
20/02/26 21:43:42 INFO streaming.StreamJob: Output directory: /user/lifeng/output
     29     225    1540	
In [17]:
hadoop fs -cat /user/lifeng/output/*
     29     225    1540	

Best Practice with Hadoop Streaming¶

  • Write you Hadoop commnad in a Bash file instead run it directly on Linux Shell

  • Use the following command to track Hadoop MapReduce errors

      yarn logs -applicationId JOB_ID
    
In [1]:
cat examples/print-colums/main.sh
#!/bin/bash

PWD=$(cd $(dirname $0); pwd)
cd $PWD 1> /dev/null 2>&1

TASKNAME=task1_fli
# python location on hadoop
PY27='/fli/tools/python2.7.tar.gz'
# hadoop client
HADOOP_HOME=/home/users/fli/hadoop/bin/hadoop
HADOOP_INPUT_DIR1=/fli/data1/part-*
HADOOP_INPUT_DIR2=/fli/data2/part-*
HADOOP_OUTPUT_DIR=/fli/results/task1

echo $HADOOP_HOME
echo $HADOOP_INPUT_DIR
echo $HADOOP_OUTPUT_DIR

$HADOOP_HOME fs -rmr $HADOOP_OUTPUT_DIR

$HADOOP_HOME streaming \
    -jobconf mapred.job.name=$TASKNAME \
    -jobconf mapred.job.priority=NORMAL \
    -jobconf mapred.map.tasks=500 \
    -jobconf mapred.reduce.tasks=500 \
    -jobconf mapred.job.map.capacity=500 \
    -jobconf mapred.job.reduce.capacity=500 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf mapred.text.key.partitioner.options=-k1,1 \
    -jobconf stream.memory.limit=1000 \
    -file $PWD/mapper.sh \
    -output ${HADOOP_OUTPUT_DIR} \
    -input ${HADOOP_INPUT_DIR1} \
    -input ${HADOOP_INPUT_DIR2} \
    -mapper "sh mapper.sh" \
    -reducer "cat" \
    -cacheArchive ${PY27}#py27 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


if [ $? -ne 0 ]; then
    echo 'error'
    exit 1
fi
$HADOOP_HOME fs -touchz ${HADOOP_OUTPUT_DIR}/done

exit 0


MapReduce with examples¶

  • Mapper: Bash, Reducer: Python

  • Write MapReduce with Python

  • Write MapReduce with R

Other Hadoop APIs¶

  • Hadoop with Java MapReduce

    • Hadoop is written in Java. There are rich classes of Java MapReduce modules ready to use.
    • You need javac (in JDK) and hadoop-mapreduce-client-core-xxx.jar to comile your jar files.
    javac -classpath $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar -d FirstJar\
    
    jar -cvf FirstJar.jar -C FirstJar/
    
  • The Java version Hadoop syntax is

hadoop jar FirstJar.jar [mainClass] input output
  • Hadoop Pipe

    • Hadoop Pipes is the name of the C++ interface to Hadoop MapReduce.
    • Pipes uses sockets as the channel over which the tasktracker communicates with the process running the C++ map or reduce function.
    • The application links against the Hadoop C++ library, which is a thin wrapper for communicating with the tasktracker child process.
  • You have to compile and link your C++ program before send it to Hadoop.

  • The Hadoop Pipe syntax

hadoop pipes \
    -D hadoop.pipes.java.recordreader=true \
    -D hadoop.pipes.java.recordwriter=true \
    -input sample.txt \
    -output output \
    -program myCPProgram

Lab¶

  • Use airline_small.csv as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html

  • Extract useful information from the data

    • List all airport codes, with frequency
    • Make a new binary variable (Y) to indicate if a trip is delayed or not.
  • Make dummy transformation for variables such as DayofWeek, Month...

  • Finally, save your output in a file.

    • Each row contains the binary variable (Y), CarrierDelay, and your constructed dummy variables as predictors.
  • Hint

    • You could use any language but Python3 is preferable.
    • Try your code with Linux pipe first and then Hadoop
    • Record the computing time.