AWSのLambdaからDigDagのワークフローを実行する方法。
|
AWSのLambdaは、イベントによって処理を駆動するサービス。
例えばS3のファイルが更新されたことを契機としてPythonプログラムを実行することが出来る。
これを使って、S3のファイルが更新されたときにDigDagワークフローを実行することが出来る。
AWSのLambdaで、更新を監視する対象のS3ファイル(パス)を指定する。
そのS3ファイルが更新(PUT)される度にLambdaが起動する。
Lambdaには処理(プログラム)を登録することが出来る。
プログラミング言語はいくつかサポートされているが、ここではPythonとする。
Pythonの場合、デフォルトではlambda_handlerという関数が呼ばれるので、その中を実装することになる。
def lambda_handler(event, context):
引数のeventには、Lambdaが起動される契機となったイベントの情報(dict)が渡される。
この中から、更新されたS3ファイルの情報(パス)を取得することも出来る。
import from urllib.parse import urlparse def lambda_handler(event, context): event_bucket_name = event['Records'][0]['s3']['bucket']['name'] event_object_key = urlparse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') event_file = f's3://{event_bucket_name}/{event_object_key}' print(event_file)
Lambdaでは、標準出力に出力した文字列はCloudWatchに出力される。
lambda_handlerの戻り値としてステータスコードやメッセージを返すことが出来る。
def lambda_handler(event, context): return { 'statusCode': 200, 'body': 'JSON String' }
Lambdaではbodyを設定しても無視されるので意味が無いが。
DigDagに対しては、REST APIでアクセスすることが出来る。
DigDagのワークフローを実行するには、実行対象ワークフローの「ワークフローID」を取得する必要がある。
ワークフロー名を渡してワークフローIDを取得できればいいのだが、そういうAPIは無いので、ワークフロー一覧を取得し、その中から対象のワークフローを探し出す。
import json import requests import datetime digdag_address = 'DigDagのIPアドレス:65432'
def __digdag_get(api): url = 'http://' + digdag_address + '/api/' + api res = requests.get(url) print(f'digdag-get {res.status_code} {url}') return res
def __digdag_put(api, params): url = 'http://' + digdag_address + '/api/' + api headers = {'content-type': 'application/json'} res = requests.put(url, headers=headers, data=json.dumps(params)) print(f'digdag-put {res.status_code} {url}') print(res.text) return res
def lambda_handler(event, context): workflow_name = '呼び出すワークフロー名' print(f'workflow_name= {workflow_name}') # ワークフロー一覧取得 res = __digdag_get('workflows') # ワークフローID取得 for wf in res.json()['workflows']: if wf['name'] == workflow_name: workflow_id = wf['id'] break else: print(res.text) raise Exception(f'not found {workflow_name}') print(f'workflow_id= {workflow_id}') # ワークフロー実行 JST = datetime.timezone(datetime.timedelta(hours=+9), 'Asia/Tokyo') session_time = datetime.datetime.now(JST).strftime('%Y-%m-%dT%H:%M:%S+09:00') print(f'session_time= {session_time}') res = __digdag_put('attempts', {'workflowId':workflow_id, 'sessionTime':session_time, 'params':{}}) return { 'statusCode': res.status_code }
なお、同一のワークフローIDに対して同一のsessionTimeで2回以上実行しようとすると、2回目以降はエラーになる(ワークフローが実行されない)。
(たぶんattemptを上手く指定してやる必要があるのだと思う)