S-JIS[2010-03-16/2010-07-03] 変更履歴

Hadoop Writable

HadoopのWritableクラスについて。


Writableの概要

Hadoopでは、ノード(サーバー)間のデータ転送や中間データ(メモリー上のバッファ内や一時ファイル)の保持をバイナリデータ(バイト列byte[])で行う。
その為、データを転送する前(出力前)にバイト配列に変換(シリアライズ)し、転送後(読込時)に復元(デシリアライズ)する必要がある。

Javaには標準でシリアライズ(java.io.Serializable)の仕組みがあるが、Hadoopではそれを使っていない。
(JavaのSerializableは重いので、Hadoopではもっと軽量な仕組みを採用したらしい)
そこで、Writableというインターフェースが用意されている。
Hadoopが扱うデータ(クラス)はたぶん全てWritableを実装している。

Writableにはwrite()readFields()という2つのメソッドがあり、つまりバイト列との変換をそれぞれの具象クラスでコーディングせよということだ^^;
したがって、書き込んだデータと読み込んだデータの整合性(ちゃんと同じフィールドに同じ型で読み込めるようにする)はプログラマーが気をつける必要がある。
(ちょっと面倒ではあるが、シンプルにコーディングすれば最も高速になるしデータ量も最小限で済む。
 Serializableを使うとコーディングの手間も省けるし整合性チェックも行ってくれるが、汎用的なので冗長な分遅いし余計なデータも付加されてサイズも大きくなる。
 Hadoopでは初回のコーディングの手間よりも実行時のパフォーマンス(速度・容量)を優先したのだろう)

もうひとつ、Writableから派生したインターフェースにWritableComparableというものがある。
これはWritableと(Java標準の)Comparableを継承しただけのインターフェース。
Hadoopで扱うデータのうち、キーとして使用するものは(Map/Reduceの間で)ソートされるので、大小比較を行える必要がある。この為にComparableを実装する。


Writableの種類

Writableの具象クラス(Hadoop0.20.1)には以下のようなクラスがある。

  クラス名 説明
Hadoop org.apache.hadoop.io.
Writable
Hadoopでシリアライズできる事を示すインターフェース。
write()とreadFields()が宣言されている。
org.apache.hadoop.io.
MapWritable
シリアライズできるHashMap<Writable, Writable>。
org.apache.hadoop.io.
SortedMapWritable
シリアライズできるTreeMap<WritableComparable, Writable>。
org.apache.hadoop.io.
ArrayWritable
Writableオブジェクトの配列をシリアライズする。
org.apache.hadoop.io.
ObjectWritable
プリミティブ型」・「列挙型」・「Writableの配列」をシリアライズする。
(シリアライズ時の先頭にクラス名を出力する)
org.apache.hadoop.io.
GenericWritable
使用したいクラスに番号を付けておき、シリアライズ時の先頭に(クラス名でなく)その番号を出力する。
org.apache.hadoop.mapreduce.
Counter
カウンター
org.apache.hadoop.conf.
Configuration
Configurationもノード間で受け渡す必要があるからWritableなのだろう。
HBase org.apache.hadoop.hbase.client.
Get
HBaseでデータを取得する列名を指定する。
org.apache.hadoop.hbase.client.
Scan
org.apache.hadoop.hbase.filter.
Filter
HBaseでデータを取得する条件を指定する。
org.apache.hadoop.hbase.client.
Put
Delete
HBaseのデータの更新内容を保持する。
org.apache.hadoop.hbase.
KeyValue
HBaseのキー(列名)と値(データ)を保持する。
  クラス名 説明
