Asakusa FrameworkのOperator DSLの合流演算子(confluent)のメモ。
|
合流演算子は、(同一データモデルの)複数の入力レコードを単一の出力にまとめる演算子。
性能特性はExtract(旧ドキュメントではMap)。[/2016-02-11]
| 入力 ポート数 |
入力データモデル の制約 |
イメージ | 出力 ポート数 |
出力データモデル の制約 |
入力1レコード に対する 出力レコード数 |
|---|---|---|---|---|---|
| 任意 (※) |
全て同じ データモデル |
![]() |
1 | inと同じ データモデル |
1レコード。 |
例えば1つ目の入力から3レコード、2つ目から5レコード、3つ目から10レコードの入力があったとすると、18レコード出力される。
3つの入力を1つにまとめる例。

| 入力データ例 | 出力データ例 | |||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| in |
|
→ | out |
|
||||||||||||
|
||||||||||||||||
|
||||||||||||||||
hoge = {
〜
};
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory; import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory.Confluent; import com.example.modelgen.dmdl.model.Hoge;
@Override
public void describe() {
CoreOperatorFactory core = new CoreOperatorFactory();
Confluent<Hoge> c = core.confluent(this.in1, this.in2, this.in3);
this.out.add(c.out);
}
合流演算子はコア演算子なので、Operatorクラスにプログラマーが何かを実装する必要は無い。
なお、出力データの指定方法は、「out.add(c)」でも「out.add(c.out)」でも同じ。見た目が違うだけ。
合流演算子はコア演算子なので、(Operatorクラスにプログラマーが何も実装していないので)Operatorの単体テストを実装する必要は無い。
当サイトの演算子の一覧では、合流演算子は以下のような図にしている(1つの入力ポートに複数の接続がある)。

しかし実際のFlow DSLは以下のように書く。
// 合流演算子(@Branch) Confluent<Hoge> c = core.confluent(this.in1, this.in2, this.in3); // グループ結合演算子(@CoGroup) Example e = operators.example(this.in1, this.in2, this.in3);
合流演算子も引数の取り方は他の演算子と同じ。
他の演算子では引数は別々の入力ポートを意味している。
つまり、上記の合流演算子の図では1つのポートに複数の接続をしているように描いているが、
実際には(接続本数分の)異なる入力ポートがあって、それぞれに接続しているのが正しい。

わざわざ(正確でない)前者の図を使った理由は、複製と合流を対比させるイメージにするため。
※Toad Editorでは、合流演算子は丸の中に∋の入った図形で表される。
演算子の入力ポートでは、1つの入力ポートに1つのデータしか接続することが出来ない。
なので、複数のデータを入力したいと思ったら、合流演算子で1つに合流させる必要がある。
ただし、フローの出力ポートに関しては、(合流演算子を使わなくても)複数の接続をすることが出来る。

private final In<Hoge> in1; private final In<Hoge> in2; private final In<Hoge> in3; private final Out<Hoge> out;
@Override
public void describe() {
this.out.add(this.in1);
this.out.add(this.in2);
this.out.add(this.in3);
}
合流演算子は、SQLのUNIONに似ている。
INSERT INTO out SELECT * FROM in1 UNION SELECT * FROM in2 UNION SELECT * FROM in3;
あるいは、Scalaのコレクション同士を結合するメソッドに似ている。
val in1 : List[Hoge] = 〜 val in2 : List[Hoge] = 〜 val in3 : List[Hoge] = 〜 val out = in1 ++ in2 ++ in3