S-JIS[2011-08-15] 変更履歴

Asakusa FrameworkのデータをHiveで読み込む

Asakusa Frameworkで出力したデータをApache Hiveで読み込んでみる。


概要

※AsakusaFW 0.7.0でHive連携が正式にサポートされた為、今はもう以下のようなことをする必要は無い。[2014-12-13]

Hiveでは独自Writableのシーケンスファイルが扱えるので、AsakusaFWで出力したシーケンスファイルも読める。

例として作ったWordCountModelを書き込んだシーケンスファイルを扱うことを考えてみる。

  1. afw-wordcountプロジェクト内に新しいソースフォルダーを作成する。(例:src_ex
  2. そのソースフォルダーの出力先を適当な新しいディレクトリーに変更する。(例:classes_ex
  3. 参照ライブラリー(ビルドパス)にHiveのjarファイルを追加しておく。
    jarファイル Eclipseのソースの添付
    HIVE_HOME/lib/hive-exec-0.7.1.jar C:/cygwin/usr/local/hive-0.7.1/src
  4. WordCountModel用のHiveのSerDeクラスを作成する。
  5. SerDeクラスとWordCountModelを含んだjarファイルを作成する。
  6. Hiveからそのjarファイルを読み込む
  7. 後は普通にCREATE EXTERNAL TABLEでAsakusaFWのシーケンスファイルを指定できる。

SerDeクラス

WordCountModel用のHiveのSerDeクラスを作成する。
AsakusaFWのシーケンスファイルはキーがNullWritableでデータがモデル(WordCountModel)になる。
(→AsakusaFW用SerDeクラスを生成するツール

package sample.afwhive;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
import org.apache.hadoop.io.Writable;

import sample.modelgen.dmdl.model.WordCountModel;
public class WordCountModelSerDe implements SerDe {

//初期処理

	private static final String[] COLUMN_NAME = { "word", "count" };
	private static final List<ObjectInspector> COLUMN_TYPE;
	static {
		List<ObjectInspector> list = new ArrayList<ObjectInspector>();
		list.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING));
		list.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.INT));
		COLUMN_TYPE = Collections.unmodifiableList(list);
	}
	private ObjectInspector cachedInspector;

	@Override
	public void initialize(Configuration conf, Properties tbl) throws SerDeException {
		String columnProperty = tbl.getProperty("columns");
		List<String> columnList = new ArrayList<String>();
		if (!columnProperty.isEmpty()) {
			for (String s : columnProperty.split(",")) {
				columnList.add(s);
			}
		}
		for (int i = columnList.size(); i < COLUMN_NAME.length; i++) {
			columnList.add(COLUMN_NAME[i]);
		}

		cachedInspector = new StandardStructObjectInspector(columnList, COLUMN_TYPE) {};
	}
	@Override
	public ObjectInspector getObjectInspector() throws SerDeException {
		return cachedInspector;
	}

//デシリアライズ

	private final List<Object> deserializeCache = new ArrayList<Object>(2);

	@Override
	public Object deserialize(Writable blob) throws SerDeException {
		WordCountModel val = (WordCountModel) blob;

		deserializeCache.clear();
		deserializeCache.add(val.getWordAsString());
		deserializeCache.add(val.getCount());

		return deserializeCache;
	}
	@Override
	public Class<? extends Writable> getSerializedClass() {
		return WordCountModel.class;
	}

//シリアライズ

	private final WordCountModel serializeCache = new WordCountModel();

	@Override
	public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
		StructObjectInspector soi = (StructObjectInspector) objInspector;
		List<? extends StructField> fields = soi.getAllStructFieldRefs();

		{
			StructField f = fields.get(0);
			Object v = soi.getStructFieldData(obj, f);
			StringObjectInspector fi = (StringObjectInspector) f.getFieldObjectInspector();
			serializeCache.setWordAsString(fi.getPrimitiveJavaObject(v));
		}
		{
			StructField f = fields.get(1);
			Object v = soi.getStructFieldData(obj, f);
			IntObjectInspector fi = (IntObjectInspector) f.getFieldObjectInspector();
			serializeCache.setCount(fi.get(v));
		}

		return serializeCache;
	}
}

