HiveのSerDe(シリアライズ/デシリアライズ)の作り方のメモ。
|
|
HiveではSequenceFileでファイルを読み書きできるが、ファイル内の実データは1行が1つのTextとなる。
Textでなく自分で作ったWritableをデータにしたい場合に、HiveのSerDeを定義する。
SerDeはSerialize/Deserializeの略で、データを入出力できる形式に変換するクラスのこと。
SerDeを作るにはorg.apache.hadoop.hive.serde2.SerDeインターフェースを実装する。
そしてCREATE TABLEでSerDeクラスを指定する。
一番最初に、データ保存に使うWritableクラスを作っておく。
今回は例としてWordCountのデータ(単語と単語数)を保持するWritableを用意してみた。
package sample.writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable;
public class WordCountWritable implements Writable {
private Text word = new Text();
private IntWritable count = new IntWritable();
public void setWord(String s) {
word.set(s);
}
public String getWord() {
return word.toString();
}
public void setCount(int n) {
count.set(n);
}
public int getCount() {
return count.get();
}
@Override
public void write(DataOutput out) throws IOException {
word.write(out);
count.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
word.readFields(in);
count.readFields(in);
}
}
Developer Guideに記載のあるMetadataTypedColumnsetSerDeクラスを参考にSerDeクラスを作ってみた。
SerDeでは大きく3種類のメソッドを実装する。初期化・デシリアライズ・シリアライズ。
SerDeクラスはけっこう頻繁に呼ばれる(DESCRIBEでも呼ばれるし、1回のSELECT・INSERTでも複数回呼ばれる)ので、SerDe内でキャッシュできるデータはキャッシュする方針のようだ。
package sample;
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.Writable; import sample.writable.WordCountWritable;
public class WordCountSerDe implements SerDe {
〜
}
SerDeがインスタンス化されると、必ずinitialize()メソッドが呼ばれる。
また、ObjectInspector(項目名と属性を保持するクラスらしい)を返すメソッド(getObjectInspector())を実装する必要があるので
ObjectInspectorインスタンスをinitialize()メソッドで作っておく。
private ObjectInspector cachedInspector; |
initialize()の中で作ったObjectInspectorをキャッシュしておく。 | ||||||||||||
@Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { String columnProperty = tbl.getProperty("columns"); List<String> columnList = Arrays.asList(columnProperty.split(",")); cachedInspector = new MyInspector(columnList); } |
初期化メソッドの引数tblには、プロパティーが入ってくる。 SERDEPROPERTIESで指定されたプロパティーもここで取得できる。
|
||||||||||||
@Override public ObjectInspector getObjectInspector() throws SerDeException { return cachedInspector; } |
initialize()で作ったObjectInspectorを返す。 | ||||||||||||
static class MyInspector extends StandardStructObjectInspector { protected MyInspector(List<String> names) { super(names, getFieldObjectInspectors()); } static List<ObjectInspector> getFieldObjectInspectors() { List<ObjectInspector> r = new ArrayList<ObjectInspector>(2); r.add(PrimitiveObjectInspectorFactory .getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING)); r.add(PrimitiveObjectInspectorFactory .getPrimitiveJavaObjectInspector(PrimitiveCategory.INT)); return r; } } |
ObjectInspectorインターフェースを実装したクラス。 直接ObjectInspectorを継承するとよく分からないメソッドを色々書かないといけないので、 MetadataTypedColumnsetSerDeで使っているStandardStructObjectInspectorを継承してみた。 親クラスのコンストラクターで項目名とデータ型を指定するようだ。 |
ファイルから読み込まれたデータをHiveで使える形式に変換する。
private final List<Object> deserializeCache = new ArrayList<Object>(2); |
テーブルの各項目のデータを保持するリストのキャッシュ。 |
@Override
public Object deserialize(Writable blob) throws SerDeException {
WordCountWritable wc = (WordCountWritable) blob;
deserializeCache.clear();
deserializeCache.add(wc.getWord());
deserializeCache.add(wc.getCount());
return deserializeCache;
}
|
テーブルのファイル形式がSequenceFileの場合、Writableクラス名はシーケンスファイルのヘッダーに書かれているので、 deserialize()メソッドにはそのクラスで渡ってきている(はず)。 なのでいきなりWordCountWritableにキャストしている。 |
ファイルへ書き込むために、Hiveの各項目のデータからWritableを作る部分。
@Override
public Class<? extends Writable> getSerializedClass() {
return WordCountWritable.class;
}
|
シリアライズに使用するWritableクラスを返す。 |
private final WordCountWritable serializeCache = new WordCountWritable(); |
シリアライズ用インスタンスのキャッシュ。 |
@Override
public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
StructObjectInspector soi = (StructObjectInspector) objInspector;
List<? extends StructField> fields = soi.getAllStructFieldRefs();
{
StructField f = fields.get(0);
Object v = soi.getStructFieldData(obj, f);
StringObjectInspector fi = (StringObjectInspector) f.getFieldObjectInspector();
serializeCache.setWord(fi.getPrimitiveJavaObject(v));
}
{
StructField f = fields.get(1);
Object v = soi.getStructFieldData(obj, f);
IntObjectInspector fi = (IntObjectInspector) f.getFieldObjectInspector();
serializeCache.setCount(fi.get(v));
}
return serializeCache;
}
|
引数のobjに各カラムのデータが入っている。 fieldsは各カラムのデータの型情報を保持している。 左記の変数vが各カラムのデータになる。(objからfを使ってデータを取り出す) vの実際のクラスは基本的にはTextやIntWritableなのだが、場合によってはHive特有のクラスだったりする。 そこでFieldObjectInspectorを介してStringやintに変換している。 (たぶんStringに関してはv.toString()だけで大丈夫だと思うけど) |
Hiveにはjarファイルを読み込ませるので、作ったクラスをjarファイル化しておく。
<?xml version="1.0" encoding="Shift_JIS"?>
<project name="hive0.7.1" default="jar" basedir=".">
<property name="src" location="../src" />
<property name="classes" location="../classes" />
<target name="jar">
<jar jarfile="C:/cygwin/tmp/hiveserde.jar">
<fileset dir="${classes}" includes="**/*.class" />
<fileset dir="${src}" includes="**/*.java" />
</jar>
</target>
</project>
Hiveでテーブルを作る際にSerDeを指定する。
hive> add jar C:/cygwin/tmp/hiveserde.jar; Added C:/cygwin/tmp/hiveserde.jar to class path Added resource: C:/cygwin/tmp/hiveserde.jar
create table wcserde (word string, cnt int) row format serde 'sample.WordCountSerDe' stored as sequencefile; |
「ROW FORMAT SERDE」でSerDeクラスを指定する。 「STORED AS」でSEQUENCEFILEを指定する。 ここで指定したカラムのデータ型はcolumns.typesプロパティーで取得できるが 今回のプログラムでは無視している。(SerDe内でデータ型を指定しているから) |
create table wcserde (word string, cnt int) row format serde 'sample.WordCountSerDe' with serdeProperties('zzz'='123') stored as sequencefile; |
「WITH SERDEPROPERTIES」でプロパティーを指定すると、initialize()メソッドで取得することが出来る。 |
create table wcserde row format serde 'sample.WordCountSerDe' stored as sequencefile; @Override
public void initialize(Configuration conf, Properties tbl) {
List<String> columnList = Arrays.asList("word", "count");
cachedInspector = new MyInspector(columnList);
}
|
SerDeを指定する場合は、カラム定義を省略することが出来る。 この場合、columnsプロパティーは空文字列になるので上記のinitialize()メソッドでは項目名が設定されない。 (CREATE TABLEは正常終了するがDML実行時に実際の項目数と一致しないので例外が発生する) 左記のinitialize()メソッドのようにSerDe内で項目名を設定してやる必要がある。 |
create table wcserde row format serde 'sample.WordCountSerDe';
|
「STORED AS」を省略するとTEXTFILEになる。 この場合、OutputFormatがTextやByteWritableにしか対応していないのでWordCountWritableからキャストできなくて例外が発生する。 |
テーブル定義を見ると、ちゃんとSerDeが指定したものになっている。
hive> desc formatted wcserde;
OK
# col_name data_type comment
word string from deserializer
cnt int from deserializer
# Detailed Table Information
Database: default
Owner: hishidama
CreateTime: Fri Aug 12 22:37:10 JST 2011
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: file:/user/hive/warehouse/wcserde
Table Type: MANAGED_TABLE
Table Parameters:
numFiles 1
numPartitions 0
numRows 1
totalSize 117
transient_lastDdlTime 1313171312
# Storage Information
SerDe Library: sample.WordCountSerDe
InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
zzz 123
hive> insert overwrite table wcserde
> select 'hello', 255 from 1レコードしか入っていないテーブル;
hive> select * from wcserde;
OK
hello 255
$ od -tx1a C:/user/hive/warehouse/wcserde/000000_0
0000000 53 45 51 06 22 6f 72 67 2e 61 70 61 63 68 65 2e
S E Q ack " o r g . a p a c h e .
0000020 68 61 64 6f 6f 70 2e 69 6f 2e 42 79 74 65 73 57
h a d o o p . i o . B y t e s W
0000040 72 69 74 61 62 6c 65 21 73 61 6d 70 6c 65 2e 77
r i t a b l e ! s a m p l e . w
0000060 72 69 74 61 62 6c 65 2e 57 6f 72 64 43 6f 75 6e
r i t a b l e . W o r d C o u n
0000100 74 57 72 69 74 61 62 6c 65 00 00 00 00 00 00 1f
t W r i t a b l e nul nul nul nul nul nul us
0000120 b3 d6 ed 55 a1 1c c2 b5 5a 4a 5a 2d 9d db db 00
3 V m U ! fs B 5 Z J Z - gs [ [ nul
0000140 00 00 0e 00 00 00 04 00 00 00 00 05 68 65 6c 6c
nul nul so nul nul nul eot nul nul nul nul enq h e l l
0000160 6f 00 00 00 ff
o nul nul nul del
0000165
CREATE〜AS SELECTによって別テーブルを作ると、SerDeクラスやファイルフォーマットは引き継がれない。
hive> create table wc2 as
> select * from wcserde;
hive> select * from wc2;
OK
hello 255
hive> desc formatted wc2;
OK
# col_name data_type comment
word string None
cnt int None
# Detailed Table Information
〜
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
CREATE〜LIKEによるテーブル定義コピーはそういった情報もコピーされる。
hive> create table wc3 like wcserde;
DESCRIBEによるテーブル定義参照やDROP
TABLEによるテーブル削除の際もSerDeが使われる。
したがって、それらの操作を行う前にも「add jar」を実行する必要がある。
また、SerDeクラスを修正して以前と整合性のとれない状態になった場合、DROPがエラーになってテーブルを削除できないことがある。
SerDeで扱うSEQUENCEFILEによって出来たファイルは、HadoopのAPIのSequeceFileクラスを使って読み書きできる。[2011-08-13]
つまりHiveのテーブルの実体となっているディレクトリー内のファイルをSequenceFile.Readerで読めるし、
そのディレクトリーにシーケンスファイルを追加すればHiveのテーブルとしてSELECTすることが出来る。
ただしLOAD DATAによってシーケンスファイルを読み込む場合は注意が必要。
load data local inpath'C:/cygwin/tmp/seq/write' into table wcserde;
シーケンスファイル内のヘッダーに書かれているWritableクラスに対しては、「add jar」によって追加されたクラスパスは参照されない。
HIVE_HOME/libにWritableクラスの入ったjarファイルを置いておけば読み込める。