Hadoop0.23のYARN(ヤーン)のメモ。
|
YARNは、Hadoop0.23におけるジョブ実行フレームワークの名前。
0.23より前のHadoopはMapReduceというアルゴリズム(に基づくフレームワーク)だったので、次世代MapReduceという意味でMapReduce2.0(MRv2)とも呼ばれているが、実際はもうMapReduceではないので、別の名前が付けられたのだろう。
YARNでは、以下のような手順でアプリケーションを実行する。
(ResourceManager(RM)とかApplicationMaster(App Mstr・AM)とかの関係については、YARN Architectureを参照)
要するに、従来のMapReduceにおいてJobTrackerやTaskTrackerがやっていた事を自分でも出来るようになった(自分がしなければならない)ということ。
ApplicationMasterがJobTracker、ContainerがTaskTracker(MapタスクやReduceタスクを実行)に相当するのかな?
ただし、JobTrackerがNameNode上で動いていたのに対し、YARNでは、ApplicationMasterもスレーブ(DataNode)上で動く。
ApplicationMasterやContainerがどのマシンで動くかについては、マシンのリソースの空き状況によって自動的に決められる(と思う)。
その為、ApplicationMasterやContainerの実行指示を出す際は、そのプログラムがどの程度のリソースを必要とするかを指定するようになっている。
(ただし、Hadoop0.23.0で指定できるリソースは、メモリーサイズのみ)
実行するプログラム(ApplicationMasterやContainer)はJavaに限らず、何でもいい。
ただしResourceManagerとやり取りをする必要があるので、やはりHadoopが書かれているJavaが一番やりやすそうではある。
プログラムの指定方法は、UNIXのシェルに記述するコマンドラインをそのまま渡す形となる。環境変数($HOGE
)や標準出力へのリダイレクト(1>
hoge.txt
)すらそのまま記述できる。
(したがって、Windows上ではなおさら実行しにくい)
最小限のサンプルとして、ClientからApplicationMasterを起動し、ApplicationMasterは(Containerの起動もせず)ただ正常終了するだけのものを作ってみる。
ApplicationMaster(App Mstr・AM)を呼び出すClientの最小限サンプル。
public class Sample1Client extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(Sample1Client.class); public static void main(String[] args) throws Exception { int r = ToolRunner.run(new Sample1Client(), args); System.exit(r); }
ToolやToolRunnerをHadoop0.23でも使っていいのかどうかは分からないが、とりあえずは大丈夫そう。
main()から呼ばれて処理をする本体。
private YarnRPC rpc; private ClientRMProtocol applicationsManager;
@Override public int run(String[] args) throws Exception { Configuration conf = getConf(); rpc = YarnRPC.create(conf); applicationsManager = connectToASM(); GetNewApplicationResponse newApp = getApplication(); ApplicationSubmissionContext appContext = getAppContext(newApp); submit(appContext); boolean succeeded = monitorApplication(newApp.getApplicationId()); return succeeded ? 0 : 1; }
最初にYarnRPCというRPCオブジェクトを生成する。(Remote Procedure Callのことだと思う)
そして、ASM(applicationsManager(Application Master(AM)と紛らわしい(苦笑)))に接続する。applicationsManagerという名前だけど、Resource
Managerの事。
それらを元に新アプリケーション(自分のアプリケーションの実行用オブジェクト)を取得してサブミット(実行)する。
そして実行状況を監視(モニタリング)する。
しかしクラス名が長いなー(苦笑) Scalaなら変数宣言は「val」だけで済むからすっきりするのに〜
Resource Manager(RM)に接続する。
ClientRMProtocol connectToASM() { Configuration conf = getConf(); YarnConfiguration yarnConf = new YarnConfiguration(conf); InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf); }
設定ファイルからResource Managerのアドレスを取得している。デフォルトは「0.0.0.0:8040
」。
なお、YARNの書き方のページではYarnConfiguration.YARN_SECURITY_INFOを設定しているが、これは廃止になったようだ。
このメソッドの戻り値はapplicationsManager(ASM)という変数に格納される。
Resource ManagerなのかApplications Managerなのか、どっちだ(苦笑)
新しいアプリケーションを取得(というかアプリケーションIDを採番)する。
GetNewApplicationResponse getApplication() throws YarnRemoteException { GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); GetNewApplicationResponse response = applicationsManager.getNewApplication(request); LOG.info("Got new application id=" + response.getApplicationId()); return response; }
applicationsManager(Resource Manager)に対して何らかの要求を行うときは、引数のオブジェクトはRecords.newRecord()
によって生成する。
getNewApplication()を呼び出すと、Resource Managerに接続する。
Resource Managerが起動していない(通信できない)と、何回かリトライした後、例外が発生する。
アプリケーション(Application Masterとして実行するプログラム)の情報を設定する。
private String appName = "sample1-client"; private int amPriority = 0; private String amQueue = ""; private String amUser = "";
ApplicationSubmissionContext getAppContext(GetNewApplicationResponse newApp) throws IOException { LOG.info("Setting up application submission context for ASM"); ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); ApplicationId appId = newApp.getApplicationId(); appContext.setApplicationId(appId); appContext.setApplicationName(appName); ContainerLaunchContext amContainer = getContainerLaunchContext(newApp, appId); appContext.setAMContainerSpec(amContainer); Priority pri = Records.newRecord(Priority.class); pri.setPriority(amPriority); appContext.setPriority(pri); appContext.setQueue(amQueue); appContext.setUser(amUser); return appContext; }
プライオリティー(優先度)の数値の範囲とか、キューとは何かとか、ユーザーに何を設定すべきかとかは、とりあえず謎(爆)
アプリケーションIDは、「application_1321815046260_0001」といった感じの文字列。
新しいアプリケーションを生成すると、末尾4桁の数値が増えていく。従来のジョブのIDと同じ感じ。
真ん中の数字はHadoopクラスターが起動した時刻を元にしているっぽい。
実行するアプリケーションの実行方法を設定する。
ここで実行するのはApplication Masterに相当するアプリだが、スレーブ上で動作するので、実装上の扱いとしてはContainerになるっぽい。
ContainerLaunchContext getContainerLaunchContext(GetNewApplicationResponse newApp, ApplicationId appId) throws IOException { ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); Map<String, LocalResource> localResources = getLocalResources(appId); amContainer.setLocalResources(localResources); Map<String, String> env = getEnvironment(); amContainer.setEnvironment(env); String command = getCommand(); List<String> commands = Collections.singletonList(command); amContainer.setCommands(commands); Resource capability = getCapability(newApp); amContainer.setResource(capability); return amContainer; }
実行したいプログラムが入っているjarファイルをローカルリソースとして指定する。
jarファイルの場所は、とりあえずプログラムの第1引数で指定することにした(実験中は、jarファイルを置く場所はコンパイル無しで変えられた方が便利だから)。
private String appMasterJar;
@Override public int run(String[] args) throws Exception { appMasterJar = args[0]; // 実行するAppMstrのjarファイルの場所 〜 }
Map<String, LocalResource> getLocalResources(ApplicationId appId) throws IOException { Path jarPath = new Path(new File(appMasterJar).toURI()); Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); LOG.info("Copy App Master jar from local filesystem and add to local environment"); Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); FileStatus destStatus = fs.getFileStatus(jarPath); LocalResource amJarRsrc = Records.newRecord(LocalResource.class); amJarRsrc.setType(LocalResourceType.FILE); amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); amJarRsrc.setTimestamp(destStatus.getModificationTime()); amJarRsrc.setSize(destStatus.getLen()); localResources.put("AppMaster.jar", amJarRsrc); return localResources; }
「ローカルリソース」の位置付けをまだよく理解していないんだけど、ここで指定したリソース(ファイル)は
実行対象マシン上にコピーされて、ローカルファイルとして使用できるという意味だと思う。
ここでは、実行したいjarファイルを「AppMaster.jar」という名前で登録しているので、
実行対象マシン上では「AppMaster.jar」というファイル名でアクセスできる。
実行時に使用する環境変数を用意する。
Map<String, String> getEnvironment() { Map<String, String> env = new HashMap<String, String>(); String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR" + ":$HADOOP_COMMON_HOME/share/hadoop/common/*" + ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*" + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*" + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*" + ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*" + ":./log4j.properties:"; env.put("CLASSPATH", classPathEnv); return env; }
ここでは、環境変数CLASSPATHだけ用意している。
UNIXが対象なので、パス区切りが「:
(コロン)」になっている。
クラスパスのひとつとして「./*
」が入っていることに注目。
つまりカレントディレクトリーにあるjarファイルがクラスパスに含まれる。
getLocalResources()においてローカルリソースとして指定したjarファイルは、これによって読み込まれる対象に含まれる。
試しに「./*
」をリソース名と同じ「./AppMaster.jar
」にしてもちゃんと動作したし、「./AppMaster1.jar
」という異なる名前にしたら、実行対象プログラムが見つからなくてエラーになった。
(javaコマンドがClassNotFoundExceptionを発生させるので、指定した標準エラーのログファイルにエラーメッセージが出力される)
実行するコマンド(コマンドライン)を生成する。
String getCommand() { StringBuilder sb = new StringBuilder(128); sb.append("${JAVA_HOME}/bin/java "); sb.append(Sample1AppMstr.class.getName()); sb.append(" arg1 arg2 arg3"); sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/SampleAppMstr.stdout.log"); sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/SampleAppMstr.stderr.log"); return sb.toString(); }
見て分かる通り、UNIXシェル上で実行するコマンドをそのまま文字列にしているだけ!
だからJava以外でも何でも実行できるだろう。
今回の例では、実行対象クラスをSample1AppMstrとし、引数にarg1〜arg3という文字列を渡している。
「1>」「2>」は標準出力・標準エラーのリダイレクト先の指定。
LOG_DIR_EXPANSION_VARという定数の実体は「<LOG_DIR>
」という文字列。
対象マシン上で実行される際は、この部分がログ出力のディレクトリーの実際のパスに置き換わる。
デフォルトでは「/tmp/logs/アプリケーションID/コンテナーID?」となる。
ちなみに、「sb.append()の中で文字列結合を行ってる!無駄
なオブジェクトを生成してる!」と早合点しないように。
定数同士の+演算子による結合は、コンパイル時にひとつの文字列にしてくれるから、これでいいのだ。
と思ったが、それを言うなら、そもそも全部の文字列を+演算子でつないでおく方がいいな(爆)
でもまぁ、この部分はどうせ全アプリの中で1回しか呼ばれないから、そんなに速度にこだわってもしょうがない、ということで^^;
リソースの必要量を指定する。
Resource getCapability(GetNewApplicationResponse newApp) { { int minMem = newApp.getMinimumResourceCapability().getMemory(); int maxMem = newApp.getMaximumResourceCapability().getMemory(); LOG.info("Min mem capabililty of resources in this cluster " + minMem); LOG.info("Max mem capabililty of resources in this cluster " + maxMem); if (amMemory < minMem) { LOG.info("AM memory specified below min threshold of cluster. Using min value." + ", specified=" + amMemory + ", min=" + minMem); amMemory = minMem; } else if (amMemory > maxMem) { LOG.info("AM memory specified above max threshold of cluster. Using max value." + ", specified=" + amMemory + ", max=" + maxMem); amMemory = maxMem; } } Resource capability = Records.newRecord(Resource.class); capability.setMemory(amMemory); return capability; }
取得したGetNewApplicationResponse(newApp)から、使用可能な最小および最大リソースサイズが取得できる。
自分のアプリが必要とするリソース量を、この範囲で決定する。
ここでの必要リソースとは、たぶんApplication
Masterのみのリソースであり、そこから起動する複数Containerの合計リソースではないと思う。
Containerを起動する際は改めてリソースを指定するし。
たぶん、指定したリソースが確保できるスレーブマシン上でアプリを実行するのだと思う。
であれば、全Containerのリソースの合計はスレーブマシン単体のリソースを上回ることが多いはずで、それを指定するのだと使い物にならないから。
で、リソースを指定できるとは言っても、Hadoop0.23.0で指定できるのはメモリーサイズのみ。
後々、他のリソース(CPUとかディスクとか)も指定できるようにするのだろう。
対象アプリケーション(Application Master)を実行(Resource Managerに対して実行開始依頼を)する。
void submit(ApplicationSubmissionContext appContext) throws YarnRemoteException { SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class); appRequest.setApplicationSubmissionContext(appContext); LOG.info("Submitting application to ASM"); applicationsManager.submitApplication(appRequest); }
対象アプリケーション(Application Master)の実行状況を監視(モニタリング)する。
private final long clientStartTime = System.currentTimeMillis(); private long clientTimeout = TimeUnit.MINUTES.toMillis(10);
boolean monitorApplication(ApplicationId appId) throws YarnRemoteException { while (true) { // 1秒間隔でチェック try { Thread.sleep(1000); } catch (InterruptedException e) { LOG.debug("Thread sleep in monitoring loop interrupted"); } GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(appId); GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest); ApplicationReport report = reportResponse.getApplicationReport(); YarnApplicationState state = report.getYarnApplicationState(); FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); switch (state) { case FINISHED: if (dsStatus == FinalApplicationStatus.SUCCEEDED) { LOG.info("Application has completed successfully. Breaking monitoring loop"); return true; } else { LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop"); return false; } case KILLED: case FAILED: LOG.info("Application did not finish." + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop"); return false; } if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { LOG.info("Reached client specified timeout for application. Killing application"); killApplication(appId); return false; } } }
void killApplication(ApplicationId appId) throws YarnRemoteException { KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class); request.setApplicationId(appId); applicationsManager.forceKillApplication(request); }
while文とThread.sleep()による1秒間隔の無限ループ構造(苦笑)
アプリケーションの実行状況(ステータス)を受け取り、終了していたら終了ステータスを確認する。
ステータスはNEW(生成直後)→SUBMITTED(実行受付した)→RUNNING(実行中)→FINISHED(完了)・FAILED(失敗)(→KILLED)といった流れで変化する。
NodeManagerが立ち上がっていない(通信できない)場合は、SUBMITTEDのままずっと止まる。
ApplicationMasterが失敗を返した場合や、そもそもApplicationMasterの起動に失敗した場合はFAILEDが返る。
上記のプログラムは経過時間も監視しており、指定時間を過ぎたら自分でアプリケーションをkillする。
最小限のClientアプリは、以上。長いよ!(爆)
Clientから呼ばれるApplicationMaster(App Mstr・AM)の最小限サンプル。
ただ単にJavaプログラムとして正常終了するだけでは、Client側は正常終了とは見なさない。
終了したことを(Resource Manager経由で)Client側に伝えないといけない。
最小限とは言っても、呼び出されていることを確認するためにも、メッセージだけは出力してみる。
public class Sample1AppMstr {
private static final Log LOG = LogFactory.getLog(Sample1AppMstr.class);
public static void main(String[] args) throws YarnRemoteException {
System.out.println("sample1-appMstr start");
try {
for (String arg : args) { //標準エラーに引数を出力してみる
System.err.println("arg=" + arg);
}
Sample1AppMstr app = new Sample1AppMstr();
app.run();
} finally {
System.out.println("sample1-appMstr end");
}
}
Clientの方はToolやToolRunnerを使ってみたが、こちらでは使っていない。
Toolを使いたい理由は、「-conf」といったHadoop用オプションの解釈をさせたい為。
AppMstrのプログラムの起動コマンドはClient内で自分で記述しているので、そういったオプションを使わないからToolRunnerは必要ない。
private Configuration conf; private YarnRPC rpc;
public Sample1AppMstr() { conf = new Configuration(); rpc = YarnRPC.create(conf); }
処理本体。
private ApplicationAttemptId appAttemptID; private AMRMProtocol resourceManager;
public void run() throws YarnRemoteException { LOG.info("Starting " + getClass().getSimpleName()); appAttemptID = getAppAttemptID(); resourceManager = connectToRM(); registerToRM(); finish(); LOG.info("Finish " + getClass().getSimpleName()); }
Resource Managerに接続し、正常終了を返している。
本来であればContainerで子プログラムを起動したりするが、今回は最小限サンプルなので、何もしない。
ApplicationAttemptId getAppAttemptID() { Map<String, String> envs = System.getenv(); String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); if (containerIdString == null) { throw new IllegalArgumentException("ContainerId not set in the environment"); } ContainerId containerId = ConverterUtils.toContainerId(containerIdString); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); return appAttemptID; }
環境変数AM_CONTAINER_IDに、当ApplicationMaster用のコンテナーIDが入っているらしい。
それを元にattemptIdというものを取得している。
Resource Manager(RM)へ接続する。
AMRMProtocol connectToRM() { YarnConfiguration yarnConf = new YarnConfiguration(conf); InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf); }
メソッド名からして、Resource Managerに自分自身を登録しているのだと思う。
RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class); appMasterRequest.setApplicationAttemptId(appAttemptID); // appMasterRequest.setHost(appMasterHostname); // appMasterRequest.setRpcPort(appMasterRpcPort); // appMasterRequest.setTrackingUrl(appMasterTrackingUrl); return resourceManager.registerApplicationMaster(appMasterRequest); }
元にしたサンプルではホスト名だのトラッキングURLだのを指定するようになっているが、
何を設定すればいいか分からないので、とりあえず省略!
Resource Managerに対し、正常終了(成功)を通知する。
void finish() throws YarnRemoteException { LOG.info("Application completed. Signalling finish to RM"); FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); finishReq.setAppAttemptId(appAttemptID); finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); resourceManager.finishApplicationMaster(finishReq); } }
上記のサンプルをCentOS上のシングルノードクラスターで実行してみる。
$ cd $HADOOP_COMMON_HOME $ bin/yarn-daemon.sh start resourcemanager $ bin/yarn-daemon.sh start nodemanagerちなみに、停止するには「stop」を指定する。
$ cd sample23.jarが置いてあるディレクトリー $ $HADOOP_COMMON_HOME/bin/yarn jar sample23.jar sample1.Sample1Client sample23.jarちなみに、実行すると、カレントディレクトリーにbuildというディレクトリーが作られる模様。
yarnコマンドに対し、実行するjarファイルとクラスをjarオプションで指定する。
今回のサンプルでは第1引数にApplicationMasterの入っているjarファイルのパスを指定するようにしてある。
もしClientとAppMstrのjarファイルが別なら、以下のような指定になるだろう。
$ $HADOOP_COMMON_HOME/bin/yarn jar sample23-client.jar sample1.Sample1Client sample23-appMstr.jar
リソースマネージャーやノードマネージャーと共にヒストリーサーバーを起動しておくと、ジョブ実行完了時の状態やファイル・ディレクトリーの移動削除の状況がログに出力される。[2011-11-29]
$ cd $HADOOP_COMMON_HOME $ bin/yarn-daemon.sh start historyserver
$ cd $HADOOP_COMMON_HOME/logs $ ls yarn-hishidama-historyserver-localhost.localdomain.out yarn-hishidama-nodemanager-localhost.localdomain.out yarn-hishidama-resourcemanager-localhost.localdomain.out