S-JIS[2018-09-24/2018-12-02] 変更履歴

AsakusaFW SQLの例

Asakusa FrameworkにSQLを移植する例。


概要

Asakusa Frameworkは(分散)並列処理技術を使ってバッチを(開発・)実行するフレームワークである。
宣伝風に短く言えば(笑)、バッチを高速化するフレームワークである。

RDBを使っているバッチでは、AsakusaFWに移植することで高速化できる可能性がある。
個人的に経験した大抵のシステムでは、DBサーバーは1台(ないし少数)でバッチサーバーが複数台(多数)ある構成になっており、DBサーバーに負荷が集中してバッチの実行が遅くなっていた。
しかしAsakusaFWだと最初のデータ取得と最後のデータ反映以外ではDBサーバーにアクセスしないように作るので、DBサーバーの負荷が減って高速化する。
というのが個人的な見解(期待)。

(RDBを使うバッチで、SQLの使い方がN+1問題になっているようなものだと、DBサーバーの負荷に関係なく遅い。AsakusaFW化するにはN+1問題のような取得方法をやめる必要があるので、早くなる可能性が高い。
 この場合、RDBを使う方式のままN+1問題を解決すれば早くなる可能性は当然あるので、「AsakusaFWが早い」という言い方は語弊があるかもしれない)


データ型

RDBのデータ型をAsakusaFWのデータモデルデータ型にマッピングする必要がある。

RDBのデータ型 AsakusaFWのデータ型 備考
NUMERIC 整数は長さに応じてINTやLONG。
それ以外の数値はDECIMAL。
9桁以内ならINT、18桁以内ならLONGで可。
Javaのint, longの値の範囲
CHAR
VARCHAR
文字列はTEXT。(AsakusaFWでは長さは可変)  
DATE 日付(yyyyMMdd)ならDATE、
日時(yyyyMMddHHmmss)(秒まで)ならDATETIME。
RDBMSによっては0年0月0日を扱えるようだが、
AsakusaFWは対応していない。
TIMESTAMP AsakusaFWにはミリ秒以下を保持できるデータ型は無い。
必要ならTEXTやLONGで扱うしか無さそう。
 

カラムの演算

SQLでは、基本的に、NULLとの演算結果はNULLになる。
(Oracleの文字列はNULLと空文字列が同一なので、NULLと結合するときは空文字列扱いだが)

AsakusaFWではnullの扱いは普通のJavaと同じ。

SQLの例 AsakusaFWの例 備考
a.int1 + 99 a.getInt1Option().isNull() ? null : a.getInt1() + 99  
a.str1 || 'Z' a.getStr1Option().or("") + "Z" /* Oracle */ Oracleの文字列結合の場合、NULLは空文字列として扱われる。

WHERE条件

SQLのWHERE条件やJOINのON条件では、テーブルのカラム同士や値で比較する。

SQLでは、NULLとの比較結果はunknownになる。(unknownはfalse扱い)
つまり「NULL = NULL」はunknown(false)になるが、AsakusaFWのOption系クラスでは、nullの入ったOption同士をequalsで比較するとtrueになる。
また、「NOT unknown」もunknown、つまりfalseになる。AsakusaFW(Java)では「!false」はtrueなので、これも異なる。

SQLの例 AsakusaFWの例 備考
a.column1 is null a.getColumn1Option().isNull()  
a.column1 is not null a.getColumn1Option().isPresent()  
a.column1 = 'A' a.getColumn1Option().has("A")  
a.column1 = b.column1 (a.getColumn1Option().isNull() || b.getColumn1Option().isNull()) ? false : a.getColumn1Option().equals(b.getColumn1Option()) どちらかがnullでない(NOT NULL項目である)と分かっている場合は、nullチェックは不要。
a.column1 <> 'A' a.getColumn1Option().isNull() ? false : !a.getColumn1Option().has("A")  
a.column1 <> b.column1 (a.getColumn1Option().isNull() || b.getColumn1Option().isNull()) ? false : !a.getColumn1Option().equals(b.getColumn1Option())  
not (a.column1 = 'A') a.getColumn1Option().isNull() ? false : !a.getColumn1Option().has("A")  

このため、SQLのような比較を行うユーティリティーを用意した方がいいと思う。(こういうユーティリティーはScalaだと綺麗に書けるんだけどなぁ)


WHERE条件は、AsakusaFWではBranch演算子で実現する。

  Branch
SQLの例
select *
from a
where a.column1 = 'Z';
Operatorの例 public enum Filter {
  OUT, MISSED
}

@Branch
public Filter whereA(A a) {
  if (a.getColumn1Option().has("Z")) {
    return Filter.OUT;
  }
  return Filter.MISSED;
}
Flowの例 Source<A> a = 〜;
WhereA where = operator.whereA(a);
core.stop(where.missed);
a = where.out;

GROUP BY

集約関数(sum, min, max, count)は、集約対象データにNULLが無い場合はDMDLの集計モデル(およびSummarize演算子)で実現できる。[2018-12-02]

  データにNULLが含まれない場合
SQLの例
select key, sum(value) as sum_value, min(value) as min_value, count(value) as count_value
from a
group by key
;
DMDLの例 summarized sum_a = a => {
  any key -> key;

  sum value -> sum_value;

  min value -> min_value;

  count value -> count_value;
} % key;
Operatorの例 @Summarize
public abstract SumA summarizeA(A in);

集約対象データにNULLが含まれる場合は、SQLとSummarize演算子では挙動が異なる。

  SQL Summarize演算子
sum
min
max
NULLは集約対象外。
全データがNULLだと結果はNULLになる。
NullPointerExceptionが発生する。
count NULLはカウント対象外。
全データがNULLだと結果は0になる。
全データがカウント対象。
(データがnullかどうかは無関係)

この場合は、Fold演算子を使って集約処理を書く。

	@Convert
	public SumA convertSumA(A in) {
		SumA result = this.sumA;
		result.reset();

		result.setKeyOption(in.getKeyOption());
		result.setSumValueOption(in.getValueOption());
		result.setMinValueOption(in.getValueOption());
		result.setCountValue(in.getValueOption().isPresent() ? 1 : 0);

		return result;
	}

	@Fold
	public void foldA(@Key(group = { "key" }) SumA in, SumA right) {
		if (in.getSumValueOption().isNull()) {
			in.setSumValueOption(right.getSumValueOption());
		} else {
			in.getSumValueOption().add(right.getSumValueOption());
		}

		if (in.getMinValueOption().isNull()) {
			in.setMinValueOption(right.getMinValueOption());
		} else if (right.getMinValueOption().isPresent()) {
			in.getMinValueOption().min(right.getMinValueOption());
		}

		in.getCountValueOption().add(right.getCountValueOption());
	}

sumに関しては、数値のOption系クラスにはaddメソッドがあるので、これで合算できる。
が、nullの場合はNullPointerExceptionが発生するので、nullの場合は単純移送とする。(rightがnullの場合はnullのままだし、rightがnull以外の場合はその値が使われることになる)
(集計前のレコードでnullのときに0を入れておく方法だと、全データがnullのときにnullにならない)

min, maxに関しては、Option系クラスにmin, maxメソッドがあるので、これで実現できる。
が、nullの場合はnullになってしまうので、nullでない場合だけ呼び出すようにする必要がある。

countに関しては、集計前のレコード全てに1(nullのときは0)を入れておき、単純に合算すればいい。


INNER JOIN

INNER JOIN(内部結合)は、双方にレコードが存在するときだけ出力される。

「a inner join b」において、aが複数レコードでbが1レコードの場合は、aのレコード数分出力される。
bが複数レコードある場合、aのレコード数×bのレコード数分出力される。

AsakusaFWの場合、結合に使える演算子はMasterJoin/MasterJoinUpdateCoGroupである。
Master系演算子は、結合されたマスター側(「a inner join b」におけるb)が1件であるという前提がある。マスター側が複数レコードある場合、どれか1件が使われる。(つまりMaster系演算子では、出力のレコード数がトランザクション側(「a inner join b」におけるa)より増えることは無い)
したがって、「a inner join b」のbが1件だと分かっている場合(結合キーがbのプライマリキーとかユニークキーである場合)はMaster系演算子を使い、そうでない場合はCoGroupを使う。

  MasterJoinUpdate CoGroup
SQLの例
select a.column_a, b.column_b
from a inner join b
on b.pk1 = a.fk1;
データモデルの例 a = {
  column_a : TEXT;
  fk1 : TEXT;
};

b = {
  pk1 : TEXT;
  column_b : TEXT;
};

c = a + {
  column_b : TEXT;
};
Operatorの例 @MasterJoinUpdate
public void joinB(
    @Key(group = { "pk1" }) b,
    @Key(group = { "fk1" }) c) {
  c.setColumnBOption(b.getColumnBOption());
}
private final C c = new C();

@CoGroup
public void joinB(
    @Key(group = { "fk1" }) @Once Iterable<C> cList,
    @Key(group = { "pk1" }) List<B> bList,
    Result<C> out) {
  C result = this.c;
  for (C c : cList) {
    for (B b : bList) {
      result.copyFrom(c);
      result.setColumnBOption(b.getColumnBOption());
      out.add(result);
    }
  }
}
Flowの例 Source<C> c = core.extend(a, C.class);
JoinB join = operator.joinB(b, c);
core.stop(join.missed);
c = join.updated;
Source<C> c = core.extend(a, C.class);
c = operator.joinB(c, b).out;

※厳密には、結合キーの値がnull同士の場合、SQLでは結合しない(と思う)がAsakusaFWでは結合する。
 普通は結合キー(特にマスター側)がnullになることは無いと思うが、もしそこを厳密にしたいなら、事前にBranch演算子でnullを除外しておけばよい。


ON句の結合キーが等号で結ばれるものならMaser系演算子やCoGroup演算子のキー項目に指定すればよいが、等号以外の演算を行う場合はキーには指定できない。
この場合は以下のようにする。

  MasterJoinUpdate CoGroup
SQLの例
select a.column_a, b.column_b
from a inner join b
on b.pk1 = a.fk1
and b.start_date <= a.date1 and a.date1 <= b.end_date;
-- start_date, end_dateはNOT NULLとする
データモデルの例 a = {
  column_a : TEXT;
  fk1 : TEXT;
  date1 : DATE;
};

b = {
  pk1 : TEXT;
  column_b : TEXT;
  start_date : DATE;
  end_date : DATE;
};

c = a + {
  column_b : TEXT;
};
Operatorの例 @MasterJoinUpdate(selection = "selectB")
public void joinB(
    @Key(group = { "pk1" }) b,
    @Key(group = { "fk1" }) c) {
  c.setColumnBOption(b.getColumnBOption());
}

@MasterSelection
public B selectB(List<B> bList, C c) {
  for (B b : bList) {
    if (b.getStartDateOption().compareTo(c.getDate1Option()) <= 0
     && c.getDate1Option().compareTo(b.getEndDateOption()) <= 0) {
      return b;
    }
  }
}
private final C c = new C();

@CoGroup
public void joinB(
    @Key(group = { "fk1" }) @Once Iterable<C> cList,
    @Key(group = { "pk1" }) List<B> bList,
    Result<C> out) {
  C result = this.c;
  for (C c : cList) {
    for (B b : bList) {
      if (b.getStartDateOption().compareTo(c.getDate1Option()) <= 0
       && c.getDate1Option().compareTo(b.getEndDateOption()) <= 0) {
        result.copyFrom(c);
        result.setColumnBOption(b.getColumnBOption());
        out.add(result);
      }
    }
  }
}
Flowの例 Source<C> c = core.extend(a, C.class);
JoinB join = operator.joinB(b, c);
core.stop(join.missed);
c = join.updated;
Source<C> c = core.extend(a, C.class);
c = operator.joinB(c, b).out;

ON句で固定値と比較している場合はBranch演算子を使う


INNER JOINによる存在チェック

レコードが存在しているかどうかのチェックの為に結合している場合(結合したカラムを使わない場合)は、MasterCheck演算子が使える。

  MasterCheck CoGroup
SQLの例
select a.* -- bのカラムを使っていない
from a inner join b
on b.pk1 = a.fk1;
データモデルの例 a = {
  column_a : TEXT;
  fk1 : TEXT;
};

b = {
  pk1 : TEXT;
  column_b : TEXT;
};
Operatorの例 @MasterCheck
public abstract boolean checkB(
    @Key(group = { "pk1" }) b,
    @Key(group = { "fk1" }) a);
@CoGroup
public void checkB(
    @Key(group = { "fk1" }) @Once Iterable<A> ) aList,
    @Key(group = { "pk1" }) List<B> bList,
    Result<A> out) {
  if (!bList.isEmpty()) {
    for (A a : aList) {
      out.add(a);
    }
  }
}
Flowの例 CheckB check = operator.checkB(b, a);
core.stop(check.missed);
a = check.found;
a = operator.checkB(a, b).out;

※厳密には、マスター側(上記のb)が複数レコード存在する場合は、aもその件数分増幅することになるはず。しかし実際のところ、それが嬉しいことはあまり無いと思う。


LEFT JOIN

LEFT JOIN(LEFT OUTER JOIN)(左結合)は、左側(「a left join b」におけるa)のレコードは常に出力される。

「a left join b」において、aが複数レコードでbが1レコードの場合は、aのレコード数分出力される。
bが複数レコードある場合、aのレコード数×bのレコード数分出力される。
bが無い場合でもaは出力される。(←これがINNER JOINとの違い)

AsakusaFWでは、LEFT JOIN用の演算子の扱いはINNER JOINで使った演算子とほぼ同様となる。
Master系演算子の場合、(INNER JOINでは結合されなかったデータ(missed)は捨てた(core.stopに渡した)が、)LEFT JOINでは後続処理に渡す。
CoGroupの場合、bが存在しない場合はaを出力する。

  MasterJoinUpdate CoGroup
SQLの例
select a.column_a, b.column_b
from a left join b
on b.pk1 = a.fk1;
データモデルの例 a = {
  column_a : TEXT;
  fk1 : TEXT;
};

b = {
  pk1 : TEXT;
  column_b : TEXT;
};

c = a + {
  column_b : TEXT;
};
Operatorの例 // INNER JOINの場合と全く同じ
@MasterJoinUpdate
public void joinB(
    @Key(group = { "pk1" }) b,
    @Key(group = { "fk1" }) c) {
  c.setColumnBOption(b.getColumnBOption());
}
private final C c = new C();

@CoGroup
public void joinB(
    @Key(group = { "fk1" }) @Once Iterable<C> cList,
    @Key(group = { "pk1" }) List<B> bList,
    Result<C> out) {
  // ここがINNER JOINの場合と異なる
  if (bList.isEmpty()) {
    for (C c : cList) {
      out.add(c);
    }
    return;
  }

  // 以降はINNER JOINの場合と全く同じ
  C result = this.c;
  for (C c : cList) {
    for (B b : bList) {
      result.copyFrom(c);
      result.setColumnBOption(b.getColumnBOption());
      out.add(result);
    }
  }
}
Flowの例 Source<C> c = core.extend(a, C.class);
JoinB join = operator.joinB(b, c);
// ここがINNER JOINの場合と異なる
c = core.confluent(join.updated, join.missed);
Source<C> c = core.extend(a, C.class);
c = operator.joinB(c, b).out;

GroupViewを使用したJOIN

JOINの結合キーの値の種類が少ない場合、Master系演算子やCoGroup演算子では結合キーの値の種類数分しか分散しないので、実行効率が悪くなる可能性がある。
(例えば1種類の値でしか結合しない場合、全データが1箇所で処理されることなる(全く分散しなくなる))

この場合、GroupViewを使う方が良いかもしれない。
なお、GroupViewを使う場合、GroupViewに指定するデータ(マスター側のデータ)の量が少ないことが前提となる。(イメージとしては、データが全てHashMapに詰め込まれる感じなので、メモリーに乗り切るサイズでないとまずい)

LEFT JOINの場合、トランザクション側(「a join b」におけるa)は常に出力されるので、マスター側(「a join b」におけるb)が1件なのであればUpdate演算子を使うのが良い。マスター側が複数件ある場合は複数レコード出力する必要があるので、Extract演算子を使う。
INNER JOINの場合は、マスター側(「a join b」におけるb)の有無によって出力されないことがあるので、Extract演算子を使う。

  LEFT JOIN INNER JOIN
SQLの例
select a.column_a, b.column_b
from a left join b
on b.pk1 = a.fk1;
--「bが存在する場合は常に1件」という前提があるものとする
select a.column_a, b.column_b
from a inner join b
on b.pk1 = a.fk1;
--「bが存在する場合は常に1件」という前提があるものとする
データモデルの例 a = {
  column_a : TEXT;
  fk1 : TEXT;
};

b = {
  pk1 : TEXT;
  column_b : TEXT;
};

c = a + {
  column_b : TEXT;
};
Operatorの例 @Update
public void joinB(C c,
    @Key(group = { "pk1" }) GroupView<B> bView) {

  List<B> bList = bView.find(c.getFk1Option());
  if (!bList.isEmpty()) {
    assert bList.size() == 1; // 常に1件という前提
    B b = bList.get(0);
    c.setColumnBOption(b.getColumnBOption());
  }
}
@Extract
public void joinB(C c,
    @Key(group = { "pk1" }) GroupView<B> bView,
    Result<C> out) {
  List<B> bList = bView.find(c.getFk1Option());
  if (!bList.isEmpty()) {
    assert bList.size() == 1; // 常に1件という前提
    B b = bList.get(0);
    c.setColumnBOption(b.getColumnBOption());
    out.add(c);
  }
}
Flowの例 Source<C> c = core.extend(a, C.class);
c = operator.joinB(c, b).out;
Source<C> c = core.extend(a, C.class);
c = operator.joinB(c, b).out;

ON句での固定値との比較

JOINのON句で固定値と比較している場合は、事前にBranch演算子でフィルタリングする。
もしくは、Master系演算子の場合はMasterSelectionを使う方法も考えられる。
(MasterSelectionはマスター側とトランザクション側(「a join b」におけるbとa)のカラム同士を比較するときに使うものなので、単なる固定値との比較だとメリットはあまり無さそう)

  Branch MasterSelection
SQLの例
select a.*
from a inner join b
on b.pk1 = a.fk1
and b.column_b = 'B' -- 固定値と比較
;
データモデルの例 a = {
  column_a : TEXT;
  fk1 : TEXT;
};

b = {
  pk1 : TEXT;
  column_b : TEXT;
};
Operatorの例 public enum Filter {
  OUT, MISSED
}

@Branch
public Filter filterB(B b) {
  if (b.getColumnBOption().has("B")) {
    return Filter.OUT;
  }
  return Filter.MISSED;
}

@MasterCheck
public abstract boolean checkB(
    @Key(group = { "pk1" }) b,
    @Key(group = { "fk1" }) a);
@MasterCheck(selection = "selectB")
public abstract boolean checkB(
    @Key(group = { "pk1" }) b,
    @Key(group = { "fk1" }) a);

@MasterSelection
public B selectB(List<B> bList, A a) {
  for (B b : bList) {
    if (b.getColumnBOption().has("B")) {
      return b;
    }
  }
  return null;
}
Flowの例 FilterB filter = operator.filterB(b);
core.stop(filter.missed);

CheckB check = operator.checkB(filter.out, a);
core.stop(check.missed);
a = check.found;
CheckB check = operator.checkB(b, a);
core.stop(check.missed);
a = check.found;

重複排除(distinct)

DISTINCT(重複排除)する場合はGroupSort演算子あるいはFold演算子を使う。

  GroupSort Fold
SQLの例
select distinct key1, column1
from a;
Operatorの例 @GroupSort
public void distinct(@Key(group = { "key1", "column1" }, order = {}) @Once Iterable<A> aList,
    Result<A> out) {
  for (A a : aList) {
    out.add(a);
    break;
  }
}
@Fold(partialAggregation = PartialAggregation.PARTIAL)
public void distinct(@Key(group = { "key1", "column1" }) A a, A right) {
  // distinctの場合、Fold演算子の処理本体は不要
}

Fold演算子の方がコーディング量は少ない。
ただし、グルーピングされたデータが複数レコードある場合、どのレコードが残るかは不定。
GroupSort演算子ならorderでソート順を指定できるので、出力されるレコードを制御できる。
(例えばM3BPSparkでは内部の(シャッフルの)処理方式が異なるので、同じ演算子でも出力されるレコードが異なることがある。これを比較すると結果が違う!ということになってしまうので、同じ結果を得られるようにするにはソートした方が確実)


最大値・最小値の存在するレコードの取得

あるカラムの値が最大(あるいは最小)になるレコードを取得するのは、標準的なSQLだとけっこう面倒だし実行効率も悪そう。

-- create table table1 (key1 CHAR(10), key2 CHAR(10), column1 CHAR(10));

select b.*
from (select key1, max(column1) as max_column1 from table1 group by key1) a, table1 b
where b.key1 = a.key1 and b.column1 = a.max_column1; 

(Oracleだと分析関数+overを使って書くことも出来るが、やはりあまり分かりやすいとは思えない^^;)

AsakusaFWだとGroupSort演算子を使って素直に書ける。

	@GroupSort
	public void max(@Key(group = { "key1" }, order = { "column1 DESC" }) @Once Iterable<Table1> table1List, Result<Table1> out) {
		for (Table1 b : table1List) {
			// 先頭1件を出力(table1Listはcolumn1の降順になるので、先頭レコードはcolumn1が最大)
			out.add(b);
			break;
		}
	}

MERGE

MERGE文は、更新対象テーブルに対し、更新データが存在していればUPDATE、無ければINSERTするもの。[2018-11-23]

AsakusaFWでは、MasterJoinUpdate演算子で更新、無かった場合は(Convert演算子等でデータを変換して)追加すればよい。
ただしこれは、キーに対して更新データが1つしか無い場合の話。
AsakusaFWのMasterJoinUpdateはキーに対して更新データが複数ある場合はどれか1レコードのみが使われる。
SQLのMERGE文の場合、更新データが複数あったらたぶん先頭1件がINSERTで残りはUPDATEとして全部処理されると思う。
これを実現しようと思ったら、AsakusaFWではCoGroup演算子を使うのが一番コーディングしやすそう。

  MasterJoinUpdate CoGroup
SQLの例
merge into a
using b
on (a.key1 = b.key1)
when matched then -- 既存データの更新
  update set column1 = b.column1
when not matched then -- 追加
  insert (key1,   column1)
  values (b.key1, b.column1)
;
Operatorの例 @MasterJoinUpdate
public void updateWhenMatched(
  @Key(group = { "key1" }) B b,
  @Key(group = { "key1" }) A a
) {
  a.setColumn1Option(b.getColumn1Option());
}

private final A a = new A();

@Convert
public A convertWhenNotMatched(B b) {
  A result = this.a;
  result.reset();

  result.setKey1Option(b.getKey1Option());
  result.setColumn1Option(b.getColumn11Option());

  return result;
}

@CoGroup
public void merge(
  @Key(group = { "key1" }, order = {}) List<A> aList,
  @Key(group = { "key1" }, order = {}) List<B> bList,
  Result<A> out
) {
  if (bList.isEmpty()) {
    for (A a : aList) {
      out.add(a);
    }
    return;
  }

  List<A> aList1 = aList;
  List<B> bList1 = bList;
  if (aList.isEmpty()) {
    // INSERT
    aList1 = Collections.singletonList(convertWhenNotMatched(bList.get(0)));
    bList1 = bList.subList(1, bList.size());
  }

  // UPDATE
  for (A a : aList1) {
    for (B b : bList1) {
      updateWhenMatched(b, a);
    }

    out.add(a);
  }
}

private void updateWhenMatched(B b, A a) {
  a.setColumn1Option(b.getColumn1Option());
}

private final A a = new A();

private A convertWhenNotMatched(B b) {
  A result = this.a;
  result.reset();

  result.setKey1Option(b.getKey1Option());
  result.setColumn1Option(b.getColumn11Option());

  return result;
}

Flowの例 In<A> a = 〜;
In<B> b = 〜;
Out<A> out = 〜;

UpdateWhenMatched update = operator.updateWhenMatched(b, a);
out.add(update.updated);
out.add(operator.convertWhenNotMatched(update.missed).out));
In<A> a = 〜;
In<B> b = 〜;
Out<A> out = 〜;

out.add(operator.merge(a, b).out);

AsakusaFW化のポイント

テーブルをJOINするSELECT文は、Master系演算子やCoGroup演算子を使用する。
(結合ごとに演算子メソッドを定義することになるので、複数のテーブルを結合している場合は結構な量になってしまうがorz)
結合に必要な全項目を入れたデータモデルを作っておき、Master系演算子やCoGroup演算子でそこに値をセットしていく方式がお勧め。

UPDATE文は、Update演算子を使う。
更新データに他テーブルの値を使う場合はMasterJoinUpdate演算子やCoGroup演算子で更新する。
(他テーブルのデータ量が少ないようなら、Update演算子のままGroupViewを使う方法もある)

DELETE文は、Branch演算子を使う。
削除条件に他テーブルの値を使う場合はMasterCheck演算子やCoGroup演算子で削除する。
(他テーブルのデータ量が少ないようなら、Branch演算子のままGroupViewを使う方法もある)


RDBを使うバッチは、まず処理対象をテーブル(やファイル)から取得し、それに対して必要なデータをテーブルから取得して処理していく方式を採っているものが大半だと思われる。
その「必要なデータをテーブルから取得」する部分は、新たなSELECT文を発行するような方式になっていることが多いと思う。(それが一番コーディングしやすいから。しかし一番N+1問題を誘発しやすいと思う)

AsakusaFW化する場合、こういう方式ではなく、事前に必要なテーブルと結合してデータを取得しておき、後でまとめて処理を行う方式となる。
(ただし、GroupViewを使う方式は、都度SELECTする方式と非常に似ている)

  SQLを使ったバッチの例 AsakusaFWの例
GroupView方式 事前結合方式
テーブル定義
(データモデル定義)
-- 処理対象
create table target (
  key1 CHAR(10),
  fk1 CHAR(10),
  result1 CHAR(10)
);

-- 結合してくるデータ
create table data (
  pk1 CHAR(10),
  data1 CHAR(10)
);
"処理対象"
target = {
  key1 : TEXT;
  fk1 : TEXT;
  result1 : TEXT;
};

"結合してくるデータ"
data = {
  pk1 : TEXT;
  data1 : TEXT;
};
  "処理対象ワーク"
target_work = target + {
  data1 : TEXT;
};
処理本体 // Java

@Autowired
private DataRepository dataRepository;

public void main(List<TargetEntity> targetList) {
  for (TargetEntity target : targetList) {
    DataEntity data = dataRepository.find(target.getFk1());
    if (data != null) {
      // data.data1を使った処理
      target.setResult1(data.getData1());
    }
  }
}
// Operator

@Update
public void main(Target target,
    @Key(group = { "pk1" }) GroupView<Data> dataView) {
  List<Data> dataList = dataView.find(target.getFk1Option());
  if (!dataList.isEmpty()) {
    Data data = dataList.get(0);
    // data.data1を使った処理
    target.setResult1Option(data.getData1Option());
  }
}
// Operator

// 事前に取得
@MasterJoinUpdate
public void join(@Key(group = { "pk1" }) Data data,
    @Key(group = { "fk1" }) TargetWork work) {
  work.setData1Option(data.getData1Option());
}

@Update
public void main(TargetWork work) {
  // data1を使った処理
  work.setResult1Option(work.getData1Option());
}
// Flow

Source<Target> target = 〜;
Source<Data> data = 〜;

Source<Target> output = operator.main(target, data).out;
// Flow

Source<Target> target = 〜;
Source<Data> data = 〜;

Source<TargetWork> work = core.extend(target, TargetWork.class);
Join join = operator.join(data, work);

Source<TargetWork> output = operator.main(join.updated).out;

GroupViewを使う方式は、SQLを使った方式と似ていて分かりやすい。
(Update演算子を使っているので、これだけで(マルチスレッドやマルチサーバーで)並列処理される。SQLで取得した結果を直列処理するより効率よさそうw)
結合キーが異なる複数のテーブルから取得する場合でも簡単に対応できる。
ただしGroupViewはメモリーに乗り切るデータサイズでないといけないという制約がある。
M3BPを使う場合、全データをシリアライズしてメモリーに保存する上に、GruopViewとしてJava用に保持するメモリーが別途必要になる)

MasterJoinUpdate演算子等を使う方式だとメソッドの数は増えてしまうが、AsakusaFWの最適化が効く可能性があるので、なるべくこちらにしたい。

ただし、処理本体で複数レコードを取得して使用する場合は、事前にデータを取得する方式では単純に出来ない。
こういう場合は(基本的に)処理本体で結合するしかない。
(事前に結合して、1つの文字列項目に何らかの方式で(例えばJSONで)入れてしまうという手も考えられる。ちょっと微妙^^;だけど、背に腹は変えられない


パラメーター指定によるSELECT

処理中に取得する為に実行するSELECT文は、複雑な(複数のテーブルを結合する)SQL文であることがある。
これをAsakusaFW化する際は、そのSELECT文に相当するFlowPartを作って分離しておくと分かりやすい。(テストもしやすい)

  SQLを使ったバッチの例 AsakusaFWの例
テーブル定義
(データモデル定義)
-- 処理対象
create table target (
  key1 CHAR(10),
  fk1 CHAR(10),
  result1 CHAR(10)
);

-- 結合してくるデータ
create table data1 (
  pk1 CHAR(10),
  start_date DATE,
  end_date DATE,
  text1 CHAR(10)
);

create table data2 (
  pk2 CHAR(10),
  date2 DATE,
  text2 CHAR(10)
);
"処理対象"
target = {
  key1 : TEXT;
  fk1 : TEXT;
  result1 : TEXT;
};

"結合してくるデータ"
data1 = {
  pk1 : TEXT;
  start_date : DATE;
  end_date : DATE;
  text1 : TEXT;
};

data2 = {
  pk2 : TEXT;
  date2 : DATE;
  text2 : TEXT;
};

"data1とdata2の結合データ"
data12 = data1 + data2;
処理本体 // Java(Data12Repository)

public Data12Entity find(String key) {
// 以下のSQLを実行
// select * from data1
// inner join data2
//   on  data2.pk2 = data1.pk1
//   and data1.start_date <= data2.date2
//   and data2.date2 <= data1.end_date
// where pk1 = :key
}
// FlowPartのOperator(Data12Operator)

@MaserJoinUpdate(selection  = "select")
public void join(@Key(gropu = { "pk2" }) Data2 data2,
   @Key(group = { "pk1" }) Data12 data12) {
  data12.setText2Option(data2.getText2Option());
}

@MasterSelection
public Data2 select(List<Data2> data2List, Data12 data12) {
  for (Data2 data2 : data2List) {
    if (data12.getStartDateOption().compareTo(date2.getDate2Option()) <= 0
     && date2.getDate2Option().compareTo(data12.getEndDateOption()) <= 0) {
      return data2;
    }
  }
  return null;
}
// FlowPart(Data12FlowPart)

private In<Data1> data1;
private In<Data2> data2;
private Out<Data12> data12;

public void describe() {
  Data12OperatorFactory operator = new Data12OperatorFactory();
  CoreOperatorFactory core = new CoreOperatorFactory();

  Source<Data12> data12 = core.extend(data1, Data12.class);
  Join join = operator.join(data2, data12);
  core.stop(join.missed);
  data12.add(join.updated);
}
// Java

@Autowired
private Data12Repository data12Repository;

public void main(List<TargetEntity> targetList) {
  for (TargetEntity target : targetList) {
    Data12Entity data12 = data12Repository.find(target.getFk1());
    if (data != null) {
      target.setResult1(data12.getText2());
    }
  }
}
// Operator

@MasterJoinUpdate
public void main(@Key(group = { "pk1" }) Data12 data12,
    @Key(group = { "fk1" }) Target target) {
  target.setResult1Option(data12.getText2Option());
}
// Flow

Source<Target> target = 〜;
Source<Data1> data1 = 〜;
Source<Data2> data2 = 〜;

Data12FlowPart flow = new Data12FlowPartFactory().create(data1, data2);
Source<Target> output = operator.main(flow.data12, target).out;

しかし、処理本体の値をパラメーターとして使用するSELECT文の場合、それを考慮しないと膨大なデータ量になってしまう場合がありうる。
そういう場合は、パラメーター(と結果)を保持するデータモデルを作って、それを入力とするFlowPartを作る方法が考えられる。

  SQLを使ったバッチの例 AsakusaFWの例
テーブル定義
(データモデル定義)
-- 処理対象
create table target (
  fk1 CHAR(10),
  fk3 CHAR(10),
  result1 CHAR(10)
);

-- 結合してくるデータ
create table data1 (
  pk1 CHAR(10),
  fk2 CHAR(10)
);

create table data2 (
  pk2 CHAR(10),
  pk3 CHAR(10,
  text2 CHAR(10)
);
"処理対象"
target = {
  fk1 : TEXT;
  fk3 : TEXT;
  result1 : TEXT;
};

"結合してくるデータ"
data1 = {
  pk1 : TEXT;
  fk2 : TEXT;
};

data2 = {
  pk2 : TEXT;
  pk3 : TEXT;
  text2 : TEXT;
};

"data1とdata2の結合データ"
data12 = {
  param_key1 : TEXT;
  param_key3 : TEXT;
} + data1 + data2;
処理本体 // Java(Data12Repository)

public Data12Entity find(String key1, String key3) {
// 以下のSQLを実行
// select * from data1
// inner join data2
//   on  data1.pk1 = :key1
//   and data2.pk2 = data1.fk2
//   and data2.pk3 = :key3
}
// FlowPartのOperator(Data12Operator)

private final Data12 result = new Data12();

@CoGroup
public void join1(@Key(group = { "param_key1" }) @Once Iterable<Data12> data12List,
    @Key(group = { "pk1" }) List<Data1> data1List,
    Result<Data12> out) {
  for (Data12 data12 : data12List) {
    for (Data1 data1 : data1List) {
      result.copyFrom(data12);
      result.setFk2Option(data1.getFk2Option());
      out.add(result);
    }
  }
}

@CoGroup
public void join2(@Key(group = { "fk2", "param_key3" }) @Once Iterable<Data12> data12List,
    @Key(group = { "pk2", "pk3" }) List<Data2> data2List,
    Result<Data12> out) {
  for (Data12 data12 : data12List) {
    for (Data2 data2 : data2List) {
      result.copyFrom(data12);
      result.setText2Option(data2.getText2Option());
      out.add(result);
    }
  }
}
// FlowPart(Data12FlowPart)

private In<Data12> param;
private In<Data1> data1;
private In<Data2> data2;
private Out<Data12> data12;

public void describe() {
  Data12OperatorFactory operator = new Data12OperatorFactory();
  CoreOperatorFactory core = new CoreOperatorFactory();

  Source<Data12> join1 = operator.join1(param, data1).out;
  Source<Data12> join2 = operator.join2(join1, data2).out;
  data12.add(join2);
}
// Java

@Autowired
private Data12Repository data12Repository;

public void main(List<TargetEntity> targetList) {
  for (TargetEntity target : targetList) {
    Data12Entity data12 = data12Repository.find(target.getFk1(), target.getFk3());
    if (data != null) {
      target.setResult1(data12.getText2());
    }
  }
}
// Operator

private final Data12 param = new Data12();

@Convert
public Data12 convertParam(Target target) {
  param.reset();
  param.setParamKey1Option(target.getFk1Option());
  param.setParamKey3Option(target.getFk3Option());
  return param;
}

@MasterJoinUpdate
public void main(@Key(group = { "param_key1", "param_key3" }) Data12 data12,
    @Key(group = { "fk1", "fk3" }) Target target) {
  target.setResult1Option(data12.getText2Option());
}
// Flow

Source<Target> target = 〜;
Source<Data1> data1 = 〜;
Source<Data2> data2 = 〜;

Source<Data12> param = operator.convertParam(target).out;
Data12FlowPart flow = new Data12FlowPartFactory().create(param, data1, data2);
Source<Target> output = operator.main(flow.data12, target).out;

AsakusaFW目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま