Sqoopのソース(内部実装)について。
|
今回は、CDH3u4のSqoop1.3.0のソースを見てみた。
CDHのSqoopの場合、Sqoop本来のクラスとClouderaが拡張したクラスがある。
パッケージ名だけ違っていてクラス名が同じ、元来のクラスをClouderaのクラスが継承しているなんてものも多い。
普通のexportの場合、SqlManager#exportTable()が呼ばれる。
exportTable()では、JdbcExportJobのインスタンスを作り、そのrunExport()を呼び出す。
JdbcExportJobは、HDFS上のファイルを読み込む為のInputFormatや、DBに出力する為のOutputFormat(RecordWriter)等のクラスを決定する。
runExport()ではMapReduceジョブを実行する。
ステージングテーブルを指定している場合は、その前処理や後処理も行う。
OutputFormatクラスは、ExportJobBase#getOutputFormatClass()で決定される。
デフォルトでは、バッチモードであればExportBatchOutputFormat、そうでなければExportOutputFormatとなる。
(ExportBatchOutputFormatはExportOutputFormatのサブクラス、その親はAsyncSqlOutputFormat)
AsyncSqlOutputFormatのサブクラスは、内部クラスとしてAsyncSqlRecordWriterのサブクラスを持っている。
Mapperクラスのmap()メソッドでcontext.write()が呼ばれると、AsyncSqlRecordWriter#write()が呼ばれる。
引数として、HDFS上のファイルから読み込んだ1レコード(をWritableに変換したもの)が渡ってくるので、records(ArrayList)に追加していく。
これが一定数(デフォルトでは100。sqoop.export.records.per.statementで指定)になったらexecUpdate()を呼び出す。
// AsyncSqlRecordWriter#write()の抜粋 @Override public void write(K key, V value) throws InterruptedException, IOException { records.add((SqoopRecord) key.clone()); if (records.size() >= this.rowsPerStmt) { execUpdate(false, false); } }
AsyncSqlRecordWriter#execUpdate()は、PreparedStatement(SQL文)を生成し、recordsの内容をPreparedStatementのパラメーターにセットする。
具体的には、SQL文の生成およびパラメーターのセットはgetPreparedStatement()で行う。
ExportRecordWriterでは複数VALUES INSERT、
ExportBatchRecordWriter(バッチモード)では通常のINSERT文でaddBatch()する。
(PreparedStatementに値をセットしたら、recordsはクリアする)
if (records.size() > 0) { stmt = getPreparedStatement(records); this.records.clear(); }
そして、PreparedStatementを(AsyncDBOperationというオブジェクトに包んで)AsyncSqlExecThreadに渡す。
AsyncSqlOutputFormat.AsyncDBOperation op = new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(), commit, stopThread); execThread.put(op);
execThreadはAsyncSqlExecThreadというスレッドであり、put()は、ただ単にopをキューに追加するだけ。
AsyncSqlExecThread#run()では、キューからopを取得し、SQL(PreparedStatement)を実行する。
(バッチモードであればexecuteBatch()、そうでなければexecute()を実行する)
そして、実行した個数が一定数(デフォルトは100。sqoop.export.statements.per.transactionで指定)になったら、コミットする。
デフォルトでは100レコード毎にPreparedStatementを生成し、さらに100ステートメント毎にコミットしているので、1万レコード毎にコミットしていることになる。
驚いたのは、ファイルから読み込んでPreparedStatementを作るまでと、PreparedStatementを実行してコミットするのが、別スレッドになっていること。
実際のDBアクセスに多少の時間がかかるのであれば、読み込みと別スレッドにして並列で処理することによって、全体の速度が上がるということなんだろう。