Fargate環境でembulkを使ってMySQLからBigQueryへのマスタデータ転送
住まい暮らしメディアLIMIAで開発を担当している樋口です。
LIMIAではBigQueryを使ってデータ分析を行なっています。
ログデータについてはFirebase Analyticsに送信したデータをBigQueryへ書き出しています。
マスタデータについてはMySQLに格納されており、それをBigQueryへ送信する必要がありました。
embulkをfargate環境で動かすことで実現したため、それについて共有します。
embulkについて
embulkはデータローダです。
fluentdのバッチ版と言われており、オープンソースで開発されています。
fluentdと同じようにプラグインを導入することで、様々なデータソースに対応しています。
LIMIAでは入力をMySQL、出力をBigQueryとしています。
そのため、プラグインとして embulk-input-mysql と embulk-output-bigquery を利用しています。
今回のデータを転送するには、テーブル毎に2つのファイルを用意する必要があります。
1つは転送先のBigQueryテーブルのschemaを定義するファイルで、次のようになります。
files/etc/embulk/schema/idea.json
1 2 3 4 5 6 7 8 |
[ {"name":"id", "type":"integer", "mode": "required"}, {"name":"status", "type":"integer", "mode": "required"}, {"name":"user_id", "type":"integer", "mode": "required"}, {"name":"title", "type":"string", "mode": "nullable"}, {"name":"created_at", "type":"timestamp", "mode": "required"}, {"name":"updated_at", "type":"timestamp", "mode": "required"} ] |
もう1つは転送設定ファイルです。
ファイル名の最後に.liquidと付けると、二重括弧 {{ }} を展開してくれます。
env. で環境変数を取得できるため、パスワードなどをここに格納しています。
以下の設定では、MySQLの対象テーブル全行をダンプし、それをBigQueryに転送して置き換えます。
files/etc/embulk/tables/idea.yml.liquid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
in: type: mysql user: {{ env.LIMIA_DB_USER }} password: {{ env.LIMIA_DB_PASS }} host: {{ env.LIMIA_DB_HOST_MAIN }} database: fily_communication table: idea select: "*" options: {useLegacyDatetimeCode: false, serverTimezone: Asia/Tokyo} default_timezone: "Asia/Tokyo" parser: type: json out: type: bigquery mode: replace auth_method: json_key json_keyfile: /etc/bigquery_service_account.json path_prefix: /tmp/ file_ext: .jsonl.gz source_format: NEWLINE_DELIMITED_JSON project: {{ env.GCP_PROJECT_ID }} dataset: fily_communication auto_create_table: true table: idea schema_file: /etc/embulk/schema/idea.json formatter: {type: jsonl} encoders: - {type: gzip} |
コンテナ設定
embulkをインストールして、実行時にshell scriptを叩くようなDockerファイルを作ります。
Dockerfile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
FROM openjdk:8-jre-alpine RUN apk add --no-cache libc6-compat python py2-pip coreutils tzdata && \ cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime && \ echo "Asia/Tokyo" > /etc/timezone RUN pip install awscli RUN wget -q https://dl.embulk.org/embulk-latest.jar -O /usr/local/bin/embulk && \ chmod +x /usr/local/bin/embulk RUN /usr/local/bin/embulk gem install embulk-input-mysql && \ /usr/local/bin/embulk gem install embulk-output-bigquery COPY files/etc/embulk /etc/embulk COPY files/entrypoint.sh /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] |
embulkで転送を実行するためには、コマンドラインで次のように打ちます。
1 |
embulk run 転送設定ファイル.yml.liquid |
LIMIAでは複数のテーブルを転送しているため、特定のディレクトリに転送設定ファイルを格納しておき、process_tables()の処理でそれを一括実行しています。
また、特定のテーブルのみ指定時刻に転送する要望があるため、process_specified()で個別転送可能としています。
BigQueryに転送するためには、service accountと呼ばれる権限を与えられた鍵ファイルが必要です。
それを暗号化して blob/${LIMIA_ENV}-bigquery-service-account.blob に置いておき、暗号化鍵をKMSに登録しておきます。
decypt()の処理でservice accountを復号化して、コンテナ内のファイルに保存しています。
GCP_PROJECT_ID と LIMIA_ENV は、環境変数から取得しています。
環境変数は、ECS Task Definitionsで設定しています。
files/entrypoint.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#!/bin/sh decrypt() { aws kms decrypt --ciphertext-blob fileb:///etc/embulk/blob/${LIMIA_ENV}-bigquery-service-account.blob --output text --query Plaintext | base64 -d > /etc/bigquery_service_account.json } process_tables() { ls /etc/embulk/tables/*.yml.liquid | xargs -n1 java -jar /usr/local/bin/embulk run 2>&1 } process_specified() { for var in $@; do ls /etc/embulk/tables/${var}.yml.liquid | xargs -n1 java -jar /usr/local/bin/embulk run 2>&1 done } init() { if [ -z "${GCP_PROJECT_ID}" ]; then echo no GCP_PROJECT_ID exit fi if [ -z "${LIMIA_ENV}" ]; then echo no LIMIA_ENV exit fi decrypt if [ $# -eq 0 ]; then process_tables else process_specified "$@" fi } init "$@" |
まとめ
embulkを使ってMySQLのマスタデータをBigQueryに転送する方法を説明しました。
これにより、マスタデータを絡めた分析が可能となりました。
ECS/Fargateについては軽く触れた程度ですので、可能なら次回その辺りについても共有したいと思います。
また、Fargate環境でS3にあるALBやCloudFrontのログをBigQueryに転送する方法も書きました。もしよければ、合わせてこちらもご覧ください。