AWS Glueの再処理防止用機能である、Job Bookmarkを理解していきます。
基本的には以下のドキュメントに書かれているんですが、個人的に非常にわかりづらい機能なので、自分でまとめ直します。 https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html
Job Bookmarkとは何か?
Job Bookmarkとは、Glue Job実行時に、既に処理したことがあるデータはスキップする機能です。
具体例
具体的には以下のことを行いたい時に利用する機能です。
- AというテーブルとBというテーブルが存在する
- X時の時点でJobを実行し、Aテーブルを元にBというテーブルを更新する
- Y時にAテーブルにデータが追記される
- Z時の時点でJobを実行し、Bというテーブルを更新する
この4ステップが完了後、Bテーブルがどのような状態になっていて欲しいかを考えると、それは当然「重複データを含まない完全な状態」になっていて欲しいですよね。
実現方法
これを実現するには、おそらく以下の2パターン実装方法が考えられます。
- Bテーブル更新時に、重複データを削除する
- 3で追加されたAテーブル更新データだけ利用する
AWS Glueのアーキテクチャ的に、アウトプット先はS3であり、その実態はただのファイルです。mysqlなどのように気軽にupdate/deleteは行えません。1のパターンの場合、先にファイルを全部削除して、全て上書きする、といった手順を踏む必要があります。
削除せずに、2のパターンをできれば行いたいものです。そして、そこで利用するのがJob Bookmark機能です。
Job Bookmark機能では、実行時に既に処理に利用したデータをスキップすることができます。 既に利用したデータをブックマーク保存しておく、と言った考え方です。これを利用することで、「Aテーブルの更新データ」だけ利用することができます。
Job Bookmarkが利用されるルール
Job Bookmarkが利用される絶対的なルールとして、以下が存在します。
- JobのBookmark機能がEnableになっている
- transformation_ctxが付与されている
- Job.initが実行されている
- Job.commitが実行されている
1については言わずもがなですね。設定から変更ができます。 2については、各テーブルを呼び出す時に利用するGlueContextのメソッドに、transformation_ctxを付与することで、Job Bookmarkが利用されます。これはtransformation_ctxごとにJob Bookmarkが保存される仕様になっています。つまりtransformation_ctxはJobごとにユニークである必要があります。
3と4については、セットになっています。Job.initが実行されないと、保存されているJob Bookmarkを呼び出すことができません。またJob.initだけ実行され、何らかの理由によりcommitが実行されなかった場合は、そのJobで処理したデータは未処理状態ということになります。commitでBookmarkを保存する、という構造になっているからです。 これは処理の途中でエラーなどが起きた時にありがちです。
また注意点として、Job Bookmarkは読み込みだけBookmarkされます。つまり書き込みは重複データがあろうと、スキップはされずに、重複データが書き込まれるということです。
そしてどの経由によってテーブルが読み込まれるかによって、いくつか分岐される仕様が存在します。以下の2つに分類されます。
- JDBCソース経由での読み込み
- その他での読み込み
JDBCソース経由での読み込み
JDBCソース経由とは、RDSなどから直接データを読み込む形式のことです。
そのJDBCソース経由での読み込み時には、ブックマークキーが利用されます。このブックマークキーは、テーブルのカラム名で指定することができます。指定されたカラムは連続して増減、もしくは一定間隔で増減するIDのようなものでなくてはなりません。
JDBCソース経由では、このブックマークキーを利用して、どこまでデータを処理したのかを記憶しています。
実装
具体的な実装としては、以下のようにGlueContext.create_dynamic_frame.from_catalog実行時のadditional_optionsにjobBookmarkKeysを指定することで、利用可能です。
注意点として、ここでいうsourceはJDBCソースである必要があります。
self.glue_context.create_dynamic_frame.from_catalog( database=self.source.database, table_name=self.source.table_name, additional_options={ "jobBookmarkKeys": ["id"] }, transformation_ctx="datasource0_" + self.target.table_name )
その他での読み込み
その他での読み込みは、JDBCソース経由ではない読み込み方法のことです。通常のGlue DatabaseとTableと考えれば問題ないです。例えばS3にデータ置いてある場合などがこれに該当します。
この場合は、データの最終更新日時がキーとして利用されます。S3の場合は、S3に最終更新されたファイルの日時を利用して、既にJobで利用したデータかどうかを判断します。なので、JDBCソースとは違い、jobBookmarkKeysの指定は必要ありません。
例えば、最初のJobの実行が13:00で、2回目のJobの実行が14:00だとすると、読み込まれるデータは13:00 ~ 14:00の間に更新されたファイルのみ、となります。
またこの場合でも、transformation_ctxを指定しないとJob Bookmarkは機能しません。
Job Bookmarkのリセット方法
Job Bookmarkのリセットは、AWS CLI、もしくはWebから行えます。
aws glue reset-job-bookmark --job-name my-job-name
もしもJobを最初から実行したくなった場合に利用します。
さいごに
AWS GlueのJob Bookmarkは非常に便利ですが、使い方を間違えると正しくないデータが蓄積されてしまいます。必ず理解して利用するようにしたいですね。
自分もつい最近、なぜかデータがおかしいテーブルがあったんですが、Job BookmarkがDisableになっており、Job Bookmarkが機能していなかったというポンコツをやらかしました。結果的には、データが実行ごとに重複して保存されていました。反省です。