Hadoop0.23のYARNのメモ第2弾。 コンテナーの起動まで。
最初にコンテナー抜きのサンプルを作ったので、今度はコンテナーを起動するサンプル。
ただし、コンテナーの起動は出来たものの、戻り値を受け取る上手い方法を見つけられなかったので、今回はその部分は無し!
つまりコンテナーを起動するだけ(結果はログに出力するだけ)^^;
参考として従来のMapReduceで動く円周率算出(モンテカルロ法)のサンプルを使って作ってみたが、
結果を返さないので、別に円周率算出でなくてもよかったな…(苦笑)
MapReduce版のサンプルでは、Map処理を複数起動して数値の算出を行い、Reduce処理で合算している。
最初に入力ディレクトリーをHDFS上に1つ作り、Map数と同じだけ入力ファイルを作っている。
(従来のMapReduceでは、入力ファイル数(と大きなファイルの場合はブロック数)の個数だけMapタスクが作られる。円周率算出サンプルではそれを利用している)
数値を算出する範囲の指定(初期情報)は、各ファイル内にデータとして持たせている。
Reduceタスクは1つにしてあり、全てのMapタスクでの処理結果がReducerクラスに入ってくる。
なので、合算処理は特に面倒は無い。
指定された出力ディレクトリーに普通に結果を出力する。
Map・Reduceを実行するジョブ(mainメソッド)の中でReduceの出力ディレクトリーを決めているので、出力場所は分かる。
MapReduce終了後にそこから結果ファイルを読み出し、内容をコンソールに出力してプログラム全体を終了する。
つまり、基本的な入出力(データのやりとり)は全てHDFS上のファイルを介して行っている。
YARNではタスク分割とHDFSには何の関係もなく自分で制御できるので、今回は起動するJavaプログラムの引数として初期情報を渡す。
戻り値については従来版と同様にHDFSを経由すれば取得できるだろうけど、ファイル名をどうするか(特にリトライした場合)の制御とかが面倒そうなので、とりあえず保留。
…そもそも少量のデータをやりとりする機構が欲しいんだけど、あるのかなぁ?
progress(進捗状況)の情報には自由なクラスは渡せなさそうだし、
diagnosticsとしてメッセージ(String)は渡せるんだけど、これはエラー発生時に使うものみたいだし。
アプリケーションマスター(AppMstr・AM)を呼び出すクライアントは、最初に作った最小限クライアントとほぼ同じ。
ただし、元にした円周率算出サンプルでは第1引数に並列数、第2引数にサンプリング数を渡しているので、それを踏襲する。
public class Sample2Client {
/** AppMstrのクラス名 */
protected Class<?> appMasterClass = Sample2AppMstr.class;
/** 起動するコンテナー数 */
private int nContainers; //元にしたサンプルではnMaps、つまり起動するマップ数だった
/** 演算(サンプリング)する回数 */
private long nSamples;
@Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: " + getClass().getName() + " <nContainers> <nSamples>"); throw new IllegalArgumentException("args.length=" + args.length); } nContainers = Integer.parseInt(args[0]); nSamples = Long.parseLong(args[1]); 〜 }
protected String getCommand() { StringBuilder sb = new StringBuilder(128); sb.append("${JAVA_HOME}/bin/java "); sb.append(appMasterClass.getName()); sb.append(" "); sb.append(nContainers); sb.append(" "); sb.append(nSamples); sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout.log"); sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr.log"); return sb.toString(); }
protected Map<String, LocalResource> getLocalResources() throws IOException {
Path jarPath = new Path(new File(ClassUtil.findContainingJar(appMasterClass)).toURI());
〜
}
クライアントプログラムでは、アプリケーションマスター用のローカルリソース(ローカルファイル)としてAppMstrが入っているjarファイルを準備している。
最初に作った最小限クライアントではjarファイル(の場所)をmain()の引数として受け取るようにしていたが、
今回は全てのクラスが1つのjarファイルに入っている(ClientとAppMstrが同じjarファイルに入っている)という前提で
クラスローダーからjarファイル名を取得するようにした。
その為のClassUtilというクラスを用意したが、これは従来のHadoopのJobConf#findContainingJar()の処理そのもの。publicなメソッドだったら、そのまま呼び出すのに…。
アプリケーションマスターもクライアントと同様に、円周率算出用の引数を受け取るように改造する。
public class Sample2AppMstr {
/** 起動するコンテナー数 */ private int nContainers; /** 演算(サンプリング)する回数 */ private long nSamples;
void init(String[] args) { if (args.length != 2) { System.err.println("Usage: " + getClass().getName() + " <nContainers> <nSamples>"); throw new IllegalArgumentException("args.length=" + args.length); } nContainers = Integer.parseInt(args[0]); nSamples = Long.parseLong(args[1]); }
最小限AppMstrに対し、コンテナーを起動する処理を追加する。
public int run(String[] args) throws YarnRemoteException { init(args); //追加 appAttemptID = getAppAttemptID(); resourceManager = connectToRM(); RegisterApplicationMasterResponse amResponse = registerToRM(); initResource(amResponse); //追加 loop(); //追加 finish(); return 0; }
追加したのは、コンテナーの必要リソースを初期化する処理と、コンテナーの起動および監視を行う処理。
コンテナーのリソースサイズを決めておく処理。
/** コンテナーを実行するのに必要な(要求する)メモリーサイズ */ protected int containerMemory = 1 * 1024;
void initResource(RegisterApplicationMasterResponse response) { int minMem = response.getMinimumResourceCapability().getMemory(); int maxMem = response.getMaximumResourceCapability().getMemory(); LOG.info("Min mem capabililty of resources in this cluster " + minMem); LOG.info("Max mem capabililty of resources in this cluster " + maxMem); if (containerMemory < minMem) { LOG.info("Container memory specified below min threshold of cluster. Using min value." + ", specified=" + containerMemory + ", min=" + minMem); containerMemory = minMem; } else if (containerMemory > maxMem) { LOG.info("Container memory specified above max threshold of cluster. Using max value." + ", specified=" + containerMemory + ", max=" + maxMem); containerMemory = maxMem; } }
クライアントがアプリマスターを起動する際にも必要リソースを指定していたが、それと全く同じ。
ただしコンテナーは複数起動されるので、このメソッドでは値を初期化するのに留め、リソースオブジェクトの生成は後で個別に行うようにした。
コンテナーの起動と監視を行う。
コンテナーの起動に関しては、元にしたYARNのサンプルではややこしい事をしている。
起動用のスレッドを起こして、そこでコンテナーのプログラムを実行している。
監視にはこのスレッドは使用していない。起動のみ。
監視用に途中結果(完了結果)をまとめて受け取るメソッドがあるので、コンテナーの実行状態はそちらで取得して処理する。
最初に、実行するコンテナーの総数を決める。
/** 起動するコンテナー数 */ private int numTotalContainers;
void loop() throws YarnRemoteException { Queue<ContainerLauncher> queue = getContainerQueue(); numTotalContainers = queue.size();
Queue<ContainerLauncher> getContainerQueue() { Queue<ContainerLauncher> queue = new ArrayBlockingQueue<ContainerLauncher>(nContainers); for (int i = 0; i < nContainers; i++) { Sample2Launcher launcher = new Sample2Launcher(i * nSamples, nSamples); queue.add(launcher); } return queue; }
コンテナー起動用のスレッドを作成する為のクラスを別途作るようにした。
それをキューに入れて返している。
なぜこんな作りにしたかというと、リランする際に「コンテナー起動用スレッド」を作り直せるようにする為。
(JavaのThreadは一度実行が終わったら再実行することは出来ない)
public interface ContainerLauncher { public LaunchThread create(); }
class Sample2Launcher implements ContainerLauncher { private long offset; private long samples; /** コンストラクター */ public Sample2Launcher(long offset, long samples) { this.offset = offset; this.samples = samples; } @Override public LaunchThread create() { return new Sample2LaunchThread(); } class Sample2LaunchThread extends LaunchThread { 〜 } }
コンテナープログラムを実行する為のスレッド。
public abstract class LaunchThread extends Thread { /** アロケートされたコンテナー */ private Container container; /** ContainerManager */ private ContainerManager cm; public void init(Container container) { this.container = container; } }
class Sample2LaunchThread extends LaunchThread { @Override public void run() { try { this.cm = connectToCM(); ContainerLaunchContext ctx = getContainerLaunchContext(); startContainer(ctx); } catch (Exception e) { numCompletedContainers.incrementAndGet(); numFailedContainers.incrementAndGet(); e.printStackTrace(); } }
スレッドの処理本体は至ってシンプルで、コンテナー用のコンテキスト(リソース・実行情報)を生成し、実行するだけ。
例外が発生した場合は失敗件数をカウントしている。
(ただし全例外の処理がこれでいいかについては何とも…)
コンテナーマネージャーへ接続する。
ContainerManager connectToCM() { String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort(); InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); LOG.info("Connecting to ResourceManager at " + cmIpPortStr); return (ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf); }
実行するコンテナーの実行方法を設定する。
ContainerLaunchContext getContainerLaunchContext() throws IOException { ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); ctx.setContainerId(container.getId()); ctx.setResource(container.getResource()); try { ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName()); } catch (IOException e) { LOG.info("Getting current user info failed when trying to launch the container" + e.getMessage()); } Map<String, LocalResource> localResources = getLocalResources(); ctx.setLocalResources(localResources); Map<String, String> env = getEnvironment(); ctx.setEnvironment(env); String command = getCommand(); List<String> commands = Collections.singletonList(command); ctx.setCommands(commands); return ctx; }
基本的にはクライアントがアプリマスターを実行する際の指定方法と同じ。
リソース(メモリーサイズ)や優先度については起動可能コンテナー数を取得する際に指定するので、ここでは指定しない。
今回実行するのは、以下のようなコンテナープログラム。
String getCommand() { StringBuilder sb = new StringBuilder(128); sb.append("${JAVA_HOME}/bin/java "); sb.append(Sample2Container.getName()); sb.append(" "); sb.append(offset); sb.append(" "); sb.append(samples); sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stdout.log"); sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stderr.log"); return sb.toString(); }
ログディレクトリーの指定方法はアプリマスターのと全く同じだが、
実際の実行時にはコンテナーIDの付いたディレクトリーになる為、かぶることは無い。
コンテナープログラムを実行(起動)する。(クライアントがアプリマスターを実行する際のsubmitに相当する)
void startContainer(ContainerLaunchContext ctx) throws YarnRemoteException { StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class); startReq.setContainerLaunchContext(ctx); cm.startContainer(startReq); }
loop()でコンテナー起動用のキューを取得した後からの続き。
while (numCompletedContainers.get() < numTotalContainers) {
まず、キューのサイズ(起動したいコンテナーの残りの数)を取得し、それを引数として
リソースマネージャーにコンテナーの状態を問い合わせる。
すると、アロケートされたコンテナー一覧(使用可能なコンテナー)が取得できる。
int askCount = queue.size(); List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>(); if (askCount > 0) { ResourceRequest containerAsk = setupContainerAskForRM(askCount); resourceReq.add(containerAsk); } AMResponse amResp = sendContainerAskToRM(resourceReq); List<Container> allocatedContainers = amResp.getAllocatedContainers(); numAllocatedContainers.addAndGet(allocatedContainers.size());
アロケートされた分、コンテナープログラムを起動する。
ここでスレッドを使用する。
なお、リソースが余っている場合は、自分が欲しい個数より多くのコンテナーがアロケートされる模様。
今回は、不要なコンテナーはリリースせずに放ってある(爆)(リリースすると後続の監視処理でFAIL扱いになる為)
for (Container allocatedContainer : allocatedContainers) { ContainerLauncher launcher = queue.poll(); if (launcher != null) { LaunchThread thread = launcher.create(); thread.init(allocatedContainer); launcherMap.put(allocatedContainer.getId(), launcher); launchThreads.add(thread); thread.start(); } }
クラスターのリソース状態を取得することが出来る。
Resource availableResources = amResp.getAvailableResources(); LOG.info("Current available resources in the cluster " + availableResources);
完了したコンテナーを取得し、成功/失敗に応じてカウンターを増減させる。
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses(); for (ContainerStatus containerStatus : completedContainers) { ExitStatusAction exitStatus = checkExitStatus(containerStatus); switch (exitStatus) { case SUCCEEDED: numCompletedContainers.incrementAndGet(); LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId()); break; case FAILED: numCompletedContainers.incrementAndGet(); numFailedContainers.incrementAndGet(); break; case RETRY: numAllocatedContainers.decrementAndGet(); ContainerLauncher launcher = launcherMap.get(containerStatus.getContainerId()); if (launcher != null) { queue.add(launcher); } break; default: throw new Error("exitStatus=" + exitStatus); } }
完了したコンテナー数が当初予定の総数に達したら、ループを終了する。
失敗したコンテナーも完了コンテナー数に数えているので、失敗数も考慮に入れた方がいいかも。
if (numCompletedContainers.get() == numTotalContainers) {
break; // ループ終了
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
}
} //while
// スレッドの終了待ち
for (Thread launchThread : launchThreads) {
try {
launchThread.join(10 * 1000);
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
e.printStackTrace();
}
}
} //loop()
コンテナーについて問い合わせる情報を生成する。
ResourceRequest setupContainerAskForRM(int numContainers) { ResourceRequest request = Records.newRecord(ResourceRequest.class); request.setHostName("*"); request.setNumContainers(numContainers); Priority pri = Records.newRecord(Priority.class); pri.setPriority(requestPriority); request.setPriority(pri); Resource capability = Records.newRecord(Resource.class); capability.setMemory(containerMemory); request.setCapability(capability); return request; }
コンテナーのアロケートを行う。
/** リソースマネージャーへのRPC呼び出し毎に増加させるカウンター */ private AtomicInteger rmRequestID = new AtomicInteger(); /** リリースされるコンテナー */ private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) throws YarnRemoteException { AllocateRequest req = Records.newRecord(AllocateRequest.class); req.setResponseId(rmRequestID.incrementAndGet()); req.setApplicationAttemptId(appAttemptID); req.addAllAsks(requestedContainers); req.addAllReleases(releasedContainers); req.setProgress((float) numCompletedContainers.get() / numTotalContainers); LOG.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size() + ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress()); for (ResourceRequest rsrcReq : requestedContainers) { LOG.info("Requested container ask: " + rsrcReq.toString()); } for (ContainerId id : releasedContainers) { LOG.info("Released container, id=" + id.getId()); } AllocateResponse resp = resourceManager.allocate(req); return resp.getAMResponse(); }
アロケートを要求するコンテナーの他に、リリースするコンテナーも指定できる。(今回は特に使っていないが)
完了したコンテナーの成功・失敗・リトライを判定する。
public static enum ExitStatusAction { SUCCEEDED, FAILED, RETRY }
ExitStatusAction checkExitStatus(ContainerStatus containerStatus) { int exitStatus = containerStatus.getExitStatus(); switch (exitStatus) { case 0: return ExitStatusAction.SUCCEEDED; case -100: return ExitStatusAction.RETRY; default: return ExitStatusAction.FAILED; } }
exitStatusは、コンテナープログラムの終了コードだと思われる。
-100は、何らかの事情によりコンテナーがロストした場合に返ってくる模様。
実行したコンテナー数や失敗したコンテナー数を数える為のカウンターをいくつか使っている。
/** 完了したコンテナー数 */ private AtomicInteger numCompletedContainers = new AtomicInteger(); /** アロケートされたコンテナー数 */ private AtomicInteger numAllocatedContainers = new AtomicInteger(); /** 失敗したコンテナー数 */ private AtomicInteger numFailedContainers = new AtomicInteger();
AtomicIntegerを使っているのは、マルチスレッドでの動作を想定している為。
考慮しなければならないスレッドはLaunchThreadだが、LaunchThread内で使っているnumCompletedContainersとnumFailedContainers以外は
マルチスレッドでアクセスされることがないので、本当はAtomicIntegerである必要はない。
アプリケーションマスターから(複数)起動され、演算の本体を行うプログラムがコンテナー。
YARNとしては、別にプログラムでなくてもシェルスクリプトでも構わない。
アプリマスターの場合は環境変数経由でクライアントとの通信(進捗状況の報告および完了通知)用の情報を取得できるが、
コンテナーのプログラムの場合はそういったものは無く、Javaで言えば普通にmain()が呼ばれるバッチアプリ。
円周率算出のコンテナーは、元のMap処理とほぼ同じ。
ただし元々はHDFSへ値を出力していたが、今回はログ出力しているだけ。
public class Sample2Container {
public static void main(String[] args) { Sample2Container container = new Sample2Container(); container.init(args); int r = container.run(); System.exit(r); }
private long offset; private long samples; public void init(String[] args) { offset = Long.parseLong(args[0]); samples = Long.parseLong(args[1]); System.out.println("offset= " + offset); System.out.println("samples=" + samples); }
public int run() { // 元のQmcMapper#map()と同等 final HaltonSequence haltonsequence = new HaltonSequence(offset); long numInside = 0L; long numOutside = 0L; for (long i = 0; i < samples;) { // generate points in a unit square final double[] point = haltonsequence.nextPoint(); // count points inside/outside of the inscribed circle of the square final double x = point[0] - 0.5; final double y = point[1] - 0.5; if (x * x + y * y > 0.25) { numOutside++; } else { numInside++; } // report status i++; if (i % 1000 == 0) { // 元はcontext.setStatus()で渡していた System.err.println("Generated " + i + " samples."); } } // 元はcontext.write()でHDFSに出力していた System.out.println("numInside= " + numInside); System.out.println("numOutside=" + numOutside); return 0; }
private static class HaltonSequence { 〜 } }
上記のサンプルをCentOS上のシングルノードクラスターで実行してみる。
$ $HADOOP_COMMON_HOME/bin/yarn jar ~/Desktop/sample23.jar sample.yarn.sample2.Sample2Client 4 2000
シングルノードなので、ログは全て同一マシン上に出力される。
本当の分散クラスターなら稼動したノード上に稼動した分だけ出力されると思う。
$ cd /tmp/logs $ cd application_1322294777708_0014 ←アプリケーションIDがディレクトリー名になっている $ ls container_1322294777708_0014_01_000001 ←アプリケーションマスターのログ container_1322294777708_0014_01_000002 ←コンテナープログラムのログ container_1322294777708_0014_01_000003 container_1322294777708_0014_01_000004 container_1322294777708_0014_01_000005 $ cd *2 $ ls container.stderr.log container.stdout.log $ cat container.stderr.log Generated 1000 samples. Generated 2000 samples. $ cat container.stdout.log offset= 0 samples=2000 numInside= 1572 numOutside=428
アプリケーションIDがついたディレクトリーの下にコンテナーIDがついたディレクトリーがある。
アプリケーションマスターもコンテナーとして動いているので、IDだけ見ても区別が付かない(一番最初に実行しているから、1のはずだけどw)。
ログファイルには、コンテナープログラムが標準出力・標準エラーに出力したメッセージがちゃんと出ている。