AWSのLambdaからDigDagのワークフローを実行する方法。
|
AWSのLambdaは、イベントによって処理を駆動するサービス。
例えばS3のファイルが更新されたことを契機としてPythonプログラムを実行することが出来る。
これを使って、S3のファイルが更新されたときにDigDagワークフローを実行することが出来る。
AWSのLambdaで、更新を監視する対象のS3ファイル(パス)を指定する。
そのS3ファイルが更新(PUT)される度にLambdaが起動する。
Lambdaには処理(プログラム)を登録することが出来る。
プログラミング言語はいくつかサポートされているが、ここではPythonとする。
Pythonの場合、デフォルトではlambda_handlerという関数が呼ばれるので、その中を実装することになる。
def lambda_handler(event, context):
引数のeventには、Lambdaが起動される契機となったイベントの情報(dict)が渡される。
この中から、更新されたS3ファイルの情報(パス)を取得することも出来る。
import from urllib.parse import urlparse
def lambda_handler(event, context):
event_bucket_name = event['Records'][0]['s3']['bucket']['name']
event_object_key = urlparse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
event_file = f's3://{event_bucket_name}/{event_object_key}'
print(event_file)
Lambdaでは、標準出力に出力した文字列はCloudWatchに出力される。
lambda_handlerの戻り値としてステータスコードやメッセージを返すことが出来る。
def lambda_handler(event, context):
return {
'statusCode': 200,
'body': 'JSON String'
}
Lambdaではbodyを設定しても無視されるので意味が無いが。
DigDagに対しては、REST APIでアクセスすることが出来る。
DigDagのワークフローを実行するには、実行対象ワークフローの「ワークフローID」を取得する必要がある。
ワークフロー名を渡してワークフローIDを取得できればいいのだが、そういうAPIは無いので、ワークフロー一覧を取得し、その中から対象のワークフローを探し出す。
import json import requests import datetime digdag_address = 'DigDagのIPアドレス:65432'
def __digdag_get(api):
url = 'http://' + digdag_address + '/api/' + api
res = requests.get(url)
print(f'digdag-get {res.status_code} {url}')
return res
def __digdag_put(api, params):
url = 'http://' + digdag_address + '/api/' + api
headers = {'content-type': 'application/json'}
res = requests.put(url, headers=headers, data=json.dumps(params))
print(f'digdag-put {res.status_code} {url}')
print(res.text)
return res
def lambda_handler(event, context):
workflow_name = '呼び出すワークフロー名'
print(f'workflow_name= {workflow_name}')
# ワークフロー一覧取得
res = __digdag_get('workflows')
# ワークフローID取得
for wf in res.json()['workflows']:
if wf['name'] == workflow_name:
workflow_id = wf['id']
break
else:
print(res.text)
raise Exception(f'not found {workflow_name}')
print(f'workflow_id= {workflow_id}')
# ワークフロー実行
JST = datetime.timezone(datetime.timedelta(hours=+9), 'Asia/Tokyo')
session_time = datetime.datetime.now(JST).strftime('%Y-%m-%dT%H:%M:%S+09:00')
print(f'session_time= {session_time}')
res = __digdag_put('attempts', {'workflowId':workflow_id, 'sessionTime':session_time, 'params':{}})
return {
'statusCode': res.status_code
}
なお、同一のワークフローIDに対して同一のsessionTimeで2回以上実行しようとすると、2回目以降はエラーになる(ワークフローが実行されない)。
(たぶんattemptを上手く指定してやる必要があるのだと思う)