Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

AirflowとsuperQuery

By Eben Du ToitMar 12, 20194 min read

このページはEnglishDeutschEspañolFrançaisItalianoPortuguêsでもご覧いただけます。

1 xmk dcdi2qeckymmzlzkda

SuperQueryOperatorでBigQueryのコストをリアルタイムに把握する

1 xmk dcdi2qeckymmzlzkdaPhoto by Soheb Zaidi on Unsplash

「コストはいくら?」——テック業界では耳にしない日がないほどの問いで、スタートアップで働く人なら誰もが一瞬ひるんでしまいます。そして返ってくる答えはたいてい「よくわからない」。

データエンジニアリングの分野でワークフローをスケジューリングするツールとして、定番のひとつがApache Airflowです。融通の利かないcronスケジュールから多くの企業を解放し、有向非巡回グラフ(DAG)を駆使してビッグデータの大波を乗りこなす世界へと導いてくれました。

当然ながら、それは大量のデータがデータベースを行き来することを意味し、その華々しい動きには避けがたいコストがついて回ります。

そうしたデータベースのひとつ——いわばスーパーコンピューターと呼ぶべき存在がGoogle BigQueryです。Google Cloudの旗艦サービスであり、ペタバイト級のデータ処理を実現します。インフラの性能を気にせず、分析の質や解くべきデータフローの課題そのものに集中できる、極めて優秀なプラットフォームです。

BigQueryで気をつけたい重要なポイントのひとつが、データスキャンによってコストがどこまで膨らむか、という余地の大きさです。経験豊富なデータエンジニアでも、本来不要なデータをうっかりスキャンしてしまい、月次の分析費用が予算を超えてしまった——そんな苦い経験を打ち明けることがあります。

そこで登場するのがsuperQueryです。「必要な情報がすべて手元にあり、判断を支えるセーフガードも揃っている。だからコストに頭を悩ませる必要はない」——これがsuperQueryの提案する考え方です。


Airflowでコストを把握する

1 wnx 9yasyzdavzemvxxveg

Airflow DAGが軽快に処理を進め、お好みの処理システムへデータを送り込んでいる裏側では、膨大なログが記録されています。Airflowのログはアクセスしやすく読みやすいので、DAGの動きを把握するのに役立ちます。そのログにクエリ実行プランの情報、特にコストとスキャンされたデータ総量まで表示されたら——便利だと思いませんか?もちろん便利です。たとえば、こんなイメージです:

--------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------
[2019-03-11 21:12:02,129] {models.py:1593} INFO - Executing <Task(SuperQueryOperator): connect_to_superquery_proxy> on 2019-03-01T00:00:00+00:00
[2019-03-11 21:12:03,836] {superq_operators.py:54} INFO - Executing: #standardSQL
SELECT COUNT(testField) FROM `mydata.PROD.myTable`;
[2019-03-11 21:12:03,844] {logging_mixin.py:95} INFO - [2019-03-11 21:12:03,843] {base_hook.py:83} INFO - Using connection to: id: mysql_default. Host: superproxy.system.io, Port: 3306, Schema: None, Login: XXXXXX, Password: XXXXXXXX, extra: {}
[2019-03-11 21:12:15,172] {superq_operators.py:68} INFO - ((
'{
"startTime":1552331525642,
"endTime":1552331534624,
"executionTime":"8988",
"bigQueryTotalBytesProcessed":26388279066,
"bigQueryTotalCost":"0.12",
"superQueryTotalBytesProcessed":0,
"superQueryTotalCost":"0.00",
"saving":0,
"totalRows":"1",
}', '', '1', 'true'),)
[2019-03-11 21:12:17,121] {logging_mixin.py:95} INFO - [2019-03-11 21:12:17,119] {jobs.py:2527} INFO - Task exited with return code 0

このログから、AirflowのBigQueryオペレーターが24Gbのデータをスキャンし、コストが$0.12だったことが一目でわかります。シンプルですね。さらに、サードパーティ製のツールやbashスクリプトでログをパースすれば、BigQueryからデータをスキャンするDAGごとのコストサマリーを作ることもできます。


1 hyjrfqs1qbbawfsevvqm0q

仕組みは?

SuperQueryはMySqlプロキシを利用することで接続方法を共通化し、SQLインターフェース経由で情報を取得できるようにしています。

次に必要なもの:SuperQueryOperator

同じ機能を実現するための手順は次のとおりです:

  1. SuperQueryOperatorを使うために、superqueryプラグインをAirflowに追加します。
  2. superQueryのトライアルに登録し、superQuery MySqlプロキシのログイン情報を取得します。
  3. 下記のDAGでプロキシへの接続をテストします。
  4. この機能を使いたいDAGで、BigQueryオペレーターをSuperQueryオペレーターに置き換えます。

SuperQueryOperatorの使い方

SuperQueryオペレーターのインターフェースは次のとおりです:

TEST_SQL = """#standardSQL
SELECT COUNT(*) FROM `mydata.PROD.myTable`;"""
SuperQueryOperator(
task_id="connect_to_superquery_proxy",
sql=TEST_SQL,
database="",
explain=True, # False if you don't want information
dag=dag
)

オペレーターのコードは以下です。Airflowのpluginsフォルダにコピーしてお使いください:

https://gist.github.com/super-eben/b1d49538b46fa3f55c5bddfe73405b34

SuperQueryへの接続をテストするコードはこちらです:

https://gist.github.com/super-eben/dcfa4420c419331e5a12a8b23e0a088c

まとめ

本記事では、AirflowタスクをBigQueryと接続・実装する際に、Airflowのコストを可視化する方法を紹介しました。SuperQueryプロキシを活用すれば、より幅広い実行プランの詳細情報を取得し、システムが提供するメリットを存分に引き出せます。

それでは、コスト監視(と節約)を楽しんでください!