Asakusa Frameworkで出力したデータをApache Hiveで読み込んでみる。
※AsakusaFW 0.7.0でHive連携が正式にサポートされた為、今はもう以下のようなことをする必要は無い。[2014-12-13]
Hiveでは独自Writableのシーケンスファイルが扱えるので、AsakusaFWで出力したシーケンスファイルも読める。
例として作ったWordCountModelを書き込んだシーケンスファイルを扱うことを考えてみる。
jarファイル | Eclipseのソースの添付 |
---|---|
HIVE_HOME/lib/hive-exec-0.7.1.jar | C:/cygwin/usr/local/hive-0.7.1/src |
WordCountModel用のHiveのSerDeクラスを作成する。
AsakusaFWのシーケンスファイルはキーがNullWritableでデータがモデル(WordCountModel)になる。
(→AsakusaFW用SerDeクラスを生成するツール)
package sample.afwhive;
import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; import org.apache.hadoop.io.Writable; import sample.modelgen.dmdl.model.WordCountModel;
public class WordCountModelSerDe implements SerDe {
//初期処理
private static final String[] COLUMN_NAME = { "word", "count" }; private static final List<ObjectInspector> COLUMN_TYPE; static { List<ObjectInspector> list = new ArrayList<ObjectInspector>(); list.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING)); list.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.INT)); COLUMN_TYPE = Collections.unmodifiableList(list); }
private ObjectInspector cachedInspector; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { String columnProperty = tbl.getProperty("columns"); List<String> columnList = new ArrayList<String>(); if (!columnProperty.isEmpty()) { for (String s : columnProperty.split(",")) { columnList.add(s); } } for (int i = columnList.size(); i < COLUMN_NAME.length; i++) { columnList.add(COLUMN_NAME[i]); } cachedInspector = new StandardStructObjectInspector(columnList, COLUMN_TYPE) {}; }
@Override public ObjectInspector getObjectInspector() throws SerDeException { return cachedInspector; }
//デシリアライズ
private final List<Object> deserializeCache = new ArrayList<Object>(2); @Override public Object deserialize(Writable blob) throws SerDeException { WordCountModel val = (WordCountModel) blob; deserializeCache.clear(); deserializeCache.add(val.getWordAsString()); deserializeCache.add(val.getCount()); return deserializeCache; }
@Override public Class<? extends Writable> getSerializedClass() { return WordCountModel.class; }
//シリアライズ
private final WordCountModel serializeCache = new WordCountModel(); @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { StructObjectInspector soi = (StructObjectInspector) objInspector; List<? extends StructField> fields = soi.getAllStructFieldRefs(); { StructField f = fields.get(0); Object v = soi.getStructFieldData(obj, f); StringObjectInspector fi = (StringObjectInspector) f.getFieldObjectInspector(); serializeCache.setWordAsString(fi.getPrimitiveJavaObject(v)); } { StructField f = fields.get(1); Object v = soi.getStructFieldData(obj, f); IntObjectInspector fi = (IntObjectInspector) f.getFieldObjectInspector(); serializeCache.setCount(fi.get(v)); } return serializeCache; } }
Hiveで読み込む為のjarファイルを作成する。
作成したWordCountModelSerDeの他に、対象となるモデルクラス(WordCountModel)も含んでおく。
(実際にテーブル(ファイル)を読み込むときに必要になる為)
ちなみに、AsakusaFWの流儀に従うならMavenを使うんだろうけど、自分は知らないので旧来通りのAntで…。
<?xml version="1.0" encoding="UTF-8"?> <project name="afw-ex" default="jar" basedir="."> <property name="src" location="../src_ex" /> <property name="classes" location="../classes_ex" /> <property name="model_src" location="../target/generated-sources/modelgen" /> <property name="model_classes" location="../target/classes" /> <target name="jar"> <jar jarfile="C:/cygwin/tmp/afwhive.jar"> <fileset dir="${classes}" includes="**/*.class" /> <fileset dir="${src}" includes="**/*.java" /> <fileset dir="${model_classes}" includes="**/model/*.class" /> <fileset dir="${model_src}" includes="**/model/*.java" /> </jar> </target> </project>
Hiveから独自に作ったクラスを使用する場合、そのクラスの入ったjarファイルをadd jarで読み込む必要がある。
hive> add jar /cygwin/tmp/afwhive.jar;
Pigの場合はDataModelインターフェースが入っているjarファイルも別途読み込む必要があった。
HiveはHadoop上で動いているので、HADOOP_HOMEがAsakusaFW用に構築したCDH3もどきを指している場合はHADOOP_HOME/libの下にAsakusaFWのjarファイル一式がコピーされているので、特に指定する必要は無い。
Linux上で純正のCDH3を使っている場合は、M2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jarもaddしてやる必要がある。Exception in thread "main" java.lang.NoClassDefFoundError: com/asakusafw/runtime/model/DataModelhive> add jar M2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar > /tmp/afwhive.jar;複数のjarファイルを読み込む場合はaddをファイル毎に書いてもよいが、「add jar」はスペース区切りで複数のjarファイルを指定することも出来る。
いずれにしても、jarファイルを指定する順序にも意味があるので要注意。
クラスの依存関係がある場合、依存するものが先に指定されている必要がある。
今回の例ではWordCountModelがDataModelを使っているので、DataModelの入っているasakusa-runtime-0.2.1.jarを先に指定しなければならない。
jarファイルを読み込んだら、シーケンスファイルにアクセスする為のテーブルを作成する。
しかしHiveでは場所指定はディレクトリーであって、その中の個々のファイルを指定することは出来ない。
AsakusaFWでは出力先のディレクトリー内に中間ファイルも作られるので、そのディレクトリーを直接指定すると余計なファイルまで読み込んでしまう(データの形式が違うとエラーになる)。
(ついでに言うと、このテーブルに対して書き込み(INSERTやLOAD)を行うとディレクトリーが一旦削除される)
仕方が無いので、別のディレクトリーにファイルをコピーして、そのディレクトリーを指定することにする。
$ mkdir /tmp/result/ $ cp -p /home/hishidama/result/wc* /tmp/result/
create external table afwwc row format serde 'sample.afwhive.WordCountModelSerDe' stored as sequencefile location '/cygwin/tmp/result';
hive> select * from afwwc; OK Hello 2 Asakusa 1 World 1
よしよし、読めた。