Hadoopの分散キャッシュについて。
|
分散キャッシュは、読み込み専用のファイルを効率よく扱えるようにするものらしい。
MapReduceの直接の入力ファイルとは別のファイルを読み込むのに使う。
たぶんローカルにファイルをコピーし、使い回せるようにするのだと思う。
(一度コピーしたら、ファイルが新しくならない限り再コピーしない)
jarやzip等の
アーカイブファイルの場合、解凍した状態でキャッシュするらしい。
Hadoop1系の場合、DistributedCacheクラスを使う。
ジョブ作成前にDistributedCacheのadd系メソッドを呼び出してキャッシュしたいファイル名を登録する。(ファイルの存在チェックが行われる)
MapperやReducerから、DistributedCacheのget系メソッドでキャッシュされているファイル名を取得できる。
import org.apache.hadoop.filecache.DistributedCache;
public class DistCacheExample extends Configured implements Tool { public static void main(String[] args) throws Exception { int r = ToolRunner.run(new DistCacheExample(), args); System.exit(r); } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), "distcatche-example"); job.setJarByClass(getClass()); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(MyMapper.class); job.setReducerClass(IntSumReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); DistributedCache.addCacheFile(new URI("distcache-exmaple/dist.txt"), job.getConfiguration()); boolean succeeded = job.waitForCompletion(true); return succeeded ? 0 : 1; } }
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Map<String, String> cachedData = new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); // 分散キャッシュファイルのPathを取得 FileStatus stat; try { stat = DistributedCache.getFileStatus(context.getConfiguration(), new URI("distcache-exmaple/dist.txt")); } catch (URISyntaxException e) { throw new IOException(e); } Path path = stat.getPath(); FileSystem fs = path.getFileSystem(context.getConfiguration()); FSDataInputStream is = fs.open(path); BufferedReader br = new BufferedReader(new InputStreamReader(is)); try { for (;;) { String s = br.readLine(); if (s == null) { break; } String[] ss = s.split(","); String key = ss[0]; String dat = ss[1]; cachedData.put(key, dat); } } finally { br.close(); } }
private Text dat = new Text(); private IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { dat.set(cachedData.get(value.toString())); context.write(dat, one); } }
この例では、Mapper#setup()でキャッシュファイルを読み込んでフィールドにデータを保持し、map()で使っている。
Hadoop2系の場合、DistributedCacheクラスの大部分のメソッドが非推奨になっている。
メソッドはJobクラスやContextクラスに移動したようだ。
以下、Hadoop1系との差分を示す。
public class DistCacheExample extends Configured implements Tool {
〜
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "distcatche-example");
〜
// DistributedCache.addCacheFile(new URI("distcache-exmaple/dist.txt"), job.getConfiguration());
job.addCacheFile(new URI("distcache-exmaple/dist.txt"));
〜
}
}
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 〜 @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); // 分散キャッシュファイルのPathを取得 // URI[] files = context.getCacheFiles(); // Path path = new Path(files[0]); Path path = new Path(new URI("distcache-exmaple/dist.txt")); 〜 } 〜 }
addに当たるメソッドはJobにあり、getに当たるメソッドはContextにある。
しかしHadoop1系にあったgetFileStatus()メソッドは無い模様。
new Path()でURIを直接指定すれば大丈夫そう。
キャッシュしたいファイルは、事前にHDFS上に置いておく必要がある。
(プログラム内で指定するURIの指している場所に置く)
$ hadoop fs -put src-file.txt distcache-exmaple/dist.txt $ hadoop jar example.jar example.DistCacheExample input output