Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

ClickHouseでBigQuery・Lookerのコストを賢く削減 第2回

By Sayle MatthewsJun 30, 202416 min read

このページはEnglishDeutschEspañolFrançaisItalianoPortuguêsでもご覧いただけます。

前回の続き

前回の記事では、本構成の全体像と、AivenまたはClickHouseのDBaaSを使った基本的なClickHouseサービスの立ち上げ方を取り上げました。今回は、データをClickHouseに取り込み、BigQueryとの間でレプリケーションを構成する手順に進みます。

GitHubリポジトリ

本記事では、Cloud FunctionとBigQueryジョブを取り上げます。これら2つのソースコードは、こちらのGitHubリポジトリに置いています。

なお、このコードは学習用途を想定してあえて非常にシンプルに作っています。そのため、実環境で利用する場合はカスタマイズがほぼ必須になる点をご承知おきください。

BigQueryからClickHouseへデータを取り出す

現時点では、BigQueryから任意の宛先へCDCやストリーミングでデータを自動的に出力する手段はありません。つまり、BigQueryに挿入する前にデータを取得しておくか、挿入後に抽出するかのいずれかが必要です。公式の方法が存在しないため、このテーマは非常に奥深いものになります。そこで本記事では、バッチロードに絞って解説します。

レプリケーションの話は次のセクションで扱いますが、その前にまずは初期データセットをBigQueryからClickHouseへ取り出し、ベースラインを作る必要があります。

BigQueryには、データを外部に取り出す最も手軽な手段としてエクスポート機能があります。ただし大きな欠点が2つあり、一度にエクスポートできるのは1テーブルだけ、かつ保存先がGCSに限られます。

1テーブルずつしか扱えないという制約があるからこそ、すべてのテーブルをClickHouseへレプリケーションする必要があるのか、あるいはLookerで使うためにレプリケーションすべき候補はどれか、を見極める良い機会になります。

すべてのテーブルを一覧化する手早い方法として、対象データセットに対して以下のクエリを実行すれば、全テーブルがリスト化されます。さらに、よく使われているテーブルが分からない場合は、こちらのクエリでデータセット内の各テーブルにヒットしたクエリ件数を確認できます。なお、このクエリは相応のコストがかかる可能性があるため、まずUIで料金見積りを確認し、スキャン対象日数を調整してから実行してください。

実際にエクスポートを行う際は、bq CLIコマンドでテーブル全体を取得するのが最善です。理由は、「EXPORT DATA…」SQLコマンドを使うとエクスポートしたデータ量に応じた処理/スキャン料金、Editions利用時はスロット料金が発生するのに対し、CLIコマンドやAPI呼び出しなら追加料金なしでテーブル全体をダンプできるためです。

テーブルの一部、たとえば特定のパーティションだけが必要な場合は、bq cpコマンドでパーティションを新しいテーブルにコピーし、それをClickHouseに直接ロードする方法があります。残念ながらこのコマンドはワイルドカードや複数パーティションの一括指定に対応していないため、パーティションごとに実行する必要があります。スクリプト化は比較的容易ですが、ここでコマンド例を載せておきます:

bq cp –append_table=true `<source_project_id>:<source_dataset>.<source_table>$<source_partition_name>` `<target_project_id>.<target_dataset>.<target_table>`

パーティション分割されておらず(かつ巨大すぎない)テーブルであれば、テーブル全体をClickHouseに読み込んでから、SQLで不要分を削るのがおすすめです。これならBigQuery側で処理料金が発生しません。

初期エクスポートの準備が整ったら、次に進みましょう。

ここではparquet形式でデータをダンプします。BigQueryが内部で使うファイルシステムに最も近い形式であり、列の型を保ったままClickHouseへ容易にロードできるためです。

BigQueryからテーブルをエクスポートするコマンドは次のとおりです:

bq extract — destination_format=PARQUET \
<project_id>:<dataset>.<table_name> \
gs://<bucket_name>/<path_and_filename>

(ファイルをサブフォルダに格納する場合は、そのフォルダがバケット内に存在することを必ず確認してください。存在しないと、位置引数に関する非常に分かりにくいエラーメッセージが出ます)

テーブルをGCSにエクスポートできたら、いよいよClickHouseへのロードに進みます。

初期データをClickHouseにロードする

ClickHouseへロードする際、本記事執筆時点で公式に推奨されているSQLは以下のとおりです:

CREATE TABLE <table_name>
ENGINE = MergeTree
ORDER BY tuple() AS
SELECT *
FROM S3Cluster(default,
  'https://storage.googleapis.com/<bucket_name>/<path>/*.parquet');

注意: 一部のClickHouseバージョンには、上記クエリが失敗するもののテーブルだけは作成されてしまうバグがあります。回避策として、INSERT INTO

に上記のSELECT文部分を続ける形で実行すれば、データを挿入できます。

推奨かつ、よりセキュアな方法はHMACキーを使うことです。利用するサービスアカウントについて、まずバケット側にキーを追加するのを忘れないでください。よくあるミスです。なお、これらのキーは本記事の後半で扱うレプリケーション方法の一部でも使うため、今のうちに生成して適切なバケットに反映しておくのがベストです。

これらのキーを使う場合、ClickHouseでは代わりに以下のSQLを使用してください:

CREATE TABLE <table_name>
ENGINE = MergeTree
ORDER BY tuple() AS
SELECT *
FROM s3Cluster(‘default’,
  ‘https://storage.googleapis.com/<bucket_name>/<path>/*.parquet’,
  <hmac_access_key>,
  <hmac_secret>)

これらのクエリでは、ファイル名にアスタリスク(*)を使うことで、当該ディレクトリ内のparquet拡張子を持つファイルをすべて取得します。また、サービスアカウントキーとHMACキーを取り違えないよう注意してください。両者はまったく別物で、HMACキーはGCS内にのみ存在し、GCPのIAMサービスには含まれません。さらに、ファイルに拡張子を付けない場合は、s3Cluster関数呼び出しに引数として「Parquet」を渡す必要があります。そうしないとエラーになる場合があります(GCSをクエリ対象にしてHMACキーを使うサービス全般でよく見られる問題です)。

ここからが少し奥深い部分で、ClickHouseの真価が発揮されるところです。ENGINEの行はパフォーマンスを左右する重要な要素のひとつで、これは「テーブルエンジン」と呼ばれます。ClickHouseのトレーニングシリーズが少なくとも1本まるごと組まれるほどのテーマなので、本記事では深入りしません。代わりに、公式ドキュメントをこちらにリンクしておきます。

データロード時には、用途に合わせて適切なエンジンを選定する必要があります。MergeTree系のエンジンは汎用的なストレージエンジンとして優秀で、その中にも特化したものがいくつかありますが、ぜひ他のエンジンも一通り確認し、用途に最適なものを選ぶことを強くおすすめします。これこそがClickHouseでパフォーマンスを引き出す最大の鍵と言っても過言ではありません。いくら強調してもしすぎることはありません。

その上で、初期データをClickHouseへロードする工程で最後に重要なステップがあります。一時データの削除です。これを忘れて、後日の監査で指摘されるまでGCSのストレージ料金を払い続けるケースが少なくありません。同じ轍を踏まないよう、その場で対処してコストを抑えてください。少なくとも、忘れないようリマインダーを設定しておきましょう。

コスト削減のため、GCSバケット内のファイルは原則として削除するのがおすすめです。再ロードのために将来必要になるか分からない場合は、忘れないよう30日(あるいはそれ以上)で削除されるライフサイクルルールを設定するのも良い方法です。

レプリケーションのパターン

次は、データのレプリケーションをどう実現するかです。これは現在BigQueryへどのようにデータを取り込んでいるかによって方法が変わります。本記事では、バッチ取り込みでの実現方法を扱います。ストリーミングはより複雑なシナリオになるため、別途今後の記事で取り上げる予定です。

もうひとつの大きな課題は、万能の正解が存在しないということです。最適解は現状のデータパイプラインの構成次第になります。本稿ではDoiTでよく見かける代表的な構成を提示しますので、実装者である皆さんが自社にとって最適な戦略を選んでください。

レプリケーションの前提条件

これらのレプリケーション手法すべてに共通する前提条件がいくつかあります。幸い、ほとんどのデータセットには既に組み込まれているため通常は心配無用ですが、念のためお伝えしておきます。

前提条件1: 取り込み時刻または「インクリメンタル・マーカー」

レプリケーションがどこから始まり、どこで止まったかを判別できることが、データ重複によるデータ品質トラブルを防ぐ鍵になります。これは取り込み時刻でも、増分的な性質を持つ一意の「主キー」でも構いません。BigQueryでは多くの場合、これがパーティション境界と一致します。

前提条件2: データのソースを把握する

要するに、BigQueryに取り込まれる前のデータがどこから来ているのかを突き止めることです。バッチ系であればGCSやS3にあるケースが多く、Pub/Subのサブスクリプション経由でBigQueryに直接ストリーミングされていたり、Dataflow/Beamで変換されてくる場合もあります。データの出どころを把握することが、最適なレプリケーション方法を選ぶ鍵になります。

前提条件3: データの鮮度要件を見極める

これは、ClickHouseに格納したデータをBIツールでクエリする際に、「リアルタイム」が必要なのか、「いずれ反映される」程度で良いのか、という問いです。数分以内の反映が必要であれば「リアルタイム」、30分以上の更新間隔で問題なければ「いずれ反映」と位置づけて、戦略を選びましょう。

バッチレプリケーションを構築する

ClickHouseへのバッチレプリケーションは、一定間隔でソースまたはBigQueryからClickHouseへデータを取り込む方式です。これらの戦略は最も実装が容易で、BigQueryに読み込む前にデータがファイルやその他のバッチ機構として存在する場合に有効です。

もうひとつの方法は、BigQueryからGCSへデータを取り出し、そこからClickHouseへロードするやり方です。

1つ目の方法は、GCSに保存されたデータをCloud Functionでそのままおまま ClickHouseへロードするものです。この方法は、データが既にGCSに保存され、かつClickHouseがロード可能な対応フォーマットになっている場合のみ使えます。

この方法は非常に簡単で、多くの利用者がBigQueryへのロードで行っているのと同じような、シンプルなロードジョブで済みます。

2つ目の方法は、データが既にBigQueryに格納されており、ClickHouseへ取り出して利用したい場合に使います。こちらはあまり簡単ではなく、構成要素もコストも増えるため、その分推奨度はやや下がります。

なお、MySQLやPostgreSQLなど別のデータストアにデータがあり、それがBigQueryへの中継地点になっている、あるいはそこからBigQueryへレプリケーションされている場合は、2つ目の方法が選択肢になります。BigQuery以外のデータストアを使うシナリオの手順についても、後ほど触れます。

BigQueryロード前のGCSデータでバッチレプリケーションを構築する

この方法は群を抜いて簡単で、最小限のセットアップで完結します。

要は、GCSにあるデータファイルをそのままClickHouseへLOADするだけです。実際には常にそう簡単とは限りませんが(いつものことです)、ほぼそれに近い形であり、必要なコードの大半はすでに用意してあります。

これまで見つけた最も簡単な方法は、バケット内でファイルがfinalizedになるイベントをトリガーにし、そのファイルをClickHouseへロードするCloud Functionを呼び出す方式です。手順はこちらに記載されており、デプロイ時にはここのソースコードをそのままお使いいただけます。ただし、まずはドキュメントを読んで前提条件(特に接続情報を保持するシークレットの作成)を満たすようにしてください。

なお、現実には特定のタイプのファイルだけをロードしたい、あるいはパスやファイル名に応じて別テーブルへロードしたい、といったケースが多いはずです。このロジックはユースケースに応じてご自身で追加していただく必要があります。コード内に修正すべき箇所をコメントで明記してあります。

BigQueryに既にあるデータでバッチレプリケーションを構築する

次のレプリケーション戦略は、BigQueryからClickHouseへ一定間隔でバッチレプリケーションする方法です。これは、別のソースから直接BigQueryにデータがロードされており、ロード前にデータをキャプチャできない場合に有効です。たとえば、ストリーミング挿入、BigQuery Storage APIによるロード、StitchやFivetranなどの外部ソースからBigQueryへデータが流れ込んでいるようなケースが該当します。

コストに関する注意: この方法では一定間隔でデータをクエリする必要があるため、相応のコストが発生します。これを念頭に置き、元テーブルを適切にパーティション/クラスタ化してコストを最小化してください。取り込みデータ量によっては、Editionsではなくオンデマンド課金を選ぶのも一案です。多くの場合、メインプロジェクトに保存されたデータのエクスポートを行う専用プロジェクトを別途作成し、workloadsを分離するのが好まれる方法です。これにより、オンデマンド課金や(Editions利用時には)別予約を使えるため、Standard Editionで低コストに抑えることもできます。

この方法の概要は、BigQuery内でスケジュールクエリを一定間隔で実行し、新しいデータをGCSバケットへエクスポートするというものです。GCSバケットにファイルが書き込み終わると、Cloud Functionがトリガーされ、そのデータをClickHouseへロードします。

この方法はクエリ、ストレージ、Cloud Functionの呼び出し回数のいずれの面でも費用がかさむ可能性がありますが、非常に予測可能な間隔でデータをロードできます。とはいえ、BigQueryに入る前にデータをロードできない状況であれば、これがベストな選択肢になり得ますし、それでもクエリコストを節約できる可能性は十分にあります。

コストを抑えるには、データのエクスポートをパーティション境界に合わせ、パーティション単位でGCSへ直接エクスポートできるようにするのが基本的に望ましいです(コマンド例は前述参照)。たとえば1時間単位でパーティション分割している場合、その時間分のデータを丸ごと取得できるタイミングでスケジュールを実行します。

では実際に、この仕組みがどう動くのかを見ていきましょう!

動作する例

例として、BigQuery内に以下のスキーマを持つごく基本的なテーブルを作成しました:

クエリを高速化するため、transaction_time でパーティション分割しています。

このテーブルには毎時、その直前1時間分のデータが、漏れなくロードされます。つまり、後続のロードに含まれてくるデータが取りこぼされることはありません。これはあくまで簡略化のためであり、現実にはほぼあり得ないシナリオです。

シンプルさのため、そして良いプラクティスとして、ロードが完了したらPub/Subトピックへメッセージを送ります。そのトピックにサブスクリプションを紐づけ、Cloud Functionをトリガーする形にすれば、小さなパイプラインを別途構築せずとも処理できます。代替案として、ファイル書き込み完了時に直接Cloud Functionをトリガーして自動ロードすれば、間隔ロードよりリアルタイムに近づけることもできます。

このロード処理を行うCloud Functionと、その使い方を解説したreadmeファイルへのリンクはこちらです。

なお、コード内ではClickHouseの認証情報をSecret Managerのシークレットに格納し、それらの値を環境変数としてCloud Functionに公開しています。これはCloud Function内で認証情報にアクセスする最も簡単で、かつ安全性の高い方法のひとつです。詳細はPythonコード一式に付属するREADME.mdにまとめてあります。

最初のステップは、Cloud Functionの作成です。GCSバケットでのfinalizedイベントによってトリガーされる点に注意してください。Cloud Functionの設定は以下のようなイメージになります(サービスアカウントに関する警告にも注意し、必要な権限を必ず付与してください):

