Airflowによるデータパイプラインのスケジュールとモニタリング

こんにちは!WEBマーケティング事業部エンジニアの@hatappiです。

現在私のプロジェクトでデータパイプラインのスケジュールとモニタリングをAirflowというオープンソースのツールを使用して運用しています。 今回は導入や使い勝手の話です。

Airflowとは?

airbnbがオープンソースで開発したワークフロー管理プラットフォームです。

※ 現在はApache Incubatorのオープンソースプロジェクトになっています。

Airflowはバッチ同士の関係を管理し、可視化することが出来ます。
また、実行した際のLogや実行時間の推移など、様々なデータの閲覧をデフォルトの機能として使用することが出来ます。

データパイプラインとは?

プロジェクトを運用していると、複数のテーブルにデータが入っており、それらのサマリーを作成するといった経験をしたことはありませんか?

これらの一連のプロセスのことをデータパイプラインといいます。

例えば、私のプロジェクトではMySQLやEmbulkを用いてRedshiftにバルクインサートを行い、バッチ処理をして最終的なデータにするデータパイプラインがあります。

スケジュールについて

特定のタスクを「何月何日の何時に実行したい」「定期的に実行したい」といった要望がある場合、真っ先に思いつくのは cronではないでしょうか。

cronを使用する場合

  • 複数のスケジュールが追加された際に管理がし辛い。
  • ログの出力設定をしなければログが出力されず、問題が起きた際の原因特定が難しい。
  • 動いているタスクの進捗を確認するためには、ログの確認やタスクの中に進捗を通知するような仕組みを作らなければならない

といった点に注意しなければなりません。

弊社では昔は定期実行処理をJenkinsで行っていました。記事はこちら
しかし今回はスケジューリングとモニタリングをしたいだけで、Jenkinsほど多機能なツールは必要なかったため、スケジュールとモニタリングに特化したAirflowを使うことにしました。

インストールについて

pipを使ってAirflow本体をインストールします。

$ pip install airflow

Airflowは本体とは別にサブパッケージが用意されており、MySQLやHive、S3やSambaなど様々なパッケージが提供されておりそれらを使用することでデータ連携を行うことが出来ます。

今回はAirflow上のデータストアとしてMySQL、ジョブキューとしてCeleryを使用します。

これらのサブパッケージも本体と同様にpipを使ってインストールします。

$ pip install airflow[mysql]
$ pip install airflow[celery]

Airflowでは、データストアをどこに置くか、ジョブキューは何を使うかなどをすべて設定ファイルで管理します。 今回はデータストアとジョブキュー部分を記載しています。

他にも設定できる項目はあるので詳しくは公式ドキュメントを見てください。

executor = CeleryExecutor
sql_alchemy_conn = mysql://[user_name]:[password]@[host]:[port]/[db_name]
broker_url = sqla+mysql://[user_name]:[password]@[host]:[port]/[db_name]
celery_result_backend = db+mysql://[user_name]:[password]@[host]:[port]/[db_name]

これで一通りの設定が完了しました。

下記コマンドを実行するとブラウザから管理画面を確認できるようになります。

# データベースに必要なテーブルを追加する
$ airflow initdb

# port8080でwebサーバーを立ち上げる
$ airflow webserver -p 8080

Airflowの構成

AirflowではDAGとよばれる複数のタスクの集まりを1つのスケジュールとして扱い管理をしています。 DAGの詳細についてはwikipediaをご覧ください

定義されたDAGは下記の3つの役割をもったプロセスを立ち上げることで運用をはじめることが出来ます。 ここではそれぞれの役割を簡単に記載します。

DAGの書き方

次に1つ簡単なDAGを定義してみます。

検証環境は下記を使用しました

Vagrant
  - CentOS 7.1
  - Python v3.5.1
  - Airflow v1.7.1.2

今回は例として毎日16:00になるとdateコマンドを実行するDAGを定義します。

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'vagrant',
    'start_date': datetime(2016, 6, 12, 15, 00, 00)
}

dag = DAG('hello', default_args=default_args, schedule_interval='00 16 * * *')

task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

task2 = BashOperator(
    task_id='print_hello-world',
    bash_command='echo "hello world!!"',
    dag=dag)

task1.set_downstream(task2)

プログラム内にstart_dateがありますが、ここではいつからタスクを開始するかを指定します。 ここで気をつけなければならないのが、datetime.now()などと指定しないことです。 指定を間違えてしまうと延々とタスクが走り続けてしまいます。 他のオプションについては公式のこちらを参照してください

default_args = {
    'owner': 'vagrant',
    'start_date': datetime(2016, 6, 13, 15, 00, 00)
}

下記でDAGインスタンスを定義します。

schedule_intervalにはスケジュールを記載します。

例では馴染みのあるcronの記法で指定しています。

dag = DAG('hello', default_args=default_args, schedule_interval='00 16 * * *')

下記のように記載することで task1の後にtask2を実行するというDAGを定義できます。

task1.set_downstream(task2)

Pool

運用していると出てくるのが、複数のタスクを並列して走らせる要望です。

AirflowではPoolという機能があり、こちらを設定することで実現することが出来ます。

まず並列化に使うDAGを定義します。

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'vagrant',
    'start_date': datetime(2016, 6, 12, 15, 00, 00)
}

dag = DAG('hello', default_args=default_args, schedule_interval='00 16 * * *')

task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

task2 = BashOperator(
    task_id='print_hello-world',
    bash_command='echo "hello world!!"',
    dag=dag)

task3 = BashOperator(
    task_id='print_hello-world',
    bash_command='echo "hello japan!!"',
    dag=dag)
    
task4 = BashOperator(
    task_id='end_task',
    bash_command='echo "end!!"',
    dag=dag)

task1.set_downstream(task2)
task1.set_downstream(task3)

task2.set_downstream(task4)
task3.set_downstream(task4)

今回はこのprint_hello-worldprint_hello-japanを並列実行させたいと思います。 管理画面上からAdmin -> Poolと遷移します。(uri: /admin/pool/)

こちらでPool名と並列実行数であるslotsを定義します。

先ほど定義したPool名を使用してDAGを編集します。

これでslotsで定義した数だけ並列実行が出来るようになります。

task2 = BashOperator(
    task_id='print_hello-world',
    bash_command='echo "hello world!!"',
    pool='hello_pool',
    dag=dag)

task3 = BashOperator(
    task_id='print_hello-world',
    bash_command='echo "hello japan!!"',
    pool='hello_pool',
    dag=dag)

Slackとの連携について

タスクの実行結果を把握しやすくするために、Slack連携を行います。

airflowにはサブモジュールでairflow[slack]がありますが、これはタスクの一つとして実行されるものです。

今回使用したいのは、それぞれのタスクが完了もしくは失敗した際にslackに通知するものです。
これが実現できると随時DAGの進捗を受け取ることが出来るので、管理画面を見に行く必要がなくなります。

これを実現するのがon_success_callbackon_failure_callbackです。
これらのコールバックにSlack通知を送る関数をセットすることで、通知を行うことができます。

実装にはSlackのInconming webhookを使うため、こちらでhookのurlを取得します。
PythonにはSlackのwebhookを便利に実行ができるslackwebというライブラリがあるので、pip install slackwebでインストールしておきます。

下記が実際に成功時、エラー時にSlackに通知を送るコードです。

import slackweb

WEBHOOK_URL = 'https://hooks.slack.com/services/xxxxxxxxxxxxx/xxxxxxxxxxxxxx'

def notify_error(context):
  slack = slackweb.Slack(url=WEBHOOK_URL)
  attachments = [
      {
        "fallback": "%s error" %(context['task']),
        "color": "#b8312f",
        "title": "%s %s" %(context['dag'], context['task']),
        "title_link": "http://speee.jp",
        "text": '<!channel>'
      }
    ]
  slack.notify(attachments=attachments)

def notify_success(context):
  slack = slackweb.Slack(url=WEBHOOK_URL)
  attachments = [
      {
        "fallback": "%s success" %(context['task']),
        "color": "#36a64f",
        "title": "%s %s" %(context['dag'], context['task']),
        "title_link": "http://speee.jp",
        "text": ''
      }
    ]
  slack.notify(attachments=attachments)

後はコールバックにこの関数を指定しておけば、タスクの実行結果がSlackに通知されるようになります。

default_args = {
    'owner': 'vagrant',
    'start_date': datetime(2016, 4, 21, 13, 05, 00),
    'on_success_callback': notify_success,
    'on_failure_callback': notify_error,
}

成功した場合は緑、失敗した場合は赤になるよう設定したので、通知の結果は下記の画像のようになります。

実際に使ってみて

実際に運用していくと下記のようなグラフやガントチャートが表示されます。
これを見ることで、通常よりも時間がかかっている処理を発見できたり、問題のある箇所のログを確認することができるようになり、原因究明がしやすくなりました。

またどのスケジューラにも言えることですが、ツールにロジックをもたせ過ぎないことが大切です。 今回はPythonでタスクを定義しましたが、実装次第ではDBから取得した値を用いて、タスクに渡す引数を変更することも可能です。

ただこれを行ってしまうと、データとタスクが密結合になってしまい修正をする際にスケジュールも変更しなければならないなど運用コストが上がってしまいます。

最後に

今回Airflowを実装することでLogを画面上から確認することが出来るようにもなり、 デバッグもしやすくなりました。

また大きな利点としては下記のようにエンジニア以外のメンバーもタスクを実行できるようになるので、エンジニアは開発に専念できるようになります。

f:id:technica-speee:20160719151152j:plain

次回予告

今回は導入手順やSlack通知などのTipsを書きましたが、次回実際にプロダクトにいれてみて
どう変わったか、運用中に問題が発生した時どう乗り切ったかなど 運用観点で掘り下げた記事を書きます。