S-JIS[2011-08-12/2011-08-13] 変更履歴

Hive SerDe

HiveのSerDe(シリアライズ/デシリアライズ)の作り方のメモ。


概要

HiveではSequenceFileでファイルを読み書きできるが、ファイル内の実データは1行が1つのTextとなる。
Textでなく自分で作ったWritableをデータにしたい場合に、HiveのSerDeを定義する。

SerDeはSerialize/Deserializeの略で、データを入出力できる形式に変換するクラスのこと。

SerDeを作るにはorg.apache.hadoop.hive.serde2.SerDeインターフェースを実装する。
そしてCREATE TABLEでSerDeクラスを指定する。


Writableクラス

一番最初に、データ保存に使うWritableクラスを作っておく。

今回は例としてWordCountのデータ(単語と単語数)を保持するWritableを用意してみた。

package sample.writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class WordCountWritable implements Writable {

	private Text word = new Text();
	private IntWritable count = new IntWritable();
	public void setWord(String s) {
		word.set(s);
	}

	public String getWord() {
		return word.toString();
	}

	public void setCount(int n) {
		count.set(n);
	}

	public int getCount() {
		return count.get();
	}
	@Override
	public void write(DataOutput out) throws IOException {
		word.write(out);
		count.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		word.readFields(in);
		count.readFields(in);
	}
}

SerDeクラス

Developer Guideに記載のあるMetadataTypedColumnsetSerDeクラスを参考にSerDeクラスを作ってみた。

SerDeでは大きく3種類のメソッドを実装する。初期化デシリアライズシリアライズ
SerDeクラスはけっこう頻繁に呼ばれる(DESCRIBEでも呼ばれるし、1回のSELECTINSERTでも複数回呼ばれる)ので、SerDe内でキャッシュできるデータはキャッシュする方針のようだ。

package sample;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Writable;

import sample.writable.WordCountWritable;
public class WordCountSerDe implements SerDe {
	〜
}

SerDeの初期化部分

SerDeがインスタンス化されると、必ずinitialize()メソッドが呼ばれる。
また、ObjectInspector(項目名と属性を保持するクラスらしい)を返すメソッド(getObjectInspector())を実装する必要があるので
ObjectInspectorインスタンスをinitialize()メソッドで作っておく。

private ObjectInspector cachedInspector;
initialize()の中で作ったObjectInspectorをキャッシュしておく。
@Override
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
	String columnProperty = tbl.getProperty("columns");
	List<String> columnList = Arrays.asList(columnProperty.split(","));
	cachedInspector = new MyInspector(columnList);
}
初期化メソッドの引数tblには、プロパティーが入ってくる。
SERDEPROPERTIESで指定されたプロパティーもここで取得できる。
代表的な
プロパティー
内容 備考
name DB名およびテーブル名 ピリオド区切り
columns CREATE TABLEで指定されたカラム名 カンマ区切り
columns.types CREATE TABLEで指定されたカラムのデータ型 コロン区切り
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
	return cachedInspector;
}
initialize()で作ったObjectInspectorを返す。
static class MyInspector extends StandardStructObjectInspector {

	protected MyInspector(List<String> names) {
		super(names, getFieldObjectInspectors());
	}
	static List<ObjectInspector> getFieldObjectInspectors() {
		List<ObjectInspector> r = new ArrayList<ObjectInspector>(2);
		r.add(PrimitiveObjectInspectorFactory
		  .getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING));
		r.add(PrimitiveObjectInspectorFactory
		  .getPrimitiveJavaObjectInspector(PrimitiveCategory.INT));
		return r;
	}
}
ObjectInspectorインターフェースを実装したクラス。
直接ObjectInspectorを継承するとよく分からないメソッドを色々書かないといけないので、
MetadataTypedColumnsetSerDeで使っているStandardStructObjectInspectorを継承してみた。
親クラスのコンストラクターで項目名とデータ型を指定するようだ。

SerDeのデシリアライズ部分

ファイルから読み込まれたデータをHiveで使える形式に変換する。

