DigDagのpyオペレーターのメモ。
|
|
digファイル内にpyオペレーターを書くことで、Pythonプログラムを実行できる。
このPythonプログラムでは、ワークフローの後続処理で読む環境変数を設定することが出来る。
また、ワークフロー自体を変更する(サブタスクを追加する)ことも出来るらしい。
pyファイルはdigファイルと同じ場所に置く。
あるいは、ディレクトリーを作って、その中に置く。
ファイル構成 | pyファイルの例 | digファイル内の記述例 | 備考 |
---|---|---|---|
example.dig |
# tasks.py |
py>: tasks.my_task |
digファイルと同じ場所にpyファイルを置くと、 「ファイル名 . 関数名」で呼び出せる。 |
example.dig |
# tasks/__init__.py |
py>: tasks.my_task |
ディレクトリーの下に__init__.pyを置くと、 「ディレクトリー名 . 関数名」で呼び出せる。 |
example.dig |
# tasks/test.py |
py>: tasks.test.my_task |
ディレクトリーの下にpyファイルを置く場合、 (空でもいいので)__init__.pyが必要。 「ディレクトリー名 . ファイル名. 関数名」で呼び出せる。 |
実行に使われるPythonは、DigDagが動いているユーザーのpythonコマンドだと思われる。
つまり、DigDagがrootユーザーで動いていれば、rootユーザーのpythonが使われる。(古めのOSなら、このPythonは2系だろう)
使用するpythonコマンドをdigファイルの中で指定することが出来る。
_export: py: python: /usr/local/bin/python3.8 +test: py>: tasks.my_task
+test: _export: py: python: /usr/local/bin/python3.8 py>: tasks.my_task
+test: _export: py: python: /usr/local/bin/python3.8 +execute: py>: tasks.my_task
_export: py: python: ['pipenv', 'run', 'python'] +test: py>: tasks.my_task _env: PIPENV_PIPFILE: /path/to/Pipfile
例2は、「pipenv run python tasks.py」という感じの実行方法。
pipenvではカレントディレクトリーにあるPipfileを読むと思われるが、カレントディレクトリーはDigDagのタスクによって異なるらしい。
そこで、環境変数PIPENV_PIPFILEで、Pipfileの場所を指定する。[2021-10-11]
(が、一度はこれで上手くいったが、二度目以降は成功していないorz 他に何か環境周りの設定が関係しているのだろうか)
参考: hiroyuki-satoさんのdigdag.md
Pythonプログラムに渡す引数をdigファイル上に記述できる。
+test: py>: tasks.my_task arg1: "abc" arg2: 123
def my_task(arg1, arg2): print(arg1) print(arg2)
変数名だけ合っていれば、digファイル上に引数を書く順序はどうでもいい。
デフォルト引数も対応している。
+test: py>: tasks.my_task arg1: "abc" arg2: 123
def my_task(arg1, arg2, arg3='zzz'): print(arg1) print(arg2) print(arg3)
Pythonプログラムから環境変数に値を書き込むことが出来る。書き込んだ値は、DigDagのワークフローの後続処理で読み込める。
(シェルからもDigDagの環境変数に書き込めたら便利なんだけどなぁ)
import digdag def my_task(): digdag.env.store({ '環境変数名': '値' })
digdagというモジュールは、特にインストールや設定をしなくても使用できる。
参考:kaki_kさんのDigdagのPython APIを使う
なお、digファイルの${ }
ではJavaScriptの演算を書くことが出来るので、簡単な計算結果を環境変数に入れたいだけなら、そちらを使う方が簡単かも。[2021-08-26]
→ifオペレーターの例
import digdag
def my_task():
digdag.env.store({ 'target_list': ['abc', 'def', 'ghi'] })
+test1: # target_listに書き込む py>: tasks.my_task +test2: for_each>: target: ${target_list} _do: echo>: ${target}
my_taskで環境変数target_listにabc,def,ghiという3つの値を(リストとして)書き込んで、for_eachでその値の個数分ループする。