
SuperQueryOperatorでBigQueryのコストをリアルタイムに把握する
Photo by Soheb Zaidi on Unsplash
「コストはいくら?」——テック業界では耳にしない日がないほどの問いで、スタートアップで働く人なら誰もが一瞬ひるんでしまいます。そして返ってくる答えはたいてい「よくわからない」。
データエンジニアリングの分野でワークフローをスケジューリングするツールとして、定番のひとつがApache Airflowです。融通の利かないcronスケジュールから多くの企業を解放し、有向非巡回グラフ(DAG)を駆使してビッグデータの大波を乗りこなす世界へと導いてくれました。
当然ながら、それは大量のデータがデータベースを行き来することを意味し、その華々しい動きには避けがたいコストがついて回ります。
そうしたデータベースのひとつ——いわばスーパーコンピューターと呼ぶべき存在がGoogle BigQueryです。Google Cloudの旗艦サービスであり、ペタバイト級のデータ処理を実現します。インフラの性能を気にせず、分析の質や解くべきデータフローの課題そのものに集中できる、極めて優秀なプラットフォームです。
BigQueryで気をつけたい重要なポイントのひとつが、データスキャンによってコストがどこまで膨らむか、という余地の大きさです。経験豊富なデータエンジニアでも、本来不要なデータをうっかりスキャンしてしまい、月次の分析費用が予算を超えてしまった——そんな苦い経験を打ち明けることがあります。
そこで登場するのがsuperQueryです。「必要な情報がすべて手元にあり、判断を支えるセーフガードも揃っている。だからコストに頭を悩ませる必要はない」——これがsuperQueryの提案する考え方です。
Airflowでコストを把握する

Airflow DAGが軽快に処理を進め、お好みの処理システムへデータを送り込んでいる裏側では、膨大なログが記録されています。Airflowのログはアクセスしやすく読みやすいので、DAGの動きを把握するのに役立ちます。そのログにクエリ実行プランの情報、特にコストとスキャンされたデータ総量まで表示されたら——便利だと思いませんか?もちろん便利です。たとえば、こんなイメージです:
--------------------------------------------------------------------Starting attempt 1 of 4--------------------------------------------------------------------
[2019-03-11 21:12:02,129] {models.py:1593} INFO - Executing <Task(SuperQueryOperator): connect_to_superquery_proxy> on 2019-03-01T00:00:00+00:00[2019-03-11 21:12:03,836] {superq_operators.py:54} INFO - Executing: #standardSQLSELECT COUNT(testField) FROM `mydata.PROD.myTable`;[2019-03-11 21:12:03,844] {logging_mixin.py:95} INFO - [2019-03-11 21:12:03,843] {base_hook.py:83} INFO - Using connection to: id: mysql_default. Host: superproxy.system.io, Port: 3306, Schema: None, Login: XXXXXX, Password: XXXXXXXX, extra: {}[2019-03-11 21:12:15,172] {superq_operators.py:68} INFO - (('{ "startTime":1552331525642, "endTime":1552331534624, "executionTime":"8988", "bigQueryTotalBytesProcessed":26388279066, "bigQueryTotalCost":"0.12", "superQueryTotalBytesProcessed":0, "superQueryTotalCost":"0.00", "saving":0, "totalRows":"1",}', '', '1', 'true'),)[2019-03-11 21:12:17,121] {logging_mixin.py:95} INFO - [2019-03-11 21:12:17,119] {jobs.py:2527} INFO - Task exited with return code 0このログから、AirflowのBigQueryオペレーターが24Gbのデータをスキャンし、コストが$0.12だったことが一目でわかります。シンプルですね。さらに、サードパーティ製のツールやbashスクリプトでログをパースすれば、BigQueryからデータをスキャンするDAGごとのコストサマリーを作ることもできます。

仕組みは?
SuperQueryはMySqlプロキシを利用することで接続方法を共通化し、SQLインターフェース経由で情報を取得できるようにしています。
次に必要なもの:SuperQueryOperator
同じ機能を実現するための手順は次のとおりです:
- SuperQueryOperatorを使うために、superqueryプラグインをAirflowに追加します。
- superQueryのトライアルに登録し、superQuery MySqlプロキシのログイン情報を取得します。
- 下記のDAGでプロキシへの接続をテストします。
- この機能を使いたいDAGで、BigQueryオペレーターをSuperQueryオペレーターに置き換えます。
SuperQueryOperatorの使い方
SuperQueryオペレーターのインターフェースは次のとおりです:
TEST_SQL = """#standardSQLSELECT COUNT(*) FROM `mydata.PROD.myTable`;"""SuperQueryOperator( task_id="connect_to_superquery_proxy", sql=TEST_SQL, database="", explain=True, # False if you don't want information dag=dag)オペレーターのコードは以下です。Airflowのpluginsフォルダにコピーしてお使いください:
https://gist.github.com/super-eben/b1d49538b46fa3f55c5bddfe73405b34
SuperQueryへの接続をテストするコードはこちらです:
https://gist.github.com/super-eben/dcfa4420c419331e5a12a8b23e0a088c
まとめ
本記事では、AirflowタスクをBigQueryと接続・実装する際に、Airflowのコストを可視化する方法を紹介しました。SuperQueryプロキシを活用すれば、より幅広い実行プランの詳細情報を取得し、システムが提供するメリットを存分に引き出せます。
それでは、コスト監視(と節約)を楽しんでください!