保存ボタンを押す前に、「ランタイム、ビルド、接続、セキュリティの設定」のドロップダウンを開き、セキュリティとイメージリポジトリまでスクロールします。上部に「シークレット」セクションがあるので、必要な値ごとにシークレットを追加します(下図参照)。Cloud Functionが正しく動作するよう、各シークレットでドロップダウンから「環境変数として公開」を選択してください。

この作業は、Cloud Functionが必要とするすべてのシークレット(host、port、secure、username、password)について行う必要があります。portとsecureにはそれぞれ9019とTrueというデフォルト値があるため、デフォルトを使うならシークレットの設定は不要です。また、サービスアカウントに関する警告も残してありますので、本作業の前に当該サービスアカウントがSecret Managerからの読み取り権限を持っているかを必ず確認してください。

次のステップは、このクエリをBigQueryのスケジュールクエリとして利用することです(やったことのない方向けに、公式ドキュメントはこちらです)。毎時、5分過ぎに実行するようにスケジュールします。

設定が完了したら、指定の時刻まで待ち、GCSバケットに新しいディレクトリとファイルが作成されているか確認してください。存在していれば、Cloud Functionが正常に動作しています。

この例を実運用で活用する

この例は簡略化のため、非常にシンプルかつ理想化された形になっています。実運用では、データが時間境界ぴったりに、遅延なく完璧にロードされることはまずありません。これらの条件はあえて省きましたが、ClickHouseには良い解があります。ReplacingMergeTreeCollapsingMergeTreeです。

ClickHouseチームによる優れた解説記事がこちらにあり、これらを使ったupdateやdeleteの方法が説明されています。データの利用パターンによって、どちらを採用すべきかが変わります。

さらに、通知の送信やETL/ELTジョブのトリガーなど、追加機能を組み込みたくなる場面も多いはずです。Cloud Function実行の最後にPub/Subへメッセージを送るコードを追加し、下流の処理をトリガーする構成にしておくのがおすすめです。

LookerをClickHouseに接続する

最後に、Lookerの接続先をClickHouseへ切り替える作業です。

場所が分かりにくいことがありますが、LookerのAdminモードに入り、左パネルのDatabase->Connectionsから設定できます。

Admin->Database->Connections

次に、追加ボタンを押し、接続ページでClickHouseインスタンスのホスト情報などを入力します。

ConnectionsをクリックしてDialectでClickHouseを選択すると、以下の画面が表示されます:

LookerにおけるClickHouse接続の新規作成画面

表示されたら、ご利用のインスタンスから接続情報をコピー&ペーストしてください。Aivenのコンソールであれば、たとえば以下のような画面です:

AivenのClickHouseコンソールに表示されるインスタンス接続情報

確認すべき情報は通常、JDBCコネクタ向け、またはClickHouseのHTTPSアクセス用のものです。

入力が完了したら、Lookerのテストボタンで動作を確認してください。

最後に、セキュリティ上非常に重要なのが、ClickHouseインスタンス側のIPホワイトリスト機能です。このリストに登録されたIPだけが、インスタンスへの接続を許可されます。Googleが公開しているこちらの手順に従えば、ホワイトリストに登録すべきIPアドレス一覧を取得できます。これにより、ClickHouseインスタンスへ接続できるのはLookerのIPからのみとなります。

Aivenコンソールでの許可IPアドレス表示

このパターンは、BigQueryの代わりにLookerから参照できるClickHouseインスタンスを構築する第一歩となり、クエリにかかるコストを大幅に削減できる可能性を秘めています。

あくまで初歩的な例であるため、すべてのデータロード要件にそのまま当てはまる「すぐに使える解」とはならないでしょう。しかし、BigQueryでクエリ単位の料金を払い続ける代わりに、ClickHouseへデータを「キャッシュ」してコストを削減する道のりにおいて、最初の足がかりとして位置づけていただければと思います。