S-JIS[2021-10-30] 変更履歴

AWS LambdaからのDigDag呼び出し

AWSのLambdaからDigDagのワークフローを実行する方法。


概要

AWSのLambdaは、イベントによって処理を駆動するサービス。
例えばS3のファイルが更新されたことを契機としてPythonプログラムを実行することが出来る。

これを使って、S3のファイルが更新されたときにDigDagワークフローを実行することが出来る。


AWS Lambdaの概要

AWSのLambdaで、更新を監視する対象のS3ファイル(パス)を指定する。
そのS3ファイルが更新(PUT)される度にLambdaが起動する。

Lambdaには処理(プログラム)を登録することが出来る。
プログラミング言語はいくつかサポートされているが、ここではPythonとする。
Pythonの場合、デフォルトではlambda_handlerという関数が呼ばれるので、その中を実装することになる。

def lambda_handler(event, context):

引数のeventには、Lambdaが起動される契機となったイベントの情報(dict)が渡される。
この中から、更新されたS3ファイルの情報(パス)を取得することも出来る。

import from urllib.parse import urlparse

def lambda_handler(event, context):
    event_bucket_name = event['Records'][0]['s3']['bucket']['name']
    event_object_key = urlparse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    event_file = f's3://{event_bucket_name}/{event_object_key}'
    print(event_file)

Lambdaでは、標準出力に出力した文字列はCloudWatchに出力される。


lambda_handlerの戻り値としてステータスコードやメッセージを返すことが出来る。

def lambda_handler(event, context):
    return {
        'statusCode': 200,
        'body': 'JSON String'
    }

Lambdaではbodyを設定しても無視されるので意味が無いが。


DigDagワークフロー実行の例

DigDagに対しては、REST APIでアクセスすることが出来る。

DigDagのワークフローを実行するには、実行対象ワークフローの「ワークフローID」を取得する必要がある。
ワークフロー名を渡してワークフローIDを取得できればいいのだが、そういうAPIは無いので、ワークフロー一覧を取得し、その中から対象のワークフローを探し出す。

import json
import requests
import datetime

digdag_address = 'DigDagのIPアドレス:65432'
def __digdag_get(api):
    url = 'http://' + digdag_address + '/api/' + api
    res = requests.get(url)
    print(f'digdag-get {res.status_code} {url}')
    return res
def __digdag_put(api, params):
    url = 'http://' + digdag_address + '/api/' + api
    headers = {'content-type': 'application/json'}
    res = requests.put(url, headers=headers, data=json.dumps(params))
    print(f'digdag-put {res.status_code} {url}')
    print(res.text)
    return res
def lambda_handler(event, context):
    workflow_name = '呼び出すワークフロー名'
    print(f'workflow_name= {workflow_name}')

    # ワークフロー一覧取得
    res = __digdag_get('workflows')

    # ワークフローID取得
    for wf in res.json()['workflows']:
        if wf['name'] == workflow_name:
            workflow_id = wf['id']
            break
    else:
        print(res.text)
        raise Exception(f'not found {workflow_name}')
    print(f'workflow_id= {workflow_id}')

    # ワークフロー実行
    JST = datetime.timezone(datetime.timedelta(hours=+9), 'Asia/Tokyo')
    session_time = datetime.datetime.now(JST).strftime('%Y-%m-%dT%H:%M:%S+09:00')
    print(f'session_time= {session_time}')
    res = __digdag_put('attempts', {'workflowId':workflow_id, 'sessionTime':session_time, 'params':{}})

    return {
        'statusCode': res.status_code
    }

なお、同一のワークフローIDに対して同一のsessionTimeで2回以上実行しようとすると、2回目以降はエラーになる(ワークフローが実行されない)。
(たぶんattemptを上手く指定してやる必要があるのだと思う)


DigDagへ戻る / 技術メモへ戻る
メールの送信先:ひしだま