Asakusa Framework0.2.1(batchapp)でテキストファイルを入出力する方法のメモ。
|
|
Asakusa Framework0.2.1でファイルを扱う(ファイルで入出力する場合)は、基本的にSequenceFileを対象としている。
FileImporterにはFileInputFormatを指定できるようになっているので(デフォルトがSequenceFile)、ここに独自のFileInputFormatを指定してやればテキストファイルを扱うことが出来る。
(AsakusaFW 0.2.6以降ではWindGateやDirect I/Oでテキスト(CSV)ファイルを扱えるので、このような手間は必要ない。[2012-07-09])
ただしAsakusaFWでジョブフロー等のテストを実行する場合、そのファイルへテストデータを書き込むという作業を行うので、読み込みだけでなく、書き込みを行う為のFileOutputFormatも同時に作っておく必要がある。
WordCountのサンプルでテキストファイルから読み込むようにしてみる。
Hadoopには標準でテキストファイルを読み込む為のTextInputFormatが存在する。
ただしこれをそのままFileImporterに指定するのは駄目。
TextInputFormatは値がTextクラスだが、AsakusaFWではDMDLから生成したモデルクラスを返す必要がある為。
そこで、テキストファイルを読み込む為の独自のFileInputFormatを作成する必要がある。
ただし基本的にはTextInputFormatと同じ処理でよく、ただ単にWordCountサンプルの入力用データモデルであるLineModelを返せばいい。
そこで、FileInputFormatを継承しつつ、内部のRecordReaderはTextInputFormatで使っているものをそのまま使用する。
package sample.jobflow.gateway;
import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import sample.modelgen.dmdl.model.LineModel;
public class LineModelTextInputFormat extends FileInputFormat<NullWritable, LineModel> {
@Override
public RecordReader<NullWritable, LineModel> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new LineModelRecordReader();
}
protected static class LineModelRecordReader extends RecordReader<NullWritable, LineModel> {
protected LineRecordReader reader = new LineRecordReader();
protected LineModel model = new LineModel();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
reader.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
if (reader.getCurrentKey() != null) {
return NullWritable.get();
} else {
return null;
}
}
@Override
public LineModel getCurrentValue() throws IOException, InterruptedException {
Text text = reader.getCurrentValue();
if (text != null) {
model.setText(text);
return model;
} else {
return null;
}
}
@Override
public float getProgress() throws IOException, InterruptedException {
return reader.getProgress();
}
@Override
public void close() throws IOException {
reader.close();
}
}
}
ほとんどLineRecordReaderに処理を委譲している。
getCurrentKey()とgetCurrentValue()だけ、自分のクラスへ変換する処理を行っている。
public class LineFromFile extends FileImporterDescription {
〜
@SuppressWarnings("rawtypes")
@Override
public Class<? extends FileInputFormat> getInputFormat() {
return LineModelTextInputFormat.class;
}
}
さて、InputFormatとFileImporterが書けたから実行してみると…
java.lang.RuntimeException: java.io.IOException: Failed to create opposite OutputFormat: sample.jobflow.gateway.LineModelTextInputFormat at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95) at sample.jobflow.WordCountJobTest.testDescribe(WordCountJobTest.java:23) 〜 Caused by: java.lang.ClassNotFoundException: sample.jobflow.gateway.LineModelTextOutputFormat at java.net.URLClassLoader$1.run(URLClassLoader.java:200) 〜 at com.asakusafw.testdriver.file.FileImporterPreparator.getOpposite(FileImporterPreparator.java:136) ... 33 more
OutputFormatが無いというエラー。「opposite」というのは、「反対の」とか「逆の」という意味らしい。
ジョブフローのテストではImporterで指定されたファイルにテストデータを書き込むので、OutputFormatが要るのだろう。
どうやら同パッケージ・ほぼ同名で、「InputFormat」が「OutputFormat」になったクラスが要るらしい。
そこで、InputFormatと対になるFileOutputFormatを作成する。
TextOutputFormatはTextInputFormatと違って値のクラスを型引数で指定できるのだが、Text以外の場合はtoString()を使って文字列に変換してファイルに出力する。
AsakusaFWが生成するモデルクラスのtoString()はファイル保存用の形式ではない(モデル名等も含まれる)ので、今回の目的には使えない。
InputFormatと同様にWriterだけ使おうかと思ったけれども、コンストラクターにDataOutputStreamを渡す必要があったりして面倒(苦笑)
そこで、TextOutputFormat自体に委譲するようにする。
package sample.jobflow.gateway;
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import sample.modelgen.dmdl.model.LineModel;
public class LineModelTextOutputFormat extends FileOutputFormat<NullWritable, LineModel> {
protected TextOutputFormat<NullWritable, Text> delegate = new TextOutputFormat<NullWritable, Text>();
@Override
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
delegate.checkOutputSpecs(job);
}
@Override
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
return delegate.getDefaultWorkFile(context, extension);
}
@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
return delegate.getOutputCommitter(context);
}
@Override
public RecordWriter<NullWritable, LineModel> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new LineModelRecordWriter(delegate.getRecordWriter(job));
}
protected static class LineModelRecordWriter extends RecordWriter<NullWritable, LineModel> {
protected RecordWriter<NullWritable, Text> writer;
public LineModelRecordWriter(RecordWriter<NullWritable, Text> writer) {
this.writer = writer;
}
@Override
public void write(NullWritable key, LineModel value) throws IOException, InterruptedException {
if (value != null) { //[2011-09-15]
writer.write(null, value.getText());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
writer.close(context);
}
}
}
ジョブフローのテストを実行して、FileImporterで指定したファイルの中を見てみる。
$ cat ~/input/file01 Hello Hadoop World Hello Asakusa
よし、ちゃんとテキストファイルになってる(笑)
上記のInputFormat・OutputFormatをもう少し汎用的に使えるようなクラスを作ってみた。[2011-09-02]
textformat.jar(6.85kB) [2011-09-02]
package sample.jobflow.gateway;
import java.util.HashSet; import java.util.Set; import jp.hishidama.asakusafw.ioformat.AsakusaTextInputFormat; import jp.hishidama.asakusafw.ioformat.AsakusaTextOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import com.asakusafw.vocabulary.external.FileImporterDescription; import sample.modelgen.dmdl.model.LineModel;
// LineModelのファイルインポーター
public class LineFromFile extends FileImporterDescription {
@Override
public Set<String> getPaths() {
Set<String> set = new HashSet<String>();
set.add("input/file01");
return set;
}
@Override
public Class<?> getModelType() {
return LineModel.class;
}
@SuppressWarnings("rawtypes")
@Override
public Class<? extends FileInputFormat> getInputFormat() {
return LineModelTextInputFormat.class;
}
public static class LineModelTextInputFormat extends AsakusaTextInputFormat<LineModel> {
protected LineModel value = new LineModel();
@Override
protected LineModel setText(Text text) {
value.setText(text);
return value;
}
}
public static class LineModelTextOutputFormat extends AsakusaTextOutputFormat<LineModel> {
@Override
protected Text getText(LineModel value) {
if (value != null) { //[2011-09-15]
return value.getText();
} else {
return null;
}
}
}
}
AsakusaTextInputFormatを継承したクラスを作り、setText()を実装する。
メソッド名はセッターだが、モデルのインスタンスを返す^^;(モデルインスタンス生成用のメソッドを用意する手間を省いた)
このInputFormatとペアになる名前で、AsakusaTextOutputFormatを継承したクラスも作っておく。
こちらは素直にgetText()でテキストを返すように実装する。