Hadoop org.apache.hadoop.io.
WritableComparable<T>
ソート可能なWritable
型引数TはComparable<T>のものなので、自分自身のクラスを指す。
org.apache.hadoop.io.
BooleanWritable
ByteWritable
IntWritable
LongWritable
FloatWritable
DoubleWritable
プリミティブ型に該当するWritable。(shortは無いみたいだ…)
シリアライズ時には固定長のバイト列になる。
org.apache.hadoop.io.
BytesWritable
バイト配列(byte[])をシリアライズする。
org.apache.hadoop.io.
VIntWritable
VLongWritable
シリアライズ時に可変長のバイト列になる。
例えば-112〜127の範囲なら1バイトしか出力されない為、データサイズが縮小できる。
シリアライズ方法(実際に呼ばれるメソッド)はVIntでもVLongと同じ。
org.apache.hadoop.io.
Text
文字列を保持するWritable。
シリアライズ時はUTF-8で扱われる。
org.apache.hadoop.io.
NullWritable
何も無い事を示すWritable。
NullWritable同士を比較すると常に等しくなる。ソート順を気にしない場合のキーに使える。
このクラスはシングルトンで扱えるので、NullWritable.get()でインスタンスを取得する。
HBase org.apache.hadoop.hbase.io.
ImmutableBytesWritable
HBaseのキーや値は全てバイト配列で扱うので、その際に使用する。
BytesWritableはsetSize()によって長さを変えられるが、ImmutableBytesWritableはサイズ変更できない。
org.apache.hadoop.hbase.
HTableDescriptor
HBaseのテーブル情報
org.apache.hadoop.hbase.
HColumnDescriptor
HBaseの列情報

Writableの自作方法
WritableComparableの自作方法


独自Writable

独自のWritableも簡単に作れる。
例えば点数と人数を保持するクラスは以下のようになる。

フィールドにWritableを使う例 フィールドにプリミティブ型やPOJOを使う例
import org.apache.hadoop.io.Writable;
public class Sum implements Writable {

	private IntWritable ten = new IntWritable();
	private IntWritable cnt = new IntWritable();

	public void set(int ten, int count) {
		this.ten.set(ten);
		this.cnt.set(count);
	}

	public int getTen() {
		return ten.get();
	}

	public int getCount() {
		return cnt.get();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		ten.write(out);
		cnt.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		ten.readFields(in);
		cnt.readFields(in);
	}
}
import org.apache.hadoop.io.Writable;
public class Sum implements Writable {

	private int ten = 0;
	private int cnt = 0;

	public void set(int ten, int count) {
		this.ten = ten;
		this.cnt = count;
	}

	public int getTen() {
		return ten;
	}

	public int getCount() {
		return cnt;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(ten);
		out.writeInt(cnt);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		ten = in.readInt();
		cnt = in.readInt();
	}
}
readFields()やwirte()の実装は、
フィールドのWritableの同名メソッドを呼び出す形となる。
各フィールドの具象クラスが異なっても同じ形式で済む。
readFields()やwirte()の実装は、
フィールドの型(クラス)に応じて変える必要がある。
セッターやゲッターメソッドは
各フィールドのset()やget()を呼ぶ必要があるのでちょっとだけ面倒。
セッターやゲッターメソッドは
通常のクラス(JavaBean)と同じくシンプル。

このSumクラスを使って集計する例


シリアライズ用ユーティリティー

自分でバイト列の変換をコーディングする場合、DataInput/DataOutputとの入出力を行うユーティリティーが用意されているので、それを利用することが出来る。
(なお、DataInputDataOutputはJava標準のデータ入出力クラスなので、プリミティブ型用のメソッドは揃っている)

値の型 出力(書込) 入力(読込) 備考
boolean out.writeBoolean(値) in.readBoolen()  
byte out.writeByte(値) in.readByte()
in.readUnsignedByte()
 
short out.writeShort(値) in.readShort()
in.readUnsignedShort()
 
int out.writeInt(値) in.readInt()  
long out.writeLong(値) in.readLong()  
float out.writeFloat(値) in.readFloat()  
double out.writeDouble(値) in.readDouble()  
String Text.writeString(out, 値) Text.readString(in) WritableUtils.write/readString()とは出力内容が異なる。
String[] WritableUtils.writeStringArray(out, 値) WritableUtils.readStringArray(in) WritableUtils.write/readString()を使用している。
列挙型 WritableUtils.writeEnum(out, 値) WritableUtils.readEnum(in, 列挙クラス)  
int
long
WritableUtils.writeVInt(out, 値)
WritableUtils.writeVLong(out, 値)
WritableUtils.readVInt(in)
WritableUtils.readVLong(in)
可変長のバイト列として出力する。
VInt()の内部ではVLong()を呼んでいるので、出力内容は同一。
byte[]
String
String[]
WritableUtils.writeCompressedByteArray(out, 値)
WritableUtils.writeCompressedString(out, 値)
WritableUtils.writeCompressedStringArray(out, 値)
WritableUtils.readCompressedByteArray(in)
WritableUtils.readCompressedString(in)
WritableUtils.readCompressedStringArray(in)
GZIP圧縮して出力(解凍して復元)する。
StringArray()はString()を呼び、String()はByteArray()を呼んでいる。

バイト配列からの変換メソッドはWritableComparatorに用意されている。[2010-03-21]

import org.apache.hadoop.io.WritableComparator;
値の型 入力(読込) 備考
byte bytes[start]  
byte[]   比較はWritableComparator.compareBytes(b1,s1,l1, b2,s2,l2)
バイト配列を1バイトずつ0〜255の値として比較する。
short WritableComparator.readUnsignedShort(bytes, start) 2バイト(Short.SIZE/Byte.SIZE)
int WritableComparator.readInt(bytes, start) 4バイト(Integer.SIZE/Byte.SIZE)
long WritableComparator.readLong(bytes, start) 8バイト(Long.SIZE/Byte.SIZE)
float WritableComparator.readFloat(bytes, start) 4バイト(Float.SIZE/Byte.SIZE)
double WritableComparator.readDouble(bytes, start) 8バイト(Double.SIZE/Byte.SIZE)
String Text.decode(bytes, start, length)  
int
long
WritableComparator.readVInt(bytes, start)
WritableComparator.readVLong(bytes, start)
可変長整数のバイト列の復元。

数値に関しては長さが決まっている(決まる)ので、オフセット(開始位置)だけ渡せばよい。(長さは不要)


HBaseを使う場合、Bytesクラスにもバイト配列を扱うメソッドが用意されている。[2010-03-23]

import org.apache.hadoop.hbase.util.Bytes;
値の型 サイズ 出力(書込) 変換(出力) 入力(読込) 備考
boolean SIZEOF_BOOLEAN 1   toBytes(値) toBoolean(bytes) 1バイト使用する。
char SIZEOF_CHAR 2 putChar(targetBytes, targetStart, 値) toBytes(値) toChar(bytes)
toChar(bytes, start)
※1
char[]     putChars(targetBytes, targetStart, 値) toBytes(値) toChars(bytes)
toChars(bytes, start)
※1
byte SIZEOF_BYTE 1 putByte(targetBytes, targetStart, 値)   bytes[start]  
byte[]     writeByteArray(out, bytes)
writeByteArray(out, bytes,start,len)
writeByteArray(targetBytes, targetStart, bytes, start, len)
  readByteArray(in) 先頭に可変長intでバイト数を出力している。
readByteArray()では、例外発生時はIOExceptionをスローする。
putBytes(targetBytes, targetStart, bytes, start, len)   readByteArray(in, len) 指定された長さのバイト配列。
    readByteArrayThrowsRuntime(in) readByteArrayThrowsRuntime()の内容はreadByteArray()と同じだが、throwsは宣言されていない。
(例外発生時はRuntimeExceptionをスローする)
short SIZEOF_SHORT 2 putShort(targetBytes, targetStart, 値) toBytes(値) toShort(bytes)
toShort(bytes, start)
※1
int SIZEOF_INT 4 putInt(targetBytes, targetStart, 値) toBytes(値) toInt(bytes)
toInt(bytes, start)
※1
long SIZEOF_LONG 8 putLong(targetBytes, targetStart, 値) toBytes(値) toLong(bytes)
toLong(bytes, start)
※1
  vintToBytes(値) bytesToVint(bytes)
readVLong(bytes, start)
可変長整数は、intというメソッド名でもlongで扱われている。
float SIZEOF_FLOAT 4 putFloat(targetBytes, targetStart, 値) toBytes(値) toFloat(bytes)
toFloat(bytes, start)
toFloat()では、toInt()のビット列をfloatに変換している。
double SIZEOF_DOUBLE 8 putDouble(targetBytes, targetStart, 値) toBytes(値) toDouble(bytes)
toDouble(bytes, start)
toDouble()では、toLong()のビット列をdoubleに変換している。
BigDecimal     putBigDecimal(targetBytes, targetStart, 値) toBytes(値) toBigDecimal(bytes)
toBigDecimal(bytes, start)
※1
String       toBytes(値) toString(bytes)
toString(bytes, start, len)
文字列をUTF-8のバイト列で扱う。
toString()では、引数がnullの場合はnullを返す。
  toBytesBinary(値) toStringBinary(bytes)
toStringBinary(bytes, start, len)
英数字や一部の記号(コロンやピリオドなど)はそのまま、
それ以外は「\xhh」の形式の文字列。
(HBase0.20.4→0.20.5で、そのまま出る記号の種類が変わった[2010-07-03]

※1: toInt()やtoLong()では、引数bytesがnullの場合やバイト列の長さが短い場合は-1を返す。 (toBytes()はnull不可)

内容 メソッド 備考
結合 byte[] add(bytes1, bytes2)
add(bytes1, bytes2, bytes3)
バイト配列を結合した新しいバイト配列を返す。
HBaseではfamilyとqualifierをコロン区切りで結合してcolumnを作るので、
引数を3つとるメソッドもあるのだろう。
分割 byte[] head(bytes, len)
padHead(bytes, len)
tail(bytes, len)
padTail(bytes, len)
バイト列の先頭あるいは末尾の指定された長さを返す。
pad無しメソッドは、バイト列が短い場合はnullを返す。
pad付きメソッドは、バイト列が短い場合は0で埋めて指定された長さを返す。
比較 int compareTo(bytes1, bytes2)
compareTo(b1,s1,l1, b2,s2,l2)
比較ロジックはWritableComparator.compareBytes()と全く同じ。
boolean equals(bytes1, bytes2) compareTo()では引数がnullの場合はNullPointerExceptionが発生するが、
equals()ではnullも可(双方がnullだとtrueを返す)。
ハッシュ int hashCode(bytes)
hashCode(bytes, len)
WritableComparator.hashBytes()を呼び出している。

Text.writeString()とWritableUtils.writeString()

文字列(String)の出力(と入力)は、DataOutput#writeUTF()WritableUtils.writeString()Text.writeString()といった、いくつかの方法がある。

WritableUtilsとTextはHadoopで用意されているクラスなので、DataOutputのwriteUTF()よりはこれらを使う方がいいと思う。

TextとWritableUtilsでは、文字列の出力方法(バイト列になった時の内容)が異なる。
どちらも最初に文字数(UTF-8のバイト数)を出力するのだが、WritableUtilsは固定長int(常に4バイト)でTextは可変長int(ASCIIで127文字までは1バイト)。文字列の長さ(バイト数)は普通は4バイト分もいかないだろうから、可変長intの方がサイズが少なくて済む。
また、WritableUtilsは単純にString#getBytes("UTF-8")で文字列をUTF-8に変換しているので、CharsetEncoderのインスタンスを毎回生成している。
TextはCharsetEncoderインスタンスを使い回すようになっている。
Hadoopで文字列を扱う際にはTextクラスを使っている事も考えると、WritableUtils.writeString()よりText.writeString()を使う方がいいだろう。


可変長int・long

Javaではintは4バイトlongは8バイトであり、DataOutput#writeInt()は4バイトのバイト列、writeLong()は8バイトのバイト列を生成する。
しかし、例えば文字数のような数値を出力したい場合、せいぜい65535文字でも2バイトしか使わないので、4バイトも出力するのは(容量の観点からは)効率が悪い。
そこでHadoopでは、数値の桁数に応じて出力するバイト数を変える方法が用意されている。
それがWritableUtilsのwriteVInt()・readVInt()やwriteVLong()・readVLong()

可変長整数なので、変換方法はintもlongも同じ。
という訳で、VInt()はVLong()を呼んでいる。(readVInt()は、readVLong()の戻り値をintにキャストしているだけ)

出力内容は、値が-112〜127の場合は1バイトでその値をそのまま出力する。
それ以外は先頭1バイトが整数値の長さ(バイト数)を示しており、後続のバイト列が整数値の実体となる。

範囲 備考
バイト列 バイト列 サイズ
-112〜127 -112 90 0
1
127
00
01
7f
1  
128〜255
-113〜-256
-113
-256
87 70
87 ff
128
255
8f 80
8f ff
2 2バイト目以降は
整数値を素直にバイト列
ビッグエンディアン
で表したものとなる。
(負の数はビット反転
256〜65535
-257〜-65536
-257
-65536
86 01 00
86 ff ff
256
65535
8e 01 00
8e ff ff
3
65536〜0xffffff
-65537〜-0x1000000
-65537 85 01 00 00 65536 8d 01 00 00 4
         
〜Long.MAX_VALUE
〜Long.MIN_VALUE
MIN_VALUE 80 7f ff ff ff ff ff ff ff MAX_VALUE 88 7f ff ff ff ff ff ff ff 9

WritableUtilsには、可変長整数のバイト列のサイズを求めるメソッドも用意されている。

メソッド 説明
getVIntSize(整数値) その整数が何バイトになるかを返す。 int len = WritableUtils.getVIntSize(255);
int len = WritableUtils.getVIntSize(256);
2
3
decodeVIntSize(バイト) バイト列の先頭1バイトを渡すと、可変整数値のバイト列全体の長さを返す。 byte[] bytes = { 0x8f, 0x80, … };
int len = WritableUtils.decodeVIntSize(bytes[0]);
2

もちろん単純に出力するより可変長で変換する方が多少コスト(変換に要する時間)がかかるが、
変換せずにネットワーク上をそのまま転送するのと比較すれば、メモリー上で圧縮してから転送する方が全体の効率(実行時間)は良い。
というのがHadoopの思想なんだと思う。

もし文字列の文字数を1バイトで出力できたとすると、intをそのまま出力した場合(4バイト)より3バイト節約できる。
文字列が1000個あったら3kバイトの節約となる。(ネットワーク上の転送量が3kバイト減ることになる)
Hadoopは大量のデータ(文字列の個数なんて1000個どころじゃないだろう)を処理する想定だから、意外と地味に効いてくるのかも。


独自WritableComparable

独自のWritableComparableも独自WritableにcompareTo()を実装するだけで作れる。[2010-03-21]

import org.apache.hadoop.io.WritableComparable;
public class KeyWritable implements WritableComparable<KeyWritable> {
	protected int no;
	protected String name;

	public void set(int no, String name) {
		this.no = no;
		this.name = name;
	}

	public int getNo() {
		return no;
	}

	public String getName() {
		return name;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(no);
		Text.writeString(out, name);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		no = in.readInt();
		name = Text.readString(in);
	}

	@Override
	public int compareTo(KeyWritable that) {
		int c = this.no - that.no;
		if (c != 0) {
			return c;
		}
		return name.compareTo(that.name);
	}

	@Override
	public boolean equals(Object obj) {
		if (obj instanceof KeyWritable) {
			KeyWritable that = (KeyWritable) obj;
			if (this.no != that.no) {
				return false;
			}
			return this.name.equals(that.name);
		}
		return false;
	}

	@Override
	public int hashCode() {
		return no * 163 + name.hashCode();
	}
}

これをMapperやReducerのキーに使う(Job#setMapOutputKeyClass()setOutputKeyClass()で指定する)と、ソート時にcompareTo()が呼ばれて比較される。


MapperやReducerの中間ファイル(メモリー上のバッファー)では値はバイト配列として(シリアライズされて)保持されている為、ソート時にはそこからreadFields()を使って復元し、それからcompareTo()を呼ぶようになっている。
これをWritableComparatorというクラスが行っている。

比較の実施イメージ:

	Class keyClass = job.getMapOutputKeyClass();
	WritableComparator comparator = new WritableComparator(keyClass);

	int compare = comparator.compare(bytes1, offset1, length1, bytes2, offset2, length2);

WritableComparatorのイメージ:

	private final WritableComparable key1;
	private final WritableComparable key2;
	private final DataInputBuffer buffer;
	/**
	 * コンストラクター.
	 * 
	 * @param keyClass 対象となるWritableComparable
	 */
	protected WritableComparator(Class<? extends WritableComparable> keyClass) {

		// keyClassのインスタンスを作っておく
		this.key1 = keyClass.newInstance();
		this.key2 = keyClass.newInstance();

		// バイト配列をDataInputに変換する為のバッファー
		this.buffer = new DataInputBuffer();
	}
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

		// バイト配列からkey1へ値をセット
		buffer.reset(b1, s1, l1);
		key1.readFields(buffer);

		// バイト配列からkey2へ値をセット
		buffer.reset(b2, s2, l2);
		key2.readFields(buffer);

		// 比較
		return key1.compareTo(key2);
	}

独自RawComparator

WritableComparableでなくWritableを実装したクラスであっても、Job#setSortComparatorClass()setGroupingComparatorClass()でソート用のクラス(RawComparator)を指定することにより、ソートさせることが出来る。[2010-03-21]
(WritableComparableであっても、Job#setSortComparatorClass()やsetGroupingComparatorClass()で指定したソート用クラスの方が優先される)

		job.setOutputKeyClass  (KeyWritable.class);
		job.setOutputValueClass(Sum.class);
		job.setSortComparatorClass(KeyWritable.NoComparator.class);
//		job.setSortComparatorClass(KeyWritable.NameComparator.class);

import org.apache.hadoop.io.RawComparator;
public class KeyWritable implements Writable {
//                         implements WritableComparable<KeyWritable> {


	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(no);
		Text.writeString(out, name);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		no = in.readInt();
		name = Text.readString(in);
	}

KeyWritableの番号(no)だけでソートするクラスの例

	public static class NoComparator implements RawComparator<KeyWritable> {
		protected static final WritableComparator INT_COMPARATOR = new IntWritable.Comparator();

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			int SIZE = Integer.SIZE / Byte.SIZE;
			return INT_COMPARATOR.compare(b1, s1, SIZE, b2, s2, SIZE);
//			return INT_COMPARATOR.compare(b1, s1, l1, b2, s2, l2);
		}

		@Override
		public int compare(KeyWritable o1, KeyWritable o2) { //たぶんこのメソッドは呼ばれない
			return o1.no - o2.no;
		}
	}
}

KeyWritableの名前(name)だけでソートするクラスの例

	public static class NameComparator implements RawComparator<KeyWritable> {
		protected static final WritableComparator TEXT_COMPARATOR = new Text.Comparator();

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

			// this.no(固定長int)の分をスキップ
			int SIZE = Integer.SIZE / Byte.SIZE;
			s1 += SIZE; l1 -= SIZE;
			s2 += SIZE; l2 -= SIZE;

			return TEXT_COMPARATOR.compare(b1, s1, l1, b2, s2, l2);
		}

		@Override
		public int compare(KeyWritable o1, KeyWritable o2) { //たぶんこのメソッドは呼ばれない
			return o1.name.compareTo(o2.name);
		}
	}

