Speee DEVELOPER BLOG

Speee開発陣による技術情報発信ブログです。 メディア開発・運用、スマートフォンアプリ開発、Webマーケティング、アドテクなどで培った技術ノウハウを発信していきます!

Athenaを使ったデータ処理基盤の設計

こんにちは。UZOUのプロダクト開発をしているエンジニアの@kanga333です。 UZOUでは広告データの集計の一部にAmazon Athenaを採用しています。 この記事ではUZOUにおけるAthenaを使ったデータ処理基盤の設計について紹介したいと思います。

全体構成

データ処理基盤の全体構成は次のようになっています。

f:id:kangaat:20201105155841j:plain

以後はそれぞれのコンポーネントについて順次紹介していきます。

FleuntdによるS3への集約

UZOUでは特にFluentdアグリゲータのような中継サーバは設けていません。広告配信サーバに常駐するFluentdがログを直接S3にプットしています。 以下はFluentdのS3 output部分の設定の一部抜粋です。

<buffer time>
  @type file
  timekey 60m
</buffer>

path example_table/dt=%Y%m%d/hour=%H
hex_random_length 16
s3_object_key_format "%{path}/#{Socket.gethostname}_%{hex_random}.%{file_extension}"
overwrite true
時系列に沿ったデータの格納

Athenaのコスト削減、クエリの実行時間短縮のためにはデータをパーティショニングする必要があります。 パーティショニングをざっくり説明すると、パーティションカラムと呼ばれる特殊なカラムとS3のようなストレージ上の特定のパスを紐付けることで参照するデータの量を絞る機能です。*1 ログのような時系列のデータの場合、日付や時間など時刻を示したパーティションカラムにするのが一般的です。

ログの時刻とパーティションの時刻を一致させるためにはFluentdがログの時刻に沿ったS3のパスにデータを格納する必要があります。 S3 outputのpathで使うtime_slice_formatで表現する時刻はbufferセクションで指定するtimekeyを元に決まります。 timekey60mに設定することでバッファのファイルが時間を跨ぐ時に別ファイルに書き込まれるようになり、ログの時刻のhourとS3のパスのhourの時刻が一致します。

転送するファイルの一意性とリトライ時の冪等性の担保

またデータ転送時にS3に配置するファイル名が衝突して書き込みが失敗したり、あるいはリトライなどによって2重にファイルを書き込んでしまわないように設定する必要があります。 ファイル名を一意にさせる一般的な方法の一つにuuid_flushプレースホルダーを使う方法がありますが、uuid_flushで生成されるuuidはwriteの度に計算されるため、リトライが生じた際にファイルが2重に書き込まれる恐れがあります。 代わりにhex_randomhex_random_length 16に設定してoverwrite trueとあわせて使うことで、バッファされたファイルからS3のファイル名を一意に決めることができ、万一2重書き込みのようなリトライが発生しても、同名ファイルが上書きされるだけで済むようになります。*2

JSONから列指向フォーマットへの変換

Athenaのコスト削減、クエリの実行時間短縮のためにはデータを列指向フォーマットに変換する必要があります。 列指向フォーマットをざっくり説明すると、データを列毎に持つことで圧縮効率を高めたり、特定の列だけを参照する場合において、不要なデータのスキャンをスキップできるような機能を持ったビッグデータの参照処理向けのデータフォーマットです。*3

UZOUでは列指向フォーマットとしてApache Parquetを使用しています。 オリジナルのログはJSON形式であるため、JSONファイルをParquetファイルに変換する必要があります。 データの変換にはAthenaを使っています。 JSONファイルを読み書きする定義をしたテーブルとParquetファイルを読み書きする定義をしたテーブルを同じカラムで用意して、JSONテーブルからParquetテーブルへSELECT INSERTすることでデータを変換しています。

AirflowとAthenaによる変換バッチ

UZOUではワークフロー管理ツールにApache Airflowを採用しています。 AirflowにはAthena OperatorというAthenaにクエリする機能があります。変換バッチではAthena Operatorを使ってAthenaにクエリを発行しています。 変換バッチにおいても2重書き込みを防止、つまり冪等性を担保することが重要です。変換バッチのワークフローを簡易化すると

  • S3のDeleteObjectで書き込み先のパーティションパスにあるファイルをすべて削除する
  • AthenaのSELECT INSERTでJSONデータをParquetに変換する

という2つのステップに分かれます。

データの変換は1時間毎に区切られたパーティション単位で行っています。ワークフローでは変換クエリを発行する前に書き込み先のS3のパスにあるParquetファイルを消す処理を入れています。 これはINSERT INTOでデータを変換する都合上、ワークフローが2回実行された際にParquetファイルが2重に作成されてしまうことを避けるためです。 こうすることでワークフロー全体を再実行した時に冪等性を担保するような設計にしています。

2重書き込みをスキップするINSERTクエリ

しかし、これだけでは冪等性の担保としては不十分です。 一時的なエラーなどに起因してAthena Operatorのタスク単体でリトライが動いた場合、タイミングによっては変換クエリが2重に発行されてしまう可能性があります。 この問題を避けるため変換クエリにも2重書き込みを避ける工夫をしています。 以下に変換クエリのサンプルを挙げます。(dtとhourはパーティションカラムです)

INSERT INTO parquet_table
WITH check_empty AS (
  SELECT
    count(*) = 0
  FROM
   parquet_table
  WHERE
    dt = '20200101'
    AND hour = '01'
)
SELECT
  *
FROM json_table
WHERE
  dt = '20200101'
  AND hour = '01'
  AND (SELECT * FROM check_empty )

このクエリではSELECT INSERTに合わせてcheck_emptyと名の付いてるWITH句の内部で書き込み先のデータが0件であることを確認しており、それをSELECT INSERTのクエリのWHERE句の条件に使っています。 こうすることで書き込み先に何らかデータがあった場合にはINSERTをスキップするようなことを実現しています。

これら2つの方法を組み合わせることで以下のような要件を実現しています。

  • データを作り直したい場合はワークフロー全体を再実行して0からデータの作り直しができる
  • 一時的なネットワークエラーなどはリトライで救える
  • リトライによって変換クエリが2回実行されても2回目のINSERTはスキップされエラーにならない
補足: 他のAWSのサービスでParquetに変換する方法

Athena以外にも各種AWSサービスを使ってデータをParquetに変換できます。 ここでは他の方法と採用しなかった理由について軽く挙げます。

  • Kinesis Data Firehose
    • ストリームデータをリアルタイムにParquetに変換してS3に配置できる
    • 当初一番有力視していた
    • が結論パーティショニングが厳しくて見送り
    • ログの時刻に基づいてS3のパスを決めたいのだがKinesisはKinesisに到着した時刻に基づいてS3のパスを決めてしまう
    • これではログの時刻とパーティションの位置がずれるデータが出てしまうので不採用
  • EMR
    • HadoopなのでHiveでもSparkでもなんでも使って変換できる
    • ある程度クラスタの面倒を見る必要があり管理コスト高いので見送り
  • Glueジョブ
    • フルマネージドSparkとして使えるのでSparkのジョブとして変換ジョブを実行できる
    • 設計当初はGlue 2.0が出る前だった
    • Glue 1.0は1回ジョブを起動すると最低でも10分の時間に対する請求が発生するのでコスト高い
    • Glue2.0になってからはミニマム1分になったので今なら採用の余地はあるかも

各種方法に色々とメリデメはあるのですが、現時点ではAthenaのクエリによる変換が一番お手軽な方法だと思ってます。

S3のストレージクラス設計

コストパフォーマンス良くS3にデータを格納するにはデータの種類に応じて適切なタイミングで適切なストレージクラスに格納されるようにライフサイクルを設計する必要があります。 S3にはライフサイクルポリシーの機能があり、格納したデータに対して任意の日数が経ったタイミングで別のストレージクラスに移すように設定できます。 このセクションではAthenaで使うデータであることを念頭にどのようなストレージクラスを選んだかということを記述します。

メインのデータはIntelligent-Tireingを使う

Athenaで使うデータは基本的にはIntelligent-Tireingのストレージクラスを使っています。 Intelligent-Tireingはアクセスが30日以上無いデータを自動的にコストの安い低頻度のアクセス階層に移してくれるストレージクラスです。 時系列データを保管する場合、日数が経つほど過去データの参照頻度は減っていきます。 そうしたデータが自動でコストの安い階層に移ることで全体の保管コストを減らしています。

Intelligent-Tireingは低頻度のアクセス階層だろうとS3標準のストレージクラスと同等の読み取りパフォーマンスなため*4 *5、ほぼデメリットなし*6でコスト削減の恩恵を受けられます。

基本的にはアクセスしないが残しておきたいデータはGlacierを使う

例えばParquetに変換した後のJSONデータなどは基本的には不要になります。 また、Parquetに変換した後のデータでも年数が経つにつれて過去のデータは参照しなくなり不要になっていきます。

そうした殆ど不要になったデータはGlacierにアーカイブしています。 Glacierにデータを移行した場合、データを再度読めるようにする場合は取り出しリクエストを発行する必要があり、時間やコストがかかるため通常の運用ではほぼ不要だけど万一に備えて残しておきたいと言えるデータなどを対象としています。

ちなみにAthenaのテーブル定義で参照しているS3バケットにGlacierでアーカイブしたデータがあった場合ですが、エラーにならずにスキャンがスキップされます。 ウッカリAthenaがクエリして取り出し料金がかかるとかクエリがエラーにみたいなことは無いので安心です。*7

なお一度GlacierにしたデータをAthenaで再び読めるようにするには

  1. Glacierのデータに取り出しリクエストを出す
  2. 取り出し可能になったGlacierのデータをコピーして別のストレージクラスとして置き直す

というめんどくさいオペレーションが必要になるのでアーカイブする対象には注意が必要です。

Glueデータカタログによるテーブル管理

Athenaのテーブル定義はAWS Glueのデータカタログという機能で管理されています。 UZOUではGlueデータカタログをTerraformを使ってコード化することでAthenaのテーブル定義を管理しています。 これは直接DDLをAthenaに発行する場合に比べて次のような利点があります。

  • テーブルの作成やカラムの変更などの全ての操作を宣言的に行うことができる
    • DDLだとCREATE TABLEした後にカラムを変更しようとした場合でALTER TABLEなどの手続き的な変更が必要となる
    • DROP TABLEした後にCREATE TABLEでテーブル定義を作り直す方法なら宣言的な管理ができるが一時的にテーブルが使えなくなるダウンタイムが発生する
  • TerraformなのでCI/CDに乗せやすい
    • Plan結果のGithubへの通知など他のインフラ管理の方法と同じフローでテーブル定義を管理できる

Terraformのコード例

Terraformではaws_glue_catalog_table*8というリソースを使ってテーブルを定義できます。 しかしGlueのテーブルカタログはお決まりで入れないといけないパラメータが多いため、JSON用、Parquet用などファイルフォーマット別にmodule化して使っています。

JSON用のAthenaテーブルを定義するmoduleのコード例です。

resource "aws_glue_catalog_table" "json" {
  name          = var.name
  database_name = var.database_name

  table_type = "EXTERNAL_TABLE"
  parameters = merge({
    EXTERNAL = "TRUE"
  }, var.projections)

  dynamic "partition_keys" {
    for_each = var.partition_keys
    content {
      name = partition_keys.value.name
      type = partition_keys.value.type
    }
  }

  storage_descriptor {
    location      = var.location
    input_format  = "org.apache.hadoop.mapred.TextInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"

    ser_de_info {
      name                  = "json"
      serialization_library = "org.openx.data.jsonserde.JsonSerDe"
      parameters = {
        "serialization.format" = 1
      }
    }

    dynamic "columns" {
      for_each = var.columns
      content {
        name = columns.value.name
        type = columns.value.type
      }
    }
  }
}

variable.tfはこんな感じです。

variable "name" {
  type = string
}

variable "database_name" {
  type = string
}

variable "location" {
  type = string
}

variable "partition_keys" {
  type = list(object({
    name = string
    type = string
  }))
}

variable "columns" {
  type = list(object({
    name = string
    type = string
  }))
}

variable "projections" {
  type    = map(string)
  default = {}
}

input_format, output_format, serialization_libraryなどはファイルのフォーマット毎に決まった値です。 module化することで利用者はテーブル名、カラム名やその型、S3での格納場所など必要な情報をのみを記述するだけでテーブルを作れるようにしています。

moduleを利用するテーブル定義本体のコード例です。

locals {
  example_log_columns = [
    {
      name = "url"
      type = "string"
    },
    {
      name = "response_time"
      type = "bigint"
    }
  ]
}

module "example_log_json" {
  source = "../json_table"

  name           = "example_log_json"
  database_name  = "test_db"
  location       = "s3://test-bucket/example_table/json"
  partition_keys = local.hourly_partitions
  columns        = local.example_log_columns
  projections    = local.hourly_projections
}

module "example_log_parquet" {
  source = "../parquet_table"

  name           = "example_log_parquet"
  database_name  = "test_db"
  location       = "s3://test-bucket/example_table/parquet"
  partition_keys = local.hourly_partitions
  columns        = local.example_log_columns
  projections    = local.hourly_projections
}

module化の恩恵のひとつとして同じデータだけどJSONデータ用のテーブルとParquetデータ用のテーブルなど、テーブルを2つ用意しないといけないようなケースでカラムの定義の記述を共通化できています。 これによってコピペミスなどによりJSONテーブルとParquetテーブルでカラムに不一致が出てしまう、といったことを回避できます。 ちなみカラムの変数をカラム名と型のMapにせずに配列にしている理由はカラムの順序を保つためです。配列にしておけばテーブル定義のカラムの順番も保持されます。

パーティションの管理にはパーティション射影を使う

Athenaの比較的新しい機能にパーティション射影があります。 パーティション射影はパーティションカラムからS3上のパスを自動で計算してくれる機能です。 従来パーティションを管理しようとした場合、ADD PARTITONなどのクエリを発行して、都度パーティションカラムとS3のパスを紐付けるバッチを運用する必要があったのですが、パーティション射影を使うことで自動でパスを計算してくれるようになり、パーティション管理の運用を撤廃できます。

UZOUでは時系列のログはパーティション構造を統一しているため、パーティション射影の設定も同様に統一しています。その設定を定義しているのが上述のコード例で出てきたlocal.hourly_partitionslocal.hourly_projectionsの変数です。これらは他のテーブルで使う都合上、別のファイルに分けて定義していて、以下のような設定をしています。

locals {
  hourly_partitions = [
    {
      name = "dt"
      type = "string"
    },
    {
      name = "hour"
      type = "string"
    }
  ]
  hourly_projections = {
    "projection.dt.type"     = "date",
    "projection.dt.range"    = "NOW-3YEARS,NOW",
    "projection.dt.format"   = "yyyyMMdd",
    "projection.hour.type"   = "integer",
    "projection.hour.range"  = "0,23",
    "projection.hour.digits" = "2",
    "projection.enabled"     = "true"
  }
}

この例ではyyyyMMdd形式のdtというパーティションとHH形式のhourというパーティションを設定していて、それぞれの範囲を現時点から3年前と00~23までとしています。これは、たとえば今が2020年の1月1日だとすると

s3://test-bucket/example_table/json/dt=20200101/hour=23/...
s3://test-bucket/example_table/json/dt=20200101/hour=22/...
.
.
.
s3://test-bucket/example_table/json/dt=20170101/hour=00/...

までのパーティションを自動認識してくれる設定になり、認識範囲も現在時刻に沿って柔軟に管理できるのでとても便利です。

Workspaceによるクエリの記録の分離や制限の分離

AthenaにはWorkspaceという機能があり、クエリ実行時のモニタリング設定などを分離できます。 Workspaceを分けるとクエリログを記録するバケットやクエリ実行時に計測されるCloudWatchメトリクスを分けられます。 またWorkspaceには1回のクエリにおけるデータスキャン量の上限を設定できます。

UZOUでは、例えばバッチによるクエリ実行とアドホックによるクエリ実行はWorkspaceを別々にしています。 その上でアドホッククエリで使うWorkspaceにはスキャン上限を設定しており、うっかりパーティション指定の無いクエリが流れて金額がかさむといったことが起こることを防いでいます。

また、CloudWatchMetricsのWorkspace別のクエリのデータスキャン量をDatadog上のダッシュボードに表示して朝会で確認して想定外に高額なクエリが流れていないか日々ウォッチしています。

f:id:kangaat:20201105140014p:plain

まとめ

UZOUでのAthenaを使ったデータ処理基盤とその設計の紹介でした。 分量が多くなったので今回は省いた細かいTipsはまだまだあるので、また機会があればお伝えできればなと思います。

また、このデータ処理基盤で今後やっていきたいことはまだまだあります。

  • Athena Federated QueryがGAになったら導入してAurora MySQLとシームレスにつなぐ
  • AthenaのUDFがGAされたら(ry
  • LakeFormationによるデータ参照権限の統制
  • よりリアルタイムにデータを参照するためにダイレクトにParquet形式でS3にPutするための取り組み

こちらも実現できたらまたブログを書いてお伝えできればと思います。