private final List<Object> deserializeCache = new ArrayList<Object>(2);
テーブルの各項目のデータを保持するリストのキャッシュ。
@Override
public Object deserialize(Writable blob) throws SerDeException {
	WordCountWritable wc = (WordCountWritable) blob;

	deserializeCache.clear();
	deserializeCache.add(wc.getWord());
	deserializeCache.add(wc.getCount());

	return deserializeCache;
}
テーブルのファイル形式SequenceFileの場合、Writableクラス名はシーケンスファイルのヘッダーに書かれているので、
deserialize()メソッドにはそのクラスで渡ってきている(はず)。
なのでいきなりWordCountWritableにキャストしている。

SerDeのシリアライズ部分

ファイルへ書き込むために、Hiveの各項目のデータからWritableを作る部分。

@Override
public Class<? extends Writable> getSerializedClass() {
	return WordCountWritable.class;
}
シリアライズに使用するWritableクラスを返す。
private final WordCountWritable serializeCache = new WordCountWritable();
シリアライズ用インスタンスのキャッシュ。
@Override
public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
	StructObjectInspector soi = (StructObjectInspector) objInspector;
	List<? extends StructField> fields = soi.getAllStructFieldRefs();

	{
	  StructField f = fields.get(0);
	  Object v = soi.getStructFieldData(obj, f);
	  StringObjectInspector fi = (StringObjectInspector) f.getFieldObjectInspector();
	  serializeCache.setWord(fi.getPrimitiveJavaObject(v));
	}
	{
	  StructField f = fields.get(1);
	  Object v = soi.getStructFieldData(obj, f);
	  IntObjectInspector fi = (IntObjectInspector) f.getFieldObjectInspector();
	  serializeCache.setCount(fi.get(v));
	}

	return serializeCache;
}
引数のobjに各カラムのデータが入っている。
fieldsは各カラムのデータの型情報を保持している。
左記の変数vが各カラムのデータになる。(objからfを使ってデータを取り出す)
vの実際のクラスは基本的にはTextやIntWritableなのだが、場合によってはHive特有のクラスだったりする。
そこでFieldObjectInspectorを介してStringやintに変換している。
(たぶんStringに関してはv.toString()だけで大丈夫だと思うけど)

jarファイル化

Hiveにはjarファイルを読み込ませるので、作ったクラスをjarファイル化しておく。

プロジェクト/bin/build.xml:

<?xml version="1.0" encoding="Shift_JIS"?>
<project name="hive0.7.1" default="jar" basedir=".">
	<property name="src" location="../src" />
	<property name="classes" location="../classes" />

	<target name="jar">
		<jar jarfile="C:/cygwin/tmp/hiveserde.jar">
			<fileset dir="${classes}" includes="**/*.class" />
			<fileset dir="${src}" includes="**/*.java" />
		</jar>
	</target>
</project>

テーブル作成

Hiveでテーブルを作る際にSerDeを指定する。

hive> add jar C:/cygwin/tmp/hiveserde.jar;
Added C:/cygwin/tmp/hiveserde.jar to class path
Added resource: C:/cygwin/tmp/hiveserde.jar
create table wcserde
(word string, cnt int)
row format serde 'sample.WordCountSerDe'
stored as sequencefile;
「ROW FORMAT SERDE」でSerDeクラスを指定する。
「STORED AS」でSEQUENCEFILEを指定する。
ここで指定したカラムのデータ型はcolumns.typesプロパティーで取得できるが
今回のプログラムでは無視している。(SerDe内でデータ型を指定しているから)
create table wcserde
(word string, cnt int)
row format serde 'sample.WordCountSerDe' with serdeProperties('zzz'='123')
stored as sequencefile;
「WITH SERDEPROPERTIES」でプロパティーを指定すると、initialize()メソッドで取得することが出来る。
create table wcserde
row format serde 'sample.WordCountSerDe'
stored as sequencefile;
@Override
public void initialize(Configuration conf, Properties tbl) {
	List<String> columnList = Arrays.asList("word", "count");
	cachedInspector = new MyInspector(columnList);
}
SerDeを指定する場合は、カラム定義を省略することが出来る。
この場合、columnsプロパティーは空文字列になるので上記のinitialize()メソッドでは項目名が設定されない。
(CREATE TABLEは正常終了するがDML実行時に実際の項目数と一致しないので例外が発生する)
左記のinitialize()メソッドのようにSerDe内で項目名を設定してやる必要がある。
create table wcserde
row format serde 'sample.WordCountSerDe';

 

