Asakusa FrameworkにSQLを移植する例。
|
Asakusa Frameworkは(分散)並列処理技術を使ってバッチを(開発・)実行するフレームワークである。
宣伝風に短く言えば(笑)、バッチを高速化するフレームワークである。
RDBを使っているバッチでは、AsakusaFWに移植することで高速化できる可能性がある。
個人的に経験した大抵のシステムでは、DBサーバーは1台(ないし少数)でバッチサーバーが複数台(多数)ある構成になっており、DBサーバーに負荷が集中してバッチの実行が遅くなっていた。
しかしAsakusaFWだと最初のデータ取得と最後のデータ反映以外ではDBサーバーにアクセスしないように作るので、DBサーバーの負荷が減って高速化する。
というのが個人的な見解(期待)。
(RDBを使うバッチで、SQLの使い方がN+1問題になっているようなものだと、DBサーバーの負荷に関係なく遅い。AsakusaFW化するにはN+1問題のような取得方法をやめる必要があるので、早くなる可能性が高い。
この場合、RDBを使う方式のままN+1問題を解決すれば早くなる可能性は当然あるので、「AsakusaFWが早い」という言い方は語弊があるかもしれない)
RDBのデータ型をAsakusaFWのデータモデルのデータ型にマッピングする必要がある。
RDBのデータ型 | AsakusaFWのデータ型 | 備考 |
---|---|---|
NUMERIC | 整数は長さに応じてINTやLONG。 それ以外の数値はDECIMAL。 |
9桁以内ならINT、18桁以内ならLONGで可。 →Javaのint, longの値の範囲 |
CHAR VARCHAR |
文字列はTEXT。(AsakusaFWでは長さは可変) | |
DATE | 日付(yyyyMMdd)ならDATE、 日時(yyyyMMddHHmmss)(秒まで)ならDATETIME。 |
RDBMSによっては0年0月0日を扱えるようだが、 AsakusaFWは対応していない。 |
TIMESTAMP | AsakusaFWにはミリ秒以下を保持できるデータ型は無い。 必要ならTEXTやLONGで扱うしか無さそう。 |
SQLでは、基本的に、NULLとの演算結果はNULLになる。
(Oracleの文字列はNULLと空文字列が同一なので、NULLと結合するときは空文字列扱いだが)
AsakusaFWではnullの扱いは普通のJavaと同じ。
SQLの例 | AsakusaFWの例 | 備考 |
---|---|---|
a.int1 + 99 |
a.getInt1Option().isNull() ? null : a.getInt1() +
99 |
|
a.str1 || 'Z' |
a.getStr1Option().or("") + "Z" /* Oracle
*/ |
Oracleの文字列結合の場合、NULLは空文字列として扱われる。 |
SQLのWHERE条件やJOINのON条件では、テーブルのカラム同士や値で比較する。
SQLでは、NULLとの比較結果はunknownになる。(unknownはfalse扱い)
つまり「NULL = NULL」はunknown(false)になるが、AsakusaFWのOption系クラスでは、nullの入ったOption同士をequalsで比較するとtrueになる。
また、「NOT unknown」もunknown、つまりfalseになる。AsakusaFW(Java)では「!false」はtrueなので、これも異なる。
SQLの例 | AsakusaFWの例 | 備考 |
---|---|---|
a.column1 is null |
a.getColumn1Option().isNull() |
|
a.column1 is not null |
a.getColumn1Option().isPresent() |
|
a.column1 = 'A' |
a.getColumn1Option().has("A") |
|
a.column1 = b.column1 |
(a.getColumn1Option().isNull() ||
b.getColumn1Option().isNull()) ? false :
a.getColumn1Option().equals(b.getColumn1Option()) |
どちらかがnullでない(NOT NULL項目である)と分かっている場合は、nullチェックは不要。 |
a.column1 <> 'A' |
a.getColumn1Option().isNull() ? false :
!a.getColumn1Option().has("A") |
|
a.column1 <> b.column1 |
(a.getColumn1Option().isNull() ||
b.getColumn1Option().isNull()) ? false :
!a.getColumn1Option().equals(b.getColumn1Option()) |
|
not (a.column1 = 'A') |
a.getColumn1Option().isNull() ? false :
!a.getColumn1Option().has("A") |
このため、SQLのような比較を行うユーティリティーを用意した方がいいと思う。(こういうユーティリティーはScalaだと綺麗に書けるんだけどなぁ)
WHERE条件は、AsakusaFWではBranch演算子で実現する。
Branch | |
---|---|
SQLの例 |
select * from a |
Operatorの例 | public enum Filter { |
Flowの例 | Source<A> a = 〜; |
集約関数(sum, min, max, count)は、集約対象データにNULLが無い場合はDMDLの集計モデル(およびSummarize演算子)で実現できる。[2018-12-02]
データにNULLが含まれない場合 | |
---|---|
SQLの例 |
select key, sum(value) as sum_value, min(value) as min_value, count(value) as count_value from a group by key ; |
DMDLの例 |
summarized sum_a = a => { |
Operatorの例 | @Summarize |
集約対象データにNULLが含まれる場合は、SQLとSummarize演算子では挙動が異なる。
SQL | Summarize演算子 | |
---|---|---|
sum min max |
NULLは集約対象外。 全データがNULLだと結果はNULLになる。 |
NullPointerExceptionが発生する。 |
count | NULLはカウント対象外。 全データがNULLだと結果は0になる。 |
全データがカウント対象。 (データがnullかどうかは無関係) |
この場合は、Fold演算子を使って集約処理を書く。
@Convert public SumA convertSumA(A in) { SumA result = this.sumA; result.reset(); result.setKeyOption(in.getKeyOption()); result.setSumValueOption(in.getValueOption()); result.setMinValueOption(in.getValueOption()); result.setCountValue(in.getValueOption().isPresent() ? 1 : 0); return result; } @Fold public void foldA(@Key(group = { "key" }) SumA in, SumA right) { if (in.getSumValueOption().isNull()) { in.setSumValueOption(right.getSumValueOption()); } else { in.getSumValueOption().add(right.getSumValueOption()); } if (in.getMinValueOption().isNull()) { in.setMinValueOption(right.getMinValueOption()); } else if (right.getMinValueOption().isPresent()) { in.getMinValueOption().min(right.getMinValueOption()); } in.getCountValueOption().add(right.getCountValueOption()); }
sumに関しては、数値のOption系クラスにはaddメソッドがあるので、これで合算できる。
が、nullの場合はNullPointerExceptionが発生するので、nullの場合は単純移送とする。(rightがnullの場合はnullのままだし、rightがnull以外の場合はその値が使われることになる)
(集計前のレコードでnullのときに0を入れておく方法だと、全データがnullのときにnullにならない)
min, maxに関しては、Option系クラスにmin, maxメソッドがあるので、これで実現できる。
が、nullの場合はnullになってしまうので、nullでない場合だけ呼び出すようにする必要がある。
countに関しては、集計前のレコード全てに1(nullのときは0)を入れておき、単純に合算すればいい。
INNER JOIN(内部結合)は、双方にレコードが存在するときだけ出力される。
「a inner join b」において、aが複数レコードでbが1レコードの場合は、aのレコード数分出力される。
bが複数レコードある場合、aのレコード数×bのレコード数分出力される。
AsakusaFWの場合、結合に使える演算子はMasterJoin/MasterJoinUpdateとCoGroupである。
Master系演算子は、結合されたマスター側(「a inner join
b」におけるb)が1件であるという前提がある。マスター側が複数レコードある場合、どれか1件が使われる。(つまりMaster系演算子では、出力のレコード数がトランザクション側(「a
inner join b」におけるa)より増えることは無い)
したがって、「a inner join
b」のbが1件だと分かっている場合(結合キーがbのプライマリキーとかユニークキーである場合)はMaster系演算子を使い、そうでない場合はCoGroupを使う。
MasterJoinUpdate | CoGroup | |
---|---|---|
SQLの例 |
select a.column_a, b.column_b from a inner join b on b.pk1 = a.fk1; |
|
データモデルの例 | a = { |
|
Operatorの例 | @MasterJoinUpdate |
private final C c = new C(); |
Flowの例 | Source<C> c = core.extend(a,
C.class); |
Source<C> c = core.extend(a,
C.class); |
※厳密には、結合キーの値がnull同士の場合、SQLでは結合しない(と思う)がAsakusaFWでは結合する。
普通は結合キー(特にマスター側)がnullになることは無いと思うが、もしそこを厳密にしたいなら、事前にBranch演算子でnullを除外しておけばよい。
ON句の結合キーが等号で結ばれるものならMaser系演算子やCoGroup演算子のキー項目に指定すればよいが、等号以外の演算を行う場合はキーには指定できない。
この場合は以下のようにする。
MasterJoinUpdate | CoGroup | |
---|---|---|
SQLの例 |
select a.column_a, b.column_b from a inner join b on b.pk1 = a.fk1 and b.start_date <= a.date1 and a.date1 <= b.end_date; -- start_date, end_dateはNOT NULLとする |
|
データモデルの例 | a = { |
|
Operatorの例 | @MasterJoinUpdate(selection = "selectB") |
private final C c = new C(); |
Flowの例 | Source<C> c = core.extend(a,
C.class); |
Source<C> c = core.extend(a,
C.class); |
→ON句で固定値と比較している場合はBranch演算子を使う
レコードが存在しているかどうかのチェックの為に結合している場合(結合したカラムを使わない場合)は、MasterCheck演算子が使える。
MasterCheck | CoGroup | |
---|---|---|
SQLの例 |
select a.* -- bのカラムを使っていない
from a inner join b
on b.pk1 = a.fk1;
|
|
データモデルの例 | a = { |
|
Operatorの例 | @MasterCheck |
@CoGroup |
Flowの例 | CheckB check = operator.checkB(b, a); |
a = operator.checkB(a, b).out; |
※厳密には、マスター側(上記のb)が複数レコード存在する場合は、aもその件数分増幅することになるはず。しかし実際のところ、それが嬉しいことはあまり無いと思う。
LEFT JOIN(LEFT OUTER JOIN)(左結合)は、左側(「a left join b」におけるa)のレコードは常に出力される。
「a left join b」において、aが複数レコードでbが1レコードの場合は、aのレコード数分出力される。
bが複数レコードある場合、aのレコード数×bのレコード数分出力される。
bが無い場合でもaは出力される。(←これがINNER JOINとの違い)
AsakusaFWでは、LEFT JOIN用の演算子の扱いはINNER JOINで使った演算子とほぼ同様となる。
Master系演算子の場合、(INNER JOINでは結合されなかったデータ(missed)は捨てた(core.stopに渡した)が、)LEFT
JOINでは後続処理に渡す。
CoGroupの場合、bが存在しない場合はaを出力する。
MasterJoinUpdate | CoGroup | |
---|---|---|
SQLの例 |
select a.column_a, b.column_b from a left join b on b.pk1 = a.fk1; |
|
データモデルの例 | a = { |
|
Operatorの例 | // INNER JOINの場合と全く同じ |
private final C c = new C(); |
Flowの例 | Source<C> c = core.extend(a,
C.class); |
Source<C> c = core.extend(a,
C.class); |
JOINの結合キーの値の種類が少ない場合、Master系演算子やCoGroup演算子では結合キーの値の種類数分しか分散しないので、実行効率が悪くなる可能性がある。
(例えば1種類の値でしか結合しない場合、全データが1箇所で処理されることなる(全く分散しなくなる))
この場合、GroupViewを使う方が良いかもしれない。
なお、GroupViewを使う場合、GroupViewに指定するデータ(マスター側のデータ)の量が少ないことが前提となる。(イメージとしては、データが全てHashMapに詰め込まれる感じなので、メモリーに乗り切るサイズでないとまずい)
LEFT JOINの場合、トランザクション側(「a join b」におけるa)は常に出力されるので、マスター側(「a join
b」におけるb)が1件なのであればUpdate演算子を使うのが良い。マスター側が複数件ある場合は複数レコード出力する必要があるので、Extract演算子を使う。
INNER JOINの場合は、マスター側(「a join b」におけるb)の有無によって出力されないことがあるので、Extract演算子を使う。
LEFT JOIN | INNER JOIN | |
---|---|---|
SQLの例 |
select a.column_a, b.column_b from a left join b on b.pk1 = a.fk1; --「bが存在する場合は常に1件」という前提があるものとする |
select a.column_a, b.column_b from a inner join b on b.pk1 = a.fk1; --「bが存在する場合は常に1件」という前提があるものとする |
データモデルの例 | a = { |
|
Operatorの例 | @Update |
@Extract |
Flowの例 | Source<C> c = core.extend(a,
C.class); |
Source<C> c = core.extend(a,
C.class); |
JOINのON句で固定値と比較している場合は、事前にBranch演算子でフィルタリングする。
もしくは、Master系演算子の場合はMasterSelectionを使う方法も考えられる。
(MasterSelectionはマスター側とトランザクション側(「a join b」におけるbとa)のカラム同士を比較するときに使うものなので、単なる固定値との比較だとメリットはあまり無さそう)
Branch | MasterSelection | |
---|---|---|
SQLの例 |
select a.* from a inner join b on b.pk1 = a.fk1 and b.column_b = 'B' -- 固定値と比較 ; |
|
データモデルの例 | a = { |
|
Operatorの例 | public enum Filter { |
@MasterCheck(selection = "selectB") |
Flowの例 | FilterB filter = operator.filterB(b); |
CheckB check = operator.checkB(b, a); |
DISTINCT(重複排除)する場合はGroupSort演算子あるいはFold演算子を使う。
GroupSort | Fold | |
---|---|---|
SQLの例 |
select distinct key1, column1 from a; |
|
Operatorの例 | @GroupSort |
@Fold(partialAggregation =
PartialAggregation.PARTIAL) |
Fold演算子の方がコーディング量は少ない。
ただし、グルーピングされたデータが複数レコードある場合、どのレコードが残るかは不定。
GroupSort演算子ならorderでソート順を指定できるので、出力されるレコードを制御できる。
(例えばM3BPとSparkでは内部の(シャッフルの)処理方式が異なるので、同じ演算子でも出力されるレコードが異なることがある。これを比較すると結果が違う!ということになってしまうので、同じ結果を得られるようにするにはソートした方が確実)
あるカラムの値が最大(あるいは最小)になるレコードを取得するのは、標準的なSQLだとけっこう面倒だし実行効率も悪そう。
-- create table table1 (key1 CHAR(10), key2 CHAR(10), column1 CHAR(10)); select b.* from (select key1, max(column1) as max_column1 from table1 group by key1) a, table1 b where b.key1 = a.key1 and b.column1 = a.max_column1;
(Oracleだと分析関数+overを使って書くことも出来るが、やはりあまり分かりやすいとは思えない^^;)
AsakusaFWだとGroupSort演算子を使って素直に書ける。
@GroupSort public void max(@Key(group = { "key1" }, order = { "column1 DESC" }) @Once Iterable<Table1> table1List, Result<Table1> out) { for (Table1 b : table1List) { // 先頭1件を出力(table1Listはcolumn1の降順になるので、先頭レコードはcolumn1が最大) out.add(b); break; } }
MERGE文は、更新対象テーブルに対し、更新データが存在していればUPDATE、無ければINSERTするもの。[2018-11-23]
AsakusaFWでは、MasterJoinUpdate演算子で更新、無かった場合は(Convert演算子等でデータを変換して)追加すればよい。
ただしこれは、キーに対して更新データが1つしか無い場合の話。
AsakusaFWのMasterJoinUpdateはキーに対して更新データが複数ある場合はどれか1レコードのみが使われる。
SQLのMERGE文の場合、更新データが複数あったらたぶん先頭1件がINSERTで残りはUPDATEとして全部処理されると思う。
これを実現しようと思ったら、AsakusaFWではCoGroup演算子を使うのが一番コーディングしやすそう。
MasterJoinUpdate | CoGroup | |
---|---|---|
SQLの例 |
merge into a using b on (a.key1 = b.key1) when matched then -- 既存データの更新 update set column1 = b.column1 when not matched then -- 追加 insert (key1, column1) values (b.key1, b.column1) ; |
|
Operatorの例 | @MasterJoinUpdate
|
@CoGroup
|
Flowの例 | In<A> a = 〜; |
In<A> a = 〜; |
テーブルをJOINするSELECT文は、Master系演算子やCoGroup演算子を使用する。
(結合ごとに演算子メソッドを定義することになるので、複数のテーブルを結合している場合は結構な量になってしまうがorz)
結合に必要な全項目を入れたデータモデルを作っておき、Master系演算子やCoGroup演算子でそこに値をセットしていく方式がお勧め。
UPDATE文は、Update演算子を使う。
更新データに他テーブルの値を使う場合はMasterJoinUpdate演算子やCoGroup演算子で更新する。
(他テーブルのデータ量が少ないようなら、Update演算子のままGroupViewを使う方法もある)
DELETE文は、Branch演算子を使う。
削除条件に他テーブルの値を使う場合はMasterCheck演算子やCoGroup演算子で削除する。
(他テーブルのデータ量が少ないようなら、Branch演算子のままGroupViewを使う方法もある)
RDBを使うバッチは、まず処理対象をテーブル(やファイル)から取得し、それに対して必要なデータをテーブルから取得して処理していく方式を採っているものが大半だと思われる。
その「必要なデータをテーブルから取得」する部分は、新たなSELECT文を発行するような方式になっていることが多いと思う。(それが一番コーディングしやすいから。しかし一番N+1問題を誘発しやすいと思う)
AsakusaFW化する場合、こういう方式ではなく、事前に必要なテーブルと結合してデータを取得しておき、後でまとめて処理を行う方式となる。
(ただし、GroupViewを使う方式は、都度SELECTする方式と非常に似ている)
SQLを使ったバッチの例 | AsakusaFWの例 | ||
---|---|---|---|
GroupView方式 | 事前結合方式 | ||
テーブル定義 (データモデル定義) |
-- 処理対象 |
"処理対象" |
|
"処理対象ワーク" |
|||
処理本体 | // Java |
// Operator |
// Operator |
// Flow |
// Flow |
GroupViewを使う方式は、SQLを使った方式と似ていて分かりやすい。
(Update演算子を使っているので、これだけで(マルチスレッドやマルチサーバーで)並列処理される。SQLで取得した結果を直列処理するより効率よさそうw)
結合キーが異なる複数のテーブルから取得する場合でも簡単に対応できる。
ただしGroupViewはメモリーに乗り切るデータサイズでないといけないという制約がある。
(M3BPを使う場合、全データをシリアライズしてメモリーに保存する上に、GruopViewとしてJava用に保持するメモリーが別途必要になる)
MasterJoinUpdate演算子等を使う方式だとメソッドの数は増えてしまうが、AsakusaFWの最適化が効く可能性があるので、なるべくこちらにしたい。
ただし、処理本体で複数レコードを取得して使用する場合は、事前にデータを取得する方式では単純に出来ない。
こういう場合は(基本的に)処理本体で結合するしかない。
(事前に結合して、1つの文字列項目に何らかの方式で(例えばJSONで)入れてしまうという手も考えられる。ちょっと微妙^^;だけど、背に腹は変えられない)
処理中に取得する為に実行するSELECT文は、複雑な(複数のテーブルを結合する)SQL文であることがある。
これをAsakusaFW化する際は、そのSELECT文に相当するFlowPartを作って分離しておくと分かりやすい。(テストもしやすい)
SQLを使ったバッチの例 | AsakusaFWの例 | |
---|---|---|
テーブル定義 (データモデル定義) |
-- 処理対象 |
"処理対象" |
処理本体 | // Java(Data12Repository) |
// FlowPartのOperator(Data12Operator) |
// FlowPart(Data12FlowPart) |
||
// Java |
// Operator |
|
// Flow |
しかし、処理本体の値をパラメーターとして使用するSELECT文の場合、それを考慮しないと膨大なデータ量になってしまう場合がありうる。
そういう場合は、パラメーター(と結果)を保持するデータモデルを作って、それを入力とするFlowPartを作る方法が考えられる。
SQLを使ったバッチの例 | AsakusaFWの例 | |
---|---|---|
テーブル定義 (データモデル定義) |
-- 処理対象 |
"処理対象" |
処理本体 | // Java(Data12Repository) |
// FlowPartのOperator(Data12Operator) |
// FlowPart(Data12FlowPart) |
||
// Java |
// Operator |
|
// Flow |