stdin
and stdout
¶Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java
Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any combination of languages that can read standard input and write to standard output to write your MapReduce program.
You could use different language in mapper and reduce functions.
It suits for text processing (e.g. read every line from a big CSV file).
It can also handle binary streams (e.g. read image as input).
which cat
/bin/cat
which wc
/bin/wc
hadoop fs -rm -r /user/lifeng/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input /user/lifeng/asm-LICENSE \
-output /user/lifeng/output \
-mapper "/usr/bin/cat" \
-reducer "/usr/bin/wc"
rm: `/user/lifeng/output': No such file or directory packageJobJar: [/tmp/hadoop-unjar7580110097437608263/] [] /tmp/streamjob1753312041067415791.jar tmpDir=null 20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/ 20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032 20/02/26 21:41:52 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/ 20/02/26 21:41:52 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032 20/02/26 21:41:53 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries 20/02/26 21:41:53 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53] 20/02/26 21:41:53 INFO mapred.FileInputFormat: Total input paths to process : 1 20/02/26 21:41:53 INFO mapreduce.JobSubmitter: number of splits:16 20/02/26 21:41:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0005 20/02/26 21:41:53 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0005 20/02/26 21:41:53 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0005/ 20/02/26 21:41:53 INFO mapreduce.Job: Running job: job_1579055927679_0005 20/02/26 21:41:58 INFO mapreduce.Job: Job job_1579055927679_0005 running in uber mode : false 20/02/26 21:41:58 INFO mapreduce.Job: map 0% reduce 0% 20/02/26 21:42:06 INFO mapreduce.Job: map 100% reduce 0% 20/02/26 21:42:11 INFO mapreduce.Job: map 100% reduce 29% 20/02/26 21:42:12 INFO mapreduce.Job: map 100% reduce 57% 20/02/26 21:42:13 INFO mapreduce.Job: map 100% reduce 86% 20/02/26 21:42:14 INFO mapreduce.Job: map 100% reduce 100% 20/02/26 21:42:14 INFO mapreduce.Job: Job job_1579055927679_0005 completed successfully 20/02/26 21:42:14 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1367 FILE: Number of bytes written=3035077 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=14720 HDFS: Number of bytes written=175 HDFS: Number of read operations=69 HDFS: Number of large read operations=0 HDFS: Number of write operations=14 Job Counters Launched map tasks=16 Launched reduce tasks=7 Data-local map tasks=16 Total time spent by all maps in occupied slots (ms)=4947858 Total time spent by all reduces in occupied slots (ms)=1907919 Total time spent by all map tasks (ms)=83862 Total time spent by all reduce tasks (ms)=16307 Total vcore-milliseconds taken by all map tasks=83862 Total vcore-milliseconds taken by all reduce tasks=16307 Total megabyte-milliseconds taken by all map tasks=156989664 Total megabyte-milliseconds taken by all reduce tasks=61053408 Map-Reduce Framework Map input records=29 Map output records=29 Map output bytes=1540 Map output materialized bytes=2961 Input split bytes=1824 Combine input records=0 Combine output records=0 Reduce input groups=25 Reduce shuffle bytes=2961 Reduce input records=29 Reduce output records=7 Spilled Records=58 Shuffled Maps =112 Failed Shuffles=0 Merged Map outputs=112 GC time elapsed (ms)=3117 CPU time spent (ms)=33410 Physical memory (bytes) snapshot=10298241024 Virtual memory (bytes) snapshot=92690731008 Total committed heap usage (bytes)=16765157376 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=12896 File Output Format Counters Bytes Written=175 20/02/26 21:42:14 INFO streaming.StreamJob: Output directory: /user/lifeng/output
hadoop fs -ls /user/lifeng/output
Found 8 items -rw-r----- 2 lifeng hadoop 0 2020-02-26 21:42 /user/lifeng/output/_SUCCESS -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00000 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00001 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00002 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00003 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00004 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00005 -rw-r----- 2 lifeng hadoop 25 2020-02-26 21:42 /user/lifeng/output/part-00006
hadoop fs -cat /user/lifeng/output/*
6 63 375 7 24 159 5 39 290 4 39 279 3 21 150 2 20 147 2 19 140
hadoop fs -rm -r /user/lifeng/output
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar -input /user/lifeng/asm-LICENSE -output /user/lifeng/output -mapper "/usr/bin/cat" -reducer "/usr/bin/wc" -numReduceTasks 1
20/02/26 21:43:21 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 1440 minutes, Emptier interval = 30 minutes. Moved: 'hdfs://emr-header-1.cluster-46968:9000/user/lifeng/output' to trash at: hdfs://emr-header-1.cluster-46968:9000/user/lifeng/.Trash/Current packageJobJar: [/tmp/hadoop-unjar6344502777978736495/] [] /tmp/streamjob5520248859546438609.jar tmpDir=null 20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/ 20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032 20/02/26 21:43:24 INFO impl.TimelineClientImpl: Timeline service address: http://emr-header-1.cluster-46968:8188/ws/v1/timeline/ 20/02/26 21:43:24 INFO client.RMProxy: Connecting to ResourceManager at emr-header-1.cluster-46968/192.168.0.222:8032 20/02/26 21:43:24 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries 20/02/26 21:43:24 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 97184efe294f64a51a4c5c172cbc22146103da53] 20/02/26 21:43:24 INFO mapred.FileInputFormat: Total input paths to process : 1 20/02/26 21:43:24 INFO mapreduce.JobSubmitter: number of splits:16 20/02/26 21:43:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1579055927679_0006 20/02/26 21:43:25 INFO impl.YarnClientImpl: Submitted application application_1579055927679_0006 20/02/26 21:43:25 INFO mapreduce.Job: The url to track the job: http://emr-header-1.cluster-46968:20888/proxy/application_1579055927679_0006/ 20/02/26 21:43:25 INFO mapreduce.Job: Running job: job_1579055927679_0006 20/02/26 21:43:31 INFO mapreduce.Job: Job job_1579055927679_0006 running in uber mode : false 20/02/26 21:43:31 INFO mapreduce.Job: map 0% reduce 0% 20/02/26 21:43:37 INFO mapreduce.Job: map 13% reduce 0% 20/02/26 21:43:38 INFO mapreduce.Job: map 100% reduce 0% 20/02/26 21:43:42 INFO mapreduce.Job: map 100% reduce 100% 20/02/26 21:43:42 INFO mapreduce.Job: Job job_1579055927679_0006 completed successfully 20/02/26 21:43:42 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=895 FILE: Number of bytes written=2241281 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=14720 HDFS: Number of bytes written=25 HDFS: Number of read operations=51 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Killed map tasks=1 Launched map tasks=16 Launched reduce tasks=1 Data-local map tasks=16 Total time spent by all maps in occupied slots (ms)=4388774 Total time spent by all reduces in occupied slots (ms)=295191 Total time spent by all map tasks (ms)=74386 Total time spent by all reduce tasks (ms)=2523 Total vcore-milliseconds taken by all map tasks=74386 Total vcore-milliseconds taken by all reduce tasks=2523 Total megabyte-milliseconds taken by all map tasks=139250592 Total megabyte-milliseconds taken by all reduce tasks=9446112 Map-Reduce Framework Map input records=29 Map output records=29 Map output bytes=1540 Map output materialized bytes=1535 Input split bytes=1824 Combine input records=0 Combine output records=0 Reduce input groups=25 Reduce shuffle bytes=1535 Reduce input records=29 Reduce output records=1 Spilled Records=58 Shuffled Maps =16 Failed Shuffles=0 Merged Map outputs=16 GC time elapsed (ms)=2576 CPU time spent (ms)=27920 Physical memory (bytes) snapshot=8505241600 Virtual memory (bytes) snapshot=61655015424 Total committed heap usage (bytes)=12341739520 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=12896 File Output Format Counters Bytes Written=25 20/02/26 21:43:42 INFO streaming.StreamJob: Output directory: /user/lifeng/output 29 225 1540
hadoop fs -cat /user/lifeng/output/*
29 225 1540
Write you Hadoop commnad in a Bash file instead run it directly on Linux Shell
Use the following command to track Hadoop MapReduce errors
yarn logs -applicationId JOB_ID
cat examples/print-colums/main.sh
#!/bin/bash PWD=$(cd $(dirname $0); pwd) cd $PWD 1> /dev/null 2>&1 TASKNAME=task1_fli # python location on hadoop PY27='/fli/tools/python2.7.tar.gz' # hadoop client HADOOP_HOME=/home/users/fli/hadoop/bin/hadoop HADOOP_INPUT_DIR1=/fli/data1/part-* HADOOP_INPUT_DIR2=/fli/data2/part-* HADOOP_OUTPUT_DIR=/fli/results/task1 echo $HADOOP_HOME echo $HADOOP_INPUT_DIR echo $HADOOP_OUTPUT_DIR $HADOOP_HOME fs -rmr $HADOOP_OUTPUT_DIR $HADOOP_HOME streaming \ -jobconf mapred.job.name=$TASKNAME \ -jobconf mapred.job.priority=NORMAL \ -jobconf mapred.map.tasks=500 \ -jobconf mapred.reduce.tasks=500 \ -jobconf mapred.job.map.capacity=500 \ -jobconf mapred.job.reduce.capacity=500 \ -jobconf stream.num.map.output.key.fields=2 \ -jobconf mapred.text.key.partitioner.options=-k1,1 \ -jobconf stream.memory.limit=1000 \ -file $PWD/mapper.sh \ -output ${HADOOP_OUTPUT_DIR} \ -input ${HADOOP_INPUT_DIR1} \ -input ${HADOOP_INPUT_DIR2} \ -mapper "sh mapper.sh" \ -reducer "cat" \ -cacheArchive ${PY27}#py27 \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner if [ $? -ne 0 ]; then echo 'error' exit 1 fi $HADOOP_HOME fs -touchz ${HADOOP_OUTPUT_DIR}/done exit 0
Mapper: Bash, Reducer: Python
Write MapReduce with Python
Write MapReduce with R
Hadoop with Java MapReduce
javac -classpath $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.2.jar -d FirstJar\
jar -cvf FirstJar.jar -C FirstJar/
The Java version Hadoop syntax is
hadoop jar FirstJar.jar [mainClass] input output
Hadoop Pipe
You have to compile and link your C++ program before send it to Hadoop.
The Hadoop Pipe syntax
hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input sample.txt \
-output output \
-program myCPProgram
Use airline_small.csv
as input. The data description is available at http://stat-computing.org/dataexpo/2009/the-data.html
Extract useful information from the data
Make dummy transformation for variables such as DayofWeek
, Month
...
Finally, save your output in a file.
CarrierDelay
, and your constructed dummy variables as predictors.Hint