HadoopのStreamingについて。
|
|
Hadoopストリーミングは、Java以外の言語のMapper・Reducerプログラムを実行する方法。
データは標準入出力を介して受け渡す。
つまり、標準入力からデータ(テキスト)を受け取り、標準出力にデータを出力することでHadoop側とやりとりする。
したがって、標準入出力が扱える言語なら何でも使用できる。
Hadooopストリーミングでは、Streaming用のJavaプログラムにMapper・Reducerプログラムを指定する形で実行する。
CDH3の場合、以下のように指定する。
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \ -mapper Mapperプログラムのファイル名 \ -combiner Combinerプログラムのファイル名 \ -reducer Reducerプログラムのファイル名 \ -input 入力ファイル名 \ -output 出力ディレクトリー名 \ -file ローカルファイルパス1 -file ローカルファイルパス2 …
Combinerが無い場合は-combinerの指定は省略可能。
(Hadoop0.21から-combinerにJavaクラス以外を指定できるようになったらしい。CDH3でも指定できる)
Mapper・Combiner・Reducerのプログラムは、ローカル(hadoopコマンドを実行するマシン)上に置いておき、
-fileでそのパスを指定することで各データノードに(HDFSを経由して)コピーされ、タスクの実行時に参照することが出来る。
プログラムが3ファイルあったら、-fileも3つ指定する。
(単独環境で実行する場合は-fileを指定しなくても動作するが、分散環境で動かすには-fileが必須)
他にも色々なオプションがある。-infoでオプション一覧が表示される。
$ hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -info
HadooopストリーミングはJava以外の言語で書けるというだけで、基本的な注意点はJava APIを使う場合と同じ。
大量データを扱うことを意識し、コレクション(連想配列・ディクショナリー・マップ等)に全てのキーを保持するようなコーディングは避ける。
また、Combinerが有効になるような作りにした方が速度が出る。(単純な集計なら、2倍近くも差が出る)
JavaのMapperクラスでは、キー(数値)と値(テキスト)が引数として渡される。
Streamingでは、値(テキスト)のみが渡される。
(Javaでも、TextInputFormatを使う場合はキーは無視されることがほとんどだから、キーが無くても構わない)
出力は、標準出力に対してキーと値をタブ区切りで出力する。
StreamingのReducerには、キーでソートされた順で、“タブで区切られたキーと値”という1行ずつのテキストが渡される。
したがって、まずタブで分割してキーと値を分ける。
そして、キーの値が変わる(キーブレイクする)まで一連の処理を行うようなプログラムにする。
(JavaのReducerクラスではキーと値一覧が渡されるので、少し違う)
出力は、Mapperと同じく、標準出力に対してキーと値をタブ区切りで出力する。
存在意義はJavaのCombinerと同様で、入出力方法はReducerと同様。
場合によってはReducerと全く同じプログラムが指定できる。(その場合、-combinerと-reducerには同じプログラムを指定し、-fileは1つでよい)
awkでHadoopストリーミングのサンプルを作ってみた。[2011-12-26]
{ for(i = 1; i <= NF; i++) print $i, 1 }
※HiveのWordCountで使ったスクリプトとほぼ全く同じ。
BEGIN { OFS = "\t"; old = ""; sum = 0; } { if ($1 != old) { output(old, sum); old = $1; sum = 0; } sum += $2; } END { output(old, sum); } function output(key, count) { if (count != 0) { print key, count; } }
SRC=wordcount/input DST=wordcount/output MAP=map.awk RED=reduce.awk hadoop fs -rmr $DST hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \ -mapper "awk -f $MAP" \ -combiner "awk -f $RED" \ -reducer "awk -f $RED" \ -input $SRC \ -output $DST \ -file $MAP \ -file $RED