「STORED AS」を省略するとTEXTFILEになる。
この場合、OutputFormatがTextやByteWritableにしか対応していないのでWordCountWritableからキャストできなくて例外が発生する。

テーブル操作例

テーブル定義を見ると、ちゃんとSerDeが指定したものになっている。

hive> desc formatted wcserde;
OK
# col_name              data_type               comment

word                    string                  from deserializer
cnt                     int                     from deserializer

# Detailed Table Information
Database:               default
Owner:                  hishidama
CreateTime:             Fri Aug 12 22:37:10 JST 2011
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Location:               file:/user/hive/warehouse/wcserde
Table Type:             MANAGED_TABLE
Table Parameters:
        numFiles                1
        numPartitions           0
        numRows                 1
        totalSize               117
        transient_lastDdlTime   1313171312

# Storage Information
SerDe Library:          sample.WordCountSerDe
InputFormat:            org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        serialization.format    1
        zzz                     123
hive> insert overwrite table wcserde
    > select 'hello', 255 from 1レコードしか入っていないテーブル;

hive> select * from wcserde;
OK
hello   255
$ od -tx1a C:/user/hive/warehouse/wcserde/000000_0
0000000  53  45  51  06  22  6f  72  67  2e  61  70  61  63  68  65  2e
          S   E   Q ack   "   o   r   g   .   a   p   a   c   h   e   .
0000020  68  61  64  6f  6f  70  2e  69  6f  2e  42  79  74  65  73  57
          h   a   d   o   o   p   .   i   o   .   B   y   t   e   s   W
0000040  72  69  74  61  62  6c  65  21  73  61  6d  70  6c  65  2e  77
          r   i   t   a   b   l   e   !   s   a   m   p   l   e   .   w
0000060  72  69  74  61  62  6c  65  2e  57  6f  72  64  43  6f  75  6e
          r   i   t   a   b   l   e   .   W   o   r   d   C   o   u   n
0000100  74  57  72  69  74  61  62  6c  65  00  00  00  00  00  00  1f
          t   W   r   i   t   a   b   l   e nul nul nul nul nul nul  us
0000120  b3  d6  ed  55  a1  1c  c2  b5  5a  4a  5a  2d  9d  db  db  00
          3   V   m   U   !  fs   B   5   Z   J   Z   -  gs   [   [ nul
0000140  00  00  0e  00  00  00  04  00  00  00  00  05  68  65  6c  6c
        nul nul  so nul nul nul eot nul nul nul nul enq   h   e   l   l
0000160  6f  00  00  00  ff
          o nul nul nul del
0000165

CREATE〜AS SELECTによって別テーブルを作ると、SerDeクラスやファイルフォーマットは引き継がれない。

hive> create table wc2 as
    > select * from wcserde;

hive> select * from wc2;
OK
hello   255

hive> desc formatted wc2;
OK
# col_name              data_type               comment

word                    string                  None
cnt                     int                     None

# Detailed Table Information
〜

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
        serialization.format    1

CREATE〜LIKEによるテーブル定義コピーはそういった情報もコピーされる。

hive> create table wc3 like wcserde;

注意点

DESCRIBEによるテーブル定義参照やDROP TABLEによるテーブル削除の際もSerDeが使われる。
したがって、それらの操作を行う前にも「add jar」を実行する必要がある。

また、SerDeクラスを修正して以前と整合性のとれない状態になった場合、DROPがエラーになってテーブルを削除できないことがある。


直接シーケンスファイルを扱う

SerDeで扱うSEQUENCEFILEによって出来たファイルは、HadoopのAPIのSequeceFileクラスを使って読み書きできる。[2011-08-13]

つまりHiveのテーブルの実体となっているディレクトリー内のファイルをSequenceFile.Readerで読めるし、
そのディレクトリーにシーケンスファイルを追加すればHiveのテーブルとしてSELECTすることが出来る。


ただしLOAD DATAによってシーケンスファイルを読み込む場合は注意が必要。

load data local inpath'C:/cygwin/tmp/seq/write'
into table wcserde;

シーケンスファイル内のヘッダーに書かれているWritableクラスに対しては、「add jar」によって追加されたクラスパスは参照されない。
HIVE_HOME/libにWritableクラスの入ったjarファイルを置いておけば読み込める。


HiveQLへ戻る / Hive目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま