S-JIS[2012-06-19/2012-07-07] 変更履歴

Sqoopソース

Sqoopのソース(内部実装)について。


前提

今回は、CDH3u4のSqoop1.3.0のソースを見てみた。

CDHのSqoopの場合、Sqoop本来のクラスとClouderaが拡張したクラスがある。
パッケージ名だけ違っていてクラス名が同じ、元来のクラスをClouderaのクラスが継承しているなんてものも多い。


全体の流れ

  1. sqoopコマンドの引数(オプション)はSqoopOptionsクラスで保持される。
  2. DefaultManagerFactory#accept()で、DBに接続するためのConnManager(SqlManager)の具象クラスを決定する。
    1. 「--driver」オプションによってドライバークラス名が指定されている場合は、それを使うようなGenericJdbcManagerを生成する。
    2. 「--connection-manager」オプションによってConnManagerクラスが指定されている場合は、そのインスタンスを生成する。
    3. それ以外の場合は、「--connect」による接続URLを解析してConnManagerの具象クラスを決定する。if〜elseの羅列だぞ(爆)
      • MySQLはMySQLManagerだが、ダイレクトモードではDirectMySQLManager
      • PostgreSQLはPostgresqlManagerだが、ダイレクトモードではDirectPostgresqlManager
      • OracleはOracleManager
  3. sqoopコマンドの指示に応じて、SqlManagerのメソッドが呼ばれる。

SqlManager#exportTable()

普通のexportの場合、SqlManager#exportTable()が呼ばれる。
exportTable()では、JdbcExportJobのインスタンスを作り、そのrunExport()を呼び出す。

JdbcExportJobは、HDFS上のファイルを読み込む為のInputFormatや、DBに出力する為のOutputFormat(RecordWriter)等のクラスを決定する。

runExport()ではMapReduceジョブを実行する。
ステージングテーブルを指定している場合は、その前処理や後処理も行う。

OutputFormatクラスは、ExportJobBase#getOutputFormatClass()で決定される。
デフォルトでは、バッチモードであればExportBatchOutputFormat、そうでなければExportOutputFormatとなる。
(ExportBatchOutputFormatはExportOutputFormatのサブクラス、その親はAsyncSqlOutputFormat)

AsyncSqlOutputFormatのサブクラスは、内部クラスとしてAsyncSqlRecordWriterのサブクラスを持っている。
Mapperクラスのmap()メソッドでcontext.write()が呼ばれると、AsyncSqlRecordWriter#write()が呼ばれる。
引数として、HDFS上のファイルから読み込んだ1レコード(をWritableに変換したもの)が渡ってくるので、records(ArrayList)に追加していく。
これが一定数(デフォルトでは100。sqoop.export.records.per.statementで指定)になったらexecUpdate()を呼び出す。

// AsyncSqlRecordWriter#write()の抜粋
	@Override
	public void write(K key, V value) throws InterruptedException, IOException {
		records.add((SqoopRecord) key.clone());
		if (records.size() >= this.rowsPerStmt) {
			execUpdate(false, false);
		}
	}

AsyncSqlRecordWriter#execUpdate()は、PreparedStatement(SQL文)を生成し、recordsの内容をPreparedStatementのパラメーターにセットする。
具体的には、SQL文の生成およびパラメーターのセットはgetPreparedStatement()で行う。
ExportRecordWriterでは複数VALUES INSERT、
ExportBatchRecordWriter(バッチモード)では通常のINSERT文でaddBatch()する。
(PreparedStatementに値をセットしたら、recordsはクリアする)

		if (records.size() > 0) {
			stmt = getPreparedStatement(records);
			this.records.clear();
		}

そして、PreparedStatementを(AsyncDBOperationというオブジェクトに包んで)AsyncSqlExecThreadに渡す。

		AsyncSqlOutputFormat.AsyncDBOperation op = new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(), commit, stopThread);
		execThread.put(op);

execThreadはAsyncSqlExecThreadというスレッドであり、put()は、ただ単にopをキューに追加するだけ。

AsyncSqlExecThread#run()では、キューからopを取得し、SQL(PreparedStatement)を実行する。
(バッチモードであればexecuteBatch()、そうでなければexecute()を実行する)
そして、実行した個数が一定数(デフォルトは100。sqoop.export.statements.per.transactionで指定)になったら、コミットする。


デフォルトでは100レコード毎にPreparedStatementを生成し、さらに100ステートメント毎にコミットしているので、1万レコード毎にコミットしていることになる。

驚いたのは、ファイルから読み込んでPreparedStatementを作るまでと、PreparedStatementを実行してコミットするのが、別スレッドになっていること。
実際のDBアクセスに多少の時間がかかるのであれば、読み込みと別スレッドにして並列で処理することによって、全体の速度が上がるということなんだろう。


Sqoop目次へ戻る / Hadoopへ行く / 技術メモへ戻る
メールの送信先:ひしだま