S-JIS[2011-11-29] 変更履歴

Hadoop0.23 MapReduce

Hadoop0.23YARNでMapReduceを動かしてみる。


概要

従来(Hadoop0.23より前)のMapReduceで作ったプログラムを(何も設定していない状態の)Hadoop0.23で動かすと、ちゃんと動く。

しかし、YARNのプログラムを動かすにはリソースマネージャーやノードマネージャーを起動しておく必要があるので、
逆にMapReduceプログラムはYARN環境で動いたわけではない、ということになる。

実のところ、何も設定していない状態のHadoop0.23でMapReduceプログラムを動かすと、ローカルモードで実行されるようだ。
YARN環境で動かすにはHadoop0.23の設定を変更する必要がある。


MapReduceをYARNで動かす設定

MapReduceプログラムをYARN(リソースマネージャーとノードマネージャーが動いている環境)で動かすには、以下の設定をする必要がある。

設定値 デフォルト値 備考
mapred-site.xml
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
local 「local」だとローカル環境、つまりHadoopのデーモンを使うことなく動作する。
「yarn」だとYARN環境(ResourceManagerとNodeManager上)で動作する。
「classic」だと従来の環境(JobTrackerとTaskTracker上)で動作する模様。
なお、設定値(定数)についてはMRConfigインターフェースのFRAMEWORK_NAMEを参照。
yarn-site.xml
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce.shuffle</value>
  </property>
(空) なんでデフォルトで何も設定されていないのか理解に苦しむが、
「mapreduce.shuffle」を設定しないとMapReduceが動作しない。
設定しない場合の障害

xmlファイルを書き換える場合は、confディレクトリー(HADOOP_CONF_DIR)の下のxmlファイルを編集する。(ファイルが無い場合は作る)

yarn.nodemanager.aux-servicesはデーモン(たぶんノードマネージャー)起動時に反映されるようなので、yarn-site.xmlを修正したらデーモンを再起動する必要がある。

$ cd $HADOOP_COMMON_HOME

$ bin/yarn-daemon.sh stop nodemanager
stopping nodemanager
$ bin/yarn-daemon.sh stop resourcemanager
stopping resourcemanager

$ bin/yarn-daemon.sh start resourcemanager
$ bin/yarn-daemon.sh start nodemanager

mapreduce.framework.nameについては、MapReduceプログラムがToolRunnerを使って実行するようになっている場合、confの下のxmlファイルを編集しなくても、実行時の引数で指定することも出来る。

自作のWordCount
$ cd /tmp/wordcount
$ $HADOOP_COMMON_HOME/bin/yarn jar wordcount23.jar sample.WordCount23 -D mapreduce.framework.name=yarn input output
円周率算出サンプル
$ cd $HADOOP_COMMON_HOME
$ bin/yarn jar hadoop-mapreduce-examples-0.23.0.jar pi -D mapreduce.framework.name=yarn 4 2000

mapreduce.shuffleを設定しない場合の障害

yarn.nodemanager.aux-servicesを設定せずにフレームワークyarnを指定して実行すると、以下のようなエラーになる。

2011-11-29 21:38:16,772 INFO mapreduce.Job (Job.java:printTaskEvents(1315)) - Task Id : attempt_1322569405158_0004_m_000000_0, Status : FAILED
Container launch failed for container_1322569405158_0004_01_000002 : java.lang.IllegalStateException: Invalid shuffle port number -1 returned for attempt_1322569405158_0004_m_000000_0
at org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl$EventProcessor.run(ContainerLauncherImpl.java:313)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

2011-11-29 21:38:16,781 WARN mapreduce.Job (Job.java:getTaskLogs(1389)) - Error reading task output Connection refused

これをずっと繰り返すようなので、Ctrl+Cで止める。
(ただし裏でジョブは生きているようなので、psで確認してkillで止める)


yarn.nodemanager.aux-servicesを設定したがノードマネージャーを再起動せずにMapReduceプログラムを実行すると、Mapタスクが0%のまま待ちに入る。
(タイムアウトもしないっぽい。そんなに長時間試してないけど)

2011-11-29 21:41:18,999 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1227)) - map 0% reduce 0%

YARNで動くMapReduceの仕組み

MapReduceプログラムがYARNで動く場合、以下のようなクラスを経由している模様。

  1. クライアント(MapReduceプログラム)がJob#waitForCompletion()を呼び出す。
    1. Job#submit()が呼ばれる。
    2. Job#submit()からconnect()が呼ばれる。
    3. connect()の中でClusterインスタンスが生成される。(Hadoop0.20ではJobConfが生成されていた)
      1. Clusterのコンストラクターの中からinitialize()メソッドが呼ばれる。
      2. サービスプロバイダーの機能で、各種ClientProtocolProviderの中から実行する環境を選択する。
        mapreduce.framework.nameが「yarn」の場合、YarnClientProtocolProviderが有効になる。
      3. YarnClientProtocolProviderでは、ClientProtocolとして、YARNRunnerインスタンスが作られる。
    4. Job#submit()からJobSubmitter#submitJobInternal()が呼ばれる。
      1. ClientProtocol(YARNRunner)のsubmitJob()が呼ばれる。
      2. YARNRunner#createApplicationSubmissionContext()の中で、MapReduce用のアプリケーションマスターの起動コマンドが生成される。
        実行されるクラスは、MRAppMaster
  2. アプリケーションマスターとして、MRAppMasterが実行される。
    1. この中はイベントハンドラーとかを駆使して色々複雑なことをやっているので、詳細不明(爆)
    2. そのうち、TaskAttemptImpl#createContainerLaunchContext()が呼ばれる。
    3. その中でMapReduceChildJVM#getVMCommand()が呼ばれて、MapReduce用のコンテナープログラムの起動コマンドが生成される。
      実行されるクラスは、YarnChild
  3. コンテナーとして、YarnChildが実行される。
    1. MapタスクやReduceタスクとして、指定されたMapper・Reducerクラスを呼び出す。

ちなみに、YarnChildの実行時引数(main()の引数)で、ホスト名やポート番号を受け取っている。
これを元にRPCのプロキシーオブジェクトを生成してタスク情報を取得していたりする。
したがって、やはり自作のアプリマスターとコンテナープログラムの場合、それらの間の通信は自前で作り込む必要があるようだ。


YARNへ戻る / Hadoop0.23へ戻る / Hadoop目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま