广告

Hadoop streaming详解

  • 浏览(1,689)
  • 评论(1)
  • 译者:k8s

Hadoop流

Hadoop的为MapReduce的提供了不同的API,可以方便我们使用不同的编程语言来使用的MapReduce框架,而不是只局限于Java的。这里要介绍的就是Hadoop的流 API。Hadoop的流媒体使用的Unix的标准流作为我们的MapReduce程序和MapReduce框架之间的接口。所有你可以使用任何语言来编写MapReduce程序,只要该语言可以通过标准输入/输出上进行读写
。streamming是天然适用于文字处理(text processing),当然,也只适用纯文本的处理,对于需要对象和序列化的场景,hadoop streaming无能为力。它们使我们能够快捷的通过各种脚本语言,快速的处理大量的文本文件。

  • 映射函数的输入是通过立即输入一行一接收数据的。(不像Java API,通过InputFormat类做预处理,使得Map函数的输入是有Key和value的
  • 地图函数的输出则必须限定为键值对,键和值之间用\ t分开。(MapReduce的框架在处理中间的地图输出时,必须做排序和分区,即洗牌
  • 减少函数的输入是Map函数的输出也是键值对,键和值之间用\ t分开。

常用的Streaming编程语言:

  • bash shell
  • 红宝石
  • 蟒蛇

红宝石

下面是一个Ruby编写的MapReduce程序的示例:

地图

max_temperature_map.rb:
ruby
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

  • 从标准输入读入一行数据。
  • 处理数据之后,生成一个键值对,用\ t分隔,输出到标准输出

减少

max_temperature_reduce.rb:
ruby
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key

  • 从标准输入读入一行数据
  • 数据是用\ t分隔的键值对
  • 数据是被MapReduce根据键排序之后顺序一行一行读入
  • 减少对数据进行处理,并输出,输出仍是用\ t分隔的键值对

运行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • hadoop jar $ HADOOP_INSTALL / contrib / streaming / hadoop – * – streaming.jar指明了使用hadoop stream
  • hadoop – * – streaming.jar会将输入里的文件,一行一行的输出到标准输出。
  • 用-mapper指定映射函数。类似于通过管道将数据传给rb文件: data|ch02/src/main/ruby/max_temperature_map.rb
  • -reducer指定减少函数。

蟒蛇

地图

#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

减少

#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

运行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.py\
-reducer ch02/src/main/ruby/max_temperature_reduce.py
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

Bash shell

地图

#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

运行

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 这里的-D mapred.reduce.tasks=0将减任务观掉,因此也不需要设置-reducer
  • 只使用Mapper,可以通过MapReduce帮助我们并行的完成一些平时只能串行的shell脚本
  • 注意这里的-file,在集群模式下,需要并行运行时,需要-file把文件传输到其他节点

组合器

在流模式下,仍然可以运行Combiner,两种方法:

  1. 通过Java编写一个组合器的函数,并使用-combiner选项
  2. 以命令行的管道模式完成combiner的任务

这里具体解释

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
ch02/src/main/ruby/max_temperature_reduce.rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb

注意看-mapper这一行,通关管道的方式,把映射器的临时输出文件(中间文件,Map完成后的临时文件)作为输入,然后送到排序进行排序,然后送到减少脚本,来完成类似于combiner的工作。这时候的输出才真正的作为shuffle的输入,被分组并在网络上发送到

  • 分享到:
  • icon
  • icon
  • icon
  • icon
箭头