S-JIS[2011-11-27] 変更履歴

Hadoop0.23 YARN

Hadoop0.23YARNのメモ第2弾。 コンテナーの起動まで。


円周率算出サンプル(中途半端版)

最初にコンテナー抜きのサンプルを作ったので、今度はコンテナーを起動するサンプル。

ただし、コンテナーの起動は出来たものの、戻り値を受け取る上手い方法を見つけられなかったので、今回はその部分は無し!
つまりコンテナーを起動するだけ(結果はログに出力するだけ)^^;

参考として従来のMapReduceで動く円周率算出(モンテカルロ法)のサンプルを使って作ってみたが、
結果を返さないので、別に円周率算出でなくてもよかったな…(苦笑)

ソース全体


MapReduce版のサンプルでは、Map処理を複数起動して数値の算出を行い、Reduce処理で合算している。

最初に入力ディレクトリーをHDFS上に1つ作り、Map数と同じだけ入力ファイルを作っている。
(従来のMapReduceでは、入力ファイル数(と大きなファイルの場合はブロック数)の個数だけMapタスクが作られる。円周率算出サンプルではそれを利用している)
数値を算出する範囲の指定(初期情報)は、各ファイル内にデータとして持たせている。

Reduceタスクは1つにしてあり、全てのMapタスクでの処理結果がReducerクラスに入ってくる。
なので、合算処理は特に面倒は無い。
指定された出力ディレクトリーに普通に結果を出力する。

Map・Reduceを実行するジョブ(mainメソッド)の中でReduceの出力ディレクトリーを決めているので、出力場所は分かる。
MapReduce終了後にそこから結果ファイルを読み出し、内容をコンソールに出力してプログラム全体を終了する。

つまり、基本的な入出力(データのやりとり)は全てHDFS上のファイルを介して行っている。


YARNではタスク分割とHDFSには何の関係もなく自分で制御できるので、今回は起動するJavaプログラムの引数として初期情報を渡す。

戻り値については従来版と同様にHDFSを経由すれば取得できるだろうけど、ファイル名をどうするか(特にリトライした場合)の制御とかが面倒そうなので、とりあえず保留。

…そもそも少量のデータをやりとりする機構が欲しいんだけど、あるのかなぁ?
progress(進捗状況)の情報には自由なクラスは渡せなさそうだし、
diagnosticsとしてメッセージ(String)は渡せるんだけど、これはエラー発生時に使うものみたいだし。


クライアント

アプリケーションマスター(AppMstr・AM)を呼び出すクライアントは、最初に作った最小限クライアントとほぼ同じ。

ただし、元にした円周率算出サンプルでは第1引数に並列数、第2引数にサンプリング数を渡しているので、それを踏襲する。

public class Sample2Client {
	/** AppMstrのクラス名 */
	protected Class<?> appMasterClass = Sample2AppMstr.class;

	/** 起動するコンテナー数 */
	private int nContainers;	//元にしたサンプルではnMaps、つまり起動するマップ数だった
	/** 演算(サンプリング)する回数 */
	private long nSamples;
	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: " + getClass().getName() + " <nContainers> <nSamples>");
			throw new IllegalArgumentException("args.length=" + args.length);
		}

		nContainers = Integer.parseInt(args[0]);
		nSamples = Long.parseLong(args[1]);
〜
	}
	protected String getCommand() {
		StringBuilder sb = new StringBuilder(128);

		sb.append("${JAVA_HOME}/bin/java ");
		sb.append(appMasterClass.getName());
		sb.append(" ");
		sb.append(nContainers);
		sb.append(" ");
		sb.append(nSamples);
		sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout.log");
		sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr.log");

		return sb.toString();
	}

	protected Map<String, LocalResource> getLocalResources() throws IOException {
		Path jarPath = new Path(new File(ClassUtil.findContainingJar(appMasterClass)).toURI());
〜
	}

クライアントプログラムでは、アプリケーションマスター用のローカルリソース(ローカルファイル)としてAppMstrが入っているjarファイルを準備している。

最初に作った最小限クライアントではjarファイル(の場所)をmain()の引数として受け取るようにしていたが、
今回は全てのクラスが1つのjarファイルに入っている(ClientとAppMstrが同じjarファイルに入っている)という前提で
クラスローダーからjarファイル名を取得するようにした。

