S-JIS[2012-12-15] 変更履歴

Hadoop DistributedCache

Hadoopの分散キャッシュについて。


分散キャッシュの概要

分散キャッシュは、読み込み専用のファイルを効率よく扱えるようにするものらしい。
MapReduceの直接の入力ファイルとは別のファイルを読み込むのに使う。

たぶんローカルにファイルをコピーし、使い回せるようにするのだと思う。
(一度コピーしたら、ファイルが新しくならない限り再コピーしない)
jarやzip等の アーカイブファイルの場合、解凍した状態でキャッシュするらしい。


分散キャッシュのコーディング例(Hadoop1系)

Hadoop1系の場合、DistributedCacheクラスを使う。

ジョブ作成前にDistributedCacheのadd系メソッドを呼び出してキャッシュしたいファイル名を登録する。(ファイルの存在チェックが行われる)
MapperやReducerから、DistributedCacheのget系メソッドでキャッシュされているファイル名を取得できる。

import org.apache.hadoop.filecache.DistributedCache;
public class DistCacheExample extends Configured implements Tool {

	public static void main(String[] args) throws Exception {
		int r = ToolRunner.run(new DistCacheExample(), args);
		System.exit(r);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = new Job(getConf(), "distcatche-example");
		job.setJarByClass(getClass());

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(MyMapper.class);
		job.setReducerClass(IntSumReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		DistributedCache.addCacheFile(new URI("distcache-exmaple/dist.txt"), job.getConfiguration());

		boolean succeeded = job.waitForCompletion(true);
		return succeeded ? 0 : 1;
	}
}
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	private Map<String, String> cachedData = new HashMap<String, String>();

	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		super.setup(context);

		// 分散キャッシュファイルのPathを取得
		FileStatus stat;
		try {
			stat = DistributedCache.getFileStatus(context.getConfiguration(), new URI("distcache-exmaple/dist.txt"));
		} catch (URISyntaxException e) {
			throw new IOException(e);
		}
		Path path = stat.getPath();

		FileSystem fs = path.getFileSystem(context.getConfiguration());
		FSDataInputStream is = fs.open(path);
		BufferedReader br = new BufferedReader(new InputStreamReader(is));
		try {
			for (;;) {
				String s = br.readLine();
				if (s == null) {
					break;
				}
				String[] ss = s.split(",");
				String key = ss[0];
				String dat = ss[1];
				cachedData.put(key, dat);
			}
		} finally {
			br.close();
		}
	}
	private Text dat = new Text();
	private IntWritable one = new IntWritable(1);

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		dat.set(cachedData.get(value.toString()));

		context.write(dat, one);
	}
}

この例では、Mapper#setup()でキャッシュファイルを読み込んでフィールドにデータを保持し、map()で使っている。


分散キャッシュのコーディング例(Hadoop2系)

Hadoop2系の場合、DistributedCacheクラスの大部分のメソッドが非推奨になっている。
メソッドはJobクラスやContextクラスに移動したようだ。

以下、Hadoop1系との差分を示す。

public class DistCacheExample extends Configured implements Tool {
〜

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), "distcatche-example");
〜

//		DistributedCache.addCacheFile(new URI("distcache-exmaple/dist.txt"), job.getConfiguration());
		job.addCacheFile(new URI("distcache-exmaple/dist.txt"));
〜
	}
}
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
〜
	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		super.setup(context);

		// 分散キャッシュファイルのPathを取得
//		URI[] files = context.getCacheFiles();
//		Path path = new Path(files[0]);
		Path path = new Path(new URI("distcache-exmaple/dist.txt"));
〜
	}
〜
}

addに当たるメソッドはJobにあり、getに当たるメソッドはContextにある。

しかしHadoop1系にあったgetFileStatus()メソッドは無い模様。
new Path()でURIを直接指定すれば大丈夫そう。


分散キャッシュの実行例

キャッシュしたいファイルは、事前にHDFS上に置いておく必要がある。
プログラム内で指定するURIの指している場所に置く)

$ hadoop fs -put src-file.txt distcache-exmaple/dist.txt
$ hadoop jar example.jar example.DistCacheExample input output

Hadoop APIへ戻る / Hadoop目次へ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま