へんてこのブログ

日々気づいたことや、最近やっていることを書いています

AWS GlueのJob Bookmark機能を理解する

AWS Glueの再処理防止用機能である、Job Bookmarkを理解していきます。

基本的には以下のドキュメントに書かれているんですが、個人的に非常にわかりづらい機能なので、自分でまとめ直します。 https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html

Job Bookmarkとは何か?

Job Bookmarkとは、Glue Job実行時に、既に処理したことがあるデータはスキップする機能です。

具体例

具体的には以下のことを行いたい時に利用する機能です。

  1. AというテーブルとBというテーブルが存在する
  2. X時の時点でJobを実行し、Aテーブルを元にBというテーブルを更新する
  3. Y時にAテーブルにデータが追記される
  4. Z時の時点でJobを実行し、Bというテーブルを更新する

この4ステップが完了後、Bテーブルがどのような状態になっていて欲しいかを考えると、それは当然「重複データを含まない完全な状態」になっていて欲しいですよね。

実現方法

これを実現するには、おそらく以下の2パターン実装方法が考えられます。

  1. Bテーブル更新時に、重複データを削除する
  2. 3で追加されたAテーブル更新データだけ利用する

AWS Glueのアーキテクチャ的に、アウトプット先はS3であり、その実態はただのファイルです。mysqlなどのように気軽にupdate/deleteは行えません。1のパターンの場合、先にファイルを全部削除して、全て上書きする、といった手順を踏む必要があります。

削除せずに、2のパターンをできれば行いたいものです。そして、そこで利用するのがJob Bookmark機能です。

Job Bookmark機能では、実行時に既に処理に利用したデータをスキップすることができます。 既に利用したデータをブックマーク保存しておく、と言った考え方です。これを利用することで、「Aテーブルの更新データ」だけ利用することができます。

Job Bookmarkが利用されるルール

Job Bookmarkが利用される絶対的なルールとして、以下が存在します。

  1. JobのBookmark機能がEnableになっている
  2. transformation_ctxが付与されている
  3. Job.initが実行されている
  4. Job.commitが実行されている

1については言わずもがなですね。設定から変更ができます。 2については、各テーブルを呼び出す時に利用するGlueContextのメソッドに、transformation_ctxを付与することで、Job Bookmarkが利用されます。これはtransformation_ctxごとにJob Bookmarkが保存される仕様になっています。つまりtransformation_ctxはJobごとにユニークである必要があります。

3と4については、セットになっています。Job.initが実行されないと、保存されているJob Bookmarkを呼び出すことができません。またJob.initだけ実行され、何らかの理由によりcommitが実行されなかった場合は、そのJobで処理したデータは未処理状態ということになります。commitでBookmarkを保存する、という構造になっているからです。 これは処理の途中でエラーなどが起きた時にありがちです。

また注意点として、Job Bookmarkは読み込みだけBookmarkされます。つまり書き込みは重複データがあろうと、スキップはされずに、重複データが書き込まれるということです。

そしてどの経由によってテーブルが読み込まれるかによって、いくつか分岐される仕様が存在します。以下の2つに分類されます。

  1. JDBCソース経由での読み込み
  2. その他での読み込み

JDBCソース経由での読み込み

JDBCソース経由とは、RDSなどから直接データを読み込む形式のことです。

そのJDBCソース経由での読み込み時には、ブックマークキーが利用されます。このブックマークキーは、テーブルのカラム名で指定することができます。指定されたカラムは連続して増減、もしくは一定間隔で増減するIDのようなものでなくてはなりません。

JDBCソース経由では、このブックマークキーを利用して、どこまでデータを処理したのかを記憶しています。

実装

具体的な実装としては、以下のようにGlueContext.create_dynamic_frame.from_catalog実行時のadditional_optionsにjobBookmarkKeysを指定することで、利用可能です。

注意点として、ここでいうsourceはJDBCソースである必要があります。

self.glue_context.create_dynamic_frame.from_catalog(
  database=self.source.database,
  table_name=self.source.table_name,
  additional_options={
    "jobBookmarkKeys": ["id"]
  },
  transformation_ctx="datasource0_" + self.target.table_name
)

その他での読み込み

その他での読み込みは、JDBCソース経由ではない読み込み方法のことです。通常のGlue DatabaseとTableと考えれば問題ないです。例えばS3にデータ置いてある場合などがこれに該当します。

この場合は、データの最終更新日時がキーとして利用されます。S3の場合は、S3に最終更新されたファイルの日時を利用して、既にJobで利用したデータかどうかを判断します。なので、JDBCソースとは違い、jobBookmarkKeysの指定は必要ありません。

例えば、最初のJobの実行が13:00で、2回目のJobの実行が14:00だとすると、読み込まれるデータは13:00 ~ 14:00の間に更新されたファイルのみ、となります。

またこの場合でも、transformation_ctxを指定しないとJob Bookmarkは機能しません。

Job Bookmarkのリセット方法

Job Bookmarkのリセットは、AWS CLI、もしくはWebから行えます。

リセットボタン

aws glue reset-job-bookmark --job-name my-job-name

もしもJobを最初から実行したくなった場合に利用します。

さいごに

AWS GlueのJob Bookmarkは非常に便利ですが、使い方を間違えると正しくないデータが蓄積されてしまいます。必ず理解して利用するようにしたいですね。

自分もつい最近、なぜかデータがおかしいテーブルがあったんですが、Job BookmarkがDisableになっており、Job Bookmarkが機能していなかったというポンコツをやらかしました。結果的には、データが実行ごとに重複して保存されていました。反省です。