S-JIS[2011-12-25/2011-12-26] 変更履歴

Hadoop Streaming

HadoopのStreamingについて。


Streamingの概要

Hadoopストリーミングは、Java以外の言語のMapper・Reducerプログラムを実行する方法。

データは標準入出力を介して受け渡す。
つまり、標準入力からデータ(テキスト)を受け取り、標準出力にデータを出力することでHadoop側とやりとりする。

したがって、標準入出力が扱える言語なら何でも使用できる。


Streamingの実行方法

Hadooopストリーミングでは、Streaming用のJavaプログラムにMapper・Reducerプログラムを指定する形で実行する。

CDH3の場合、以下のように指定する。

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
-mapper   Mapperプログラムのファイル名 \
-combiner Combinerプログラムのファイル名 \
-reducer  Reducerプログラムのファイル名 \
-input  入力ファイル名 \
-output 出力ディレクトリー名 \
-file ローカルファイルパス1 -file ローカルファイルパス2 …

Combinerが無い場合は-combinerの指定は省略可能。
(Hadoop0.21から-combinerにJavaクラス以外を指定できるようになったらしい。CDH3でも指定できる)

Mapper・Combiner・Reducerのプログラムは、ローカル(hadoopコマンドを実行するマシン)上に置いておき、
-fileでそのパスを指定することで各データノードに(HDFSを経由して)コピーされ、タスクの実行時に参照することが出来る。
プログラムが3ファイルあったら、-fileも3つ指定する。
(単独環境で実行する場合は-fileを指定しなくても動作するが、分散環境で動かすには-fileが必須)

他にも色々なオプションがある。-infoでオプション一覧が表示される。

$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -info

Streamingのプログラミング

HadooopストリーミングはJava以外の言語で書けるというだけで、基本的な注意点はJava APIを使う場合と同じ。

大量データを扱うことを意識し、コレクション(連想配列・ディクショナリー・マップ等)に全てのキーを保持するようなコーディングは避ける。
また、Combinerが有効になるような作りにした方が速度が出る。(単純な集計なら、2倍近くも差が出る)


Mapper

JavaのMapperクラスでは、キー(数値)値(テキスト)が引数として渡される。
Streamingでは、値(テキスト)のみが渡される。
(Javaでも、TextInputFormatを使う場合はキーは無視されることがほとんどだから、キーが無くても構わない)

出力は、標準出力に対してキーと値をタブ区切りで出力する。


Reducer

StreamingのReducerには、キーでソートされた順で、“タブで区切られたキー”という1行ずつのテキストが渡される。
したがって、まずタブで分割してキーと値を分ける。
そして、キーの値が変わる(キーブレイクする)まで一連の処理を行うようなプログラムにする。
JavaのReducerクラスではキー値一覧が渡されるので、少し違う)

出力は、Mapperと同じく、標準出力に対してキーと値をタブ区切りで出力する。


Combiner

存在意義はJavaのCombinerと同様で、入出力方法はReducerと同様。
場合によってはReducerと全く同じプログラムが指定できる。(その場合、-combinerと-reducerには同じプログラムを指定し、-fileは1つでよい)


Streamingサンプル(awk)

awkでHadoopストリーミングのサンプルを作ってみた。[2011-12-26]

map.awk:

{ for(i = 1; i <= NF; i++) print $i, 1 }

HiveのWordCountで使ったスクリプトとほぼ全く同じ。

reduce.awk:

BEGIN {
	OFS = "\t";
	old = ""; sum = 0;
}
{
	if ($1 != old) {
		output(old, sum);
		old = $1; sum = 0;
	}
	sum += $2;
}
END {
	output(old, sum);
}

function output(key, count) {
	if (count != 0) {
		print key, count;
	}
}

run.sh:

SRC=wordcount/input
DST=wordcount/output
MAP=map.awk
RED=reduce.awk

hadoop fs -rmr $DST

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
-mapper   "awk -f $MAP" \
-combiner "awk -f $RED" \
-reducer  "awk -f $RED" \
-input  $SRC \
-output $DST \
-file $MAP \
-file $RED

Hadoop目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま