Distributed data stream processing and edge computing A survey on resource elasticity and future directions

参考

メモ

Twitter Heronの論文を引用しているということでピックアップしてみた。

故障耐性を持ち、分散処理を実現するストリーム処理エンジンでは、処理の表現としてDAG構造を用いるものが多い印象。 一方で、宣言的なハイレベルのAPI(SQLなど)を提供するのは少数か? (かつて、分散処理ではなかったり、故障耐性を持たなかったりした時代では、CQLが台頭していたが…)

全体的に、リソースアロケーションに関する研究では、Apache Stormのカスタムスケジューラとして実装して見せる論文が多く感じられた。 ただしエッジコンピューティングの前に、そもそもストリーム処理における弾力性(柔軟で安定したスケールアウト・イン)の実現自体がまだ枯れていない印象。

以下、一応、主観的に重要そうな個所に★マークを付与。

1. Introduction

大量のデータが生まれ始めた。 以下のような項目が要因の例として挙げられる。

  • 計器化されたビジネス・プロセス
  • ユーザアクティビティ
  • ウェアラブルアシスタンス
  • ウェブサイトトラッキング
  • センサー
  • 金融
  • 会計
  • 科学計算

ユースケースとしては、IoTなどが挙げられていた。 興味深い例としては以下。

1
2
3
Rettig, L., Khayati, M., Cudré-Mauroux, P., Piórkowski, M., 2015. Online anomaly
detection over big data streams. In: IEEE International Conference on Big Data (Big
Data 2015), IEEE, Santa Clara, USA, pp. 11131122

多くのツールがデータフローアプローチを取る。 つまりオペレータの有向グラフに変換して処理する。

一方でストリームデータを一定量ためてマイクロバッチするアプローチを取るツールもある。 スケーラビリティと故障耐性を狙った工夫である。

補足:これはSpark Streamingのことを意識した記述だと理解

さらにクラウド上での実行を想定したアプローチが登場。 ★ これにより、弾力性実現を狙う。

Google Dataflow (2015)の論文が引用されていた。 異なる実行モデルが混在しているケースを扱う技術として。

Apache Edgent, https://edgent.apache.org 2017. はインターネットのエッジで動作することを想定したフレームワークである。

過去の調査として、弾力性に注力していないものもあった。

1
2
3
Zhao, X., Garg, S., Queiroz, C., Buyya, R., 2017. Software Architecture for Big Data and
the Cloud, Elsevier Morgan Kaufmann. Ch. A Taxonomy and Survey of Stream
Processing Systems.

本論文は以下の構成。

  • 2章
    • ビッグデータエコシステム。オンラインデータ処理のアーキテクチャ。
  • 3章
    • 既存のエンジン。ストリームデータ処理の他のソリューション
  • 4章
    • マネージドクラウドソリューション
  • 5章
    • 既存の技術がどのようにリソースの弾力性を実現しようとしているのか
  • 6章
    • 複数のインフラストラクチャを組み合わせる
  • 7章
    • 将来の話

2. Background and architecture

2.1. Online data processing architecture

ここでは「オンライン」という単語を用いているが、以下のように定義して用いている。

1
2
Similar to Boykin et al., hereafter use the term online to mean that “data are
processed as they are being generated”.

Fig. 1にストリーム処理アーキテクチャの典型的なコンポーネント関係図がある。(補足:当たり前の内容ではあるが参考になると思う)

コンポーネント関係図

補足:ストレージとの関係性はもう少し深堀りしてもよさそう。最近だとDatabricks DeltaやApache Hudiのようなストレージレイヤのミドルウェアが生まれているため。 ★

まず「データソース」ではデータを送り出す(ローダ)、データフォーマット、MQTTのようなプロトコルに関する議論がある。 たいてい、送り出す機能はネットワークのエッジ側に置かれる。

コンポーネントそれぞれが独立して成長できるよう、MQ(ActiveMQ、RabbitMQ、Kestrel)、Pub/Subベースのソリューション(Kafka、DistributedLog)、 あるいはマネージドサービス(Amazon Kinesis Firehose、Azure IoT Hub)が挟まれることが多い。

補足:http://bookkeeper.apache.org/distributedlog/

補足:DistributedLogは2017年にBookKeeperのサブプロジェクト化された。 https://bookkeeper.apache.org/docs/latest/api/distributedlog-api/

大規模データを扱うための技術の代表例として、MapReduceが挙げられていた。

1
2
Dean, J., Ghemawat, S. MapReduce: Simplified data processing on large clusters.
Communications of the ACM 51 (1).

本論文で扱う技術は、ストリーム管理システムだったり、CEPと呼ばれる。

補足:ここではストリーム処理とCEPは分けて言及していないようだ。

ストリーム処理のアーキテクチャはデータストレージ機能を内包することがある。 目的は、さらなる処理のためだったり、結果を分析者等に渡すためだったりする。 ★ このとき用いられるストレージは、RDBMS、KVS、インメモリデータベース、NoSQLなどなど。パブリッククラウドにもいろいろな選択肢がある。

補足:ステート管理用のストレージを持つ場合もあるだろう。 ★

2.2. Data streams and models

「データストリーム」の定義は様々。 不連続の信号、イベントログ、モニタリング情報、時系列データ、ビデオ、などなど。

2005年くらいの調査では、時系列データ、キャッシュレジスタ、回転扉あたりのデータモデルが対象だった。 一方最近ではSNS等から生じる非構造データ、準構造データが見受けられる。 データ構造としては、キー・バリュー形式を挙げており、キーがタイムスタンプである。

多くのケースで、ストリーム処理はオペレータのDAG構造を抽象化として用いる。 ★ 論理プランと物理プラン。 物理プラン側で並列度などを含め、クラスタのリソースにオペレータを割り当てることになる。

オペレータは選択性、ステートの軸でカテゴライズできる。

並列度に関するカテゴリもある。

■並列度のカテゴリ

  • パイプライン並列
    • アップストリームのオペレータは並列でタプルを処理できる
  • タスク並列
    • グラフの中には同じタプルを並列で処理できる箇所がある
  • データ並列
    • スプリッタでデータを分割し、マージャで処理されたデータを結合する

気になった論文

1
2
3
Gedik, B., Özsema, H., Öztürk, O., 2016. Pipelined fission for stream programs with
dynamic selectivity and partitioned state. J. Parallel Distrib. Comput. 96, 106120.
http://dx.doi.org/10.1016/j.jpdc.2016.05.003.

2.3. Distributed data stream processing

DBMSと対比し、DSMS(Data Stream Management System)と呼ぶことがある。 ★ DSMSは以下のようなオペレータを持つ。

  • join
  • aggregation
  • filter
  • その他分析用の機能

初期のDSMSはSQLライクなインターフェースを提供していた。

CEPはイベント間の関係を明らかにする、などの用途に用いられた。

補足:ここからストリーム処理エンジンの世代説明。一般的な説明をするときに使えそう。 ★

第1世代は、DBMSの拡張として用いられ、単一マシンで実行することを想定していた。(分散処理を考慮していなかった)

第2世代は、分散処理対応 ここでIBM System Sの登場。ゴールはスケーラビリティと効率の改善。

最近(第3世代)では、スケーラブルで故障耐性のある実行を目指している。 ただし、この手の技術は宣言的なインターフェースを持たず、ユーザがアプリケーションを書く必要がある。 UDFを用いることができる。

入力データを離散化(小さなバッチに区切り)し、マイクロバッチ処理を実行するものもあった。

ある程度まとめこまれたデータに対し、繰り返し処理を実行するものもある。 目的はスループット向上。ただし、低レイテンシの要求が厳しくない場合を対象とする。

第4世代は、エッジコンピューティングを含む。 この手の技術として挙げられていたのは以下の通り。

1
2
3
Sajjad, H.P., Danniswara, K., Al-Shishtawy, A., Vlassov, V., 2016. SpanEdge: Towards
unifying stream processing over central and near-the-edge data centers. In: IEEE/
ACM Symposium on Edge Computing (SEC), pp. 168178.

SPEとして。

1
2
3
Chan, S., 2016. Apache quarks, watson, and streaming analytics: Saving the world, one
smart sprinkler at a time. Bluemix Blog (June).URL 〈https://www.ibm.com/blogs/
bluemix/2016/06/better-analytics-with-apache-quarks/〉
1
2
3
4
Pisani, F., Brunetta, J.R., do Rosario, V.M., Borin, E., 2017. Beyond the fog: Bringing
cross-platform code execution to constrained iot devices. In: Proceedings of the 29th
International Symposium on Computer Architecture and High Performance
Computing (SBAC-PAD 2017), Campinas, Brazil, pp. 1724.

クラウドとエッジ。

1
2
3
Hirzel, M., Schneider, S., Gedik, B., An, S.P.L., 2017. extensible language for distributed
stream processing. ACM Trans. Program. Lang. Syst. 39 (1), 5:15:39. http://
dx.doi.org/10.1145/3039207, (URL 〈http://doi.acm.org/10.1145/3039207〉).

2.4. Resource elasticity

クラウドの特徴の一つは、支払えばリソースを使えること。

もうひとつはリソース弾力性である。★ オートスケーリングも可能とする。

オートスケーリングに対応するには、スケールイン・アウトしたリソースに対応する仕組みも必要だが、 オートスケーリングのポリシーも必要。

オートスケーリングの仕組みは、主にバッチ処理についてビッグデータのワークロードに関し、 いくつか提案手法が存在する。★

補足:クラウドベースのオートスケーリングについて以下の論文で触れられているらしい。

1
2
3
Tolosana-Calasanz, R., çngel Ba-ares, J., Pham, C., Rana, O.F., 2016. Resource
management for bursty streams on multi-tenancy cloud environments. Future
Gener. Comput. Syst. 55, 444459. http://dx.doi.org/10.1016/j.future.2015.03.012.

ここでのポイントは、「シビアな遅延があること」。 特にSPEでは、バッチ処理と比べてスケールイン・アウトの要件が厳しい。 例えば、処理を動かし続けるから、データロストしないでスケールイン・アウトさせるのが難しいし、 スケールイン・アウト後のオペレータの再配置が難しい。

補足:つまりバッチ処理におけるスケールアウト・インはある程度実現性があるが、ストリーム処理のスケールアウト・インはまだまだ研究段階?であると言えるのか ★

3. Stream processing engines and tools

SPEの歴史を振り返る。

3.1. Early stream processing solutions

DBMSの拡張として登場。 2000年代。

ただし近年の技術と比べて、大規模データを対象としていない。

NiagaraCQ

2000年。 XMLデータに対し、定期的に繰り返し処理を実行する。

STREAM

2004年。 CQL。Continuous Query Lang.

CQLをクエリプランにコンパイルする。 DAG構造で処理を表す。

実行時には、スケジューラがオペレータをリソースに割り当てる。

チェーンスケジューリングを用いる。

補足:このあたりではUDFについて言及されていない。

Aurora

モニタリングアプリケーション向け。

STREAMと同様に、CQSを利用可能。

トレインスケジューリングを採用。 非直線性。 入力タプルをボックスに集め、それに対し処理を実行する。これによりI/Oオペレーションを減らす。

Medusa

MedusaはAuroraをクエリ処理エンジンとして使いながら、分散処理対応させた。

UDFには対応していない。

3.2. Current stream processing solutions

2種類にわけられる。

  • オペレータグラフに基づきタプル単位で処理する(補足:こっちがいわゆるコンティニュアス・ストリーム処理)
  • 入力データをある程度の塊に区切り、マイクロバッチ処理として処理する(補足:こっちがいわゆるマイクロバッチ処理によるストリーム処理)

3.2.1. Apache Storm

アプリケーションはTopologyと呼ばれる。 データはチャンクとして取り込まれ、タスクを割り当てられたノードで処理される。 データはタプル群として扱われる。

Nimbusがマスタ機能であり、ZooKeeperと連携しながら、 タスクをスケジューリングする。

WorkerノードにはSupervisorがいて、(複数の)Workerプロセスを管理する。 Workerプロセス内では(複数の)Executorスレッドが起動され、SpoutかBoltの機能を実行する。

Spoutがデータソースからデータを取得し、Boltがタプルを処理する。

フィルタ、アグリゲーション、ジョイン、データベースクエリ、UDFなどを実行する。

並列度を制御するには、Topologyごとに各コンポーネントの並列度、 Executor数を指定する。

Workerノードを追加した際、新しいTopologyをローンチ可能。 また、Workerプロセス数やExecutorスレッド数を変更可能。

一方で、タスク数などを変更するときには、一度Topologyを止める必要がある。 ★ 特にステートを持つオペレータを用いているときは問題が複雑。

性能チューニングは、Executorの入出力キューの長さ、Workerプロセスのキューの長さを変更することで可能。

またTridentを用いたマイクロバッチ処理が可能になっているし、Summingbirdのような 抽象化層が生まれている。

3.2.2. Twitter heron

2015年の論文。

Stormの当時の欠点をいくつも改善しつつAPI互換性を実現した。

補足:なお、現在のApache Storm実装では、Heron論文で挙げられていた課題が多数解決されている。

Topologyはプロセスベースであり、デバッグしやすいさなどを重視。 ★

バックプレッシャ機能も有する。

Stormと同様にアプリケーションがDAGのTopologyで表現され、SpoutとBoltも存在。

各種コンテナサービスと連携する。 Aurora on Mesos、YARN、ECS。

Topologyがローンチされると、Topologyマスタ(TM)と複数のコンテナが立ち上がる。 ZooKeeperを用いてTMが単一であることを保証し、ほかのコンポーネントからTMを発見できるようにする。 TMはゲートウェイ機能にもなるし、スタンバイTMを用いることも可能。

各コンテナは、Heronインスタンス(HI)群、ストリームマネージャ(SM)、メトリクスマネージャ(MM)を起動する。 コンテナ間でSM同士がフルメッシュのNWを構成する。

Spout、BoltはHIが担うことになるが、HI自体はJVMである。これはStormと異なる点である。★ またHIは2スレッドで構成される。 1個目スレッドはユーザ定義の処理内容を実行し、2個目スレッドはほかのコンポーネントととの連絡を担う。

Heronトラッカ(HT)はTopology群の情報を得るための、クラスタ単位のゲートウェイである。

補足:詳しくは、 Twitter Heron論文 を参照。

3.2.3. Apache S4

2010年。

並列処理の管理にアクターモデルを用いる。 ★

プロセッシングエレメント(PE)がイベント交換と処理を担う。

分散型で、シンメトリックなアーキテクチャを採用。 プロセッシングノード(PN)は機能性に関し同一。 ZooKeeperがPNのコーディネーションのために用いられる。

アプリ開発者は、PEの処理内容を定義しないとならないが、 その際キーに基づく処理を作りこむことが多い。(キーのハッシュに基づく処理の割り当て) ほかにキーを用いないPEは、例えば入力データを扱うPEである。

3.2.4. Apache samza

2017年。

Apache Kafkaをメッセージング、YARNをデプロイメント、リソース管理、セキュリティ機能のために用いる。

データフローを定義して用いるが、コンシューマを定義する。 ネイティブではDAG構造による処理の定義に対応していない。 ★

Heronと同じくシングルスレッドモデル。

各コンテナはローカルのステート管理用のKVSを持つ。 KVSはほかのマシンに転送される。これにより故障耐性を有する。

3.2.5. Apache flink

2015年。

DAG型のデータフローを定義する。 「ストリーム」は中間結果で、「トランスフォーメーション」が複数のストリームを入力とし、複数のストリームを出力とする。

並列度はストリームとオペレータの並列度に依存する。 ストリームはパーテイションに分割され、オペレータはサブタスクに分割される。 サブタスクは異なるスレッドに割り当てられる。

ジョブマスタ(JM)とタスクマネージャ(TM)。 JMがタスクのスケジューリング、チェックポイントなどに責任を持つ。 TMがサブタスクを実行する。

ワーカはJVMであり、サブタスクごとの独立したスレッドを実行する。 ★

スロットの概念があり、Stormと異なりメモリ管理機能を有する。 ★

3.2.6. Spark streaming

2012年。

イミュータブルなデータセットの抽象表現であるResilient Distributed Datasets (RDDs) のDAGでアプリケーションを表現。 故障耐性向上のため、RDDのリネージを利用。

トラディショナルなストリーム処理エンジンでは、タプルの継続的な処理を担うオペレータのグラフによるモデルを採用しており、 故障耐性を担保するのが難しい。 そのようなエンジンでは、セクションのレプリケーション、もしくはアップストリームでのバックアップで実現する。

Spark Streamingでは故障耐性を実現するのに際し、RDDを使ったマイクロバッチ処理をベースとし、 RDDの故障耐性を利用することとした。

3.2.7. Other solutions

System S(IBM Streams)はオペレータのDAGをアプリケーション構造に採用。 ★ 構造化、非構造化データの両方に対応。 SPL(ストリーム処理言語)に対応。 ストリーム処理コア(SPC)を有効活用するようSPLのコンパイルと最適化も実施する。

SPLについては以下の論文が気になる。

1
2
3
Hirzel, M., Schneider, S., Gedik, B., An, S.P.L., 2017. extensible language for distributed
stream processing. ACM Trans. Program. Lang. Syst. 39 (1), 5:1–5:39. http://
dx.doi.org/10.1145/3039207, (URL: http://doi.acm.org/10.1145/3039207 ).

ESC(以下の論文参照)もオペレータを中心としたDAGによるアプリケーション表現を採用。 並列処理の管理にはアクタモデルを採用。 ★

1
2
3
Satzger, B., Hummer, W., Leitner, P., Dustdar, S., 2011. Esc: Towards an elastic stream
computing platform for the cloud. In: IEEE International Conference on Cloud
Computing (CLOUD), pp. 348–355.

TimeStream(2013)もDAGによるアプリケーション表現を採用。

GoogleのMillWheel。2013年。 こちらもデータ変換等のグラフを抽象化してアプリケーションを作成する。 ホストの動的なセット上で動作することを想定しており、 リソース使用状況に応じて処理単位を分割する、などの挙動をとる。

Efficient Lightweight Flexible (ELF) ストリーム処理システム(2014年)は 非中央集権的アーキテクチャを採用。 各マシンがそれぞれウェブサーバからデータを取得し、それぞれがバッファツリーにデータを保持する。 各マシンで処理されたデータは、オーバレイ構成のDHTを用いて、Reduce処理される。

4. Managed cloud systems

4.1. Amazon Web Services (AWS) Kinesis

ファイアホースを使うと、RedshiftやS3等に対し、ストリームデータを格納できる。 ★ ファイアホースはバッファリングを行う。

CloudWatchと連携すると、各サービスのメトリクスを集められる。

Amazon Kinesis Streamsはストリーム処理の仕組み。 ストリームデータはシャードに分けられて分散処理される。 シャードには固定のキャパシティが存在する。 単位時間あたりのオペレーション数やデータサイズである。

4.2. Google dataflow

プログラミングモデルであり、マネージドサービスである。 ETLなどを含むバッチ処理とストリーム処理の両方を対象とする。 ★

ステップや変換を有向グラフにしたもので処理を定義する。 「PCollection」が入出力データの抽象表現である。 PCollectionには固定サイズのデータ、もしくはストリームデータをタイムスタンプで区切ったものが与えられる。 トリガを用いていつ出力するかを制御する。

SDKもある。Apache Beamプロジェクト配下でリリースされている。

Dataflowサービスは、ジョブを実行するためにGCE、GCSを管理する。

オートスケールや動的なリバランス機能を使って、オンザフライでリソースアロケーションを調整できる。 (論文当時)バッチ処理のオートスケールはリリースされていたが、ストリーム処理のオートスケールは実験段階だった。

補足:ステートフルなオペレータでも問題なくスケールアウト・インされるのか?要確認。 ★

4.3. Azure stream analytics

Azure Stream Analytics(ASA)のジョブ定義は、 入力、クエリ、出力で定義される。

Azure Event Hub、Azure IoT Hub、Blobサービスなどとの連携も可能。

T-SQLの亜種を用いた分析処理が可能。

書き出し(Sink)は色々と対応している。 Blob、Table、Azure SQL DB、Event Hub、Service Queueなど。

本論文執筆時点で、UDFには対応していない。

Streaming Units(SU)の単位で分散処理される。 なお、SUはCPU、メモリなどを包含した単位。

5. Elasticity in stream processing systems

弾力性は、モニタリング、分析、計画、実行(MAPE)プロセスで実現される。

弾力性の実現には互いに関係する2種類の課題が存在する。

  • アプリケーションワークロードに合致するITリソースを確保したり、解放したりする:elastic resource management
  • 追加・解放されたリソースに合わせ、アプリケーションを調整する

スケールアウト・イン計画に基づき、アプリケーション調整を行うのをelasticity actionsと呼ぶ。

水平、垂直の両方向のスケールの仕方が存在する。

オートスケールに対応するため、アプリケーションには調整が必要になることがある。 例えば、実行グラフの調整、中間クエリの並列度の調整、など。 ★

水平分散はしばしば、処理グラフの適応(補足:つまり、グラフ形状の最適化)、オペレータの持つステートの出力(補足:つまりステートも引き継がないといけない)が必要になる。 ★

ウィンドウ化、もしくはシーケンス化されたオペレータを用いる場合、(ステート管理が必要になるため) 並列処理(のスケールアウト・インが)難しくなる。

ストリーム処理ではロングランなジョブを無停止で動かしたい、ということがあり、 ますますスケールアウト・インするときの扱いが難しい。 ★

弾力性の実現アプローチには、staticとonlineの2種類がある。

staticでは並列度やオペレータの位置を(補足:あらかじめというニュアンスがある?)最適化する。そのためにグラフを最適化する。

onlineではリソースプールの変更、新しいリソースを活用するためのアプリケーションの動的変更を含む。

5.1. Static techniques

2008年あたりの論文によると、初期タスクアサインと中間処理の並列度の最適化アプローチでは、 水平スケールが可能だった。

R-Storm(2015年)ではStormのカスタムスケジューラを提案。 ★ CPU、メモリなどのリソースプールに基づき、スケジューリングする。 これは要は、ナップサック問題に帰着する。 ★ 当該論文では、従来手法が計算量大きすぎるとし、ここでは 「タスクを必要リソースのベクトル、ノードをリソースバジェットのベクトル」と見立て、 ベクトル間のユークリッド距離で最適解を求める手法を提案した。 タスク間の通信にはヒューリスティックスを利用。

SBON(Stream Based Overlay Network)(2006年)では、 レイテンシを考慮しながらネットワークを構成する。 コストスペースという多次元ユークリッド空間を定義し、 適応的な最適化手法を利用する。

Zhouらが提案した手法(2006年)では通信コストを最小化する。 ロードバランスも考慮。 クエリを分割し、極力アサインされるノードを減らす。

ほかにも、 AhmadやÇetintemel (2004年)の論文では ネットワークバンド幅を最小化するアルゴリズムが提案されている。

5.2. Online techniques

ストリーム処理で弾力性を実現するには、以下の2個の要素が必要。★

  • リソース使用状況、サービスレベルのメトリクス(エンドツーエンドのレイテンシなど)などを 観測する仕組み
  • スケーリングポリシー

多くのソリューションはアプリケーション(というより、オペレーション)をブラックボックスとして扱い、 メトリクスから最適化を行う。

SattlerやBeierはこれらの手法は信頼性向上にもつながる、としている(2013) その観点でいえば、タスクがボトルネックになった際に、オペレーショングラフのリライトを起こすべき、となる。 (補足:つまり、入力に対して、処理が追い付かなくなってきたとき、など)

ステートフルなオペレータの最適化は非常に困難。 調整が走っている間、オペレータのステートは適切にパーティション化されていないといけない。 ★ (補足:しかもストリーム処理なのでレイテンシも気にされることになる)

補足:以下、関連論文の紹介が続く。

Fernandezらの論文(2013)では、オペレーションのステートをストリーム処理エンジンと統合するアプローチを提案した。 つまりステートはタプルの形式で定期的にチェックポイントされるようになり、 オペレータのスケールの最中、新旧のオペレータ間でリパーティションされるようになる。 ★ CPU使用状況がメトリクスとして管理され、複数のオペレータがボトルネックになっていることがわかるとスケールアウトを実行するようになっている。

T-Storm (Xu etら。2014年) = Traffic Aware Storm。 通信コストを減らすようなスケジュールをカスタムスケジューラを導入して実現。

Anielloらの論文(2013)でも同様のアプローチを提案。 オフラインで動作するスタティックなスケジューラと動的なスケジューラの両方を提案。

Lohrmannらの論文(2015)では、リソース使用を抑えつつ、与えられたエンドツーエンドの レイテンシを守るためのスケジューリングを実現する方法を提案。 ノードは均一であることを前提としている。 Rebalance(パーティション再アサイン)とResolveBottleneck(スケールアウト)の2種類の方法を使って実現。

ESCストリーム処理エンジンは、タスクスケジューリング、性能モニタ、リソースプール管理が協調して動作する。 またUDFを実行するプロセスは、ゲートウェイ機能(PEマネージャ)と実行機能(PEワーカ)の両方を内包。 マネージャはワーカのロードバランス機能を持っている。 リソース状況がひっ迫すると、PEマネージャごとPEワーカを止め、スケールアウトを実行する。

StreamCloud(2012)は、クエリをサブクエリに分け、独立した複数の計算クラスタに割り当てる。 ステートフルなオペレータに依存して分割度合いが決まる。 リソース管理の仕組みと、ロードバランスの機能を持つ。

Heinzeらの論文(2014)では、リソースの配置(再配置)を行う際に レイテンシのスパイクを考慮に入れる。 オペレータの配置は、「インクリメンタルなビン・パッケージング問題」とみなすことができる。 ここでは主にCPU使用率やキャパシティを対象とする。(メモリやネットワークも考慮はするようだ)

補足:ここからIBM Streams(System S?)の関連論文が数件続く。

Gedikらの論文(2014) では、IBM Streamsを対象として自動並列化を狙った。 ステートフルなオペレータも含むエラスティックな自動並列化を提案。 スプリッタにおけるブロッキング時間やスループットをメトリクスとし、並列度を決める。 データスプリッタでは、ステートレスの場合はラウンドロビンに従い、 そうでなければハッシュベースになる。

Tang and Gedik (2013) らの論文では、IBM Streamsを対象とし、 タスクやパイプラインの並列化を提案。 オペレータのパイプラインのスレッドを管理。スレッドを追加することでスループットを向上させる。

Gedik等の論文(2016) では、IBM Streamsを対象とし、パイプライン並列、データ並列の両方を実現する手法を提案。 チェーンのようにつながったデータフローグラフを分割する。 その際、オペレータがレプリケート可能かどうかを考慮し、並列化する。

Wu and Tanの論文 (2015) では、以下の点に関する検討を実施。

  • 大きなサイズのステートを管理
  • ワークロードの変動
  • マルチテナントの実現

彼らはChronoStreamを提案。 弾力性とオペレータマイグレーションの実現のため、アプリケーションレベルのステートを 計算ノードレベルのステートに分割し、スライス化する。 スライス化されたステートは、チェックポイントされ、ほかのノードに複製される。 スライスごとに計算の進捗は管理される。(このあたりがSparkのDStreamと異なる)

Xuらの論文(2016)では、Stela(Stream Processing Elasticity)を提案。 スケールアウト・インの後のスループットを最適化し、計算がストップするのを極力防ぐ。 Expected Throughput Percentage (ETP)をメトリクスとして使用。 オペレータの処理速度が変わった場合に最終的にどんなスループットになるのか?の値。 本手法はStormのスケジューラの拡張として提供されている。

Hidalgoらの論文では、オペレータの分割で並列化を実現する。 その際、オペレータの状態を管理するため以下の2種の手法を使用 ★。

  • short-termアルゴリズム
    • トラフィックのピークを検知するために短期間のロードを観測
    • 下限上限で管理
  • long-termアルゴリズム
    • トラフィックパターンを把握するため長期間のロードを観測
    • マルコフチェーンモデルで管理

上記に基づき、当該オペレータが過負荷、安定、負荷不足なのかを予測する。 ★

ここ最近、コンテナや軽量な仮想化技術の利用に関する研究も盛んである。

Pahl and Lee らの論文(2015) では、コンテナをエッジ、クラウドコンピューティングの 弾力性を高めるための研究を実施。

Ottenwälderらの論文 (2013) では、オペレータ配置とマイグレーションを支援するため、 システムの特徴と移動体パターンの予測を用いる。 クラウドコンピューティングとフォグコンピューティングの両方を含む。 移動体は、近くのブローカに接続する。 これによりネットワークコストを低減し、エンドツーエンドのレイテンシも改善する。 オペレータのマイグレーション計画は動的に変わる。 その際タイムグラフモデルが用いられる。

以下に、ツール群の比較表を引用する。 ★

ツール比較表

5.3. Change and burst detection

入力データの変化、バーストを検知すること。(補足:というのが重要、ということ) それ自体は弾力性の実現ではないが、トリガとして関係することがある。

Zhu and Shashaの論文 (2003) では、移動ウェーブレットツリー・データ構造を用いて、 それらを検知する。ウィンドウの取り方は3種類。 ランドマーク(固定)、移動式、過去分を減衰。

参考:ウェーブレットと多重解像度処理

そのほか、Krishnamurthyらの論文(2003) では、Sketchを用いる。

参考:乱択データ構造の最新事情 -MinHash と HyperLogLog の最近の進歩-

6. Distributed and hybrid architecture

旧来の多くの分散ストリーム処理の技術は、クラスタを対象としていた。 最近の研究では、エッジを考慮したものが登場している。

インターネットのエッジであるマイクロデータセンタは、しばしば「Cloudlets」と呼ばれる。 データ転送量の低減、エンドツーエンドのレイテンシ改善、クラウドからのオフロードが目的。

ただし、多くの手法はまだ模索段階。

6.1. Lightweight virtualisation and containers

軽量な仮想化、もしくはコンテナを利用する、というのがしばしば挙がる。

Yanguiらの論文(2016)は、クラウド・フォグコンピューティングのPaaSを提案。 Cloud Foundaryを拡張。

Morabito and Beijarの論文 (2016) は、エッジ・コンピューテーション・プラットフォームを 提案した。リソースの弾力性を実現するため、コンテナを利用する。 シングルボードのコンピュータをゲートウェイとして利用する。 データ圧縮などに利用する。

Petroloらの論文(2016)も、同様にゲートウェイデザインを提案。 ワイヤレスセンサーネットワーク向け。

Hochreinerらの論文 (2016) では、エラスティックなストリーム処理の仕組みとして、VIennaエコシステムを提案。 プロセッシンググラフを書くためのグラフィカルなユーザインターフェースも供える。

6.2. Application placement and reconfiguration

モバイルクラウドとヘテロジーニアスメモリに関するGaiらの論文(2016) では、 ハイブリッドシナリオでのタスクスケジューリングについて触れられている。

Benoitらの論文 (2013)、Roh等の論文(2017)では、ヘテロジーニアス・ハードウェアやハイブリッドアーキテクチャにおける コンピュータリソース使用に関するスケジューリングについて取り扱っている。

Cardelliniらの論文 (2016)では、 Optimal Distributed Stream Processing Problem (ODP)向けに ヘテロジーニアスなリソースを考慮した integer programming formulation を提案。 Stormを拡張し、ODPベースのスケジューラを採用。 ネットワークレイテンシを推測。

地理分散されたVMに、ストリーム処理のオペレータを配置する問題は、NP困難(ないし、NP完全)である。(2016、2017) しかし、コストアウェアなヒューリスティックは提案されている。(Gu et al., 2016; Chen et al.,)

SpanEdge(Sajjad et al. (2016) )は、中央とエッジのデータセンタを使うストリーム処理ソリューション。 マスタ・ワーカ構成のアーキテクチャであり、hubワーカ(中央)とspokeワーカ(エッジ)を持つ。 スケジューラはローカルタスクを各エッジ上のワーカで実行しようとする。 ★

Mehdipourらの論文 (2016) は、クラウドとフォグの間の通信を最小化するように動く。 IoTデバイスからのデータを処理する。

Shenらの論文(2015) は、CiscoのConnected Streaming Analytics(CSA)を 利用する。データセンタとエッジの両方を利用する。 ★ CSAが継続的クエリのためのクエリ言語を提供する。

Geelytics(Chengらの論文 2016)は、IoT環境のためのシステム。複数の地理分散されたデータProducer、結果Consumer、計算リソースを活用する。 それらはクラウドもしくはネットワークエッジのいずれかに置かれる。 マスタ・ワーカアーキテクチャであり、Pub/Subモデルのサービスを採用。 アプリはオペレータのDAG。 スコープ限定されたタスクが特徴的。 各タスクのスコープの粒度をユーザが指定可能。 データProducerの地理情報に基づきresultタスクを配置する。

7. Future directions

大きな組織におけるビッグデータソリューションは、 バッチとオンラインの両方を有することになる。 ★ また複数のデータセンタにまたがることもあり、それらの中で弾力的にリソースを使用できることが理想的である。

ここでは将来の展望を述べる。

7.1. SDN and in-transit processing

SDN等を通じて、ネットワーク経路の途中で計算を行う。 このアプローチは、セキュリティとリソース管理の課題をはらむ。 例えば、IoTデバイスからデータセンタの経路の途中で計算させることは、攻撃可能な個所を増やすことにつながる。 また状況に合わせたリソース配置も困難だ。

既存の多くの研究では、複数のオペレータの配置においては、 レイテンシやバンド幅などのネットワーク・メトリクスに着目する。 一方で、ネットワークがプログラム可能であることは考慮されていない。

コグニティブ・アシスタンスのユースケースでは、データセンターで機械学習モデルが学習され、 その後エッジ側で推論される。 ★ このときのひとつの課題は、結果的に生じるコンセプト変動である。

7.2. Programming models for hybrid and highly distributed architecture

Boykinら(2014)、Google Cloud Dataflow(2015)のように、ハイレベルの抽象化を実現したものがある。 これらのソリューションは単一のクラスタやデータセンタでの利用に限られているが、 一部これをエッジコンピューティングまで拡張する取り組みが生じている。 そのような条件下で、リソース使用を効率的に管理する手法に関する研究が多く行われるだろう。

Apache Beam(2016)のプロジェクトでは、ユニファイドなSDKを提供。 Apache SparkやApache Flinkに対応。 一方で、エッジ・フォグ・データセンタコンピューティングを横断するユニファイドなSDKはない。

Apache Edgent (2017) がインターネット・エッジでの計算・分析を可能にする。 ★

参考:Apache Edgent

共有

WSLからVagrantを使う

参考

メモ

基本的には、 (公式)Vagrant and Windows Subsystem for Linux の通り進める。

まずWindowsとWSLの両方に、Vagrantをインストールする。 両方のバージョンが一致するように気を付ける。

共有

ZooKeeperをスーパバイザでセルフヒーリングする

参考

メモ

Heron公式ドキュメントを参照していて改めて気づいたのだが、 ZooKeeperはSupervisor機能で自浄作用を持たせた方がよいだろう。

公式ドキュメントの zookeeperAdmin.html#sc_supervision では、 daemontoolsやSMFをスーパバイザの例として挙げていた。

共有

Hexoでふっとノートを用いる

参考

#メモ

最初はhexo-footnoteを見つけたのだが、開発停止されているようなので、 そこで言及されていた hexo-renderer-markdown-it を利用してみることにした。

院すとーすして、以下のようなコンフィグレーションを _config.yml に追加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
markdown:
render:
html: true
xhtmlOut: false
breaks: false
linkify: false
typographer: false
#quotes: '“”‘’'
plugins:
- markdown-it-abbr
- markdown-it-footnote
- markdown-it-ins
- markdown-it-sub
- markdown-it-sup
anchors:
level: 2
collisionSuffix: 'v'
permalink: true
permalinkClass: header-anchor
permalinkSymbol: ¶

これでフットノートが使えるようになった。 しかし、アンカーが使えないのだがまだ詳しく調査していない。

共有

Twitter Heronの論文

参考

論文の要点メモ

昔のメモをコピペ。

なお、 Storm vs. Heron のブログに記載の通り、 本論文当時のStormは古く、現在のStormでは解消されている(可能性のある)課題が取り扱われていることに注意が必要。

1. Introduction

Twitterでのリアルタイム・ストリーミングのユースケースの例。

  • RTAC(リアルタイム・アクティブユーザ・カウント)
    • 図1にRTACのトポロジーイメージが載っている
  • リアルタイム・エンゲージメント(ツイートや広告)

大きな課題のひとつは、デバッガビリティ。性能劣化の原因を素早く探りたい。 論理単位と物理的なプロセスを結びつけてシンプルにしたかった。

クラスタ管理の仕組み(TwitterではAuroraを使用)と組み合わせたかった。

計算リソースの効率も上げたかった。

一方で既存のアプリを書き換えたくないので、Storm API、Summingbird APIに対応させたい。

2. Related Work

挙げられていたのは以下のような技術。

  • Apache Samza
  • MillWheel(2013)
  • Photon(2013)
  • DataTorrent
  • Stormy(2012)
  • S4
  • Spark Streaming

またトラディショナルなデータベースに組み込まれたストリーム処理エンジンも挙げられていた。

  • Microsoft StreamInsight
  • IBM Inforsphere Streams
  • Oracle continuous query

要件

  • OSS
  • 性能高い
  • スケーラブル
  • Storm APIとの互換性あり

3. Motivation for Heron

3.1 Storm Background

特筆なし。Stormの論理構造の説明があるだけ。

3.2 Storm Worker Architecture: Limitations

Stormはプロセス内で複数のタスクを実行するし、それらのタスクは複数のトポロジーに属している。 これにより性能劣化などが発生しても、その原因を見極めるのが難しく、結果としてトポロジー「再起動」という 手段を取らざるを得なくなる。

ログが混ざって調査しづらい。

マシンリソースのリザーブが一律である点が辛い。 余分なリソースを確保してしまう。特に複雑なトポロジーを用いることになるハイレベルAPIで利用する場合。

大きなヒープを用いることもデバッグしづらさを助長する。

ワーカ内では、グローバルな送信スレッド、受信スレッド、コンピューティングスレッドがある。 その間で内外の通信を行う。これは無駄が多い。

3.3 Issues with the Storm Nimbus

Stormのスケジューラは、ワーカにおいてリソース使用分割をきちんと実施しない。 Storm on YARNを導入しても十分ではなかった。

ZooKeeperがワーカ数の制約になる。

NimbusはSPOF。 Nimbusがとまると新しいトポロジーを投入できないし、走っているトポロジーを止められない。 エラーが起きても気づけないし、自動的にリカバーできない。

3.4 Lack of Backpressure

Stormにはバックプレッシャ機能がなかった。

3.5 Efficiency

効率の悪さ(マシンリソースを使い切らない)が課題。 イメージで言えば…100コアのクラスタがあるとき、Utilization 90%以上のコアが30個で動いていほしいところ、Utilizaiton 30%のコアが90個になる、という具合。

4. Design Alternatives

Heronの他に選択肢がなかったのか、というと、なかった、ということが書かれている。 ポイントはStorm互換APIを持ち、Stormの欠点を克服するプロダクトがなかった、ということ。

5. Heron

5.1 Data Model and API

データモデルとAPIはStormと同様。 トポロジーは論理プラン、実際に実行されるパーティション化されたスパウトやボルトが 物理プランに相当すると考えて良い。

5.2 Architecture overview

ジェネラルなスケジューラであるAuroraを使用し、トポロジーをデプロイする。 Nimbusからの離脱。

以下のようなコンポーネントで成り立つ。

  • トポロジマスタ(TM)(スタンバイTMを起動することも可能)
  • ストリームマネージャ(SM)
  • メトリクスマネージャ(MM)
  • ヘロンインスタンス(HI)(要はスパウトとボルト)

コンテナは単独の物理ノードとして起動する。 なお、Twitterではcgroupsでコンテナを起動する。

メタデータはZooKeeperに保存する。

なお、HIはJavaで実装されている。

通信はProtocol Buffersで。

5.3 Topology Master

YARNでいうAMに近い。 ZooKeeperのエファメラルノードの機能を使い、複数のTMがマスタになるのを防ぎ、TMを探すのを助ける。

5.4 Stream Manager

HIは、そのローカルのSMを通じて通信する。 通信経路は O(k^2) である。このとき、HIの数 n は、 kよりもずっと大きいことが多いので、効率的に通信できるはずである。

補足:ひとつのマシンにひとつのSM、複数のコンテナ(つまりHI等)があるモデルを仮定。

バックプレッシャの方式には種類がある。

バックプレッシャについて。 複数のステージで構成されるケースにおいて、後段のステージの処理時間が長引いていると、 タプルを送るのが つまって バッファがあふれる可能性が生じる。 そこでそれを調整する機能が必要。

TCPバックプレッシャ。 HIの処理時間が長くなると、HIの受信バッファが埋まり始め、合わせて SMの送信バッファも埋まり始める。 これによりSMは つまり 始めているのを検知し、それを上流に伝搬する。 この仕組みは実装は簡単だが、実際にはワークしない。 HI間の論理通信経路は、SM間の物理通信経路上で、 オーバーレイ構成されるためである。

スパウトバックプレッシャについて。 TCPバックプレッシャと組み合わせて用いられる。 SMがHIの処理遅延を検知すると、 スパウトのデータ読み込みを止める。 続いてスタートバックプレッシャのメッセージを他のSMに送る。これにより読み込みが抑制される。 処理遅延が解消されると、ストップバックプレッシャのメッセージを送る。 欠点は、過剰に読み込みを抑止すること、メッセージングのオーバヘッドがあること。利点は、トポロジによらず素早く対処可能なこと。

その他、ステージ・バイ・ステージバックプレッシャについて。 これはトポロジはステージの連続からなる。 バックプレッシャを伝搬させることで必要分だけ読み込み抑制する。

Heronでは、スパウトバックプレッシャ方式を用いた。

ソケットをアプリケーションレベルのバッファに対応させ、 ハイ・ウォータマークとロー・ウォータマークを定義。 ハイ・ウォータマークを超えるとバックプレッシャが発動し、 ロー・ウォータマークを下回るまでバックプレッシャが続く。

5.5 Heron Instance

デザインにはいくつかの選択肢がある。

5.5.1 Single-threaded approach

HIはJVMであり、単独のタスクを実行する。 これによりデバッグしやすくなる。

しかし、シングルスレッドアプローチは「ユーザコードが様々な理由によりブロックする可能性がある」という欠点がある。 ブロックする理由は様々だが、例えば…

  • スリープのシステムコールを実行
  • ファイルやソケットI/Oのため読み書きのシステムコールを実行
  • スレッド同期を実行

これらのブロックは特にメトリクスの取り扱いにおいて問題になった。 つまり、仮にブロックされてしまうともし問題が起きていたとしてもメトリクスの伝搬が遅くなることがあり、 ユーザは信用を置けなくなってしまうからだ。

5.5.2 Two-threaded approach

ゲートウェイスレオッドとタスク実行スレッドの2種類で構成する。 ゲートウェイスレッドは、HIの通信管理を担う。 TCP接続の管理など。

タスク実行スレッドはユーザコードを実行する。 スパウトか、ボルトかによって実行されるメソッドが異なる。 スパウトであれば nextTuple を実行してデータをフェッチするし、 ボルトであれば execute メソッドを実行してデータを処理する。 処理されたデータはゲートウェイスレッドに渡され、ローカルのSMに渡される。 なお、その他にも送られたタプルの数などのメトリクスが取得される。

★補足:このデザインは汎用的なので、他のプロダクトにも利用できそう。

スレッド間はいくつかのキューで結ばれる。 data-in、data-out、metrics-outである。 重要なのは、data-inとdata-outは長さが決まっており、このキューがいっぱいになるとバックプレッシャ機能が有効になる仕組みになっていること。

問題は、NWがいっぱいなときにdata-outキューが溜まった状態になることだった。 これにより、生存オブジェクトがメモリ内に大量に残り、GCによる回収ができない。 このとき、もしdata-outキューからの送信よりも、先に受信が発動すると、新しいオブジェクトが生成されることになり、 GCを発動するが回収可能なオブジェクトが少ないため、さらなる性能劣化を引き起こす。

これを軽減するため、data-out、data-inキューの長さを状況に応じて増減することにした。

5.6 Metrics Manager

メトリクスマネージャは、コンテナごとに1個。

5.7 Startup Sequence and Failure Scenarios

トポロジがサブミットされてから実際に処理が開始されるまでの流れの説明。

  • スケジューラがリソースをアロケート
  • TMが起動し、ZooKeeperにエファメラルノードを登録
  • SMがZooKeeperからTMを確認し、SMと接続する。ハートビートを送り始める。
  • SMの接続が完了すると、TMはトポロジのコンポーネントをアサインする
  • SMはTMから物理プランを取得し、SM同士がつながる
  • HIが立ち上がり、ローカルSMを見つけ、物理プランをダウンロードし実行開始。
  • 故障時の対応のためTMは物理プランをZooKeeperに保存する。

いくつか故障シナリオが想定されている。

  • TMが故障した場合、ZooKeeper上の情報を使って復旧可能である。復旧後、SMは新しいTMを見つける。
  • SMが故障した場合、復旧したSMは物理プランを取得する。他のSMも新しいSMの物理プランを取得する。
  • HIが故障した場合、再起動してローカルSMにつなぎにいく。その後物理プランを取得し処理を再開する。

5.8 Architecture Features: Summary

まとめが記載されているのみ。

6. Heron in Production

プロダクションで利用するため、いくつかの周辺機能を有している。

6.1 Heron Tracker

ヘロントラッカーはZooKeeperを利用し、トポロジのローンチ、既存トポロジの停止、 物理プランの変更を追従する。 また同様にトポロジマスタを把握し、メトリクス等を取得する。

6.2 Heron UI

ヘロントラッカーAPIを利用し、UIを提供する。 論文上にはUIの例が載っている。 トポロジのDAG、コンテナ、コンポーネント、メトリクスを把握できる。

6.3 Heron Viz

メトリクスの可視化。トラッカーAPIを利用し、新しいトポロジがローンチされたことを検知し、可視化する。

ヘルスメトリクス、リソースメトリクス、コンポーネントメトリクス、ストリームマネージャメトリクス。

ヘルスメトリクスではラグや失敗などを表示する。

リソースメトリクスでは予約されたCPU、実際に使用されたCPU、同様にメモリに関する情報を扱う。またGCなども。

コンポーネントメトリクスはスパウトではエミットされたタプル数などのコンポーネントごとに固有のメトリクスを扱う。 エンドツーエンドのレイテンシも扱う。

ストリームマネージャメトリクスは、インスタンスに送受信されたタプル数やバックプレッシャ機能に関するメトリクスを扱う。

6.4 Heron@Twitter

TwitterではStormではなくHeronがデファクトスタンダードである。

ユースケースは多岐にわたるが、データ加工、フィルタリング、結合、コンテンツのアグリゲーションなどなど。 機械学習も含む。例えば、回帰、アソシエーション、クラスタリングなど。

3倍ほどのリソース使用効率を得られた。

7. Empirical Evaluation

本論文用に組まれた動作確認。 StormとHeronの比較。 Ackあり・なしの両方。

なお、計測はデータ処理が安定してから開始するようにした。 そのため、Stormでは0mq層 1 でほとんどドロップが起きていないときに計測することを意味し、 Heronではバックプレッシャが発動しておらず転送キューが溜まっていない状態での計測を意味する。

Ackありのケースについて、タプルのドロップは、

  • Storm:0mqでのドロップもしくはタイムアウト
  • Heron:タイムアウト

を要因とするものを想定する。

7.3 Word Count Topology

スパウトで高々175k単語のランダムな単語群を生成する。 それをボルトに渡し、メモリマップに保持する。

これは単純な処理なので、オーバーヘッドを計測するのに適している。

結果は、スループットで10倍〜14倍、レイテンシで5〜15倍の改善が見られた。

Heronのエンドツーエンドでのレイテンシにおけるボトルネックは、 SMのバッファでバッチ化されることであり、これは概ね数十ms程度の影響がある。

CPUコアの使用量は、2〜3倍小さくなった。 (補足:無駄にリソース確保せずに、きちんと各コアを使い切っている、というのも影響しているようだ)

Ackが有効、無効で同様の傾向。

7.4 RTAC Topology

Ack有効の場合、Stormで6Mタプル/min出すのに360コア必要だった。レイテンシは70ms。 対してHeronでは、36コアでよく、レイテンシも24msだった。

Ack無効の場合も同様の傾向。必要なCPUコア数に関し、10倍(つまり1/10のコアで良い)の改善が得られた。

8. Conclusions and Future work

Exactly oneceセマンティクスは論文執筆時点では対応されていない。 論文中では、 Trident が引用されていた。

最近のHeronはどうか?のメモ

2019/7/8現在になりどうなったかを軽く確認。

Apache incubation Heron が公式レポジトリである。 Apache Heronのコミット状況 を見る限り、2019/7/8現在も活発に活動されている。

Apache Heronのコントリビュータ を見る限り以下の様子。

  • コアの開発者はApache Pulsarの人でもある Sanjeev Kulkarniobjmagic
  • ただし最近は Ning Wang のように見える。彼はもともと2013年あたりまでGoogleでYouTubeに携わっていたようだ。

READMEによる「Update」

2019/7/8時点のREADMEによると、Mesos in AWS、Mesos/Aurora in AWS、ローカル(ラップトップ)の上でネイティブ動作するようになった。 またApache REEFを用いてApache YARN上で動作するように試みている。 slurm にも対応しようとしているとのこと。

公式ドキュメントを覗いてみる

公式ドキュメント を確認し、最近の様子を探る。

  • Python APIがある
  • UIがかなり進化している
  • スケジューラとしては、k8s、k8s with Helm、
  • メトリクス監視の仕組みには、Prometheus、Graphite、Scribeが挙げられている

Heron's Design Goals

2019/7/8現在、以下のようなゴールを掲げている。

  • 1億件/minをさばける
  • 小さなエンドツーエンド・レイテンシ
  • スケールによらず予測可能な挙動。またトラフィックのスパイクやパイプラインの輻輳が生じても予測可能な挙動。
  • Simple administration, including:
  • シンプルな運用
    • 共有インフラにデプロイ可能
    • 強力なモニタリングの仕組み
    • 細やかに設定可能
  • デバッグしやすい

商用サポートはあるのか?

2019/7/8現在、Heronの商用サポートがあるのか? →なさそうに見える。

Introduction to Apache Heron by Streamlio の通り、StreamlioがよくHeronの説明をしているように見える。 またこのスライドではユースケースとして、

  • Ads
  • Monitoring
  • Product Safety
  • Real Time Trends
  • Real Time Machine Learning
  • Real Time Business Intelligence

あたりを挙げている。参考までに。 また顧客(?)としては、

  • Twitter
  • Google
  • Stanford University
  • Machine Zone
  • Inidiana University
  • Microsoft
  • Industrial.io

を挙げている。

ただし、 Streamlioのサポートサービス を見る限り、Apache Pulsarを対象としているがApache Heronが対象に入っているようには見えない。 また Streamlioのプロセッシングエンジンに関する説明 を見ると、Apache Heronに言及しているが、あくまでStreamlioがApache Heronの 開発に携わっていた経験がStreamlioのストリーム処理エンジンの開発に生かされている件について触れられているだけである。 現在は、 Pulsar functions が彼らのコアか。


  1. 0mqの記述が読み取れることから、0.8系Stormを比較対象としたように見える。↩︎

共有

Apache Kafkaビルド時のエラー

参考

メモ

Re: Kafka Trunk Build Failure with Gradle 5.0 に記載のように Gradle 4系でエラーが出るようなので、いったん5系をインストールして実行し直した。 (現環境のUbuntuではaptインストールでのバージョンが4系だったので、 SDKMANを使って5系をインストールした)

Gradleのバージョンを上げたところ問題なく実行できた。 以下の通り。

1
$ gradle clients:test --tests org.apache.kafka.clients.producer.KafkaProducerTest

ここではGradleを使って、clientsサブプロジェクト以下の org.apache.kafka.clients.producer.KafkaProducerTest を実行する例である。

共有

Kafkaコンソールプロデューサを起点とした確認

参考

メモ

2019/6/23時点でのtrunkで再確認した内容。 ほぼ昔のメモのままコピペ・・・。

ConsoleProducer

エントリポイントとして、Kafkaのコンソールプロデューサを選ぶ。

bin/kafka-console-producer.sh:20

1
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

コンソールプロデューサの実態は、KafkaProducerである。

kafka/tools/ConsoleProducer.scala:45

1
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

なお、 ConsoleProducerにはsendメソッドが定義されており、 以下のように標準入力からデータを読み出してはメッセージを送信する、を繰り返す。

kafka/tools/ConsoleProducer.scala:54

1
2
3
4
5
do {
record = reader.readMessage()
if (record != null)
send(producer, record, config.sync)
} while (record != null)

kafka/tools/ConsoleProducer.scala:70

1
2
3
4
5
6
7
private def send(producer: KafkaProducer[Array[Byte], Array[Byte]],
record: ProducerRecord[Array[Byte], Array[Byte]], sync: Boolean): Unit = {
if (sync)
producer.send(record).get()
else
producer.send(record, new ErrorLoggingCallback(record.topic, record.key, record.value, false))
}

KafkaProducer

それでは実態であるKafkaProducerを確認する。

コンストラクタを確認する

まずクライアントのIDが定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:332

1
2
3
4
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;

続いてトランザクションIDが定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:337

1
2
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;

Kafkaのトランザクションについては、 Confluentのトランザクションに関する説明 を参照。

その後、ログ設定、メトリクス設定、キー・バリューのシリアライザ設定。 その後、各種設定値を定義する。

特徴的なところとしては、メッセージを送信するときにメッセージをいったんまとめこむ accumulator も このときに定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:396

1
2
3
4
5
6
7
8
9
10
11
12
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

他には Sendor も挙げられる。 例えば、上記で示したaccumulatorも、Sendorのコンストラクタに渡され、内部で利用される。

org/apache/kafka/clients/producer/KafkaProducer.java:422

1
this.sender = newSender(logContext, kafkaClient, this.metadata);

org/apache/kafka/clients/producer/KafkaProducer.java:463

1
2
3
4
5
6
7
8
9
10
11
12
13
14
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);

その他にはトランザクション・マネージャなども。

共有

GradleプロジェクトをIntellijでインポート

参考

メモ

昔は gradlew idea とやってIdea用のプロジェクトファイルを生成してから インポートしていたけれども、最近のIntellijは「ディレクトリを指定」してインポートすると、 Gradleプロジェクトを検知して、ウィザードを開いてくれる。

例えばApache KafkaなどのGradleプロジェクトをインポートするときには、 メニューからインポートを選択し、ディレクトリをインポートすると良い。

共有

Kafkaコンソールコンシューマを起点とした確認

参考

メモ

2019/6/23時点でのtrunkで再確認した内容。 ほぼ昔のメモのままコピペ・・・。

ConsoleConsumer

コンシューマの実装を確認するにあたっては、コンソール・コンシューマが良いのではないか。

bin/kafka-console-consumer.sh:21

1
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

から、 kafka.tools.ConsoleConsumer クラスあたりを確認すると良いだろう。

mainを確認すると、ConsumerConfigのインスタンスを生成し、runメソッドに渡すことがわかる。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:54

1
2
3
val conf = new ConsumerConfig(args)
try {
run(conf)

ここで、ConsumerConfigはコンソールからパラメータを受け取るためのクラス。 実態はrunメソッドである。

まず大事な箇所として、 KafkaConsumer クラスのインスタンスを生成している箇所がある。 ★

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:67

1
val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)

この KafkaConsumer クラスがコンシューマの実態である。 なお、コンソール・コンシューマでは、これをラップした便利クラス ConsumerWrapper が定義されており、 そちらを通じて主に使う。

またシャットダウンフックを定義している。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:75

1
addShutdownHook(consumerWrapper, conf)

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:87

1
2
3
4
5
6
7
8
9
10
11
12
13
def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig) {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
consumer.wakeup()

shutdownLatch.await()

if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}

これはグレースフルにシャットダウンするには大切。 ★

またデータ処理の実態は、以下の process メソッドである。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:77

1
try process(conf.maxMessages, conf.formatter, consumerWrapper, System.out, conf.skipMessageOnError)

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:101

1
2
3
4
def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
skipMessageOnError: Boolean) {

(snip)

処理するメッセージ最大数、出力ストリームに渡す形を整形するメッセージフォーマッタ、コンシューマのラッパ、出力ストリーム、 エラーのときに呼び飛ばすかどうかのフラグが渡される。

少し処理の中身を確認する。 まず以下のようにメッセージを取得する。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:104

1
2
3
4
    val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
consumer.receive()

(snip)

上記の receive は、先程の通り、ラッパーのreceiveメソッドである。

receiveメソッドは以下のようにコンシューマのポール機能を使い、 メッセージを取得する。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:437

1
2
3
4
5
6
7
8
9
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (!recordIter.hasNext) {
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
if (!recordIter.hasNext)
throw new TimeoutException()
}

recordIter.next
}

メッセージを取得するためのイテレータが得られるので、nextメソッドで取得する。 →イテレータのnextを使って1件ずつ取り出していることがわかる。 ★

続いて、取り出されたメッセージを ConsumerRecord 化してフォーマッタに渡す。   core/src/main/scala/kafka/tools/ConsoleConsumer.scala:118

1
2
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)

これにより出力が確定する。

KafkaConsumer

つづいて、KafkaConsumerクラスを確認する。

トピックアサイン

ここではトピックへの登録を確認する。 ★

コンシューマグループを使って自動で行うアサインメントを使用する場合。リバランスも行われる。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:936

1
2
3
  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {

(snip)

上記はトピックのコレクションを渡すメソッドだが、パターンを指定する方法もある。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:1008

1
2
3
  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {

(snip)

なお、何らかの理由(詳しくはJavadoc参照)により、リバランスが必要になったら、 第2引数に渡されているリスナがまず呼び出される。

コンシューマ・グループを利用せず、手動で行うアサインメントの場合は以下の通り。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:1083

1
public void assign(Collection<TopicPartition> partitions) {

いずれの場合も内部的には SubscriptionState が管理のために用いられる。 タイプは以下の通り。 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:72

1
2
3
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}

またコンシューマの管理を行う ConsumerCoordinator には、 updatePatternSubscription メソッドがある。

org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:192

1
2
3
4
5
6
7
public void updatePatternSubscription(Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe))
metadata.requestUpdateForNewTopics();
}

matchesSubscribedPattern を用いて、現在クラスタが抱えているトピックの中から、 サブスクライブ対象のトピックをフィルタして取得し、SubscriptionState#subscribeFromPattern メソッドを呼ぶ。 これにより、当該コンシューマの購読するトピックが更新される。 この更新はいくつかのタイミングで発動するが、例えば KafkaConsumer#poll(java.time.Duration) の 中で呼ばれる updateAssignmentMetadataIfNeeded メソッドを通じて呼び出される。

コンシューマ故障の検知

基本的にはハートビート( heartbeat.interval.ms で設定)が session.timeout.ms を 超えて届かなくなると、故障したとみなされる。 その結果、当該クライアント(この場合はコンシューマ)がグループから外され、 リバランスが起きる。

なお、ハートビートはコーディネータに対して送られる。 コーディネータは以下のように定義されている。 org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:128

1
private Node coordinator = null;

org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:692

1
2
3
4
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());

AbstractCoordinator#sendFindCoordinatorRequest メソッドが呼ばれる契機は複数あるが、 例えば、コンシューマがポールするときなどにコーディネータが更新される。

なお、コーディネータにクライアントを登録する際、 セッションタイムアウトの値も渡され、対応される。 予め定められた group.min.session.timeout.ms や group.max.session.timeout.ms を満たす セッションタイムアウトが用いられる。

セッションタイムアウトの値は、例えば以下のように設定される。

kafka/coordinator/group/GroupCoordinator.scala:146

1
doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)

この値は、最終的に MemberMetadata に渡されて用いられる。 例えばハートビートのデッドラインの計算などに用いられることになる。

kafka/coordinator/group/MemberMetadata.scala:56

1
2
3
4
5
6
7
8
9
private[group] class MemberMetadata(var memberId: String,
val groupId: String,
val groupInstanceId: Option[String],
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {

共有

The 8 Requirements of Real-Time Stream Processing

参照

メモ

MillWheel でも触れられているストリーム処理について書かれた論文。 2005年。 時代は古いが当時の考察は現在でも有用と考える。

1. Introduction

ウォール街のデータ処理量は大きい。 2005年時点で122,000msg/sec。年率2倍で増大。 しかも処理レイテンシが秒では遅い。 金融に限らず異常検知、センサーデータ活用などのユースケースでは同様の課題感だろう。

「メインメモリDBMS」、「ルールベースエンジン」などがこの手のユースケースのために再注目されたが、 その後いわゆる「ストリーム処理エンジン」も登場。

2. Eight rules for stream processing

Rule 1: Keep the Data Moving

ストレージに書き込むなどしてレイテンシを悪化させてはならない。

ポーリングもレイテンシを悪化させる。 そこでイベント駆動のアプローチを用いることがよく考えられる。

Rule 2: Query using SQL on Streams (StreamSQL)

ハイレベルAPIを用いることで、開発サイクルを加速し、保守性を上げる。

StreamSQL:ストリーム処理固有の機能を有するSQL リッチなウィンドウ関数、マージなど。

Rule 3: Handle Stream Imperfections (Delayed, Missing and Out-of-Order Data)

例えば複数のストリームからなるデータを処理する際、 あるストリームからのデータが到着しないとき、タイムアウトして結果を出す必要がある。

同じようにout-of-orderデータについてもウィンドウに応じてタイムアウトしないといけない。

Rule 4: Generate Predictable Outcomes

結果がdeterministicであり、再現性があること。

計算そのものだけではなく、対故障性の観点でも重要。

Rule 5: Integrate Stored and Streaming Data

過去データに基づき、何らかのパターンなどを見つける処理はよくある。 そのためステート管理の仕組み(データストア)は重要。

例えば金融のトレーディングで、あるパターンを見つける、など。 他にも異常検知も。

過去データを使いながら、実時間のデータにキャッチアップするケースもあるだろう。

このようにヒストリカルデータとライブデータをシームレスにつなぐ機能は重要。

Rule 6: Guarantee Data Safety and Availability

HAは悩ましい課題。 故障が発生してフェールオーバするとして、バックアップハードウェアを 立ち上げて処理可能な状態にするような待ち時間は許容できない。

そこでタンデム構成を取ることは現実的にありえる。

Rule 7: Partition and Scale Applications Automatically

ローレベルの実装を経ずに、分散処理できること。 またマルチスレッド処理可能なこと。

Rule 8: Process and Respond Instantaneously

数万メッセージ/secの量を処理する。 さらにレイテンシはマイクロ秒オーダから、ミリ秒オーダ。

そのためには極力コンポーネント間をまたぐ処理をしないこと。

3. SOFTWARE TECHNOLOGIES for STREAM PROCESSING

基本的なアーキテクチャは、DBMS、ルールベースエンジン、ストリーム処理エンジン。

DBMSはデータをストアしてから処理する。データを動かしつづけるわけではない。

またSPEはSQLスタイルでのストリームデータの処理を可能とする。

SPEとルールベースエンジンは、ブロックおよびタイムアウトを実現しやすい。 DBMSはそのあたりはアプリケーションに委ねられる。 仮にトリガを用いたとしても…。

いずれのアーキテクチャにおいても、予測可能な結果を得るためには、 deterministicな処理を実現しないといけない(例えばタイムスタンプの並びを利用、など) SPEとルールベースエンジンはそれを実現しやすいが、DBMSのACID特性は あくまでトラディショナルなデータベースのためのもであり、ストリームデータ処理のための ものではない。

状態を保存するデータストアとしてDBMSを見たとき、サーバ・クライアントモデルのDBMSは 主にレイテンシの面で不十分である。 唯一受け入れられるとしたら、アプリケーション内に埋め込めるDBMSの場合である。 一方ルールベースエンジンはストリームデータを扱うのは得意だが、 大規模な状態管理と状態に対する柔軟なクエリが苦手である。

3.3 Tabular results

このあたりにまとめ表が載っている。

共有