Hadoopの分散キャッシュについて。
|
分散キャッシュは、読み込み専用のファイルを効率よく扱えるようにするものらしい。
MapReduceの直接の入力ファイルとは別のファイルを読み込むのに使う。
たぶんローカルにファイルをコピーし、使い回せるようにするのだと思う。
(一度コピーしたら、ファイルが新しくならない限り再コピーしない)
jarやzip等の
アーカイブファイルの場合、解凍した状態でキャッシュするらしい。
Hadoop1系の場合、DistributedCacheクラスを使う。
ジョブ作成前にDistributedCacheのadd系メソッドを呼び出してキャッシュしたいファイル名を登録する。(ファイルの存在チェックが行われる)
MapperやReducerから、DistributedCacheのget系メソッドでキャッシュされているファイル名を取得できる。
import org.apache.hadoop.filecache.DistributedCache;
public class DistCacheExample extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new DistCacheExample(), args);
System.exit(r);
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "distcatche-example");
job.setJarByClass(getClass());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
DistributedCache.addCacheFile(new URI("distcache-exmaple/dist.txt"), job.getConfiguration());
boolean succeeded = job.waitForCompletion(true);
return succeeded ? 0 : 1;
}
}
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Map<String, String> cachedData = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
// 分散キャッシュファイルのPathを取得
FileStatus stat;
try {
stat = DistributedCache.getFileStatus(context.getConfiguration(), new URI("distcache-exmaple/dist.txt"));
} catch (URISyntaxException e) {
throw new IOException(e);
}
Path path = stat.getPath();
FileSystem fs = path.getFileSystem(context.getConfiguration());
FSDataInputStream is = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
try {
for (;;) {
String s = br.readLine();
if (s == null) {
break;
}
String[] ss = s.split(",");
String key = ss[0];
String dat = ss[1];
cachedData.put(key, dat);
}
} finally {
br.close();
}
}
private Text dat = new Text();
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
dat.set(cachedData.get(value.toString()));
context.write(dat, one);
}
}
この例では、Mapper#setup()でキャッシュファイルを読み込んでフィールドにデータを保持し、map()で使っている。
Hadoop2系の場合、DistributedCacheクラスの大部分のメソッドが非推奨になっている。
メソッドはJobクラスやContextクラスに移動したようだ。
以下、Hadoop1系との差分を示す。
public class DistCacheExample extends Configured implements Tool {
〜
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "distcatche-example");
〜
// DistributedCache.addCacheFile(new URI("distcache-exmaple/dist.txt"), job.getConfiguration());
job.addCacheFile(new URI("distcache-exmaple/dist.txt"));
〜
}
}
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
〜
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
// 分散キャッシュファイルのPathを取得
// URI[] files = context.getCacheFiles();
// Path path = new Path(files[0]);
Path path = new Path(new URI("distcache-exmaple/dist.txt"));
〜
}
〜
}
addに当たるメソッドはJobにあり、getに当たるメソッドはContextにある。
しかしHadoop1系にあったgetFileStatus()メソッドは無い模様。
new Path()でURIを直接指定すれば大丈夫そう。
キャッシュしたいファイルは、事前にHDFS上に置いておく必要がある。
(プログラム内で指定するURIの指している場所に置く)
$ hadoop fs -put src-file.txt distcache-exmaple/dist.txt $ hadoop jar example.jar example.DistCacheExample input output