このKeyWritableでは、noはバイト列の先頭だがintは4バイトなので、長さは4を指定している。(Integer.SIZEintのビット数を表している(JDK1.5以降))
nameで比較したい場合はバイト配列のnoの次の位置にデータが入っているので、オフセットやレングスはnoの分を除いて調整してやる必要がある。

ただし長さ(バイト数)に関しては、intだと固定なのでIntWritable.Comparator#compare()では引数の長さは使っていないから、特に気にしなくても大丈夫かもしれないが。


バイト列の開始位置・長さの算出

シリアライズされたバイト配列では、元のオブジェクトのフィールドの値がwrite()で書き込んだ順番で入っている。[2010-03-21]
引数のバイト配列(b1, b2)やオフセット(s1, s2)・長さ(l1, l2)は“元のオブジェクト全体のバイト列”の値(開始位置・バイト数)なので、
比較したいフィールド用のバイト配列内でのオフセット長さ(バイト数)をきちんと算出しなければならない。

つまり自分のフィールドの型のサイズを事前に算出する必要があるし、次のフィールドの為の開始位置も事後に計算する必要がある。
したがって、compare()の引数l1, l2を使っていいのは、write()で一番最後に出力したフィールドの比較のみ
(l1,l2を全く変更せずに使っていいのは、write()でフィールドをひとつしか出力していない場合のみ)

特に可変長のデータ(可変長整数や文字列)を使う場合、オフセットや長さの計算は注意を要する。
長さ(バイト数)自体をデータ(バイト配列内)として持っているので、長さを計算する為にその部分のデータを復元しなければならない。

固定長整数の例(IntWritable.Comparatorを使う方法)

write()でout.writeInt()を使った場合。

	protected static final WritableComparator INT_COMPARATOR = new IntWritable.Comparator();

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

		int SIZE = Integer.SIZE / Byte.SIZE;

		int c = INT_COMPARATOR.compare(b1, s1, SIZE, b2, s2, SIZE);
		if (c != 0) {
			return c;
		}

		// 次のデータの開始位置・長さを算出
		s1 += SIZE; l1 -= SIZE;
		s2 += SIZE; l2 -= SIZE;

		〜
	}

固定長整数の例(IntWritable.Comparatorを使わない方法)

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

		// 固定長intの値を取得
		int n1 = WritableComparator.readInt(b1, s1);
		int n2 = WritableComparator.readInt(b2, s2);

		int c = n1 - n2;
		if (c != 0) {
			return c;
		}

		// 次のデータの開始位置・長さを算出
		int SIZE = Integer.SIZE / Byte.SIZE;
		s1 += SIZE; l1 -= SIZE;
		s2 += SIZE; l2 -= SIZE;

		〜
	}

