Fargate環境でembulkを使ってS3からBigQueryへのALBとCloudFrontのログ転送
住まい暮らしメディアLIMIAで開発を担当している樋口(@mahiguch1)です。
LIMIAではBigQueryを使ってデータ分析を行なっています。
前回MySQLからBigQueryへのデータ転送に関する記事を書いたところ、S3にあるALBやCloudFrontのログの転送について質問を受けたため、それについて共有します。
ALBログ転送設定
ALBログのSchemaは決まっているため、以下のようにしています。
files/etc/embulk/schema/alb_access_log.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
[ {"name": "protocol", "type": "string", "mode": "nullable"}, {"name": "timestamp", "type": "timestamp", "mode": "nullable"}, {"name": "elb", "type": "string", "mode": "nullable"}, {"name": "client_port", "type": "string", "mode": "nullable"}, {"name": "backend_port", "type": "string", "mode": "nullable"}, {"name": "request_processing_time", "type": "float", "mode": "nullable"}, {"name": "backend_processing_time", "type": "float", "mode": "nullable"}, {"name": "response_processing_time", "type": "float", "mode": "nullable"}, {"name": "elb_status_code", "type": "integer", "mode": "nullable"}, {"name": "backend_status_code", "type": "string", "mode": "nullable"}, {"name": "received_bytes", "type": "integer", "mode": "nullable"}, {"name": "send_bytes", "type": "integer", "mode": "nullable"}, {"name": "request", "type": "string", "mode": "nullable"}, {"name": "user_agent", "type": "string", "mode": "nullable"}, {"name": "ssl_cipher", "type": "string", "mode": "nullable"}, {"name": "ssl_protocol", "type": "string", "mode": "nullable"}, {"name": "target_group_arn", "type": "string", "mode": "nullable"}, {"name": "trace_id", "type": "string", "mode": "nullable"}, {"name": "domain_name", "type": "string", "mode": "nullable"}, {"name": "chosen_cert_arn", "type": "string", "mode": "nullable"}, {"name": "matched_rule_priority", "type": "integer", "mode": "nullable"} ] |
ALBのアクセスログは、S3に格納されています。
S3にあるので、embulk-input-s3が使えそうです。
EC2で動かしていたときはそれで問題なかったのですが、ECS/Fargateで動かすと権限不足とのエラーが出て出て上手く行きませんでした。
そこで、awscliを使ってファイルを足元に転送して、それをBigQueryに格納する設定を行いました。
files/etc/embulk/logs/production/access-log-elb-limia-jp-alb.yml.liquid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
in: type: file path_prefix: /tmp/access-log-elb-limia-jp-alb parser: charset: UTF-8 newline: CRLF type: csv delimiter: " " trim_if_not_quoted: false skip_header_lines: 0 allow_extra_columns: true allow_optional_columns: true null_string: "-" columns: - {name: protocol, type: string} - {name: timestamp, type: timestamp, format: '%Y-%m-%dT%H:%M:%S.%NZ'} - {name: elb, type: string} - {name: client_port, type: string} - {name: backend_port, type: string} - {name: request_processing_time, type: double} - {name: backend_processing_time, type: double} - {name: response_processing_time, type: double} - {name: elb_status_code, type: long} - {name: backend_status_code, type: string} - {name: received_bytes, type: long} - {name: send_bytes, type: long} - {name: request, type: string} - {name: user_agent, type: string} - {name: ssl_cipher, type: string} - {name: ssl_protocol, type: string} - {name: target_group_arn, type: string} - {name: trace_id, type: string} - {name: domain_name, type: string} - {name: chosen_cert_arn, type: string} - {name: matched_rule_priority, type: long} decoders: - {type: gzip} out: type: bigquery mode: replace prevent_duplicate_insert: false auth_method: json_key json_keyfile: /etc/bigquery_service_account.json project: {{ env.GCP_PROJECT_ID }} dataset: alb_logs compression: GZIP source_format: NEWLINE_DELIMITED_JSON table: limia_jp_{{ "now" | date: "%s" | minus: 86400 | date: "%Y%m%d" }} schema_file: /etc/embulk/schema/alb_access_log.json auto_create_table: true |
CloudFrontログ転送設定
ALB同様CloudFrontのSchemaは固定的なので、以下のようにしました。
files/etc/embulk/schema/cloudfront_access_log.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
[ {"name": "date", "type": "string", "mode": "nullable"}, {"name": "time", "type": "string", "mode": "nullable"}, {"name": "x_edge_location", "type": "string", "mode": "nullable"}, {"name": "sc_bytes", "type": "string", "mode": "nullable"}, {"name": "c_ip", "type": "string", "mode": "nullable"}, {"name": "cs_method", "type": "string", "mode": "nullable"}, {"name": "cs_host", "type": "string", "mode": "nullable"}, {"name": "cs_uri_stem", "type": "string", "mode": "nullable"}, {"name": "sc_status", "type": "string", "mode": "nullable"}, {"name": "cs_referer", "type": "string", "mode": "nullable"}, {"name": "cs_user_agent", "type": "string", "mode": "nullable"}, {"name": "cs_uri_query", "type": "string", "mode": "nullable"}, {"name": "cs_cookie", "type": "string", "mode": "nullable"}, {"name": "x_edge_result_type", "type": "string", "mode": "nullable"}, {"name": "x_edge_request_id", "type": "string", "mode": "nullable"}, {"name": "x_host_header", "type": "string", "mode": "nullable"}, {"name": "cs_protocol", "type": "string", "mode": "nullable"}, {"name": "cs_bytes", "type": "string", "mode": "nullable"}, {"name": "time_taken", "type": "string", "mode": "nullable"}, {"name": "x_forwarded_for", "type": "string", "mode": "nullable"}, {"name": "ssl_protocol", "type": "string", "mode": "nullable"}, {"name": "ssl_cipher", "type": "string", "mode": "nullable"}, {"name": "x_edge_response_result_type", "type": "string", "mode": "nullable"}, {"name": "cs_protocol_version", "type": "string", "mode": "nullable"}, {"name": "fle_status", "type": "string", "mode": "nullable"}, {"name": "fle_encrypted_fields", "type": "string", "mode": "nullable"} ] |
転送設定もALB同様、足元のファイルを転送するものです。
files/etc/embulk/logs/production/access-log-cloudfront-d-limia-jp.yml.liquid
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
in: type: file path_prefix: /tmp/access-log-cloudfront-d-limia-jp parser: charset: UTF-8 newline: CRLF type: csv delimiter: "\t" trim_if_not_quoted: false skip_header_lines: 2 allow_extra_columns: true allow_optional_columns: true null_string: "-" columns: - {name: date, type: string} - {name: time, type: string} - {name: x_edge_location, type: string} - {name: sc_bytes, type: string} - {name: c_ip, type: string} - {name: cs_method, type: string} - {name: cs_host, type: string} - {name: cs_uri_stem, type: string} - {name: sc_status, type: string} - {name: cs_referer, type: string} - {name: cs_user_agent, type: string} - {name: cs_uri_query, type: string} - {name: cs_cookie, type: string} - {name: x_edge_result_type, type: string} - {name: x_edge_request_id, type: string} - {name: x_host_header, type: string} - {name: cs_protocol, type: string} - {name: cs_bytes, type: string} - {name: time_taken, type: string} - {name: x_forwarded_for, type: string} - {name: ssl_protocol, type: string} - {name: ssl_cipher, type: string} - {name: x_edge_response_result_type, type: string} - {name: cs_protocol_version, type: string} - {name: fle_status, type: string} - {name: fle_encrypted_fields, type: string} decoders: - {type: gzip} out: type: bigquery mode: replace prevent_duplicate_insert: false auth_method: json_key json_keyfile: /etc/bigquery_service_account.json project: {{ env.GCP_PROJECT_ID }} dataset: alb_logs compression: GZIP source_format: NEWLINE_DELIMITED_JSON table: d_limia_jp_{{ "now" | date: "%s" | minus: 86400 | date: "%Y%m%d" }} schema_file: /etc/embulk/schema/cloudfront_access_log.json auto_create_table: true |
スクリプト
前回のスクリプトにログ転送処理を追加しました。
具体的には、引数に logs が渡されたら、 LIMIA_ENV.sh を実行します。
files/entrypoint.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
#!/bin/sh decrypt() { aws kms decrypt --ciphertext-blob fileb:///etc/embulk/blob/${LIMIA_ENV}-bigquery-service-account.blob --output text --query Plaintext | base64 -d > /etc/bigquery_service_account.json } process_tables() { ls /etc/embulk/tables/*.yml.liquid | xargs -n1 java -jar /usr/local/bin/embulk run 2>&1 } process_specified() { if [ $# -eq 0 ]; then exit fi if [ "$1" = "logs" ]; then if [ -e /etc/embulk/logs/${LIMIA_ENV} ]; then /etc/embulk/logs/${LIMIA_ENV}.sh fi else for var in $@; do ls /etc/embulk/tables/${var}.yml.liquid | xargs -n1 java -jar /usr/local/bin/embulk run 2>&1 done fi } init() { if [ -z "${GCP_PROJECT_ID}" ]; then echo no GCP_PROJECT_ID exit fi if [ -z "${LIMIA_ENV}" ]; then echo no LIMIA_ENV exit fi decrypt if [ $# -eq 0 ]; then process_tables else process_specified "$@" fi } init "$@" |
entrypointから呼び出されるshell scriptは、以下のようになります。
awscliを使って、前日分のログを足元に転送し、embulkを実行しています。
files/etc/embulk/logs/production.sh
1 2 3 4 5 6 7 8 |
#!/bin/sh aws s3 sync s3://bucket_name/cf-logs/ /tmp/access-log-cloudfront-d-limia-jp/ --exclude "*" --include "*.`date --date '1 day ago' +%Y-%m-%d`-*" --quiet java -jar /usr/local/bin/embulk run /etc/embulk/logs/${LIMIA_ENV}/access-log-cloudfront-d-limia-jp.yml.liquid 2>&1 rm -rf /tmp/access-log-cloudfront-d-limia-jp/ aws s3 sync s3://bucket_name/AWSLogs/aws_account_id/elasticloadbalancing/ap-northeast-1/`date --date '1 day ago' +%Y/%m/%d`/ /tmp/access-log-elb-limia-jp-alb/ --quiet java -jar /usr/local/bin/embulk run /etc/embulk/logs/${LIMIA_ENV}/access-log-elb-limia-jp-alb.yml.liquid 2>&1 rm -rf /tmp/access-log-elb-limia-jp-alb/ |
まとめ
ALBやCloudFrontのログをBigQueryに転送する方法を共有しました。
同じような問題に遭遇した方の役に立てば幸いです。