S-JIS[2011-09-01/2012-07-09] 変更履歴

Asakusa Framework テキスト入出力

Asakusa Framework0.2.1(batchapp)でテキストファイルを入出力する方法のメモ。


概要

Asakusa Framework0.2.1でファイルを扱う(ファイルで入出力する場合)は、基本的にSequenceFileを対象としている。

FileImporterにはFileInputFormatを指定できるようになっているので(デフォルトがSequenceFile)、ここに独自のFileInputFormatを指定してやればテキストファイルを扱うことが出来る。
(AsakusaFW 0.2.6以降ではWindGateDirect I/Oでテキスト(CSV)ファイルを扱えるので、このような手間は必要ない。[2012-07-09]

ただしAsakusaFWでジョブフロー等のテストを実行する場合、そのファイルへテストデータを書き込むという作業を行うので、読み込みだけでなく、書き込みを行う為のFileOutputFormatも同時に作っておく必要がある。


WordCountのテキストファイル入力の例

WordCountのサンプルでテキストファイルから読み込むようにしてみる。

TextInputFormat

Hadoopには標準でテキストファイルを読み込む為のTextInputFormatが存在する。
ただしこれをそのままFileImporterに指定するのは駄目。

TextInputFormatは値がTextクラスだが、AsakusaFWではDMDLから生成したモデルクラスを返す必要がある為。


FileInputFormat

そこで、テキストファイルを読み込む為の独自のFileInputFormatを作成する必要がある。
ただし基本的にはTextInputFormatと同じ処理でよく、ただ単にWordCountサンプルの入力用データモデルであるLineModelを返せばいい。
そこで、FileInputFormatを継承しつつ、内部のRecordReaderはTextInputFormatで使っているものをそのまま使用する。

LineModelTextInputFormat.java:

package sample.jobflow.gateway;
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import sample.modelgen.dmdl.model.LineModel;
public class LineModelTextInputFormat extends FileInputFormat<NullWritable, LineModel> {
	@Override
	public RecordReader<NullWritable, LineModel> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		return new LineModelRecordReader();
	}
	protected static class LineModelRecordReader extends RecordReader<NullWritable, LineModel> {
		protected LineRecordReader reader = new LineRecordReader();
		protected LineModel model = new LineModel();

		@Override
		public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
			reader.initialize(split, context);
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			return reader.nextKeyValue();
		}

		@Override
		public NullWritable getCurrentKey() throws IOException, InterruptedException {
			if (reader.getCurrentKey() != null) {
				return NullWritable.get();
			} else {
				return null;
			}
		}

		@Override
		public LineModel getCurrentValue() throws IOException, InterruptedException {
			Text text = reader.getCurrentValue();
			if (text != null) {
				model.setText(text);
				return model;
			} else {
				return null;
			}
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			return reader.getProgress();
		}

		@Override
		public void close() throws IOException {
			reader.close();
		}
	}
}

ほとんどLineRecordReaderに処理を委譲している。
getCurrentKey()getCurrentValue()だけ、自分のクラスへ変換する処理を行っている。

LineFromFile.java

public class LineFromFile extends FileImporterDescription {
〜
	@SuppressWarnings("rawtypes")
	@Override
	public Class<? extends FileInputFormat> getInputFormat() {
		return LineModelTextInputFormat.class;
	}
}

さて、InputFormatとFileImporterが書けたから実行してみると…

java.lang.RuntimeException: java.io.IOException: Failed to create opposite OutputFormat: sample.jobflow.gateway.LineModelTextInputFormat
	at com.asakusafw.testdriver.JobFlowTester.runTest(JobFlowTester.java:95)
	at sample.jobflow.WordCountJobTest.testDescribe(WordCountJobTest.java:23)
〜
Caused by: java.lang.ClassNotFoundException: sample.jobflow.gateway.LineModelTextOutputFormat
	at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
〜
	at com.asakusafw.testdriver.file.FileImporterPreparator.getOpposite(FileImporterPreparator.java:136)
... 33 more

OutputFormatが無いというエラー。「opposite」というのは、「反対の」とか「逆の」という意味らしい。
ジョブフローのテストではImporterで指定されたファイルにテストデータを書き込むので、OutputFormatが要るのだろう。