jarファイル作成

Hiveで読み込む為のjarファイルを作成する。

作成したWordCountModelSerDeの他に、対象となるモデルクラス(WordCountModel)も含んでおく。
(実際にテーブル(ファイル)を読み込むときに必要になる為)

ちなみに、AsakusaFWの流儀に従うならMavenを使うんだろうけど、自分は知らないので旧来通りのAntで…。

afw-wordcountプロジェクト/bin/build.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project name="afw-ex" default="jar" basedir=".">
	<property name="src"     location="../src_ex" />
	<property name="classes" location="../classes_ex" />
	<property name="model_src"     location="../target/generated-sources/modelgen" />
	<property name="model_classes" location="../target/classes" />

	<target name="jar">
		<jar jarfile="C:/cygwin/tmp/afwhive.jar">
			<fileset dir="${classes}" includes="**/*.class" />
			<fileset dir="${src}"     includes="**/*.java" />
			<fileset dir="${model_classes}" includes="**/model/*.class" />
			<fileset dir="${model_src}"     includes="**/model/*.java" />
		</jar>
	</target>
</project>

実行例

Hiveから独自に作ったクラスを使用する場合、そのクラスの入ったjarファイルをadd jarで読み込む必要がある。

hive> add jar /cygwin/tmp/afwhive.jar;

Pigの場合はDataModelインターフェースが入っているjarファイルも別途読み込む必要があった。
HiveはHadoop上で動いているので、HADOOP_HOMEがAsakusaFW用に構築したCDH3もどきを指している場合はHADOOP_HOME/libの下にAsakusaFWのjarファイル一式がコピーされているので、特に指定する必要は無い。
Linux上で純正のCDH3を使っている場合は、M2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jarもaddしてやる必要がある。

Exception in thread "main" java.lang.NoClassDefFoundError: com/asakusafw/runtime/model/DataModel
hive> add jar M2_REPO/com/asakusafw/asakusa-runtime/0.2.1/asakusa-runtime-0.2.1.jar
    > /tmp/afwhive.jar;

複数のjarファイルを読み込む場合はaddをファイル毎に書いてもよいが、「add jar」はスペース区切りで複数のjarファイルを指定することも出来る。
いずれにしても、jarファイルを指定する順序にも意味があるので要注意
クラスの依存関係がある場合、依存するものが先に指定されている必要がある。
今回の例ではWordCountModelがDataModelを使っているので、DataModelの入っているasakusa-runtime-0.2.1.jarを先に指定しなければならない。


jarファイルを読み込んだら、シーケンスファイルにアクセスする為のテーブルを作成する。
しかしHiveでは場所指定はディレクトリーであって、その中の個々のファイルを指定することは出来ない。
AsakusaFWでは出力先のディレクトリー内に中間ファイルも作られるので、そのディレクトリーを直接指定すると余計なファイルまで読み込んでしまう(データの形式が違うとエラーになる)。
(ついでに言うと、このテーブルに対して書き込み(INSERTLOAD)を行うとディレクトリーが一旦削除される)
仕方が無いので、別のディレクトリーにファイルをコピーして、そのディレクトリーを指定することにする。

$ mkdir /tmp/result/
$ cp -p /home/hishidama/result/wc* /tmp/result/
create external table afwwc
row format serde 'sample.afwhive.WordCountModelSerDe' stored as sequencefile
location '/cygwin/tmp/result';
hive> select * from afwwc;
OK
Hello   2
Asakusa 1
World   1

よしよし、読めた。


AsakusaFW目次へ戻る / Hiveへ戻る / 技術メモへ戻る
メールの送信先:ひしだま