S-JIS[2011-08-13/2012-12-08] 変更履歴

Hadoop SequenceFile

HadoopのSequenceFileクラスについて。


SequenceFileの概要

シーケンスファイルは、キーとデータのペアで保存する形式。
キーもデータも任意のWritableを使用可能。
ハッシュテーブルというわけではないので、同一のキーも入れられる。


SequenceFileへの書き込み(Hadoop1系)

SequenceFileへ書き込むにはSequenceFile.Writerを使う。

以下、WordCountWritableというWritableをデータとして出力する例。

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;

import sample.writable.WordCountWritable;
public class SeqWrite {

	public static void main(String[] args) throws IOException {
		String uri = "file:/C:/cygwin/tmp/seq/write/file01.dat";

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path path = new Path(uri);

		NullWritable key = NullWritable.get();
		WordCountWritable val = new WordCountWritable();

		SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
			key.getClass(), val.getClass(), CompressionType.NONE);
		try {
			val.setWord("Hello");
			val.setCount(23);
			writer.append(key, val);

			val.setWord("World");
			val.setCount(12);
			writer.append(key, val);
		} finally {
			writer.close();
		}
	}
}

WriterはSequenceFile.createWriter()でインスタンスを取得する。
Codec(圧縮の種類)を指定するメソッドもある。

シーケンスファイルはヘッダー部にキー・データのクラス名を保持するので、最初にクラス名を渡す必要がある。


SequenceFileからの読み込み(Hadoop1系)

SequenceFileを読み込むにはSequenceFile.Readerを使う。

以下、WordCountWritableというWritableをデータとして読み込む例。

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;

import sample.writable.WordCountWritable;
public class SeqRead {

	public static void main(String[] args) throws IOException, InstantiationException, IllegalAccessException {
		String uri = "file:/C:/cygwin/tmp/seq/write/file01.dat";

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri), conf);
		Path path = new Path(uri);

		SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
		try {
			Class<?> keyClass = reader.getKeyClass();
			Class<?> valClass = reader.getValueClass();
			System.out.println(keyClass + "\t" + valClass);

			Writable key;
			if (keyClass == NullWritable.class) {
				key = NullWritable.get();
			} else {
				key = (Writable) keyClass.newInstance();
			}
			WordCountWritable val = new WordCountWritable();

			while (reader.next(key, val)) {
				System.out.println(val.getWord() + "\t" + val.getCount());
			}
		} finally {
			reader.close();
		}
	}
}

キー・データのクラス名はシーケンスファイルのヘッダーに保持されているので、Readerをオープンすると取得することが出来る。

reader.next(key, val)」でキーとデータを取得する。
keyとvalにはヘッダーのクラスと一致しているWritableインスタンスを渡す必要がある。
どのクラスを対象とするか決まっていれば決め打ちでそのインスタンスをnewすればいい(上記のnew WordCountWritable())し、
何が入っていてもいいならヘッダーのクラスからインスタンスを生成すればいい(上記のkeyClass.newInstance())。
ただしNullWritableはデフォルトコンストラクターが公開されていないので、別途インスタンスを取得している。
(NullWritableのreadFields()は何も処理を行わないのでこのインスタンスへ書き込んでも(デシリアライズしても)問題ない)


キーの値によってデータをスキップするようなケースでは、valに毎回データをセットする(デシリアライズする)必要は無い。
この場合は「reader.next(key)」と「reader.getCurrentValue(val)」を使う。

			while (reader.next(key)) {
				if (keyによる条件判定とか) {
					reader.getCurrentValue(val);
					System.out.println(val.getWord() + "\t" + val.getCount());
				}
			}

SequenceFileへの書き込み(Hadoop2系)

Hadoop2系でもSequenceFile.createWriter()を使ってシーケンスファイルを作成するのはHadoop1系と変わらないが、従来のメソッドは非推奨になり、新しい引数を使うようになった。[2012-12-08]

以下、WordCountWritableというWritableをデータとして出力する例。
(createWriter()周辺以外はHadoop1系と全く同じ)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Writer.Option;

import sample.writable.WordCountWritable;
public class SeqWrite {

	public static void main(String[] args) throws IOException {
		String uri = "file:/tmp/seq/write/file01.dat";

		Configuration conf = new Configuration();
		Path path = new Path(uri);

		NullWritable key = NullWritable.get();
		WordCountWritable val = new WordCountWritable();

		Option fileOpt = Writer.file(path);
		Option keyOpt = Writer.keyClass(key.getClass());
		Option valOpt = Writer.valueClass(val.getClass());
		Option compOpt = Writer.compression(CompressionType.NONE);
		Writer writer = SequenceFile.createWriter(conf, fileOpt, keyOpt, valOpt, compOpt);
		try {
			val.setWord("Hello");
			val.setCount(23);
			writer.append(key, val);

			val.setWord("World");
			val.setCount(12);
			writer.append(key, val);
		} finally {
			writer.close();
		}
	}
}

createWriter()の引数がOptionの可変長引数になった。
Writerクラスの各種staticメソッドでOptionを生成する。

従来のcreateWriter()で指定していたものがそのままOptionに移ったようだ。
(ただし、FileSystemは指定する必要は無いようだ。FileSystemのOptionを生成するメソッドもあるが、deprecatedになっている)


SequenceFileからの読み込み(Hadoop2系)

SequenceFile.Readerも、Writerと同様に可変長引数のOptionを指定するようになった。[2012-12-08]

以下、WordCountWritableというWritableをデータとして読み込む例。
(new Reader()周辺以外はHadoop1系と全く同じ)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Reader.Option;
import org.apache.hadoop.io.Writable;

import sample.writable.WordCountWritable;
public class SeqRead {

	public static void main(String[] args) throws IOException, InstantiationException, IllegalAccessException {
		String uri = "file:/tmp/seq/write/file01.dat";

		Configuration conf = new Configuration();
		Path path = new Path(uri);

		Option fileOpt = Reader.file(path);
		Reader reader = new Reader(conf, fileOpt);

		try {
			Class<?> keyClass = reader.getKeyClass();
			Class<?> valClass = reader.getValueClass();
			System.out.println(keyClass + "\t" + valClass);

			Writable key;
			if (keyClass == NullWritable.class) {
				key = NullWritable.get();
			} else {
				key = (Writable) keyClass.newInstance();
			}
			WordCountWritable val = new WordCountWritable();

			while (reader.next(key, val)) {
				System.out.println(val.getWord() + "\t" + val.getCount());
			}
		} finally {
			reader.close();
		}
	}
}

Hadoop APIへ戻る / Hadoop目次へ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま