Cloud Intelligence™Cloud Intelligence™

Cloud Intelligence™

AWSのデータストリーミング:選択肢が多すぎる問題

By Matthew PorterSep 21, 202117 min read

このページはEnglishDeutschEspañolFrançaisItalianoPortuguêsでもご覧いただけます。

クラウドのデータストリーミング案件に取り組んだ経験がある方なら、AWSの数あるサービスとその長大なFAQを調べているうちに、人には言えないほどブラウザのタブを開きすぎてしまった、という覚えがあるのではないでしょうか。

どのサービスを選ぶべきかを判断するためだけに、50ページものドキュメントを読み込む時間が誰にあるでしょうか?

以下は、ストリーミングデータの取り込み・変換・保存・分析を担うAWSの代表的なサービスです。本記事を読まれている頃には、すでに網羅的とは言えなくなっているかもしれません。

  • Kinesis Data Streams
  • Kinesis Firehose(オプションでLambda連携)
  • Kinesis Data Analytics
  • Managed Streaming for Apache Kafka (MSK)
  • Spark Streaming with Elastic MapReduce (EMR)
  • Glue Streaming ETL
  • IoT Core内のIoT Analytics

どれを選べばよいか迷っていたり、選定に時間を取られすぎていると感じている方は、ぜひこのまま読み進めてください。各サービスの基本と最適なユースケースを整理し、混乱を解きほぐしていきます。

記事の最後には、状況別にどれを選ぶべきかを簡潔にまとめています。

Kinesis:まず検討すべき本命

Amazonが推すデータストリーミングの本命であり、ほとんどのユースケースでまず候補に挙げるべきなのがKinesisです。ただしKinesisの中にも分かりにくい名前のサブサービスがいくつかあり、特に押さえておきたいのは次の3つです。

  • Kinesis Data Streams(以下「Streams」)
  • Kinesis Firehose(以下「Firehose」)
  • Kinesis Data Analytics(以下「Analytics」)

それぞれを順に詳しく見ていきましょう。

Kinesis Data Streams vs. Kinesis Firehose

StreamsとFirehose:機能の概要

Data StreamsもFirehoseも、データを取り込んでシンクに流し込むという点は共通しています。では、なぜ似た役割のサービスが2つ存在するのでしょうか。

StreamsとFirehoseの最大の違いは、Streamsがカスタムコンシューマー(EC2、EMR、Lambda上で動くアプリケーションなど)を備えたコンピュートサービスへデータを送り、最短で約70ミリ秒の遅延でほぼリアルタイムにデータの変換・処理を行うことを想定している点です。そのためStreamsは、リアルタイムダッシュボードや異常検知など、即時性が求められる用途に最適です。Apache Sparkとの相性もよく、より複雑な分析が必要な場合でも、ストリーミングDataFrameを通じてリアルタイム処理をシンプルに記述できます。

一方のFirehoseは、ほぼリアルタイムの配信を目的としていません。受信メッセージをバッチ化し、必要に応じてAWS Lambdaで圧縮や変換を加え、その後シンク(多くの場合はAWSサービス)へ流し込みます。シンク先には通常、S3、Redshift、Elasticsearchが選ばれます。

Streamsのメッセージは通常カスタムアプリケーションで処理されますが、StreamsからFirehoseストリームへデータを流すように構成することもでき、リアルタイム分析と長期保管用のバッチ保存を両立させることが可能です。

ユースケースがほぼリアルタイム処理を要求しないのであれば、Firehoseの方が適しており、扱いも簡単です。

Firehose:多くの場合はこちらが正解

では、なぜFirehoseの方が望ましい選択肢になりやすいのでしょうか。

第一に、Kinesis StreamsはJava中心のKinesis Producer Library(KPL)とKinesis Consumer Library(KCL)を用いた、それなりに本格的なコーディングを必要とします。一方Firehoseは、特定のAWSサービスへのデータシンクを主目的に設計されているため、シンク部分のコーディングはまったく不要です。Firehoseへのメッセージ送信もシンプルです。

import boto3firehose_client = boto3.client('firehose')
response = firehose_client.put_record(
    DeliveryStreamName='string',
    Record={'Data': b'bytes'} # base64-encoded
)

Kinesis Streamsについて、プロデューサー側・コンシューマー側双方でのパフォーマンス低下やその他のメリットを失うことを許容できるなら、KPLを使った本格開発の代わりに、AWS SDKでよりシンプルにメッセージを送ることもできます。

import boto3kinesis_client = boto3.client('kinesis')
response = kinesis_client.put_record(
    StreamName='string',
    Data=b'bytes', # base64-encoded
    PartitionKey='string'
)

第二に、StreamsもFirehoseもフルマネージドかつ自動スケーリングですが、Streamsは「自動スケールの粒度が粗く」、完全なサーバーレスとは言えません。

Firehoseは自動スケーリングにより、Firehose側のスループット上限に達しない限り、開発時のテストから秒間数GB規模まで、つまずくことなくシームレスに処理量を引き上げられます。

これに対しStreamsのスケーリングは少々手間がかかります。基盤インフラを直接管理する必要はないものの、Kinesisストリームには「シャード」の数を定義する必要があり、これがそのストリームの対応スループットに直結します。1シャードあたりの上限は書き込みで1MB/秒または1,000レコード/秒、読み込みで2MB/秒です。したがって、必要なスループットを確保するには一定数のシャードを事前にプロビジョニングしておかなければなりません。シャード数は手動で変更することも、自動スケーリングを設定することもできますが、後者の仕組みは本来あるべき姿よりかなり煩雑です。

つまり、スループットを上げるにはシャード数を増やすことになりますが、ここに大きな注意点があります。シャードの追加・削除には平均でおよそ30秒かかり、しかも一度に追加・削除できるのは1シャードずつです。

これが現実のシナリオでどのような影響を及ぼすか考えてみましょう。1,000シャード(書き込みスループット約1GiB/秒)のストリームを運用していて、近い将来にスループットを倍にする必要があると見込んでいる場合、追加の1,000シャードを投入してフルスケールに到達するまで8時間以上かかってしまいます。

突発的かつ予期しないデータ量の急増があり得るのなら、Kinesis Data Streamsでは十分な速さでスケールできません。

こうした事情から、ストリーミングのスループット変動が大きいユースケースでは、過剰プロビジョニングと過剰支払いを許容するか、ピーク時のスケールイベントを的確に予測して計画できる自信がない限り、Streamsはおすすめできません。Firehoseの方がスケールも開発もしやすく、後述するように料金見積もりも容易です。Kinesis Streamsは、リアルタイム分析が必須の場合に限って選ぶべきです。

StreamsとFirehose:料金体系の複雑さ比較

Firehoseはサーバーレスかつ自動スケーリング設計のため、データストリーミングの従量課金もシンプルで、以下に基づいて課金されます。

  • 月間で取り込まれたデータのGB数。さらに該当する場合は次の項目も加算されます。
  • データフォーマット変換のGB数
  • VPCに配信されたデータのGB数

一方Data Streamsの料金は予測がより難しく、次の要素に基づきます。

  • プロビジョニングされたシャードアワー数。自動スケーリングで変動するため、安定稼働を確保するには通常オーバープロビジョニングが必要になります。
  • 送信された25KB単位のPUTペイロード数
  • オプションの長期データ保持に関する変更
  • オプションの拡張ファンアウト機能。多数のコンシューマーが同じシャードから読み込む際にスループットを向上させます。

StreamsとFirehose:まとめ

基本的なデータ変換と、ストリーミングデータをデータストアにバッチでロードすることが目的で、リアルタイム処理が不要なら、データは直接Firehoseへ送るのが最適です。アプリ開発の工数を抑えたい場合や、急なインフラスケーラビリティの不安を避けたい場合も、Firehoseが向いています。

リアルタイム処理が必要ならStreamsを使うべきですが、自動スケーリングを有効にしてもピーク時のスループット急増には十分対応しきれない可能性があることは押さえておきましょう。

リアルタイム処理に加えて、後の分析用にデータを保存もしたい場合は、まずStreamsへ送り、Kinesisウェブコンソールから簡単な設定を行うだけで(Lambdaによる任意の変換ステップを挟む場合を除き)コーディング不要でそのデータをFirehoseへ転送できます。

Streamsの動作はApache Kafkaに近いとイメージすると分かりやすいでしょう。違いは、メッセージを一時的に永続ストレージに保持し、複数のコンシューマーへ転送できる点です。コンシューマーはカスタムアプリケーション(EC2やEMR)でも、AWSマネージド(Firehose)でも構いません。Firehoseは多くの場合、特定のサービス(主にAWS系のS3、Redshift、Elasticsearch)向けのバッチローダーとして使われ、必要に応じてLambdaによるサーバーレスのデータ変換を組み合わせます。

Kinesis Data Analytics:サーバーレスなウィンドウ分析

StreamsとFirehoseで、ストリーミングデータの取り込み・変換と、リアルタイム分析アプリケーション(Streams)や長期保管シンク(Firehose)への振り分けはほぼカバーできます。では、Analyticsはどんな役割を担うのでしょうか。

Data Analyticsは、Amazonがフルマネージドかつサーバーレスで提供するApache Flinkで、次のような機能を備えています。

  • Data StreamsまたはFirehoseと連携
  • そのストリーミングデータに対してSQLクエリを実行
  • 結果を別のData Stream、Firehoseストリーム、Lambdaなどへストリーミング送信
  • S3に置かれた静的なCSVやJSONファイルをSQLテーブルとして扱うこともでき、参照データとストリーミングデータのJOINが可能です

Data Analyticsの主な用途は、ストリーミングデータを時間ウィンドウごとに継続的に集計し、静的な参照データで補完することです。多くの場合はリアルタイムアラートが目的で、コードもインフラのプロビジョニングも不要、シンプルなSQLだけで実現できます。SQLの代わりにFlinkコードを書いてデプロイすることもできますが、ScalaやJavaはデータサイエンスの世界ではあまり主流でないため、保守性の観点からSQLにとどめておくのがおすすめです。

AWS公式ドキュメントには株価ティッカーデータを使ったスライディングウィンドウ分析の例がありますが、より興味深い実例として、ベルギー全土の交通速度をFirehoseに取り込み、Data AnalyticsとS3上の参照データで現在と過去の速度を比較し、SQLで渋滞の有無を判定して、Lambdaでリアルタイムアラートを送るこちらのケースを読んでみることをおすすめします。

Managed Streaming for Apache Kafka (MSK)

Kinesis Data StreamsとMSK(AWSが提供するApache Kafkaのマネージドサービス)はいずれも、高スループット・低レイテンシ・高可用性・耐障害性を備えた優れた「pub-sub」システムであり、メッセージの発行と消費を可能にします。データの取り込み・配信プラットフォームとしてのスケーラビリティと信頼性という観点では、表面上は両者に大きな差はありません。

しかし詳細を見ていくと重要な違いがあり、それらは概してKinesisに有利に働きます。

  • メッセージブローカーは増やすことしかできません。つまりMSKデプロイをスケールダウンすることは不可能です。
  • MSKはフルマネージドではあるものの、サーバーレスKafkaではありません。そのため、ある程度のクラスター設計が必要です。ブローカーを起動するゾーンとサブネット、ゾーンあたりのブローカー数、ブローカーを支えるインスタンスタイプなどを定義しなければなりません。
  • MSKはサーバーレスではないため、インスタンスを支えるEBSストレージにも料金が発生します。これもスケールダウンできません。
  • 初期構築後はインスタンスタイプを変更できません。スケールアップの選択肢はインスタンス数を増やすことだけです。
  • KinesisはAWS純正のフルマネージドかつサーバーレスなストリーミングサービスであり、当然ながらAWSサービスとの統合性に優れています。Kinesisのコンシューマーの一部はノーコードで接続できますが、MSKではすべてのコンシューマーアプリケーションを独自に構築する必要があります(EC2、EKS、EMR上、あるいはKinesis Data AnalyticsにデプロイするFlinkコードなど)。
  • Kafkaでは、特定のコンシューマー「グループ」内でパーティションから読み取れるコンシューマーは1つだけです。一方Kinesisはシャードあたり複数のコンシューマーをサポートします。

初期および継続的なクラスター設定が必要な点と、デプロイをスケールダウンできない点を踏まえると、MSKを利用した場合はKinesisと比べて短期・長期ともにDevOpsの運用負荷が大きくなります。

料金体系も異なり、これもまた概ねKinesisが有利です。

前述のとおり、Kinesis Data Streamsの料金は基本的にオンデマンドで、送信される25KB単位のPUTペイロード数と、目的のPUT/GETスループットを実現するためにプロビジョニングされたシャードアワー数にほぼ基づきます。シャード数を自動スケーリングに設定すればオンデマンドに近い課金感覚に近づけられますが、その実装はやや煩雑です。

一方MSKは、選択したインスタンスサイズと台数、それを支えるEBSボリュームのサイズに基づいて課金されます。インスタンス数もEBSボリュームサイズも縮小できないため、過剰にプロビジョニングしてしまうとクラスターを終了させない限り料金から逃れられません。ストレージは自動スケール可能ですが、インスタンスサイズと台数はそうではありません。代わりに、CloudWatchのインスタンスメトリクスや、ブローカーごとに使われているパーティション数、その他のパフォーマンス指標を継続的に監視し、状況に応じて手動でスケールしていく必要があります。さらに、クラスターのCPU使用率は60%未満に保つことが「強く推奨」されているため、コンピュートリソースに対してどうしても多めに支払うことになります。

MSKを使う大きな利点が1つあります。Kinesisが「at-least-once(少なくとも1回)」の配信であるのに対し、Kafkaは「exactly-once(ちょうど1回)」の配信を保証します。とはいえ、メッセージの重複排除に対処することは、コスト効率の良いインフラスケーラビリティを確保することよりずっと簡単です。

仮にMSKクラスターを非常に高スループットの規模で、コスト最適化しながら運用し、Kinesisより安く抑えられたとしても、ビジネスクリティカルなpub-subサービスを稼働させ続けるために費やすDevOps(高給な)人月コストを考えると、ほぼ手間なしで同等のことを実現できるフルマネージド代替を使った方が、結局はKinesisで節約できる金額の方が大きいだろうと私は確信しています。

個人的にAWS MSKをおすすめできるのは、時間・リファクタリングコスト・人的リソースの制約から、アーキテクチャを変えずにリフト&シフトせざるを得ない既存のKafkaベースアプリケーションを抱える企業に限られます。

さらに詳しく知りたい方には、こちらの「an honest AWS MSK review」をおすすめします。コメント欄も示唆に富んでおり、MSKとKinesisの料金比較に焦点が当てられています。

EMRでのSpark Streaming

AWS EMRはAmazonが提供するフルマネージド・自動スケーリング(ただしサーバーレスではない)のサービスで、オープンソースのビッグデータ処理ツール向けに書かれたスクリプトをクラスター上で実行できます。対象ツールにはApache Sparkも含まれます。

SparkはDataFrameベースの分析を可能にし、静的データセットだけでなくストリーミングデータセットにも適用できます。DataFrameに対しては一般的なプログラム的関数呼び出しで分析を行うことも、ANSI SQL準拠のSpark SQLを実行することもできます。Spark SQLでストリーミングデータを扱う感覚は、Apache FlinkやKinesis Data AnalyticsでSQLを使うのに近いものです。Sparkは、監視対象のストリーミング取り込みソースとしてApache Kafka、Apache Flume、AWS S3、AWS Kinesis Data Streamsを利用できます。

StreamsやS3との優れた組み込み連携と、PySparkによる学習コストの低さを踏まえ、次のような場合にはEMR上でDataFrameを使ったSpark Streamingをおすすめします。

特に重要な点として、Data Analyticsには次のような制約があることを知っておくべきです。

  • 1行のデータは512KB以下でなければなりません。Sparkの上限ははるかに高く、2GBです。
  • 参照データセットのサイズが1GBを超えてはなりません。Sparkには制限がありません。
  • 各アプリケーションが持てるのは、ちょうど1つのストリーミングソースと最大1つの参照データソースだけです。Sparkでは、複数のストリーミングソースと複数の静的参照データソースをJOINできます。
  • ウィンドウクエリは60分以内に収める必要があります。データは揮発性ストレージに保持され、想定外のアプリケーション中断時にはストリームが再構築される可能性があるためです。Sparkには時間ウィンドウの上限がありません。

Glue Streaming ETL

Glue Streamingは、フルマネージド・自動スケーリング・サーバーレスのSpark Streaming DataFramesサービスです。Sparkの経験があり、自前で運用するEMRクラスターやLambda関数の代わりに、このサービスでKinesisから流れてくるデータに対してカスタムの変換と分析を行いたい場合に向いています。Glue Streamingは、S3、Redshift、DynamoDBといった一般的なデータシンクへ書き出せます。

Glueでは、ウェブコンソールで指定した変換のリストを元に、ある程度Sparkコードを自動生成できるため、Sparkの深い経験は必ずしも必須ではありません。とはいえ、基本を押さえておくと役立ちます。

個人的には、このサービスは少々扱いにくいと感じています。サーバーレスのSparkは便利そうに聞こえますが、いくつかの制約があります。

  • スキーマ検出を使用している場合、ストリーミングデータのJOINは行えません。
  • Glue Streaming ETLジョブの実行中にKinesis Streamsのシャード数を変更することはできません。ジョブを停止し、Data Streamsのシャード数を変更してその操作の完了を待ち、その後ジョブを再開する必要があります。

上流側のスケーラビリティだけを考えれば、私個人としてはGlue Streaming ETLよりも、自動スケーリングを有効にした自前管理のSparkクラスターを選びます。ただし実際にテストしてみると、Glue Streaming ETLの自動スケーリングとサーバーレスというメリットがデメリットを上回ると感じる場合もあるかもしれません。

IoT Core内のIoT Analytics

IoT Analyticsの各コンポーネントが何をしてくれるのかは、本来あるべきほど分かりやすくはありません——そこで丸ごと1本の記事を書きました!Production-Scale IoT Best Practices: Implementation with AWS (part 2)。ここでは基本だけを押さえ、詳細は同記事に譲ります。

AWSへとストリーミングするIoTデバイスは、まずIoT Coreに到達します。そこからメッセージを他のサービスへ送り、独自の分析を行えます。たとえばIoT Rulesを使えば、IoT Coreのデータを次のサービスへ簡単に転送できます。

  • DynamoDB
  • Firehose(その先でDynamoDB、S3、Redshift、Elasticsearchへ流し込み)
  • Data Streams(EC2、EMR、Lambda、Firehoseへ送信可能)

このように、構築できるIoTデータフローのバリエーションは非常に幅広いことが分かります。

一方で、IoTデータの取り込みからストレージ、分析までを、完全にサーバーレス・自動スケーリング・フルマネージドな統一されたIoT中心のプラットフォーム内で処理し、SagemakerやQuicksightなど他のAWSサービスとの分析関連の連携も活用したい場合は、IoT Analyticsを選ぶべきです。IoT Coreを使えば、複数のサービスを継ぎ合わせるのではなく、単一のサービス内でIoTデータを処理できます。

IoT Analyticsのウィザードを進めると、次のものがセットアップされます。

  • IoT Channel:IoTデータが到着する場所
  • IoT Pipeline:Channelのデータを取り込み、属性に基づいてオプションでメッセージのエンリッチ・変換・フィルタを行えます
  • IoT Data Store:ストリーミングデータが保存される場所で、無期限または指定期間保存できます。裏側ではAWSが管理するS3バケットに保存されます。
  • IoT Data Set:Data Storeから作成できます。これはIoT SQLで作成されたData Storeのサブセットで、独自のデータ保持期間を持ち、オンデマンドでも定期的にも自身を再生成できます。Data Storeと同様、Data Setもマネージドバケットに CSVファイルとして保存されます。Data Setを使えば、たとえば「特定の興味深い時間帯の温度データだけを抽出する」といったカスタムフィルタに基づく静的データセットをオンデマンドで一度生成し、そのフィルタ済みデータセットを下流の分析用に無期限で保持しつつ、元の生データのストアメッセージは、組織が生データ保持とコスト効率のバランス上最も妥当と判断した保持期間に従って失効させる、といった運用が可能になります。
  • IoT Analyticsと連携するAWSサービスのうち、Quicksightなどはデータセットからのみ取得できますが、SageMakerなどはデータストアとデータセットの両方から取得できます。基本的には、すべてのサービスがデータセットから取得できると考えて問題ありません。データストアとの連携が比較的限定的であることや、IoT Analyticsで生データを無期限に保管することのコスト影響を踏まえると、本番ユースケースでは、分析やMLモデル生成に使う離散的でフィルタ済みのデータセットを作成し、生データのストアは時間の経過とともに失効させるという進め方に慣れておくのがおすすめです。ただし、組織がIoTデータの全履歴ストレージへの支払いを許容できる場合はその限りではありません。

結局どのサービスを選べばいい?

AWSのさまざまなストリーミングオプションを簡潔に説明しようとした結果、書きすぎてしまったかもしれません!以下に、どのサービスを選ぶべきかを手早くまとめます。

  • Kinesis Data Streams:EC2、EMR、Lambda上でリアルタイム分析を行う必要があり、ある程度の開発の複雑さや、スループットを素早くスケールできない点を許容できる場合。
  • Kinesis Firehose:ストリーミングデータをバッチ化し、必要に応じて変換・圧縮を行い、S3、Redshift、Elasticsearchへの長期ストレージに格納したい場合。使いやすく、サーバーレスで、即座に自動スケーリングするストリーミングデータ取り込み基盤を求めており、リアルタイムでコンシューマーに送信するのではなく、バッチで書き込む方式でも問題ない場合。
  • Kinesis Data Analytics:Data StreamsまたはFirehoseのデータに対し、SQL基本的なウィンドウ分析を行いたい場合。多くはリアルタイムアラート用途で、シンプル・サーバーレス・自動スケーリングのプラットフォームを求めるとき。
  • Managed Streaming for Apache Kafka (MSK):既存のKafkaベースのアプリケーションがあり、AWSへリフト&シフトしたい場合。時間やリソースの制約から、Kinesisを使うようにアプリを再設計できないとき。
  • Spark Streaming with EMR:複数のストリーミングソースおよび/または静的参照データセットを含むJOIN操作を伴う、Kinesis Data Streamsへの高度なウィンドウ分析が必要な場合。
  • Glue Streaming ETL:Spark Streaming with EMRと同様ですが、サーバーレス・自動スケーリングの環境でSparkのworkloadsを実行できます。接続中のGlueストリーミングジョブを停止して再起動しない限り、上流のData Streamsのシャードスケーリングは行えません
  • IoT Core内のIoT Analytics:IoTデータのストリーミングに対する、フルマネージド・サーバーレス・自動スケーリングのオールインワンな取り込み・ストレージ・分析プラットフォーム。IoT Coreを使えば、同等の機能を実現するために複数のAWSサービスを継ぎ合わせる必要がなくなります

お読みいただきありがとうございました! 最新情報はDoiT Engineering BlogDoiT Linkedin ChannelDoiT Twitter Channelでお届けしていますので、ぜひフォローしてください。キャリアにご興味のある方はhttps://careers.doit-intl.comをご覧ください。