AWS Glue job与bookmark集成
在AWS Glue job中,当利用其提供的glue session 读取其下的catalog中的database和table的时候。提供了一个特殊能力,其可以利用bookmark进行读取进度的保存,从而方便下次读取的时候,只获取上次读取完成后新进入的数据。支持的格式多样,这里主要测试了CSV以及Paruqet的格式。
官方文档为:
https://docs.aws.amazon.com/zh_cn/glue/latest/dg/monitor-continuations.html
那么本文
- 首先准备了Glue job,并且没有利用明文来记录access key和secret key id,利用IAM来进行测试读取本账号的Table
首先测试了直接设置glue job的bookmark 状态为enable。
直接测试发现并没有保存数据的读取状态。在尝试设置了transformation_ctx参数后可以进行保存了
Csv | df = glueContext.create_dynamic_frame.from_options(connection_type=”s3″, connection_options={“paths”: [s3_path]}, format=”parquet”, format_options={“withHeader”:”true”, “escaper”:'”‘,”quoteChar”:”\””},transformation_ctx=args[‘JOB_NAME’]) |
parquet | df = glueContext.create_dynamic_frame.from_options(connection_type=”s3″, connection_options={“paths”: [s3_path]}, format=”parquet”,transformation_ctx=args[‘JOB_NAME’]) |
- 其次测试了跨账号读取
这里准备了其他账号的accessKey进行测试
测试得到,可以跨账号读取
再次测试是否可以支持bookmark
在第一次读取后,再次运行第二次任务,得到结果是进行了事务性记录
并没有重复执行,按理bookmark可行
- 最后我们测试通过EventBridge来监听Glue job的状态,从而配合bookmark在下游保证幂等
需要是在EventBridge中创建了一个规则,选择Glue Job State Change,实践格式基本如下
{
“version”: “0”, “id”: “abcdef00-1234-5678-9abc-def012345678”, “detail-type”: “Glue Job State Change”, “source”: “aws.glue”, “account”: “123456789012”, “time”: “2017-09-07T18:57:21Z”, “region”: “us-east-1”, “resources”: [], “detail”: { “jobName”: “MyJob”, “severity”: “INFO”, “state”: “SUCCEEDED”, “jobRunId”: “jr_abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789”, “message”: “Job run succeeded” } } |
这里我需要监听的是detail中的state,
所以创建了事件模式为
{
“source”: [“aws.glue”], “detail-type”: [“Glue Job State Change”], “detail”: { “state”: [“FAILED”, “TIMEOUT”] } } |
选择目标为SNS,并创建对应的接收SNS
在测试环境,修改代码让Glue job陷入一个固定失败的状态
最后得到邮件
邮件中可以看到错误信息,jobname等关键信息
同理AWS Glue Workflow支持EventBridge进行触发,触发文档可以参考如下
https://docs.aws.amazon.com/zh_cn/glue/latest/dg/starting-workflow-eventbridge.html
当然这是最为标准的,如果希望只针对特定的glue job,要么是持续不断的更新规则
或者就是在监听策略中,利用前缀匹配或者后缀匹配来做,同时确保动态生成的job名称携带对应的前缀