stdin and stdout¶Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java
Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any combination of languages that can read standard input and write to standard output to write your MapReduce program.
You could use different language in mapper and reduce functions.
It suits for text processing (e.g. read every line from a big CSV file).
It can also handle binary streams (e.g. read image as input).
which cat
/bin/cat
which wc
/bin/wc
hadoop fs -rm -r /user/lifeng/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
       -input /user/lifeng/asm-LICENSE  \
       -output /user/lifeng/output \
       -mapper "/usr/bin/cat"      \
       -reducer "/usr/bin/wc"
rm: `/user/lifeng/output': No such file or directory packageJobJar: [/tmp/hadoop-unjar7580110097437608263/] [] /tmp/streamjob1753312041067415791.jar tmpDir=null 20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/ 20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032 20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/ 20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032 20/02/26 21:41:53 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries 20/02/26 21:41:53 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53] 20/02/26 21:41:53 INFO mapred.FileInputFormat: Total input paths to process : 1 20/02/26 21:41:53 INFO mapreduce.JobSubmitter: number of splits:16 20/02/26 21:41:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0005 20/02/26 21:41:53 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0005 20/02/26 21:41:53 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0005/ 20/02/26 21:41:53 INFO mapreduce.Job: Running job: job_1579055927679_0005 20/02/26 21:41:58 INFO mapreduce.Job: Job job_1579055927679_0005 running in uber mode : false 20/02/26 21:41:58 INFO mapreduce.Job: map 0% reduce 0% 20/02/26 21:42:06 INFO mapreduce.Job: map 100% reduce 0% 20/02/26 21:42:11 INFO mapreduce.Job: map 100% reduce 29% 20/02/26 21:42:12 INFO mapreduce.Job: map 100% reduce 57% 20/02/26 21:42:13 INFO mapreduce.Job: map 100% reduce 86% 20/02/26 21:42:14 INFO mapreduce.Job: map 100% reduce 100% 20/02/26 21:42:14 INFO mapreduce.Job: Job job_1579055927679_0005 completed successfully 20/02/26 21:42:14 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1367 FILE: Number of bytes written=3035077 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=14720 HDFS: Number of bytes written=175 HDFS: Number of read operations=69 HDFS: Number of large read operations=0 HDFS: Number of write operations=14 Job Counters Launched map tasks=16 Launched reduce tasks=7 Data-local map tasks=16 Total time spent by all maps in occupied slots (ms)=4947858 Total time spent by all reduces in occupied slots (ms)=1907919 Total time spent by all map tasks (ms)=83862 Total time spent by all reduce tasks (ms)=16307 Total vcore-milliseconds taken by all map tasks=83862 Total vcore-milliseconds taken by all reduce tasks=16307 Total megabyte-milliseconds taken by all map tasks=156989664 Total megabyte-milliseconds taken by all reduce tasks=61053408 Map-Reduce Framework Map input records=29 Map output records=29 Map output bytes=1540 Map output materialized bytes=2961 Input split bytes=1824 Combine input records=0 Combine output records=0 Reduce input groups=25 Reduce shuffle bytes=2961 Reduce input records=29 Reduce output records=7 Spilled Records=58 Shuffled Maps =112 Failed Shuffles=0 Merged Map outputs=112 GC time elapsed (ms)=3117 CPU time spent (ms)=33410 Physical memory (bytes) snapshot=10298241024 Virtual memory (bytes) snapshot=92690731008 Total committed heap usage (bytes)=16765157376 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=12896 File Output Format Counters Bytes Written=175 20/02/26 21:42:14 INFO streaming.StreamJob: Output directory: /user/lifeng/output
hadoop fs -ls /user/lifeng/output
Found 8 items -rw-r----- 2 lifeng hadoop 0 2020-02-26 21:42 /user/lifeng/output/_SUCCESS -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00000 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00001 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00002 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00003 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00004 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00005 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00006
hadoop fs -cat /user/lifeng/output/*
      6      63     375	
      7      24     159	
      5      39     290	
      4      39     279	
      3      21     150	
      2      20     147	
      2      19     140	
hadoop fs -rm -r /user/lifeng/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar -input /user/lifeng/asm-LICENSE  -output /user/lifeng/output -mapper "/usr/bin/cat" -reducer "/usr/bin/wc" -numReduceTasks 1
20/02/26 21:43:21 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes.
Moved: 'hdfs://emr-header-1.cluster-46968:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-46968:9000/user/lifeng/.Trash/Current
packageJobJar: [/tmp/hadoop-unjar6344502777978736495/] [] /tmp/streamjob5520248859546438609.jar tmpDir=null
20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/
20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032
20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/
20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032
20/02/26 21:43:24 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
20/02/26 21:43:24 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53]
20/02/26 21:43:24 INFO mapred.FileInputFormat: Total input paths to process : 1
20/02/26 21:43:24 INFO mapreduce.JobSubmitter: number of splits:16
20/02/26 21:43:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0006
20/02/26 21:43:25 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0006
20/02/26 21:43:25 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0006/
20/02/26 21:43:25 INFO mapreduce.Job: Running job: job_1579055927679_0006
20/02/26 21:43:31 INFO mapreduce.Job: Job job_1579055927679_0006 running in uber mode : false
20/02/26 21:43:31 INFO mapreduce.Job:  map 0% reduce 0%
20/02/26 21:43:37 INFO mapreduce.Job:  map 13% reduce 0%
20/02/26 21:43:38 INFO mapreduce.Job:  map 100% reduce 0%
20/02/26 21:43:42 INFO mapreduce.Job:  map 100% reduce 100%
20/02/26 21:43:42 INFO mapreduce.Job: Job job_1579055927679_0006 completed successfully
20/02/26 21:43:42 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=895
		FILE: Number of bytes written=2241281
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=14720
		HDFS: Number of bytes written=25
		HDFS: Number of read operations=51
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Killed map tasks=1
		Launched map tasks=16
		Launched reduce tasks=1
		Data-local map tasks=16
		Total time spent by all maps in occupied slots (ms)=4388774
		Total time spent by all reduces in occupied slots (ms)=295191
		Total time spent by all map tasks (ms)=74386
		Total time spent by all reduce tasks (ms)=2523
		Total vcore-milliseconds taken by all map tasks=74386
		Total vcore-milliseconds taken by all reduce tasks=2523
		Total megabyte-milliseconds taken by all map tasks=139250592
		Total megabyte-milliseconds taken by all reduce tasks=9446112
	Map-Reduce Framework
		Map input records=29
		Map output records=29
		Map output bytes=1540
		Map output materialized bytes=1535
		Input split bytes=1824
		Combine input records=0
		Combine output records=0
		Reduce input groups=25
		Reduce shuffle bytes=1535
		Reduce input records=29
		Reduce output records=1
		Spilled Records=58
		Shuffled Maps =16
		Failed Shuffles=0
		Merged Map outputs=16
		GC time elapsed (ms)=2576
		CPU time spent (ms)=27920
		Physical memory (bytes) snapshot=8505241600
		Virtual memory (bytes) snapshot=61655015424
		Total committed heap usage (bytes)=12341739520
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=12896
	File Output Format Counters 
		Bytes Written=25
20/02/26 21:43:42 INFO streaming.StreamJob: Output directory: /user/lifeng/output
     29     225    1540	
hadoop fs -cat /user/lifeng/output/*
29 225 1540
Write you Hadoop commnad in a Bash file instead run it directly on Linux Shell
Use the following command to track Hadoop MapReduce errors
  yarn logs -applicationId JOB_ID
cat examples/print-colums/main.sh
#!/bin/bash
PWD=$(cd $(dirname $0); pwd)
cd $PWD 1> /dev/null 2>&1
TASKNAME=task1_fli
# python location on hadoop
PY27='/fli/tools/python2.7.tar.gz'
# hadoop client
HADOOP_HOME=/home/users/fli/hadoop/bin/hadoop
HADOOP_INPUT_DIR1=/fli/data1/part-*
HADOOP_INPUT_DIR2=/fli/data2/part-*
HADOOP_OUTPUT_DIR=/fli/results/task1
echo $HADOOP_HOME
echo $HADOOP_INPUT_DIR
echo $HADOOP_OUTPUT_DIR
$HADOOP_HOME fs -rmr $HADOOP_OUTPUT_DIR
$HADOOP_HOME streaming \
    -jobconf mapred.job.name=$TASKNAME \
    -jobconf mapred.job.priority=NORMAL \
    -jobconf mapred.map.tasks=500 \
    -jobconf mapred.reduce.tasks=500 \
    -jobconf mapred.job.map.capacity=500 \
    -jobconf mapred.job.reduce.capacity=500 \
    -jobconf stream.num.map.output.key.fields=2 \
    -jobconf mapred.text.key.partitioner.options=-k1,1 \
    -jobconf stream.memory.limit=1000 \
    -file $PWD/mapper.sh \
    -output ${HADOOP_OUTPUT_DIR} \
    -input ${HADOOP_INPUT_DIR1} \
    -input ${HADOOP_INPUT_DIR2} \
    -mapper "sh mapper.sh" \
    -reducer "cat" \
    -cacheArchive ${PY27}#py27 \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
if [ $? -ne 0 ]; then
    echo 'error'
    exit 1
fi
$HADOOP_HOME fs -touchz ${HADOOP_OUTPUT_DIR}/done
exit 0
Mapper: Bash, Reducer: Python
Write MapReduce with Python
Write MapReduce with R
Hadoop with Java MapReduce
javac -classpath $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar -d FirstJar\
jar -cvf FirstJar.jar -C FirstJar/
The Java version Hadoop syntax is
hadoop jar FirstJar.jar [mainClass] input output
Hadoop Pipe
You have to compile and link your C++ program before send it to Hadoop.
The Hadoop Pipe syntax
hadoop pipes \
    -D hadoop.pipes.java.recordreader=true \
    -D hadoop.pipes.java.recordwriter=true \
    -input sample.txt \
    -output output \
    -program myCPProgram
Use airline_small.csv as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html
Extract useful information from the data
Make dummy transformation for variables such as DayofWeek, Month...
Finally, save your output in a file.
CarrierDelay, and your constructed dummy variables as predictors.Hint