Asakusa Framework0.2.1(batchapp)でテキストファイルを入出力する方法のメモ。
|
|
Asakusa Framework0.2.1でファイルを扱う(ファイルで入出力する場合)は、基本的にSequenceFileを対象としている。
FileImporterにはFileInputFormatを指定できるようになっているので(デフォルトがSequenceFile)、ここに独自のFileInputFormatを指定してやればテキストファイルを扱うことが出来る。
(AsakusaFW 0.2.6以降ではWindGateやDirect I/Oでテキスト(CSV)ファイルを扱えるので、このような手間は必要ない。[2012-07-09])
ただしAsakusaFWでジョブフロー等のテストを実行する場合、そのファイルへテストデータを書き込むという作業を行うので、読み込みだけでなく、書き込みを行う為のFileOutputFormatも同時に作っておく必要がある。
WordCountのサンプルでテキストファイルから読み込むようにしてみる。
Hadoopには標準でテキストファイルを読み込む為のTextInputFormatが存在する。
ただしこれをそのままFileImporterに指定するのは駄目。
TextInputFormatは値がTextクラスだが、AsakusaFWではDMDLから生成したモデルクラスを返す必要がある為。
そこで、テキストファイルを読み込む為の独自のFileInputFormatを作成する必要がある。
ただし基本的にはTextInputFormatと同じ処理でよく、ただ単にWordCountサンプルの入力用データモデルであるLineModelを返せばいい。
そこで、FileInputFormatを継承しつつ、内部のRecordReaderはTextInputFormatで使っているものをそのまま使用する。
package sample.jobflow.gateway;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import sample.modelgen.dmdl.model.LineModel;
public class LineModelTextInputFormat extends FileInputFormat<NullWritable, LineModel> {
@Override public RecordReader<NullWritable, LineModel> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new LineModelRecordReader(); }
protected static class LineModelRecordReader extends RecordReader<NullWritable, LineModel> {
protected LineRecordReader reader = new LineRecordReader();
protected LineModel model = new LineModel();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
reader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
if (reader.getCurrentKey() != null) {
return NullWritable.get();
} else {
return null;
}
}
@Override
public LineModel getCurrentValue() throws IOException, InterruptedException {
Text text = reader.getCurrentValue();
if (text != null) {
model.setText(text);
return model;
} else {
return null;
}
}
@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
@Override
public void close() throws IOException {
reader.close();
}
}
}
ほとんどLineRecordReaderに処理を委譲している。
getCurrentKey()とgetCurrentValue()だけ、自分のクラスへ変換する処理を行っている。
public class LineFromFile extends FileImporterDescription { 〜 @SuppressWarnings("rawtypes") @Override public Class<? extends FileInputFormat> getInputFormat() { return LineModelTextInputFormat.class; } }
さて、InputFormatとFileImporterが書けたから実行してみると…
java.lang.RuntimeException: java.io.IOException: Failed to create opposite OutputFormat: sample.jobflow.gateway.LineModelTextInputFormat at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95) at sample.jobflow.WordCountJobTest.testDescribe(WordCountJobTest.java:23) 〜 Caused by: java.lang.ClassNotFoundException: sample.jobflow.gateway.LineModelTextOutputFormat at java.net.URLClassLoader$1.run(URLClassLoader.java:200) 〜 at com.asakusafw.testdriver.file.FileImporterPreparator.getOpposite(FileImporterPreparator.java:136) ... 33 more
OutputFormatが無いというエラー。「opposite」というのは、「反対の」とか「逆の」という意味らしい。
ジョブフローのテストではImporterで指定されたファイルにテストデータを書き込むので、OutputFormatが要るのだろう。
どうやら同パッケージ・ほぼ同名で、「InputFormat」が「OutputFormat」になったクラスが要るらしい。
そこで、InputFormatと対になるFileOutputFormatを作成する。
TextOutputFormatはTextInputFormatと違って値のクラスを型引数で指定できるのだが、Text以外の場合はtoString()を使って文字列に変換してファイルに出力する。
AsakusaFWが生成するモデルクラスのtoString()はファイル保存用の形式ではない(モデル名等も含まれる)ので、今回の目的には使えない。
InputFormatと同様にWriterだけ使おうかと思ったけれども、コンストラクターにDataOutputStreamを渡す必要があったりして面倒(苦笑)
そこで、TextOutputFormat自体に委譲するようにする。
package sample.jobflow.gateway;
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import sample.modelgen.dmdl.model.LineModel;
public class LineModelTextOutputFormat extends FileOutputFormat<NullWritable, LineModel> { protected TextOutputFormat<NullWritable, Text> delegate = new TextOutputFormat<NullWritable, Text>();
@Override public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException { delegate.checkOutputSpecs(job); } @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { return delegate.getDefaultWorkFile(context, extension); } @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { return delegate.getOutputCommitter(context); } @Override public RecordWriter<NullWritable, LineModel> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new LineModelRecordWriter(delegate.getRecordWriter(job)); }
protected static class LineModelRecordWriter extends RecordWriter<NullWritable, LineModel> { protected RecordWriter<NullWritable, Text> writer; public LineModelRecordWriter(RecordWriter<NullWritable, Text> writer) { this.writer = writer; } @Override public void write(NullWritable key, LineModel value) throws IOException, InterruptedException { if (value != null) { //[2011-09-15] writer.write(null, value.getText()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { writer.close(context); } } }
ジョブフローのテストを実行して、FileImporterで指定したファイルの中を見てみる。
$ cat ~/input/file01 Hello Hadoop World Hello Asakusa
よし、ちゃんとテキストファイルになってる(笑)
上記のInputFormat・OutputFormatをもう少し汎用的に使えるようなクラスを作ってみた。[2011-09-02]
textformat.jar(6.85kB) [2011-09-02]
package sample.jobflow.gateway;
import java.util.HashSet; import java.util.Set; import jp.hishidama.asakusafw.ioformat.AsakusaTextInputFormat; import jp.hishidama.asakusafw.ioformat.AsakusaTextOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import com.asakusafw.vocabulary.external.FileImporterDescription; import sample.modelgen.dmdl.model.LineModel;
// LineModelのファイルインポーター public class LineFromFile extends FileImporterDescription { @Override public Set<String> getPaths() { Set<String> set = new HashSet<String>(); set.add("input/file01"); return set; } @Override public Class<?> getModelType() { return LineModel.class; }
@SuppressWarnings("rawtypes") @Override public Class<? extends FileInputFormat> getInputFormat() { return LineModelTextInputFormat.class; } public static class LineModelTextInputFormat extends AsakusaTextInputFormat<LineModel> { protected LineModel value = new LineModel(); @Override protected LineModel setText(Text text) { value.setText(text); return value; } } public static class LineModelTextOutputFormat extends AsakusaTextOutputFormat<LineModel> { @Override protected Text getText(LineModel value) { if (value != null) { //[2011-09-15] return value.getText(); } else { return null; } } } }
AsakusaTextInputFormatを継承したクラスを作り、setText()を実装する。
メソッド名はセッターだが、モデルのインスタンスを返す^^;(モデルインスタンス生成用のメソッドを用意する手間を省いた)
このInputFormatとペアになる名前で、AsakusaTextOutputFormatを継承したクラスも作っておく。
こちらは素直にgetText()でテキストを返すように実装する。