どうやら同パッケージ・ほぼ同名で、「InputFormat」が「OutputFormat」になったクラスが要るらしい。


FileOutputFormat

そこで、InputFormatと対になるFileOutputFormatを作成する。

TextOutputFormatはTextInputFormatと違って値のクラスを型引数で指定できるのだが、Text以外の場合はtoString()を使って文字列に変換してファイルに出力する。
AsakusaFWが生成するモデルクラスのtoString()はファイル保存用の形式ではない(モデル名等も含まれる)ので、今回の目的には使えない。

InputFormatと同様にWriterだけ使おうかと思ったけれども、コンストラクターにDataOutputStreamを渡す必要があったりして面倒(苦笑)
そこで、TextOutputFormat自体に委譲するようにする。

package sample.jobflow.gateway;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import sample.modelgen.dmdl.model.LineModel;
public class LineModelTextOutputFormat extends FileOutputFormat<NullWritable, LineModel> {
	protected TextOutputFormat<NullWritable, Text> delegate = new TextOutputFormat<NullWritable, Text>();
	@Override
	public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException {
		delegate.checkOutputSpecs(job);
	}

	@Override
	public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
		return delegate.getDefaultWorkFile(context, extension);
	}

	@Override
	public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
		return delegate.getOutputCommitter(context);
	}

	@Override
	public RecordWriter<NullWritable, LineModel> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		return new LineModelRecordWriter(delegate.getRecordWriter(job));
	}
	protected static class LineModelRecordWriter extends RecordWriter<NullWritable, LineModel> {
		protected RecordWriter<NullWritable, Text> writer;

		public LineModelRecordWriter(RecordWriter<NullWritable, Text> writer) {
			this.writer = writer;
		}

		@Override
		public void write(NullWritable key, LineModel value) throws IOException, InterruptedException {
			if (value != null) { //[2011-09-15]
				writer.write(null, value.getText());
			}
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
			writer.close(context);
		}
	}
}

実行結果

ジョブフローのテストを実行して、FileImporterで指定したファイルの中を見てみる。

$ cat ~/input/file01
Hello Hadoop World
Hello Asakusa

よし、ちゃんとテキストファイルになってる(笑)


汎用TextInputFormat

上記のInputFormatOutputFormatをもう少し汎用的に使えるようなクラスを作ってみた。[2011-09-02]

textformat.jar(6.85kB) [2011-09-02]

使用例

package sample.jobflow.gateway;
import java.util.HashSet;
import java.util.Set;

import jp.hishidama.asakusafw.ioformat.AsakusaTextInputFormat;
import jp.hishidama.asakusafw.ioformat.AsakusaTextOutputFormat;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import com.asakusafw.vocabulary.external.FileImporterDescription;

import sample.modelgen.dmdl.model.LineModel;
// LineModelのファイルインポーター
public class LineFromFile extends FileImporterDescription {

	@Override
	public Set<String> getPaths() {
		Set<String> set = new HashSet<String>();
		set.add("input/file01");
		return set;
	}

	@Override
	public Class<?> getModelType() {
		return LineModel.class;
	}
	@SuppressWarnings("rawtypes")
	@Override
	public Class<? extends FileInputFormat> getInputFormat() {
		return LineModelTextInputFormat.class;
	}

	public static class LineModelTextInputFormat extends AsakusaTextInputFormat<LineModel> {
		protected LineModel value = new LineModel();

		@Override
		protected LineModel setText(Text text) {
			value.setText(text);
			return value;
		}
	}

	public static class LineModelTextOutputFormat extends AsakusaTextOutputFormat<LineModel> {

		@Override
		protected Text getText(LineModel value) {
			if (value != null) { //[2011-09-15]
				return value.getText();
			} else {
				return null;
			}
		}
	}
}

AsakusaTextInputFormatを継承したクラスを作り、setText()を実装する。
メソッド名はセッターだが、モデルのインスタンスを返す^^;(モデルインスタンス生成用のメソッドを用意する手間を省いた)

このInputFormatとペアになる名前で、AsakusaTextOutputFormatを継承したクラスも作っておく。
こちらは素直にgetText()でテキストを返すように実装する。


AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま