DigDag pyメモ(Hishidama's DigDag py Memo) S-JIS[2021-07-04/2021-10-11] 変更履歴

DigDag py

DigDagのpyオペレーターのメモ。


概要

digファイル内にpyオペレーターを書くことで、Pythonプログラムを実行できる。

このPythonプログラムでは、ワークフローの後続処理で読む環境変数を設定することが出来る。
また、ワークフロー自体を変更する(サブタスクを追加する)ことも出来るらしい。


Pythonファイルを置く場所

pyファイルはdigファイルと同じ場所に置く。
あるいは、ディレクトリーを作って、その中に置く。

ファイル構成 pyファイルの例 digファイル内の記述例 備考
example.dig
tasks.py
# tasks.py
def my_task():
    print("my task")
py>: tasks.my_task digファイルと同じ場所にpyファイルを置くと、
「ファイル名.関数名」で呼び出せる。
example.dig
tasks
└__init__.py
# tasks/__init__.py
def my_task():
    print("my task")
py>: tasks.my_task ディレクトリーの下に__init__.pyを置くと、
「ディレクトリー名.関数名」で呼び出せる。
example.dig
tasks
├__init__.py
└test.py
# tasks/test.py
def my_task():
    print("my task")
py>: tasks.test.my_task ディレクトリーの下にpyファイルを置く場合、
(空でもいいので)__init__.pyが必要。
「ディレクトリー名.ファイル名.関数名」で呼び出せる。

Pythonのバージョン

実行に使われるPythonは、DigDagが動いているユーザーのpythonコマンドだと思われる。
つまり、DigDagがrootユーザーで動いていれば、rootユーザーのpythonが使われる。(古めのOSなら、このPythonは2系だろう)

使用するpythonコマンドをdigファイルの中で指定することが出来る。

例1(フルパスを指定する例):

_export:
  py:
    python: /usr/local/bin/python3.8

+test:
  py>: tasks.my_task
+test:
  _export:
    py:
      python: /usr/local/bin/python3.8
  py>: tasks.my_task
+test:
  _export:
    py:
      python: /usr/local/bin/python3.8

  +execute:
    py>: tasks.my_task

例2(実行コマンド(引数)を指定する例):

_export:
  py:
    python: ['pipenv', 'run', 'python']

+test:
  py>: tasks.my_task
  _env:
    PIPENV_PIPFILE: /path/to/Pipfile

例2は、「pipenv run python tasks.py」という感じの実行方法。

pipenvではカレントディレクトリーにあるPipfileを読むと思われるが、カレントディレクトリーはDigDagのタスクによって異なるらしい。
そこで、環境変数PIPENV_PIPFILEで、Pipfileの場所を指定する。[2021-10-11]
(が、一度はこれで上手くいったが、二度目以降は成功していないorz 他に何か環境周りの設定が関係しているのだろうか)

参考: hiroyuki-satoさんのdigdag.md


Pythonの引数

Pythonプログラムに渡す引数をdigファイル上に記述できる。

dig:

+test:
  py>: tasks.my_task
  arg1: "abc"
  arg2: 123

py:

def my_task(arg1, arg2):
  print(arg1)
  print(arg2)

変数名だけ合っていれば、digファイル上に引数を書く順序はどうでもいい。


デフォルト引数も対応している。

dig:

+test:
  py>: tasks.my_task
  arg1: "abc"
  arg2: 123

py:

def my_task(arg1, arg2, arg3='zzz'):
  print(arg1)
  print(arg2)
  print(arg3)

環境変数の変更

Pythonプログラムから環境変数に値を書き込むことが出来る。書き込んだ値は、DigDagのワークフローの後続処理で読み込める。
(シェルからもDigDagの環境変数に書き込めたら便利なんだけどなぁ)

py:

import digdag

def my_task():
  digdag.env.store({ '環境変数名': '値' })

digdagというモジュールは、特にインストールや設定をしなくても使用できる。

参考:kaki_kさんのDigdagのPython APIを使う


なお、digファイルの${ }ではJavaScriptの演算を書くことが出来るので、簡単な計算結果を環境変数に入れたいだけなら、そちらを使う方が簡単かも。[2021-08-26]
ifオペレーターの例


Pythonで処理対象候補を出力し、ワークフローでループ処理する例

py:

import digdag

def my_task():
  digdag.env.store({ 'target_list': ['abc', 'def', 'ghi'] })

dig:

+test1:
  # target_listに書き込む
  py>: tasks.my_task

+test2:
  for_each>:
    target: ${target_list}
  _do:
    echo>: ${target}  

my_taskで環境変数target_listにabc,def,ghiという3つの値を(リストとして)書き込んで、for_eachでその値の個数分ループする。


オペレーターへ戻る / DigDagへ戻る / 技術メモへ戻る
メールの送信先:ひしだま