Apache AirflowとAWS Fargateでバッチ処理

Apache AirflowとAWS Fargateでバッチ処理

美容メディアのARINEでサーバーサイドの開発を行なっているエンジニアのたばたです。
最近ARINEで使っているバッチ処理の一部をApache Airflow + AWS Fargateに移行したので、それについて紹介します。

背景

ARINEではバックエンドのフレームワークとしてRuby on Railsを採用しています。定期的に実行するバッチ処理はバッチサーバーでRakeタスクをcronに設定して処理しています。

バッチ処理はRakeタスクとして実行されることが多かったのですが、機械学習周りの処理(関連する記事の計算や、オススメのユーザーなど)を行う場合は、処理をPythonで記述しコンテナにしてECSのスケジュールタスクとして実行していました。異なる2つ環境でバッチが実行されるため、その2つの間で依存関係がある場合は実行タイミングを調整して処理を動かしてました。

依存を時間で調節してたりすると、上流のタスクが失敗しても下流のタスクが実行されてしまい、再実行する際にこのタスクはどのタスクの後に実行すればいいんだっけ?みたいなことがあります。
そういった問題を解決するために、Airflowと呼ばれるワークフローツールを導入したので今回はその話をしようと思います。

Apache Airflowについて

依存関係や定期実行をサポートしてくれるワークフローツールはたくさんありますが、今回はApache Airflowを使用しました。Airflowは元々はAirBnBがOSSで開発していたプロジェクトで、現在はApache Incubatorプロジェクトとして開発されています。

AirflowではワークフローはPythonのコードとして表現されます。デフォルトで便利なオペレーターが多数ありますので、Slackへの通知や今回利用したFargateでのタスクの実行などは簡単に行うことができます。今回はFargateで処理を実行したかったため、ここが簡単に行えるというのが採用した大きな理由です。

ワークフローの画面

ワークフローの画面

ARINEでのAirflowの使い方

前述した通り、cronで実行されるRakeタスクとECSのスケジュールタスクで実行される機械学習の依存管理や再実行をやりやすくするためにAirflowを導入しました。

例えば、ある記事に関連する記事を計算するという処理は、実際の記事データを取り出して関連する記事を計算、計算された結果を実際の環境に反映というステップが必要です。最初の関連する記事を計算する部分はPythonで書かれていて、後半の実際の環境に反映する部分はRakeタスクです。このように実行環境が異なる依存した処理をAirflowとAWS Fargateを使ってうまく管理します。

ARINEのRailsアプリケーションと機械学習の処理はコンテナになっていますので、Rakeタスクと機械学習の処理はAirflowからFargateのタスクとして動かすことにしました。ECSOperatorを使用することで簡単にFargateのタスクを実行できます。実際の処理自体はコンテナ側に記述されているので、上記の関連する記事の計算のワークフローは以下のように簡単に記述することができます。

また、Railsのアプリケーションは日々更新されているので、Rakeタスクが動くコンテナも古いイメージを使用しないように、実行するたびに最新のアプリケーションのコンテナをECRから取得してそのコンテナを使用してます。
このときに別のタスクからECSのタスク定義をAirflowのECSOperatorに渡す必要があり、これをやりやすくするために簡単なPRを出しました。(ちなみにまだ取り込まれてはいません。)

実際にAirflowを使って見て感想

今回Airflowを使って見ての感想ですが、依存のある処理が一箇所で管理できて再実行がしやすかったり、依存関係がWebページ上で可視化されるというのは、複雑なバッチ処理を運用していく上でとても助けになります。また最初から色々なOperatorが用意されていてAWSやSlackなどの呼び出しをすごい簡単にできるので、手軽にそういった機能を呼びだすワークフローが記述できてとても便利でした。

私たちの使い方だと実際のバッチ処理自体はFargateで実行されるので、Airflowの方には詳しい情報は残りません。なのでFargateのタスクのログはawslogs Log Driverを使ってCloudWatch Logsに送っています。失敗した場合はAirflowのログだけではなくCloudWatch Logsを見ないといけないのが少し難点です。
また、バッチをスケジュールする際に初回実行時刻(start_date)を指定するのですが、実際の初回実行時刻はワークフロー上で記述した時間から繰り返しの間隔だけ過ぎた時に実行されます。例えば、start_dateに12月1日の1時を指定して毎日ワークフローが動くようにスケジュールした場合、初回実行は12月2日の1時となります。この挙動があまり直感的ではなく、久しぶりにワークフローの設定をすると間違えがちです。

最後に

今回はARINEでのAirflowの使い方について話しました。依存があるタスクについては一通りAirflowに移行できましたが、まだバッチサーバーで動いているタスクも残っているので引き続き整理していきます。時間で依存を管理するのは手軽なのでやりがちなのですが、依存が大きくなると失敗した時のリカバリや整理が大変になります。私たちはそこまでになる前にワークフローツールを導入できてよかったです。
この話が少しでも同じようにバッチ処理の依存とかで困っている人の助けになれば幸いです:)