固定長整数の例(compareBytes()を使う例)

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

		int SIZE = Integer.SIZE / Byte.SIZE;

		int c = WritableComparator.compareBytes(b1, s1, SIZE, b2, s2, SIZE);
		if (c != 0) {
			return c;
		}

		// 次のデータの開始位置・長さを算出
		s1 += SIZE; l1 -= SIZE;
		s2 += SIZE; l2 -= SIZE;

		〜
	}

compareBytes()はバイト配列を1バイトずつ0〜255の値として比較する為、intの符号は考慮されない。
したがって、readInt()を使う方法と結果が一致するのは正の数の場合のみ。

可変長整数の例

write()でWritableUtils.writeVInt()を使った場合。

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		try {
			// 可変長intの値を取得
			int n1 = WritableComparator.readVInt(b1, s1);
			int n2 = WritableComparator.readVInt(b2, s2);

			int c = n1 - n2;
			if (c != 0) {
				return c;
			}

			// 次のデータの開始位置・長さを算出
			int len1 = WritableUtils.decodeVIntSize(b1[s1]);
			int len2 = WritableUtils.decodeVIntSize(b2[s2]);
			s1 += len1; l1 -= len1;
			s2 += len2; l2 -= len2;

		} catch (IOException e) {
			throw new RuntimeException(e);
		}

		〜
	}

文字列の例(Text.Comparatorを使う方法)

write()でText.writeString()を使った場合。

	protected static final WritableComparator TEXT_COMPARATOR = new Text.Comparator();

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		try {
			//「文字列の長さ部(可変長int)のバイト数」+「文字列部のバイト数」
			int len1 = WritableUtils.decodeVIntSize(b1[s1]) + WritableComparator.readVInt(b1, s1);
			int len2 = WritableUtils.decodeVIntSize(b2[s2]) + WritableComparator.readVInt(b2, s2);

			int c = TEXT_COMPARATOR.compare(b1, s1, len1, b2, s2, len2);
			if (c != 0) {
				return c;
			}

			// 次のデータの開始位置・長さを算出
			s1 += len1; l1 -= len1;
			s2 += len2; l2 -= len2;

		} catch (IOException e) {
			throw new RuntimeException(e);
		}

		〜
	}

文字列の例(Text.Comparatorを使わない方法)

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		try {
			// 文字列の長さ部(可変長int)のバイト数
			int len1 = WritableUtils.decodeVIntSize(b1[s1]);
			int len2 = WritableUtils.decodeVIntSize(b2[s2]);
			// 文字列部のバイト数
			int strLen1 = WritableComparator.readVInt(b1, s1);
			int strLen2 = WritableComparator.readVInt(b2, s2);

			s1 += len1;
			s2 += len2;
			int c = WritableComparator.compareBytes(b1, s1, strLen1, b2, s2, strLen2);
			if (c != 0) {
				return c;
			}

			// 次のデータの開始位置・長さを算出
			s1 += strLen1; l1 -= len1 + strLen1;
			s2 += strLen2; l2 -= len2 + strLen2;

		} catch (IOException e) {
			throw new RuntimeException(e);
		}

		〜
	}