その為のClassUtilというクラスを用意したが、これは従来のHadoopのJobConf#findContainingJar()の処理そのもの。publicなメソッドだったら、そのまま呼び出すのに…。


アプリケーションマスター

アプリケーションマスターもクライアントと同様に、円周率算出用の引数を受け取るように改造する。

public class Sample2AppMstr {
	/** 起動するコンテナー数 */
	private int nContainers;
	/** 演算(サンプリング)する回数 */
	private long nSamples;
	void init(String[] args) {
		if (args.length != 2) {
			System.err.println("Usage: " + getClass().getName() + " <nContainers> <nSamples>");
			throw new IllegalArgumentException("args.length=" + args.length);
		}

		nContainers = Integer.parseInt(args[0]);
		nSamples = Long.parseLong(args[1]);
	}

run()

最小限AppMstrに対し、コンテナーを起動する処理を追加する。

	public int run(String[] args) throws YarnRemoteException {
		init(args); //追加

		appAttemptID = getAppAttemptID();
		resourceManager = connectToRM();
		RegisterApplicationMasterResponse amResponse = registerToRM();
		initResource(amResponse); //追加

		loop(); //追加

		finish();
		return 0;
	}

追加したのは、コンテナーの必要リソースを初期化する処理と、コンテナーの起動および監視を行う処理。


initResource()

コンテナーのリソースサイズを決めておく処理。

	/** コンテナーを実行するのに必要な(要求する)メモリーサイズ */
	protected int containerMemory = 1 * 1024;
	void initResource(RegisterApplicationMasterResponse response) {
		int minMem = response.getMinimumResourceCapability().getMemory();
		int maxMem = response.getMaximumResourceCapability().getMemory();
		LOG.info("Min mem capabililty of resources in this cluster " + minMem);
		LOG.info("Max mem capabililty of resources in this cluster " + maxMem);

		if (containerMemory < minMem) {
			LOG.info("Container memory specified below min threshold of cluster. Using min value." + ", specified=" + containerMemory + ", min=" + minMem);
			containerMemory = minMem;
		} else if (containerMemory > maxMem) {
			LOG.info("Container memory specified above max threshold of cluster. Using max value." + ", specified=" + containerMemory + ", max=" + maxMem);
			containerMemory = maxMem;
		}
	}

クライアントがアプリマスターを起動する際にも必要リソースを指定していたが、それと全く同じ。
ただしコンテナーは複数起動されるので、このメソッドでは値を初期化するのに留め、リソースオブジェクトの生成は後で個別に行うようにした。


loop()

コンテナーの起動と監視を行う。

  1. 実行するコンテナーの総数を決める。
  2. リソースマネージャーから起動可能なコンテナー数を取得する。
  3. コンテナーを起動する。
  4. コンテナーを監視する。(完了・失敗のカウント)
  5. 最初に決めた総数に達するまで、2.から繰り返す。

コンテナーの起動に関しては、元にしたYARNのサンプルではややこしい事をしている。
起動用のスレッドを起こして、そこでコンテナーのプログラムを実行している。
監視にはこのスレッドは使用していない。起動のみ。

監視用に途中結果(完了結果)をまとめて受け取るメソッドがあるので、コンテナーの実行状態はそちらで取得して処理する。


getContainerQueue()

最初に、実行するコンテナーの総数を決める。

	/** 起動するコンテナー数 */
	private int numTotalContainers;
	void loop() throws YarnRemoteException {
		Queue<ContainerLauncher> queue = getContainerQueue();
		numTotalContainers = queue.size();
	Queue<ContainerLauncher> getContainerQueue() {
		Queue<ContainerLauncher> queue = new ArrayBlockingQueue<ContainerLauncher>(nContainers);

		for (int i = 0; i < nContainers; i++) {
			Sample2Launcher launcher = new Sample2Launcher(i * nSamples, nSamples);
			queue.add(launcher);
		}

		return queue;
	}

コンテナー起動用のスレッドを作成する為のクラスを別途作るようにした。
それをキューに入れて返している。

なぜこんな作りにしたかというと、リランする際に「コンテナー起動用スレッド」を作り直せるようにする為。
(JavaのThreadは一度実行が終わったら再実行することは出来ない)

public interface ContainerLauncher {
	public LaunchThread create();
}
class Sample2Launcher implements ContainerLauncher {
	private long offset;
	private long samples;

	/** コンストラクター */
	public Sample2Launcher(long offset, long samples) {
		this.offset = offset;
		this.samples = samples;
	}

	@Override
	public LaunchThread create() {
		return new Sample2LaunchThread();
	}

	class Sample2LaunchThread extends LaunchThread {
		〜
	}
}

Sample2LaunchThread

コンテナープログラムを実行する為のスレッド。

public abstract class LaunchThread extends Thread {
	/** アロケートされたコンテナー */
	private Container container;
	/** ContainerManager */
	private ContainerManager cm;

	public void init(Container container) {
		this.container = container;
	}
}
class Sample2LaunchThread extends LaunchThread {

	@Override
	public void run() {
		try {
			this.cm = connectToCM();

			ContainerLaunchContext ctx = getContainerLaunchContext();
			startContainer(ctx);

		} catch (Exception e) {
			numCompletedContainers.incrementAndGet();
			numFailedContainers.incrementAndGet();
			e.printStackTrace();
		}
	}

スレッドの処理本体は至ってシンプルで、コンテナー用のコンテキスト(リソース・実行情報)を生成し、実行するだけ。

例外が発生した場合は失敗件数をカウントしている。
(ただし全例外の処理がこれでいいかについては何とも…)


connectToCM()

コンテナーマネージャーへ接続する。

	ContainerManager connectToCM() {
		String cmIpPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort();
		InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
		LOG.info("Connecting to ResourceManager at " + cmIpPortStr);

		return (ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf);
	}

getContainerLaunchContext()

実行するコンテナーの実行方法を設定する。

	ContainerLaunchContext getContainerLaunchContext() throws IOException {
		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);

		ctx.setContainerId(container.getId());
		ctx.setResource(container.getResource());

		try {
			ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
		} catch (IOException e) {
			LOG.info("Getting current user info failed when trying to launch the container" + e.getMessage());
		}

		Map<String, LocalResource> localResources = getLocalResources();
		ctx.setLocalResources(localResources);

		Map<String, String> env = getEnvironment();
		ctx.setEnvironment(env);

		String command = getCommand();
		List<String> commands = Collections.singletonList(command);
		ctx.setCommands(commands);

		return ctx;
	}

基本的にはクライアントがアプリマスターを実行する際の指定方法と同じ。
リソース(メモリーサイズ)や優先度については起動可能コンテナー数を取得する際に指定するので、ここでは指定しない。

今回実行するのは、以下のようなコンテナープログラム。

	String getCommand() {
		StringBuilder sb = new StringBuilder(128);

		sb.append("${JAVA_HOME}/bin/java ");
		sb.append(Sample2Container.getName());
		sb.append(" ");
		sb.append(offset);
		sb.append(" ");
		sb.append(samples);
		sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stdout.log");
		sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/container.stderr.log");

		return sb.toString();
	}

ログディレクトリーの指定方法はアプリマスターのと全く同じだが、
実際の実行時にはコンテナーIDの付いたディレクトリーになる為、かぶることは無い。


startContainer()

コンテナープログラムを実行(起動)する。(クライアントがアプリマスターを実行する際のsubmitに相当する)

	void startContainer(ContainerLaunchContext ctx) throws YarnRemoteException {
		StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
		startReq.setContainerLaunchContext(ctx);
		cm.startContainer(startReq);
	}

loop()の続き

loop()でコンテナー起動用のキューを取得した後からの続き。

		while (numCompletedContainers.get() < numTotalContainers) {

まず、キューのサイズ(起動したいコンテナーの残りの数)を取得し、それを引数として
リソースマネージャーにコンテナーの状態を問い合わせる。
すると、アロケートされたコンテナー一覧(使用可能なコンテナー)が取得できる。

			int askCount = queue.size();
			List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
			if (askCount > 0) {
				ResourceRequest containerAsk = setupContainerAskForRM(askCount);
				resourceReq.add(containerAsk);
			}
			AMResponse amResp = sendContainerAskToRM(resourceReq);
			List<Container> allocatedContainers = amResp.getAllocatedContainers();

			numAllocatedContainers.addAndGet(allocatedContainers.size());

アロケートされた分、コンテナープログラムを起動する。
ここでスレッドを使用する。
なお、リソースが余っている場合は、自分が欲しい個数より多くのコンテナーがアロケートされる模様。
今回は、不要なコンテナーはリリースせずに放ってある(爆)(リリースすると後続の監視処理でFAIL扱いになる為)

			for (Container allocatedContainer : allocatedContainers) {
				ContainerLauncher launcher = queue.poll();
				if (launcher != null) {
					LaunchThread thread = launcher.create();
					thread.init(allocatedContainer);
					launcherMap.put(allocatedContainer.getId(), launcher);
					launchThreads.add(thread);
					thread.start();
				}
			}

クラスターのリソース状態を取得することが出来る。

			Resource availableResources = amResp.getAvailableResources();
			LOG.info("Current available resources in the cluster " + availableResources);

完了したコンテナーを取得し、成功/失敗に応じてカウンターを増減させる。

			List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
			for (ContainerStatus containerStatus : completedContainers) {
				ExitStatusAction exitStatus = checkExitStatus(containerStatus);
				switch (exitStatus) {
				case SUCCEEDED:
					numCompletedContainers.incrementAndGet();
					LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
					break;
				case FAILED:
					numCompletedContainers.incrementAndGet();
					numFailedContainers.incrementAndGet();
					break;
				case RETRY:
					numAllocatedContainers.decrementAndGet();
					ContainerLauncher launcher = launcherMap.get(containerStatus.getContainerId());
					if (launcher != null) {
						queue.add(launcher);
					}
					break;
				default:
					throw new Error("exitStatus=" + exitStatus);
				}
			}

完了したコンテナー数が当初予定の総数に達したら、ループを終了する。
失敗したコンテナーも完了コンテナー数に数えているので、失敗数も考慮に入れた方がいいかも。

			if (numCompletedContainers.get() == numTotalContainers) {
				break; // ループ終了
			}

			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				LOG.debug("Thread sleep in monitoring loop interrupted");
			}
		} //while
		// スレッドの終了待ち
		for (Thread launchThread : launchThreads) {
			try {
				launchThread.join(10 * 1000);
			} catch (InterruptedException e) {
				LOG.info("Exception thrown in thread join: " + e.getMessage());
				e.printStackTrace();
			}
		}
	} //loop()

setupContainerAskForRM()

コンテナーについて問い合わせる情報を生成する。

	ResourceRequest setupContainerAskForRM(int numContainers) {
		ResourceRequest request = Records.newRecord(ResourceRequest.class);

		request.setHostName("*");
		request.setNumContainers(numContainers);

		Priority pri = Records.newRecord(Priority.class);
		pri.setPriority(requestPriority);
		request.setPriority(pri);

		Resource capability = Records.newRecord(Resource.class);
		capability.setMemory(containerMemory);
		request.setCapability(capability);

		return request;
	}

sendContainerAskToRM()

コンテナーのアロケートを行う。

	/** リソースマネージャーへのRPC呼び出し毎に増加させるカウンター */
	private AtomicInteger rmRequestID = new AtomicInteger();

	/** リリースされるコンテナー */
	private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
	AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers) throws YarnRemoteException {
		AllocateRequest req = Records.newRecord(AllocateRequest.class);
		req.setResponseId(rmRequestID.incrementAndGet());
		req.setApplicationAttemptId(appAttemptID);
		req.addAllAsks(requestedContainers);
		req.addAllReleases(releasedContainers);
		req.setProgress((float) numCompletedContainers.get() / numTotalContainers);

		LOG.info("Sending request to RM for containers" + ", requestedSet=" + requestedContainers.size() + ", releasedSet=" + releasedContainers.size() + ", progress=" + req.getProgress());
		for (ResourceRequest rsrcReq : requestedContainers) {
			LOG.info("Requested container ask: " + rsrcReq.toString());
		}
		for (ContainerId id : releasedContainers) {
			LOG.info("Released container, id=" + id.getId());
		}

		AllocateResponse resp = resourceManager.allocate(req);
		return resp.getAMResponse();
	}

アロケートを要求するコンテナーの他に、リリースするコンテナーも指定できる。(今回は特に使っていないが)


checkExitStatus()

完了したコンテナーの成功・失敗・リトライを判定する。

	public static enum ExitStatusAction {
		SUCCEEDED, FAILED, RETRY
	}
	ExitStatusAction checkExitStatus(ContainerStatus containerStatus) {
		int exitStatus = containerStatus.getExitStatus();
		switch (exitStatus) {
		case 0:
			return ExitStatusAction.SUCCEEDED;
		case -100:
			return ExitStatusAction.RETRY;
		default:
			return ExitStatusAction.FAILED;
		}
	}

exitStatusは、コンテナープログラムの終了コードだと思われる。
-100は、何らかの事情によりコンテナーがロストした場合に返ってくる模様。


カウンター

実行したコンテナー数や失敗したコンテナー数を数える為のカウンターをいくつか使っている。

	/** 完了したコンテナー数 */
	private AtomicInteger numCompletedContainers = new AtomicInteger();

	/** アロケートされたコンテナー数 */
	private AtomicInteger numAllocatedContainers = new AtomicInteger();

	/** 失敗したコンテナー数 */
	private AtomicInteger numFailedContainers = new AtomicInteger();

AtomicIntegerを使っているのは、マルチスレッドでの動作を想定している為。
考慮しなければならないスレッドはLaunchThreadだが、LaunchThread内で使っているnumCompletedContainersとnumFailedContainers以外は
マルチスレッドでアクセスされることがないので、本当はAtomicIntegerである必要はない。


コンテナー

アプリケーションマスターから(複数)起動され、演算の本体を行うプログラムがコンテナー。
YARNとしては、別にプログラムでなくてもシェルスクリプトでも構わない。

アプリマスターの場合は環境変数経由でクライアントとの通信(進捗状況の報告および完了通知)用の情報を取得できるが、
コンテナーのプログラムの場合はそういったものは無く、Javaで言えば普通にmain()が呼ばれるバッチアプリ。

円周率算出のコンテナーは、元のMap処理とほぼ同じ。
ただし元々はHDFSへ値を出力していたが、今回はログ出力しているだけ。

public class Sample2Container {
	public static void main(String[] args) {
		Sample2Container container = new Sample2Container();
		container.init(args);
		int r = container.run();
		System.exit(r);
	}
	private long offset;
	private long samples;

	public void init(String[] args) {
		offset = Long.parseLong(args[0]);
		samples = Long.parseLong(args[1]);

		System.out.println("offset= " + offset);
		System.out.println("samples=" + samples);
	}
	public int run() {	// 元のQmcMapper#map()と同等
		final HaltonSequence haltonsequence = new HaltonSequence(offset);
		long numInside = 0L;
		long numOutside = 0L;

		for (long i = 0; i < samples;) {
			// generate points in a unit square
			final double[] point = haltonsequence.nextPoint();

			// count points inside/outside of the inscribed circle of the square
			final double x = point[0] - 0.5;
			final double y = point[1] - 0.5;
			if (x * x + y * y > 0.25) {
				numOutside++;
			} else {
				numInside++;
			}

			// report status
			i++;
			if (i % 1000 == 0) {
				// 元はcontext.setStatus()で渡していた
				System.err.println("Generated " + i + " samples.");
			}
		}

		// 元はcontext.write()でHDFSに出力していた
		System.out.println("numInside= " + numInside);
		System.out.println("numOutside=" + numOutside);

		return 0;
	}
	private static class HaltonSequence {
		〜
	}
}

実行例

上記のサンプルCentOS上のシングルノードクラスターで実行してみる。

$ $HADOOP_COMMON_HOME/bin/yarn jar ~/Desktop/sample23.jar sample.yarn.sample2.Sample2Client 4 2000

シングルノードなので、ログは全て同一マシン上に出力される。
本当の分散クラスターなら稼動したノード上に稼動した分だけ出力されると思う。

$ cd /tmp/logs
$ cd application_1322294777708_0014	←アプリケーションIDがディレクトリー名になっている
$ ls
container_1322294777708_0014_01_000001	←アプリケーションマスターのログ
container_1322294777708_0014_01_000002	←コンテナープログラムのログ
container_1322294777708_0014_01_000003
container_1322294777708_0014_01_000004
container_1322294777708_0014_01_000005

$ cd *2
$ ls
container.stderr.log  container.stdout.log

$ cat container.stderr.log 
Generated 1000 samples.
Generated 2000 samples.

$ cat container.stdout.log 
offset= 0
samples=2000
numInside= 1572
numOutside=428

アプリケーションIDがついたディレクトリーの下にコンテナーIDがついたディレクトリーがある。
アプリケーションマスターもコンテナーとして動いているので、IDだけ見ても区別が付かない(一番最初に実行しているから、1のはずだけどw)。

ログファイルには、コンテナープログラムが標準出力・標準エラーに出力したメッセージがちゃんと出ている。


YARNへ戻る / Hadoop0.23へ戻る / Hadoop目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま