HadoopのWritableクラスについて。
|
|
Hadoopでは、ノード(サーバー)間のデータ転送や中間データ(メモリー上のバッファ内や一時ファイル)の保持をバイナリデータ(バイト列byte[]
)で行う。
その為、データを転送する前(出力前)にバイト配列に変換(シリアライズ)し、転送後(読込時)に復元(デシリアライズ)する必要がある。
Javaには標準でシリアライズ(java.io.Serializable)の仕組みがあるが、Hadoopではそれを使っていない。
(JavaのSerializableは重いので、Hadoopではもっと軽量な仕組みを採用したらしい)
そこで、Writableというインターフェースが用意されている。
Hadoopが扱うデータ(クラス)はたぶん全てWritableを実装している。
Writableにはwrite()とreadFields()という2つのメソッドがあり、つまりバイト列との変換をそれぞれの具象クラスでコーディングせよということだ^^;
したがって、書き込んだデータと読み込んだデータの整合性(ちゃんと同じフィールドに同じ型で読み込めるようにする)はプログラマーが気をつける必要がある。
(ちょっと面倒ではあるが、シンプルにコーディングすれば最も高速になるしデータ量も最小限で済む。
Serializableを使うとコーディングの手間も省けるし整合性チェックも行ってくれるが、汎用的なので冗長な分遅いし余計なデータも付加されてサイズも大きくなる。
Hadoopでは初回のコーディングの手間よりも実行時のパフォーマンス(速度・容量)を優先したのだろう)
もうひとつ、Writableから派生したインターフェースにWritableComparableというものがある。
これはWritableと(Java標準の)Comparableを継承しただけのインターフェース。
Hadoopで扱うデータのうち、キーとして使用するものは(Map/Reduceの間で)ソートされるので、大小比較を行える必要がある。この為にComparableを実装する。
Writableの具象クラス(Hadoop0.20.1)には以下のようなクラスがある。
クラス名 | 説明 | |
---|---|---|
Hadoop | org.apache.hadoop.io. Writable |
Hadoopでシリアライズできる事を示すインターフェース。 write()とreadFields()が宣言されている。 |
org.apache.hadoop.io. MapWritable |
シリアライズできるHashMap<Writable, Writable>。 | |
org.apache.hadoop.io. SortedMapWritable |
シリアライズできるTreeMap<WritableComparable, Writable>。 | |
org.apache.hadoop.io. ArrayWritable |
Writableオブジェクトの配列をシリアライズする。 | |
org.apache.hadoop.io. ObjectWritable |
「プリミティブ型」・「列挙型」・「Writableの配列」をシリアライズする。 (シリアライズ時の先頭にクラス名を出力する) |
|
org.apache.hadoop.io. GenericWritable |
使用したいクラスに番号を付けておき、シリアライズ時の先頭に(クラス名でなく)その番号を出力する。 | |
org.apache.hadoop.mapreduce. Counter |
カウンター。 | |
org.apache.hadoop.conf. Configuration |
Configurationもノード間で受け渡す必要があるからWritableなのだろう。 | |
HBase | org.apache.hadoop.hbase.client. Get |
HBaseでデータを取得する列名を指定する。 |
org.apache.hadoop.hbase.client. Scan org.apache.hadoop.hbase.filter. Filter |
HBaseでデータを取得する条件を指定する。 | |
org.apache.hadoop.hbase.client. Put Delete |
HBaseのデータの更新内容を保持する。 | |
org.apache.hadoop.hbase. KeyValue |
HBaseのキー(列名)と値(データ)を保持する。 | |
クラス名 | 説明 | |
Hadoop | org.apache.hadoop.io. WritableComparable<T> |
ソート可能なWritable。 型引数TはComparable<T>のものなので、自分自身のクラスを指す。 |
org.apache.hadoop.io. BooleanWritable ByteWritable IntWritable LongWritable FloatWritable DoubleWritable |
プリミティブ型に該当するWritable。(shortは無いみたいだ…) シリアライズ時には固定長のバイト列になる。 |
|
org.apache.hadoop.io. BytesWritable |
バイト配列(byte[] )をシリアライズする。 |
|
org.apache.hadoop.io. VIntWritable VLongWritable |
シリアライズ時に可変長のバイト列になる。 例えば-112〜127の範囲なら1バイトしか出力されない為、データサイズが縮小できる。 シリアライズ方法(実際に呼ばれるメソッド)はVIntでもVLongと同じ。 |
|
org.apache.hadoop.io. Text |
文字列を保持するWritable。 シリアライズ時はUTF-8で扱われる。 |
|
org.apache.hadoop.io. NullWritable |
何も無い事を示すWritable。 NullWritable同士を比較すると常に等しくなる。ソート順を気にしない場合のキーに使える。 このクラスはシングルトンで扱えるので、 NullWritable. get() でインスタンスを取得する。 |
|
HBase | org.apache.hadoop.hbase.io. ImmutableBytesWritable |
HBaseのキーや値は全てバイト配列で扱うので、その際に使用する。 BytesWritableはsetSize()によって長さを変えられるが、ImmutableBytesWritableはサイズ変更できない。 |
org.apache.hadoop.hbase. HTableDescriptor |
HBaseのテーブル情報。 | |
org.apache.hadoop.hbase. HColumnDescriptor |
HBaseの列情報。 |
→Writableの自作方法
→WritableComparableの自作方法
独自のWritableも簡単に作れる。
例えば点数と人数を保持するクラスは以下のようになる。
フィールドにWritableを使う例 | フィールドにプリミティブ型やPOJOを使う例 |
---|---|
import org.apache.hadoop.io.Writable; public class Sum implements Writable { private IntWritable ten = new IntWritable(); private IntWritable cnt = new IntWritable(); public void set(int ten, int count) { this.ten.set(ten); this.cnt.set(count); } public int getTen() { return ten.get(); } public int getCount() { return cnt.get(); } @Override public void write(DataOutput out) throws IOException { ten.write(out); cnt.write(out); } @Override public void readFields(DataInput in) throws IOException { ten.readFields(in); cnt.readFields(in); } } |
import org.apache.hadoop.io.Writable; public class Sum implements Writable { private int ten = 0; private int cnt = 0; public void set(int ten, int count) { this.ten = ten; this.cnt = count; } public int getTen() { return ten; } public int getCount() { return cnt; } @Override public void write(DataOutput out) throws IOException { out.writeInt(ten); out.writeInt(cnt); } @Override public void readFields(DataInput in) throws IOException { ten = in.readInt(); cnt = in.readInt(); } } |
readFields()やwirte()の実装は、 フィールドのWritableの同名メソッドを呼び出す形となる。 各フィールドの具象クラスが異なっても同じ形式で済む。 |
readFields()やwirte()の実装は、 フィールドの型(クラス)に応じて変える必要がある。 |
セッターやゲッターメソッドは 各フィールドのset()やget()を呼ぶ必要があるのでちょっとだけ面倒。 |
セッターやゲッターメソッドは 通常のクラス(JavaBean)と同じくシンプル。 |
自分でバイト列の変換をコーディングする場合、DataInput/DataOutputとの入出力を行うユーティリティーが用意されているので、それを利用することが出来る。
(なお、DataInputやDataOutputはJava標準のデータ入出力クラスなので、プリミティブ型用のメソッドは揃っている)
値の型 | 出力(書込) | 入力(読込) | 備考 |
---|---|---|---|
boolean | out.writeBoolean(値) |
in.readBoolen() |
|
byte | out.writeByte(値) |
in.readByte() |
|
short | out.writeShort(値) |
in.readShort() |
|
int | out.writeInt(値) |
in.readInt() |
|
long | out.writeLong(値) |
in.readLong() |
|
float | out.writeFloat(値) |
in.readFloat() |
|
double | out.writeDouble(値) |
in.readDouble() |
|
String | Text.writeString(out,
値) |
Text.readString(in) |
WritableUtils.write/readString()とは出力内容が異なる。 |
String[] | WritableUtils.writeStringArray(out,
値) |
WritableUtils.readStringArray(in) |
WritableUtils.write/readString()を使用している。 |
列挙型 | WritableUtils.writeEnum(out,
値) |
WritableUtils.readEnum(in,
列挙クラス) |
|
int long |
WritableUtils.writeVInt(out,
値) |
WritableUtils.readVInt(in) |
可変長のバイト列として出力する。 VInt()の内部ではVLong()を呼んでいるので、出力内容は同一。 |
byte[] String String[] |
WritableUtils.writeCompressedByteArray(out,
値) |
WritableUtils.readCompressedByteArray(in) |
GZIP圧縮して出力(解凍して復元)する。 StringArray()はString()を呼び、String()はByteArray()を呼んでいる。 |
バイト配列からの変換メソッドはWritableComparatorに用意されている。[2010-03-21]
import org.apache.hadoop.io.WritableComparator;
値の型 | 入力(読込) | 備考 |
---|---|---|
byte | bytes[start] |
|
byte[] | 比較はWritableComparator.compareBytes(b1,s1,l1,
b2,s2,l2) 。バイト配列を1バイトずつ0〜255の値として比較する。 |
|
short | WritableComparator.readUnsignedShort(bytes, start) |
2バイト(Short.SIZE/Byte.SIZE) |
int | WritableComparator.readInt(bytes, start) |
4バイト(Integer.SIZE/Byte.SIZE) |
long | WritableComparator.readLong(bytes, start) |
8バイト(Long.SIZE/Byte.SIZE) |
float | WritableComparator.readFloat(bytes, start) |
4バイト(Float.SIZE/Byte.SIZE) |
double | WritableComparator.readDouble(bytes, start) |
8バイト(Double.SIZE/Byte.SIZE) |
String | Text.decode(bytes,
start, length) |
|
int long |
WritableComparator.readVInt(bytes,
start) |
可変長整数のバイト列の復元。 |
数値に関しては長さが決まっている(決まる)ので、オフセット(開始位置)だけ渡せばよい。(長さは不要)
HBaseを使う場合、Bytesクラスにもバイト配列を扱うメソッドが用意されている。[2010-03-23]
import org.apache.hadoop.hbase.util.Bytes;
値の型 | サイズ | 出力(書込) | 変換(出力) | 入力(読込) | 備考 | |
---|---|---|---|---|---|---|
boolean | SIZEOF_BOOLEAN |
1 | toBytes(値) |
toBoolean(bytes) |
1バイト使用する。 | |
char | SIZEOF_CHAR |
2 | putChar(targetBytes, targetStart, 値) |
toBytes(値) |
toChar(bytes) |
※1 |
char[] | putChars(targetBytes, targetStart, 値) |
toBytes(値) |
toChars(bytes) |
※1 | ||
byte | SIZEOF_BYTE |
1 | putByte(targetBytes, targetStart, 値) |
bytes[start] |
||
byte[] | writeByteArray(out, bytes) |
readByteArray(in) |
先頭に可変長intでバイト数を出力している。 readByteArray()では、例外発生時はIOExceptionをスローする。 |
|||
putBytes(targetBytes, targetStart, bytes, start, len) |
readByteArray(in, len) |
指定された長さのバイト配列。 | ||||
readByteArrayThrowsRuntime(in) |
readByteArrayThrowsRuntime()の内容はreadByteArray()と同じだが、throwsは宣言されていない。 (例外発生時はRuntimeExceptionをスローする) |
|||||
short | SIZEOF_SHORT |
2 | putShort(targetBytes, targetStart, 値) |
toBytes(値) |
toShort(bytes) |
※1 |
int | SIZEOF_INT |
4 | putInt(targetBytes, targetStart, 値) |
toBytes(値) |
toInt(bytes) |
※1 |
long | SIZEOF_LONG |
8 | putLong(targetBytes, targetStart, 値) |
toBytes(値) |
toLong(bytes) |
※1 |
vintToBytes(値) |
bytesToVint(bytes) |
可変長整数は、intというメソッド名でもlongで扱われている。 | ||||
float | SIZEOF_FLOAT |
4 | putFloat(targetBytes, targetStart, 値) |
toBytes(値) |
toFloat(bytes) |
toFloat()では、toInt()のビット列をfloatに変換している。 |
double | SIZEOF_DOUBLE |
8 | putDouble(targetBytes, targetStart, 値) |
toBytes(値) |
toDouble(bytes) |
toDouble()では、toLong()のビット列をdoubleに変換している。 |
BigDecimal | putBigDecimal(targetBytes, targetStart, 値) |
toBytes(値) |
toBigDecimal(bytes) |
※1 | ||
String | toBytes(値) |
toString(bytes) |
文字列をUTF-8のバイト列で扱う。 toString()では、引数がnullの場合はnullを返す。 |
|||
toBytesBinary(値) |
toStringBinary(bytes) |
英数字や一部の記号(コロンやピリオドなど)はそのまま、 それ以外は「\xhh」の形式の文字列。 (HBase0.20.4→0.20.5で、そのまま出る記号の種類が変わった[2010-07-03]) |
※1: toInt()やtoLong()では、引数bytesがnullの場合やバイト列の長さが短い場合は-1を返す。 (toBytes()はnull不可)
内容 | メソッド | 備考 | |
---|---|---|---|
結合 | byte[] | add(bytes1, bytes2) |
バイト配列を結合した新しいバイト配列を返す。 HBaseではfamilyとqualifierをコロン区切りで結合してcolumnを作るので、 引数を3つとるメソッドもあるのだろう。 |
分割 | byte[] | head(bytes, len) |
バイト列の先頭あるいは末尾の指定された長さを返す。 pad無しメソッドは、バイト列が短い場合はnullを返す。 pad付きメソッドは、バイト列が短い場合は0で埋めて指定された長さを返す。 |
比較 | int | compareTo(bytes1, bytes2) |
比較ロジックはWritableComparator.compareBytes()と全く同じ。 |
boolean | equals(bytes1, bytes2) |
compareTo()では引数がnullの場合はNullPointerExceptionが発生するが、 equals()ではnullも可(双方がnullだとtrueを返す)。 |
|
ハッシュ | int | hashCode(bytes) |
WritableComparator.hashBytes()を呼び出している。 |
文字列(String)の出力(と入力)は、DataOutput#writeUTF()
・WritableUtils.writeString()
・Text.writeString()
といった、いくつかの方法がある。
WritableUtilsとTextはHadoopで用意されているクラスなので、DataOutputのwriteUTF()よりはこれらを使う方がいいと思う。
TextとWritableUtilsでは、文字列の出力方法(バイト列になった時の内容)が異なる。
どちらも最初に文字数(UTF-8のバイト数)を出力するのだが、WritableUtilsは固定長int(常に4バイト)でTextは可変長int(ASCIIで127文字までは1バイト)。文字列の長さ(バイト数)は普通は4バイト分もいかないだろうから、可変長intの方がサイズが少なくて済む。
また、WritableUtilsは単純にString#getBytes("UTF-8")で文字列をUTF-8に変換しているので、CharsetEncoderのインスタンスを毎回生成している。
TextはCharsetEncoderインスタンスを使い回すようになっている。
Hadoopで文字列を扱う際にはTextクラスを使っている事も考えると、WritableUtils.writeString()よりText.writeString()を使う方がいいだろう。
Javaではintは4バイト、longは8バイトであり、DataOutput#writeInt()は4バイトのバイト列、writeLong()は8バイトのバイト列を生成する。
しかし、例えば文字数のような数値を出力したい場合、せいぜい65535文字でも2バイトしか使わないので、4バイトも出力するのは(容量の観点からは)効率が悪い。
そこでHadoopでは、数値の桁数に応じて出力するバイト数を変える方法が用意されている。
それがWritableUtilsのwriteVInt()・readVInt()やwriteVLong()・readVLong()。
可変長整数なので、変換方法はintもlongも同じ。
という訳で、VInt()はVLong()を呼んでいる。(readVInt()は、readVLong()の戻り値をintにキャストしているだけ)
出力内容は、値が-112〜127の場合は1バイトでその値をそのまま出力する。
それ以外は先頭1バイトが整数値の長さ(バイト数)を示しており、後続のバイト列が整数値の実体となる。
範囲 | 例 | 備考 | ||||
---|---|---|---|---|---|---|
値 | バイト列 | 値 | バイト列 | サイズ | ||
-112〜127 | -112 | 90 |
0 1 127 |
00 |
1 | |
128〜255 -113〜-256 |
-113 -256 |
87 70 |
128 255 |
8f 80 |
2 | 2バイト目以降は 整数値を素直にバイト列 (ビッグエンディアン) で表したものとなる。 (負の数はビット反転) |
256〜65535 -257〜-65536 |
-257 -65536 |
86 01 00 |
256 65535 |
8e 01 00 |
3 | |
65536〜0xffffff -65537〜-0x1000000 |
-65537 | 85 01 00 00 |
65536 | 8d 01 00 00 |
4 | |
… | ||||||
〜Long.MAX_VALUE 〜Long.MIN_VALUE |
MIN_VALUE | 80 7f ff ff ff ff ff ff ff |
MAX_VALUE | 88 7f ff ff ff ff ff ff ff |
9 |
WritableUtilsには、可変長整数のバイト列のサイズを求めるメソッドも用意されている。
メソッド | 説明 | 例 | |
---|---|---|---|
getVIntSize(整数値) | その整数が何バイトになるかを返す。 | int len = WritableUtils.getVIntSize(255); |
2 3 |
decodeVIntSize(バイト) | バイト列の先頭1バイトを渡すと、可変整数値のバイト列全体の長さを返す。 | byte[] bytes = { 0x8f, 0x80, … }; |
2 |
もちろん単純に出力するより可変長で変換する方が多少コスト(変換に要する時間)がかかるが、
変換せずにネットワーク上をそのまま転送するのと比較すれば、メモリー上で圧縮してから転送する方が全体の効率(実行時間)は良い。
というのがHadoopの思想なんだと思う。
もし文字列の文字数を1バイトで出力できたとすると、intをそのまま出力した場合(4バイト)より3バイト節約できる。
文字列が1000個あったら3kバイトの節約となる。(ネットワーク上の転送量が3kバイト減ることになる)
Hadoopは大量のデータ(文字列の個数なんて1000個どころじゃないだろう)を処理する想定だから、意外と地味に効いてくるのかも。
独自のWritableComparableも独自WritableにcompareTo()を実装するだけで作れる。[2010-03-21]
import org.apache.hadoop.io.WritableComparable;
public class KeyWritable implements WritableComparable<KeyWritable> { protected int no; protected String name; public void set(int no, String name) { this.no = no; this.name = name; } public int getNo() { return no; } public String getName() { return name; } @Override public void write(DataOutput out) throws IOException { out.writeInt(no); Text.writeString(out, name); } @Override public void readFields(DataInput in) throws IOException { no = in.readInt(); name = Text.readString(in); } @Override public int compareTo(KeyWritable that) { int c = this.no - that.no; if (c != 0) { return c; } return name.compareTo(that.name); } @Override public boolean equals(Object obj) { if (obj instanceof KeyWritable) { KeyWritable that = (KeyWritable) obj; if (this.no != that.no) { return false; } return this.name.equals(that.name); } return false; } @Override public int hashCode() { return no * 163 + name.hashCode(); } }
これをMapperやReducerのキーに使う(Job#setMapOutputKeyClass()やsetOutputKeyClass()で指定する)と、ソート時にcompareTo()が呼ばれて比較される。
MapperやReducerの中間ファイル(メモリー上のバッファー)では値はバイト配列として(シリアライズされて)保持されている為、ソート時にはそこからreadFields()を使って復元し、それからcompareTo()を呼ぶようになっている。
これをWritableComparatorというクラスが行っている。
Class keyClass = job.getMapOutputKeyClass(); WritableComparator comparator = new WritableComparator(keyClass); int compare = comparator.compare(bytes1, offset1, length1, bytes2, offset2, length2);
private final WritableComparable key1; private final WritableComparable key2; private final DataInputBuffer buffer;
/** * コンストラクター. * * @param keyClass 対象となるWritableComparable */ protected WritableComparator(Class<? extends WritableComparable> keyClass) { // keyClassのインスタンスを作っておく this.key1 = keyClass.newInstance(); this.key2 = keyClass.newInstance(); // バイト配列をDataInputに変換する為のバッファー this.buffer = new DataInputBuffer(); }
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // バイト配列からkey1へ値をセット buffer.reset(b1, s1, l1); key1.readFields(buffer); // バイト配列からkey2へ値をセット buffer.reset(b2, s2, l2); key2.readFields(buffer); // 比較 return key1.compareTo(key2); }
WritableComparableでなくWritableを実装したクラスであっても、Job#setSortComparatorClass()やsetGroupingComparatorClass()でソート用のクラス(RawComparator)を指定することにより、ソートさせることが出来る。[2010-03-21]
(WritableComparableであっても、Job#setSortComparatorClass()やsetGroupingComparatorClass()で指定したソート用クラスの方が優先される)
job.setOutputKeyClass (KeyWritable.class); job.setOutputValueClass(Sum.class); job.setSortComparatorClass(KeyWritable.NoComparator.class); // job.setSortComparatorClass(KeyWritable.NameComparator.class);
import org.apache.hadoop.io.RawComparator;
public class KeyWritable implements Writable { // implements WritableComparable<KeyWritable> { 〜 @Override public void write(DataOutput out) throws IOException { out.writeInt(no); Text.writeString(out, name); } @Override public void readFields(DataInput in) throws IOException { no = in.readInt(); name = Text.readString(in); }
public static class NoComparator implements RawComparator<KeyWritable> { protected static final WritableComparator INT_COMPARATOR = new IntWritable.Comparator(); @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int SIZE = Integer.SIZE / Byte.SIZE; return INT_COMPARATOR.compare(b1, s1, SIZE, b2, s2, SIZE); // return INT_COMPARATOR.compare(b1, s1, l1, b2, s2, l2); } @Override public int compare(KeyWritable o1, KeyWritable o2) { //たぶんこのメソッドは呼ばれない return o1.no - o2.no; } } }
public static class NameComparator implements RawComparator<KeyWritable> { protected static final WritableComparator TEXT_COMPARATOR = new Text.Comparator(); @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // this.no(固定長int)の分をスキップ int SIZE = Integer.SIZE / Byte.SIZE; s1 += SIZE; l1 -= SIZE; s2 += SIZE; l2 -= SIZE; return TEXT_COMPARATOR.compare(b1, s1, l1, b2, s2, l2); } @Override public int compare(KeyWritable o1, KeyWritable o2) { //たぶんこのメソッドは呼ばれない return o1.name.compareTo(o2.name); } }
このKeyWritableでは、noはバイト列の先頭だがintは4バイトなので、長さは4を指定している。(Integer.SIZE
はintのビット数を表している(JDK1.5以降))
nameで比較したい場合はバイト配列のnoの次の位置にデータが入っているので、オフセットやレングスはnoの分を除いて調整してやる必要がある。
ただし長さ(バイト数)に関しては、intだと固定なのでIntWritable.Comparator#compare()では引数の長さは使っていないから、特に気にしなくても大丈夫かもしれないが。
シリアライズされたバイト配列では、元のオブジェクトのフィールドの値がwrite()で書き込んだ順番で入っている。[2010-03-21]
引数のバイト配列(b1, b2)やオフセット(s1, s2)・長さ(l1, l2)は“元のオブジェクト全体のバイト列”の値(開始位置・バイト数)なので、
比較したいフィールド用のバイト配列内でのオフセットや長さ(バイト数)をきちんと算出しなければならない。
つまり自分のフィールドの型のサイズを事前に算出する必要があるし、次のフィールドの為の開始位置も事後に計算する必要がある。
したがって、compare()の引数l1,
l2を使っていいのは、write()で一番最後に出力したフィールドの比較のみ。
(l1,l2を全く変更せずに使っていいのは、write()でフィールドをひとつしか出力していない場合のみ)
特に可変長のデータ(可変長整数や文字列)を使う場合、オフセットや長さの計算は注意を要する。
長さ(バイト数)自体をデータ(バイト配列内)として持っているので、長さを計算する為にその部分のデータを復元しなければならない。
write()でout.writeInt()を使った場合。
protected static final WritableComparator INT_COMPARATOR = new IntWritable.Comparator(); @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int SIZE = Integer.SIZE / Byte.SIZE; int c = INT_COMPARATOR.compare(b1, s1, SIZE, b2, s2, SIZE); if (c != 0) { return c; } // 次のデータの開始位置・長さを算出 s1 += SIZE; l1 -= SIZE; s2 += SIZE; l2 -= SIZE; 〜 }
@Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // 固定長intの値を取得 int n1 = WritableComparator.readInt(b1, s1); int n2 = WritableComparator.readInt(b2, s2); int c = n1 - n2; if (c != 0) { return c; } // 次のデータの開始位置・長さを算出 int SIZE = Integer.SIZE / Byte.SIZE; s1 += SIZE; l1 -= SIZE; s2 += SIZE; l2 -= SIZE; 〜 }
@Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int SIZE = Integer.SIZE / Byte.SIZE; int c = WritableComparator.compareBytes(b1, s1, SIZE, b2, s2, SIZE); if (c != 0) { return c; } // 次のデータの開始位置・長さを算出 s1 += SIZE; l1 -= SIZE; s2 += SIZE; l2 -= SIZE; 〜 }
compareBytes()はバイト配列を1バイトずつ0〜255の値として比較する為、intの符号は考慮されない。
したがって、readInt()を使う方法と結果が一致するのは正の数の場合のみ。
write()でWritableUtils.writeVInt()を使った場合。
@Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // 可変長intの値を取得 int n1 = WritableComparator.readVInt(b1, s1); int n2 = WritableComparator.readVInt(b2, s2); int c = n1 - n2; if (c != 0) { return c; } // 次のデータの開始位置・長さを算出 int len1 = WritableUtils.decodeVIntSize(b1[s1]); int len2 = WritableUtils.decodeVIntSize(b2[s2]); s1 += len1; l1 -= len1; s2 += len2; l2 -= len2; } catch (IOException e) { throw new RuntimeException(e); } 〜 }
write()でText.writeString()を使った場合。
protected static final WritableComparator TEXT_COMPARATOR = new Text.Comparator(); @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { //「文字列の長さ部(可変長int)のバイト数」+「文字列部のバイト数」 int len1 = WritableUtils.decodeVIntSize(b1[s1]) + WritableComparator.readVInt(b1, s1); int len2 = WritableUtils.decodeVIntSize(b2[s2]) + WritableComparator.readVInt(b2, s2); int c = TEXT_COMPARATOR.compare(b1, s1, len1, b2, s2, len2); if (c != 0) { return c; } // 次のデータの開始位置・長さを算出 s1 += len1; l1 -= len1; s2 += len2; l2 -= len2; } catch (IOException e) { throw new RuntimeException(e); } 〜 }
@Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // 文字列の長さ部(可変長int)のバイト数 int len1 = WritableUtils.decodeVIntSize(b1[s1]); int len2 = WritableUtils.decodeVIntSize(b2[s2]); // 文字列部のバイト数 int strLen1 = WritableComparator.readVInt(b1, s1); int strLen2 = WritableComparator.readVInt(b2, s2); s1 += len1; s2 += len2; int c = WritableComparator.compareBytes(b1, s1, strLen1, b2, s2, strLen2); if (c != 0) { return c; } // 次のデータの開始位置・長さを算出 s1 += strLen1; l1 -= len1 + strLen1; s2 += strLen2; l2 -= len2 + strLen2; } catch (IOException e) { throw new RuntimeException(e); } 〜 }
フィールドが多くなってコーディングが面倒(で実行速度をあまり気にしない)なら、オブジェクトを復元する方が手っ取り早い(かつ確実)かも。
public static class NameComparator implements RawComparator<KeyWritable> { private KeyWritable key1 = new KeyWritable(); private KeyWritable key2 = new KeyWritable(); private DataInputBuffer buffer = new DataInputBuffer(); @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); key1.readFields(buffer); buffer.reset(b2, s2, l2); key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); } @Override public int compare(KeyWritable o1, KeyWritable o2) { return o1.name.compareTo(o2.name); } }
↑これはWritableComparatorでやってる事と同じなので、RawComparatorを実装する代わりにWritableComparatorを継承してもいい(WritableComparatorはRawComparatorを実装している)のだが、その場合は対象クラスはWritableComparableを実装していなければならない(Writableのみは不可)。
public static class NameComparator extends WritableComparator {
/** コンストラクター */
public NameComparator() {
super(KeyWritable.class, true); // KeyWritableはWritableComparableを実装している必要がある
}
@SuppressWarnings("unchecked")
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyWritable o1 = (KeyWritable) a;
KeyWritable o2 = (KeyWritable) b;
return o1.name.compareTo(o2.name);
}
}
Hadoopでは、WritableComparableに対してデフォルトのソートクラス(WritableComparator)を登録する機能を持っている。[2010-03-21]
登録しておくと、Job#setSortComparatorClass()やsetGroupingComparatorClass()でソート用のRawComparatorを指定しなくても、登録されたWritableComparatorが自動的に使われるようになる。
登録用のdefine()メソッドはstaticであり一度だけ呼び出せばいいので、静的初期化子で登録することが出来る。
参考: IntWritableやTextでは、WritableComparatorを継承して独自のComparatorを作り、compare()をオーバーライドしている。そしてその独自Comparatorをそれぞれのソートクラスとして登録している。
import org.apache.hadoop.io.WritableComparator;
public class KeyWritable implements WritableComparable<KeyWritable> { 〜
/** * KeyWritable用の比較クラス */ public static class Comparator extends WritableComparator { protected static final WritableComparator INT_COMPARATOR = new IntWritable.Comparator(); protected static final WritableComparator TEXT_COMPARATOR = new Text.Comparator(); /** コンストラクター */ public Comparator() { super(KeyWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int SIZE = Integer.SIZE / Byte.SIZE; int c = INT_COMPARATOR.compare(b1, s1, SIZE, b2, s2, SIZE); if (c != 0) { return c; } s1 += SIZE; l1 -= SIZE; s2 += SIZE; l2 -= SIZE; return TEXT_COMPARATOR.compare(b1, s1, l1, b2, s2, l2); } }
// WritableComparatorに比較クラスを登録する static { WritableComparator.define(KeyWritable.class, new Comparator()); } }
※複数フィールドの比較(開始位置や長さの算出)については、RawComparatorと同様の注意が必要。
※compare(WritableComparable,
WritableComparable)は、compare(byte[]〜)がオーバーライドされている場合は呼ばれないので、実装不要。
呼ばれたとしても、コンストラクターで指定されているWriteComparableクラスのcompareTo()が呼ばれるだけなので、問題ない。