フィールドが多くなってコーディングが面倒(で実行速度をあまり気にしない)なら、オブジェクトを復元する方が手っ取り早い(かつ確実)かも。

	public static class NameComparator implements RawComparator<KeyWritable> {
		private KeyWritable key1 = new KeyWritable();
		private KeyWritable key2 = new KeyWritable();
		private DataInputBuffer buffer = new DataInputBuffer();

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			try {
				buffer.reset(b1, s1, l1); key1.readFields(buffer);
				buffer.reset(b2, s2, l2); key2.readFields(buffer);
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
			return compare(key1, key2);
		}

		@Override
		public int compare(KeyWritable o1, KeyWritable o2) {
			return o1.name.compareTo(o2.name);
		}
	}

↑これはWritableComparatorでやってる事と同じなので、RawComparatorを実装する代わりにWritableComparatorを継承してもいい(WritableComparatorはRawComparatorを実装している)のだが、その場合は対象クラスはWritableComparableを実装していなければならない(Writableのみは不可)。

	public static class NameComparator extends WritableComparator {

		/** コンストラクター */
		public NameComparator() {
			super(KeyWritable.class, true);	// KeyWritableはWritableComparableを実装している必要がある
		}

		@SuppressWarnings("unchecked")
		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			KeyWritable o1 = (KeyWritable) a;
			KeyWritable o2 = (KeyWritable) b;
			return o1.name.compareTo(o2.name);
		}
	}

独自WritableComparator

Hadoopでは、WritableComparableに対してデフォルトのソートクラス(WritableComparator)を登録する機能を持っている。[2010-03-21]
登録しておくと、Job#setSortComparatorClass()setGroupingComparatorClass()でソート用のRawComparatorを指定しなくても、登録されたWritableComparatorが自動的に使われるようになる。

登録用のdefine()メソッドはstaticであり一度だけ呼び出せばいいので、静的初期化子で登録することが出来る。

参考: IntWritableやTextでは、WritableComparatorを継承して独自のComparatorを作り、compare()をオーバーライドしている。そしてその独自Comparatorをそれぞれのソートクラスとして登録している。

WritableComparatorを準備した独自WritableComparable:

import org.apache.hadoop.io.WritableComparator;
public class KeyWritable implements WritableComparable<KeyWritable> {

	/**
	 * KeyWritable用の比較クラス
	 */
	public static class Comparator extends WritableComparator {
		protected static final WritableComparator INT_COMPARATOR  = new IntWritable.Comparator();
		protected static final WritableComparator TEXT_COMPARATOR = new Text.Comparator();

		/** コンストラクター */
		public Comparator() {
			super(KeyWritable.class);
		}

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			int SIZE = Integer.SIZE / Byte.SIZE;

			int c = INT_COMPARATOR.compare(b1, s1, SIZE, b2, s2, SIZE);
			if (c != 0) {
				return c;
			}

			s1 += SIZE; l1 -= SIZE;
			s2 += SIZE; l2 -= SIZE;

			return TEXT_COMPARATOR.compare(b1, s1, l1, b2, s2, l2);
		}
	}
	// WritableComparatorに比較クラスを登録する
	static {
		WritableComparator.define(KeyWritable.class, new Comparator());
	}
}

※複数フィールドの比較(開始位置や長さの算出)については、RawComparatorと同様の注意が必要。

compare(WritableComparable, WritableComparable)は、compare(byte[]〜)がオーバーライドされている場合は呼ばれないので、実装不要。
 呼ばれたとしても、コンストラクターで指定されているWriteComparableクラスのcompareTo()が呼ばれるだけなので、問題ない。


Hadoop APIへ戻る / Hadoop目次へ戻る / Java目次へ行く / 技術メモへ戻る
メールの送信先:ひしだま