On-the-fly Reconfiguration of Query Plans for Stateful Stream Processing Engines

参考

メモ

Bartnik, A., Del Monte, B., Rabl, T. & Markl, V., らの論文。2019年。

多くのストリーム処理エンジンが、データ量の変動に対して対応するためにクエリの再起動を必要とし、 ステートの再構成(再度の分散化?)にコストを必要とする点に着目し、新たな手法を提案。 Apache Flinkへのアドオンとして対応し、外部のトリガを用いてオペレータの置換を行えることを示した。

Modificaton Coordinatorを導入し、RPC経由でメッセージ伝搬と同一の方法でModificationに関する情報を 各オペレータインスタンスに伝える。 またステートの保持にはCheckopintの仕組みを導入し、各オペレータインスタンスごとに ステートを永続化する。上流・下流オペレータはその状況を見ながら動作する。

多くの先行研究に対し、以下の点が特徴的であると言える。

  • 並列度の変更だけではなく、データフロー変更、オペレータの追加に対応
  • ステートサイズが小さいケースに加え、大きい場合も考慮し実証
  • ステートサイズが小さい場合で数秒、ステートサイズが大きい倍で数十秒のオーダで オペレータインスタンスのマイグレーション、オペレータの追加などに対応
  • Exactly onceセマンティクスを考慮

1 Introduction

ストリームデータの流量について。 ソーシャルメディアの昼夜での差のように予測可能なものもあるが、 スポーツや天気の影響のように予測不可能なものもある。

最近のSPE(Stream Processing Engine)は、実行中の設定変更に一部対応し始めているが、 基本的には設定変更のためには再実行が必要。 (ここではApache Flink、Apache Storm、Apache Sparkを例にとっている)

そこで、提案手法では、実行中の設定変更を可能にする。

  • ネットワークバッファのサイズ変更
  • ステートフル/レスの両オペレーションのマイグレーション(オペレータをほかのノードに移す)
  • 並列度の変更
  • 新しいオペレーションの追加
  • UDFの変更

Apache Flinkに上記内容をプロトタイプし、動作を検証。

2 Background

2.1 Data Stream Processing

近年のSPEのターゲットは、大量データの並列分散処理。 アプローチは2種類:マイクロバッチ、tuple-at-a-time。

マイクロバッチはスループット重視。

tuple-at-a-timeは入力レコードに対して、より細かな粒度での処理を可能にするが、 実際のところ物理レベルではバッチ処理のメカニズムを採用している。 1

SPEでは、基本的にはデータフローの表現にDAGを利用。 Source、Processing、Sink、それをつなぐEdgeで構成される。 Processingノードはステートフル、ステートレスのどちらもあえりえる。

2.2 Apache Flink

FlinkはParallelization Contract (PACT)プログラミングモデルを採用。 Flinkは投稿されたクエリをコンパイル(最適化含む)したうえで実行する。

Flinkはパイプライニングとバッファリングに依存する。 オペレータがデータを送信するときにはバッファを使用しバッファが一杯になったら送るようになっている。 これによりレイテンシを抑えつつ、スループットを稼ぐ。

Flinkはオペレータのフュージョンにも対応。 フュージョンされたオペレータはプッシュベースで通信する。

バックプレッシャもある。

2.3 Fault Tolerance and Checkpointing in Apache Flink

チェックポイントは、状態情報を保存し、故障の際には復旧させる。 このおかげで、メッセージの送達保証の各セマンティクスを選択できるようになる。 また、セーブポイントのメカニズムを築くことになる。 これにより、計画的に停止、再起動させることができるようになる。

Flinkの場合、セーブポイントから再開する際に、並列度を変えるだけではなく、 オペレータを追加・削除可能。

定期的なチェックポイントにより、状態情報が保存される。 状態情報が小さい場合は性能影響は小さい。(逆もしかり)

Flinkでは、プログラムエラーの際は実行を停止・再開し、最後にチェックポイントした 状態情報から再開する。 このとき、データソースはApache Kafkaのように再取得可能なものを想定している。 そうでないと、クエリの再開時にデータが一部失われることになる。

3 Protocol Description

3.1 System Model

システムの前提

Operatorはそれぞれの並列度を持つ。 論理プランは、Source、Sink、Processing OperatorでDAGを構成する。 実際にSPE上で動作する物理プランをJobと呼ぶ。

Operatorは

  • 単一の下流のOperatorにレコードを送る
  • すべての下流のOperatorにブロードキャストする
  • 何らかのパーティションルールにより送る

のいずれかの動作をする。

CheckpointのためにMarkerも送る。 Markerは定期的か、ユーザ指定により送られる。 i は、i番目のCheckpointであることを示す。 内部通信経路でMarkerを受け取ったオペレータは、スナップショットを作成して下流のオペレータにMarkerを送る。 また、スナップショットは非同期的にバックグラウンドの永続化ストレージに保存される。

ここで、SPEはExactly Onceでの処理を保証するものとする。 SPEはControllerとWorkerで構成される。

3.2 Migration Protocol Description

Modification Markerを送ることで、マイグレーションを開始する。 2

各Operatorは、eventualにMarkerを受けとり、マイグレーションに備えるが、 このとき枚グレート対象となるOperatorの上流・下流のOperatorにも影響することがポイント。 上流のOperatorは、バッファを駆使しつつ、Tupleの順序性を担保する。

3.3 Modification Protocols Description

Operatorをマイグレートするだけでなく、Operatorの追加、更新が可能。 ただし、その場合はUDFの配布が必要となる。

3.3.1 Introduction of New Operators

上流OperatorはMarkerを受け取るとバッファリングし始め、その旨下流に通知する。 すべての上流Operatorのバッファリングが開始されたら、新しいOperatorのインスタンスが起動する。 ただし、実際に起動させる前に、あらかじめUDFを配布しておく。 下流Operatorは、新しく起動されたOperatorに接続を試みる。

4 System Architecture

試したFlinkは1.3.2。 ★

4.1 Vanilla Components Overview

まずクライアントがModification ControlメッセージをCoordinatorに送る。 CoordinatorはWorkerに当該情報をいきわたらせる。 このメッセージは、通常のデータとは異なり、RPCとして送られる。 ここではActorモデルに基づく。

Flinkの場合、2個のOperator間の通信はProducer/Consumer関係の下やり取りされる。

各Operatorのインスタンスは、上流からデータを取得するための通信路に責任を持つ。

システムアーキテクチャのイメージ

4.2 Our Changes on the Coordinator Side

Modification Coordinatorは、Modificatoinに関する一切を取り仕切る。 バリデーションも含む。 例えば、現在走っているジョブに対して適用可能か?の面など。

Modificationの大まかな状態遷移は以下の通り。

Modificationのステート

Modificationに関係し、Taskの大まかな状態遷移は以下の通り。

Taskのステート

4.3 Our Changes on the Worker Side

オペレータ間の通信は、オペレータの関係に依存する。 例えば、同一マシンで動ているProducerインスタンスとConsumerインスタンスの場合はメモリを使ってデータをやり取りする。 一方、異なるマシンで動ている場合はネットワーク通信を挟む。

さて、Modificationが生じた場合、新しいConsumerが動き始めるまで、上流のインスタンスはバッファリングしないといけない。 提案手法では、ディスクへのスピル機能を含むバッファリングの仕組みを提案。それ専用のキューを設けることとした。

4.4 Query Plan Modifications

あるOperatorがModificationを実現するには、上流と下流のOperatorの合わせた対応も必要。 そこでModification CoordinatorがModificationメッセージに、関連情報全体を載せ、RPCを使って各Operatorに伝搬する。

4.4.1 Upstream Operators

Checkpointの間、上流から下流に向けて、Checkpointマーカーを直列に並べることで 故障耐性を実現する。 各オペレータは上流からのマーカーがそろうまでバッファリングを続ける。

もしオペレータをマイグレートしようとすると、このバッファもマイグレートする必要がある。 しかしこのバッファインの仕組みは、内部的な機能ではない(★要確認)ため、一定の手続きが必要。 Modificationメッセージには次のCheckpointのIDが含まれている。 このIDに該当するCheckpointが発動されたときには、マイグレート対象のオペレータの上流オペレータは CheckpointバリアのメッセージをModificationメッセージと一緒に送る。 このイベント情報は、上流オペレータがレコードをストレージにフラッシュしていることを下流に示すものとなる。 また、これを通じて、マイグレート対象となるオペレータと上流オペレータの間には仕掛中のレコードがないことを確認できる。

以上を踏まえると、Modificationを安全に進めるためには、Checkpointを待つことになる。 Checkpointインターバルは様々な要因で決まるが、例えばオペレータ数とステートの大きさに依存する。 ステートが大きく、Checkpointインターバルが大きい場合は、それだけModification開始を待たなくてはならない、ということである。

4.4.2 Target and Downstream Operators

下流のオペレータは、基本的には上流のオペレータの新しい情報を待つのみ。

5 Protocol Implementation

5.1 Operator Migration

Modification Coordinatorがトリガーとなるメッセージをソースオペレータから発出。 対象オペレータに加え、上流オペレータも特定する。(上流オペレータは、レコードをディスクにスピルする)

オペレータはcheckpointのマーカを待つ。 Checkpoint Coordinatorがマーカを発出し、マイグレート対象のオペレータの上流オペレータは 送信を止める。

各オペレータはPausing状態に移行するとともに、現在の状態情報をModification Coordinatorに送る。

さらに、下流のオペレータに新しいロケーションを伝える。

各オペレータがPaused状態に移行。 すべてのオペレータがPaused状態に移行したら、オペレータを再起動する。

その後、Modification Coordinatorが状態ロケーション?をアタッチし、タスクを実行開始する。

提案手法では、FlinkのCheckpointの仕組みを使用し、各オペレータの状態情報を取得し、アサインする。

5.2 Introduction of new Operators

オペレータのModificationと同様の流れで、新しいオペレーションの挿入にも対応する。 上流のオペレータがスピルした後、新しいオペレータが挿入される。 デプロイのペイロードには、コンパイル済のコードが含まれる。

5.3 Changing the Operator Function

Modification CoordinatorがModificationメッセージと一緒に、 新しいUDFを配布する。 Task Managerは非同期的にUFDを取得する。

またcallbackを用いて、グローバルCheckpointが完了したときに、新しいUDFを用いるようにする。

6 Evaluation

ここから先は評価となるが、ここではポイントのみ紹介する。

6.3 Workloads

3種類のワークロード、

  • 小さなステートの場合を確認するため、要素数をカウントするワークロード(SMQ)
  • 大きなステートの場合を確認するため、Nexmarkベンチマーク 3 のクエリ8(NBQ8)。(オンラインオークションのワークロード)
  • 上記SMQ、NBQ8についてオペレータのマイグレートを実施

2個めのワークロードではステートサイズが大きいので、インクリメンタルCheckpointを利用。 Flinkの場合は、埋め込みのRocksDB。

6.4 Migration Protocol Benchmark

6.4.1 Stateful Map Job Performance Drill Down

レイテンシで見るとスパイクが生じるのは、ストリーム処理のジョブがリスタートするタイミング。

オペレータのMigration中、1度3500msecのレイテンシのスパイクが生じた。 またコミュニケーションオーバヘッドもあるようだ。

概ね、秒オーダ。

6.4.2 Nexmark Benchmark Performance Drill Down

概ね、100~200秒オーダ。

ステートサイズは全部で13.5GBで、そのうち2.7GBがステート用のバックエンドに格納され、再現された。

80個のジェネレータのスループットは、Migration発生時も大きくは変わらなかった。

6.5 Introducing new operators at runtime

SMQのワークロードを用いた検証。 概ね、秒オーダ。

6.6 Replacing the operator function at runtime

SMQのワークロードを用いた検証。 概ね、秒オーダ。

スループットへの影響は小さい。

6.7 Discussion

Checkpointの同期は課題になりがち。 ステートのサイズが小さいときは高頻度で同期も可能かもしれないが、大きいときは頻度高くCheckpointすると、処理に影響が出る。 例えばステートサイズが小さい時には6秒以内にModificationを開始できたが、大きい時には60秒程度になった。

NBQ8の場合、従来のシャットダウンを伴うsavepointの仕組みと比べ、性能上の改善が見られた。

データソースが永続的であれば再取得することでExactly Onceを実現する。

実行中のジョブに対し、新しいオペレータを挿入することもできた。概ね10秒程度。

オペレータの関数を変更することもできた。概ね9秒程度。 これを突き詰めていくと、内部・外部状態に応じて挙動を変える、ということもできるようになるはず。

★補足: とはいえ、そのような挙動変更は、最初からUDF内に組み込んでおくべきとも考える。(普通に条件文を内部に入れておけば?と) 条件分岐が問題になるほどシビアなレイテンシ要求があるユースケースで、ここで挙げられているようなストリーム処理エンジンを使うとは思えない。 ★補足終わり:

7 Related work

★補足:特に気になった関連研究を以下に並べる。

  • Schneider, S.; Hirzel, M.; Gedik, B.; Wu, K.: Auto-parallelizing stateful distributed streaming applications. ACM PACT, 2012.
    • 並列度の変更に関する先行研究
    • 本論文の提案手法では、大きなステートサイズの際のexactly onceを対象としている点が異なる
  • Wu, Y.; Tan, K. L.: ChronoStream: Elastic stateful stream computation in the cloud. IEEE ICDE, 2015.
    • 弾力性の実現に関する先行研究
    • ただし別の論文の指摘によると、同期に関する課題がある
    • データフロー変更には対応していない。あくまでオペレータのマイグレーションのみ。
  • Heinze, T.; Pappalardo, V.; Jerzak, Z.; Fetzer, C.: Auto-scaling techniques for elastic data stream processing. In: IEEE ICDE Workshops. 2014. および、 Heinze, T.; Ji, Y.; Roediger, L.; Pappalardo, V.; Meister, A.; Jerzak, Z.; Fetzer, C.: FUGU: Elastic Data Stream Processing with Latency Constraints. IEEE Data Eng. Bull., 2015.
    • オートスケールのタイミングを判断する。オンライン機械学習を利用。
    • 簡単なマイグレーションのシナリオを想定。
  • Nasir, M.; Morales, G.; Kourtellis, N.; Serafini, M.: When Two Choices Are not Enough: Balancing at Scale in Distributed Stream Processing. CoRR, abs/1510.05714, 2015.
    • 並列度の調整に関する先行研究
    • 特にホットキーが存在する場合、そこにオペレータインスタンスを割り当てるように動く。
    • データフロー変更などには対応しない。
  • Mai, L.; Zeng, K.; Potharaju, R.; Xu, L.; Venkataraman, S.; Costa, P.; Kim, T.; Muthukrishnan, S.; Kuppa, V.; Dhulipalla, S.; Rao, S.: Chi: A Scalable and Programmable Control Plane for Distributed Stream Processing Systems. VLDB, 2018.
    • 本論文で扱っているのに近い
    • ただしステートサイズが大きなケースは対象としていない

  1. Carbone, P.; Ewen, S.; Fora, G.; Haridi, S.; Richter, S.; Tzoumas, K.: State Management in Apache Flink: Consistent Stateful Distributed Stream Processing. VLDB, 2017.↩︎

  2. Del Monte, B.: Efficient Migration of Very Large Distributed State for Scalable Stream Processing. VLDB PhD Workshop, 2017.↩︎

  3. Tucker, P.; Tufte, K.; Papadimos, V.; Maier, D.: NEXMark - A Benchmark for Queries over Data Streams. 2018.↩︎

共有

Reliable stream data processing for elastic distributed stream processing systems

参考

メモ

Reliable stream data processing for elastic distributed stream processing systems では、 弾力性を有するストリーム処理エンジン(Distributed Stream Processing System: DSPS)の さらなる課題について触れている。

  • 動的にオペレータがスケールアップ、ダウンする中で、データのバックアップの一貫性を保つ故障耐性が必要がある
  • チェックポイントへのロールバックが、直近のオートスケール調整を元に戻らせる可能性がある
共有

Transactions on Large-Scale Data- and Knowledge-Centered Systems XL

参考

メモ

Transactions on Large-Scale Data- and Knowledge-Centered Systems XL の中に、 DABS-Storm A Data-Aware Approach for Elastic Stream Processing が含まれていて気になった。

共有

vim modeline format in Markrown

参考

メモ

Add vim modeline in markdown document にvimのモードラインをMarkdownファイル内で記載する方法について言及あり。

結論としては、

1
<!-- vim: set ft=markdown: -->

のように記載する。 markdown: の末尾のコロンがポイント。

共有

vim modeline fileencoding

参考

メモ

Vim の modeline で fileencoding を設定すべからず に、モードラインでfencを設定してはいけない理由が記載されている。

共有

Apache Edgent

参考

メモ

所感

JIRA公式GitHub を見る限り、ここ最近活動量が低下しているように見える。 最後のコミットが2019年4月だったり、ここ最近のコミット量が低めだったり、とか。 まじめに使うとしたら注意が必要。

ストリーム処理をエッジデバイスで…という動機は興味深い。 ただし、「エッジで動かすための特別な工夫」が何なのかを把握できておらず、 ほかのシンプルなアーキテクチャと何が違うのか、はよく考えた方がよさそう。

例えばもしエッジで「データロード」を主軸とするならば、fluentbitがコンパクトでよいのではないか、という考えもあるだろう。 一方でストリーム処理の実装という観点では、Javaの内部DSL的であるEdgentのAPIは一定の嬉しさがあるのかもしれない。 思想として、Edgentは分析のような処理までエッジで行おうとしているようだが、fluentbitはあくまで「Log Processor and Forwarder」 1 を担うものとしている点が異なると考えてよいだろう。(できる・できない、ではない)

Edgentはエッジで動かす、という割にはJavaで実装されており、 それなりにリッチなデバイス(RaspberyPiなど)を動作環境として想定しているように感じる。 プアなデバイスがリッチなデバイスの先につながっており、リッチなデバイスを経由して ストリーム処理するイメージか。

ここではエッジデバイスも含めた、ストリーム処理を考える参考として調べる。

公式ドキュメント

Apache Edgent公式ウェブサイト によると以下の通り。

つまり、ゲートウェイやエッジデバイスで実行されるランタイムのようだ。 Raspberry Pisやスマートホンが例として挙げられている。

Apache Edgent is a programming model and micro-kernel style runtime that can be embedded in gateways and small footprint edge devices enabling local, real-time, analytics on the continuous streams of data coming from equipment, vehicles, systems, appliances, devices and sensors of all kinds (for example, Raspberry Pis or smart phones). Working in conjunction with centralized analytic systems, Apache Edgent provides efficient and timely analytics across the whole IoT ecosystem: from the center to the edge.

概要を確認する

Apache Edgent Overview を確認する。 以下、ポイントを記載する。

エッジからデータセンタにすべてのデータを転送するのはコストが高すぎる。 Apache Edgent(以降Edgent)はエッジでのデータ、イベントの分析を可能にする。

Edgentを使うと、エッジ上で異常検知などを実現できる。 例えばエンジンが通常より熱い、など。 これを用いると通常時はデータセンタにデータを送らず、異常時にはデータを送るようにする、などの制御が可能。

ユースケースの例:

  • IoT
    • エッジで分析することでNWトラフィックを減らす
  • アプリケーションサーバに埋め込む
    • エラー検知をエッジで行ってNWトラフィックを減らす
  • サーバマシンあるいはマシンルーム
    • ヘルスチェック。これもNWトラフィックを減らす。

動かせる場所:

  • Java 8, including Raspberry Pi B and Pi2 B
  • Java 7
  • Android

★補足: エッジデバイスとしてどの程度貧弱な環境で動かせるのか?はひとつポイントになるだろう。 ここではAndroidやRaspberry Piを挙げているので、エッジのマシンとしては強力な部類を想定しているように見える。 ★補足おわり:

逆に、エッジでは処理しきれないケースで、データセンタにデータを送ることもできる。 例えば…

  • CPUやメモリを使う複雑な処理
  • 大きなステート管理
  • 複数のデータソースを混ぜあわせる

など。 そのために、通信手段は複数に対応。

  • MQTT
  • IBM Watson IoT Platform
  • Apache Kafka
  • カスタム・メッセージバス

Apache Edgentのケーパビリティ

The Power of Apache Edgent を確認する。

最初に載っていたサンプルは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ImpressiveEdgentExample {
public static void main(String[] args) {
DirectProvider provider = new DirectProvider();
Topology top = provider.newTopology();

IotDevice iotConnector = IotpDevice.quickstart(top, "edgent-intro-device-2");
// open https://quickstart.internetofthings.ibmcloud.com/#/device/edgent-intro-device-2

// ingest -> transform -> publish
TStream<Double> readings = top.poll(new SimulatedTemperatureSensor(), 1, TimeUnit.SECONDS);
TStream<JsonObject> events = readings.map(JsonFunctions.valueOfNumber("temp"));
iotConnector.events(events, "readingEvents", QoS.FIRE_AND_FORGET);

provider.submit(top);
}
}

IBM Watson IoT Platformに接続する。

1
IotDevice iotConnector = IotpDevice.quickstart(top, "edgent-intro-device-2");

Connectors, Ingest and Sink

イベント処理のアプリケーションは、外部からデータを読み込む(Ingest)のと、外部にデータを書き出す(Sink)が必要。 プリミティブなコネクタはすでに存在する。

もしMQTTを用いたいのだったら、上記サンプルの代わりに以下のようになる。

1
2
MqttStreams iotConnector = new MqttStreams(top, "ssl://myMqttServer:8883", "my-device-client-id");
iotConnector.publish(events, "readingEvents", QoS.FIRE_AND_FORGET, false);

同じように、対Kafkaだったら、KafkaPdocuer、KafkaConsumerというクラスがそれぞれ提供されている。

★補足: しかし、 KafkaProducer のJavadocを見ると、以下のようなコメントが記載されている。

The connector uses and includes components from the Kafka 0.8.2.2 release. It has been successfully tested against kafka_2.11-0.10.1.0 and kafka_2.11-0.9.0.0 server as well. For more information about Kafka see http://kafka.apache.org

これを見る限り、使用しているKafkaのバージョンが古い。念のために、masterブランチを確認したところ、 以下の通り、1.1.0が用いられているように見える。ドキュメントが古いのか…。

connectors/kafka/pom.xml:56

1
2
3
4
5
6
7
8
9
10
  <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion> <!-- not transitive -->
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

★補足おわり:

そのほかにも、JdbcStreamsというクラスもあるようだ。

またFileStreamsを使うと、ディレクトリ配下のファイルの監視もできる。

例としては以下のようなものが載っていた。

1
2
3
4
5
String watchedDir = "/some/directory/path";
List<String> csvFieldNames = ...
TStream<String> pathnames = FileStreams.directoryWatcher(top, () -> watchedDir, null);
TStream<String> lines = FileStreams.textFileReader(pathnames);
TStream<JsonObject> parsedLines = lines.map(line -> Csv.toJson(Csv.parseCsv(line), csvFieldNames));

directoryWatcherというメソッドを使い、トポロジを構成するようだ。 これがどの程度のものなのかは要確認。

★補足:

directoryWatcherはDirectoryWatcherのインスタンスを引数にとり、sourceメソッドを使って、 TStreamインスタンスを返す。

org/apache/edgent/connectors/file/FileStreams.java:108

1
2
3
4
public static TStream<String> directoryWatcher(TopologyElement te,
Supplier<String> directory, Comparator<File> comparator) {
return te.topology().source(() -> new DirectoryWatcher(directory, comparator));
}

DirectoryWatcherクラスでは、監視対象のディレクトリに追加されたファイルのファイル名を リストとして管理し、そのリストからイテレータを生成して用いる。 WatcherIteratorクラスは以下のようになっている。 ここで、pendingNamesがファイル名のリスト。

org/apache/edgent/connectors/file/runtime/DirectoryWatcher.java:201

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private class WatcherIterator implements Iterator<String> {

@Override
public boolean hasNext() {
return true;
}

@Override
public String next() {

for (;;) {

String name = pendingNames.poll();
if (name != null)
return name;

このリストの管理はとても簡易なものである。 一応、比較のためのComparatorは渡せるようになっているが、もしリストに載せた後にファイルが削除されたとしても、 それはこのDirectoryWatcherでは関知しないようだ。

★補足おわり:

そのほか、コマンドで監視したり…。STDOUT、STDINと連携したり、という例も載っている。

More on Ingest

複数のソースをひとつのストリームにする例が載っていた。

また、Edgentにおいてストリームの型は自由度が高く、カスタムの型も使える。 カスタムの方を使う例が載っている。

「Simulated Sensors」を見る限り、データソースのシミュレーションもある。動作確認用か? またEdgent自身ではセンサーライブラリを手掛けない。

Filtering

Topologyでpollした後、以下のようにすることでフィルタを実現できる。

1
readings = readings.filter(tuple -> tuple < 5d || tuple > 30d);

また、Filters#deadbandメソッドを用いると、より細かな(便利な?)フィルタを用いることができるようだ。 このあたり

★補足:

FiltersのJavadoc の deadband メソッドの説明を見ると、図が掲載されている。 まず線が引かれているのが「deadband」であり、基本的にはその中に含まれたポイントはフィルタされる。 ただし、「最初のポイント」、「deadbandに入った『最初のポイント』」はフィルタされない。

これは、deadbandに入った瞬間の時刻をトレースする、などのユースケースにおいて有用と考える。 (deadbandに入った最初のポイントもフィルタしてしまうと、それが分からなくなってしますため)

★補足おわり:

Split

ストリームを分割可能。

Transforms

mapメソッドでデータ変換可能。 特に分散処理でもないので、ふつうに任意の処理を書けばよいだけ。

★補足:Sparkのようにシリアライゼーション周りで悩まされることは少なそうだ

Windowing and aggregation

ウィンドウにも対応。 直近10レコード、直近10秒、などの単位で集計可能。

このウィンドウは、ウィンドウがいっぱいになるまではaggregateされないようだ。

★補足:

以下のようなイメージか。

ウィンドウのイメージ

★補足おわり:

Misc

PlumbingStreams#parallelを使うと、並列処理化できるようだ。 ストリームのタプルを分割して、マルチスレッドで処理する感じだろうか。★要確認

Getting Started

Getting Started を確認する。

Apache Edgent and streaming analytics

Edgentの基本要素は、ストリームである。 ストリームは、ローカルデバイスの制御という形で終わったり、外部出力されたりして終わる。

処理は関数のような形で表現される。Java8のlambdaみたいな感じ。

1
reading -> reading < 50 || reading > 80

基本的なフローは以下の通り。

1
2
3
* プロバイダを定義
* トポロジを定義し、処理グラフを定義
* 処理を実行

複数のトポロジからなることもあり、ほかのアプリケーションから呼び出されて実行するというケースも想定される。

(あとは、Getting Startedにはフィルタの仕方など、基本的なAPIの使い方が紹介されている。)

GitHubで確認

開発言語

公式GitHub によると、開発言語としてはJavaが90.9%だった。(2019/7/28現在)

開発言語の様子

Javaで作られているということは、やはりエッジといえど、それなりにリッチなものを想定していることがわかる。

公式ウェブサイトで紹介されていたスライドシェアを確認

Orientation in the fog: differences between stream processing in edge and cloud を確認する。

Industrial IoTについて。 そもそも産業界はコンサバである。

機器はPLCで制御される。 参考:Apache PLC4X

Industrial IoTの現場で登場する要素:PLC、センサー、アクター

イベントは区切られずに生成される。 時系列のイベントを取り出すためには、閾値、何らかのトリガ等々で区切る必要がある。 そこで、エッジでのストリーム処理。

ここからいくつかの例。

  • サイクル検知の例。シンプルな実装に始まり、後から「あ、実はジッターがあって…」などの 要件が足されていく。そして実装がきたなくなっていく…。
  • エラーロギングの例。2300bitのビットマスクを確認し、適切な例外を上げる。
  • 複雑なアクションの例。あるビットが送られてくるまで待ちアクションを取り、別のビットを待ち…の連続。

クラウドとエッジにおけるストリーム処理の違いにも触れられている。

クラウドとエッジの違い

この図でいう、「Fast」がエッジで「Easy」となっているのはよくわからない。

宣言的な表現へ。 CRUNCHの紹介。 https://github.com/pragmaticminds/crunch

ラムダアーキテクチャをエッジで…?

Apache Edgent。

以下のランタイムを有する。

  • ストリーム処理
  • リアルタイム分析
  • クラウドとの通信

以上をエッジで行う。

エッジtoクラウドのソリューションもある。 しかし、これらは最初の障壁は低いが、ベンダ手動でロックイン。

そこでApache Edgentがある。 オープンソースでベンダーフリー。 ほかのApacheプロジェクトと連携。例えば、Apache PLC4X、IoTDBなど。 将来的には…

  • さらにクラウド接続を充実させる。
  • CRUNCHとの連携
  • エッジインテグレーションパターン(?)
  • ルーティング/ルールエンジンの充実

  1. fluentbit公式ウェブサイト↩︎

共有

AI and Compute

参考

メモ

AI and Compute は、ムーアの法則を超えるペースで、計算量が増大していることを示すブログ。 以下にポイントを示す。

導入

2012年ころからAIの学習に用いられる計算量は3.5か月あたり2倍に増えてきた。 (ムーアの法則では18か月あたりに2倍)

学習に用いる計算量をペタフロップス/dayで表したグラフ に計算量増加のグラフが載っている。 AlexNetから始まり、AlphaGo Zeroまで。 30万倍になった。

概要

AIを進化させた3要素。

  • アルゴリズムの進化
  • データの改善
  • 計算可能量の増大

ここでは計算可能量の増大に着目。 特にひとつのモデルを学習するのに用いられる計算量に着目。 概ね10倍/年ペース。 これにはそれ用のカスタムハードウェア(GPU、TPUなど)の導入に加え、 計算リソースを並列処理で使い切る手法の改善も寄与している。

時代

  • 2012年より前
    • 機械学習向けにGPUを使っていなかった
  • 2012年〜2014年
    • 1〜8個程度のGPUを使用
    • 1〜2TFLOPS
  • 2014〜2016年
    • 10〜100GPUを使用
    • 5〜10TFLOPS
  • 2016〜2017年
    • TPUなどが登場
    • アルゴリズム上も並列度がとても高まった。

将来展望

多くのベンチャーがHWを開発。例えばFLOPSあたりの単価を下げる効果が期待される。 アルゴリズム側も進化を続けている。

一方で費用と物理制約の影響は大きい。 例えば、最も大きなモデルを学習するのに必要な計算リソースは100万ドル=1億円である。 一方で、現在の多くの企業が支払っているのは、学習よりも推論側である。したがって、バランスを考えると 学習にもっと支払えるはず。 世の中のHW購入のバジェットは1兆ドルといわれており、その数字に照らし合わせるとまだまだ拡大の余地が残っている。

付録

計算方法についての補足が載っている。 FLOPSを直接計算可能な場合にはそうしたし、そうでない場合は使用したGPU数などから算出した、とのこと。 多くのケースで著者にも確認したそうだ。

共有

Distributed data stream processing and edge computing A survey on resource elasticity and future directions

参考

メモ

Twitter Heronの論文を引用しているということでピックアップしてみた。

故障耐性を持ち、分散処理を実現するストリーム処理エンジンでは、処理の表現としてDAG構造を用いるものが多い印象。 一方で、宣言的なハイレベルのAPI(SQLなど)を提供するのは少数か? (かつて、分散処理ではなかったり、故障耐性を持たなかったりした時代では、CQLが台頭していたが…)

全体的に、リソースアロケーションに関する研究では、Apache Stormのカスタムスケジューラとして実装して見せる論文が多く感じられた。 ただしエッジコンピューティングの前に、そもそもストリーム処理における弾力性(柔軟で安定したスケールアウト・イン)の実現自体がまだ枯れていない印象。

以下、一応、主観的に重要そうな個所に★マークを付与。

1. Introduction

大量のデータが生まれ始めた。 以下のような項目が要因の例として挙げられる。

  • 計器化されたビジネス・プロセス
  • ユーザアクティビティ
  • ウェアラブルアシスタンス
  • ウェブサイトトラッキング
  • センサー
  • 金融
  • 会計
  • 科学計算

ユースケースとしては、IoTなどが挙げられていた。 興味深い例としては以下。

1
2
3
Rettig, L., Khayati, M., Cudré-Mauroux, P., Piórkowski, M., 2015. Online anomaly
detection over big data streams. In: IEEE International Conference on Big Data (Big
Data 2015), IEEE, Santa Clara, USA, pp. 11131122

多くのツールがデータフローアプローチを取る。 つまりオペレータの有向グラフに変換して処理する。

一方でストリームデータを一定量ためてマイクロバッチするアプローチを取るツールもある。 スケーラビリティと故障耐性を狙った工夫である。

補足:これはSpark Streamingのことを意識した記述だと理解

さらにクラウド上での実行を想定したアプローチが登場。 ★ これにより、弾力性実現を狙う。

Google Dataflow (2015)の論文が引用されていた。 異なる実行モデルが混在しているケースを扱う技術として。

Apache Edgent, https://edgent.apache.org 2017. はインターネットのエッジで動作することを想定したフレームワークである。

過去の調査として、弾力性に注力していないものもあった。

1
2
3
Zhao, X., Garg, S., Queiroz, C., Buyya, R., 2017. Software Architecture for Big Data and
the Cloud, Elsevier Morgan Kaufmann. Ch. A Taxonomy and Survey of Stream
Processing Systems.

本論文は以下の構成。

  • 2章
    • ビッグデータエコシステム。オンラインデータ処理のアーキテクチャ。
  • 3章
    • 既存のエンジン。ストリームデータ処理の他のソリューション
  • 4章
    • マネージドクラウドソリューション
  • 5章
    • 既存の技術がどのようにリソースの弾力性を実現しようとしているのか
  • 6章
    • 複数のインフラストラクチャを組み合わせる
  • 7章
    • 将来の話

2. Background and architecture

2.1. Online data processing architecture

ここでは「オンライン」という単語を用いているが、以下のように定義して用いている。

1
2
Similar to Boykin et al., hereafter use the term online to mean that “data are
processed as they are being generated”.

Fig. 1にストリーム処理アーキテクチャの典型的なコンポーネント関係図がある。(補足:当たり前の内容ではあるが参考になると思う)

コンポーネント関係図

補足:ストレージとの関係性はもう少し深堀りしてもよさそう。最近だとDatabricks DeltaやApache Hudiのようなストレージレイヤのミドルウェアが生まれているため。 ★

まず「データソース」ではデータを送り出す(ローダ)、データフォーマット、MQTTのようなプロトコルに関する議論がある。 たいてい、送り出す機能はネットワークのエッジ側に置かれる。

コンポーネントそれぞれが独立して成長できるよう、MQ(ActiveMQ、RabbitMQ、Kestrel)、Pub/Subベースのソリューション(Kafka、DistributedLog)、 あるいはマネージドサービス(Amazon Kinesis Firehose、Azure IoT Hub)が挟まれることが多い。

補足:http://bookkeeper.apache.org/distributedlog/

補足:DistributedLogは2017年にBookKeeperのサブプロジェクト化された。 https://bookkeeper.apache.org/docs/latest/api/distributedlog-api/

大規模データを扱うための技術の代表例として、MapReduceが挙げられていた。

1
2
Dean, J., Ghemawat, S. MapReduce: Simplified data processing on large clusters.
Communications of the ACM 51 (1).

本論文で扱う技術は、ストリーム管理システムだったり、CEPと呼ばれる。

補足:ここではストリーム処理とCEPは分けて言及していないようだ。

ストリーム処理のアーキテクチャはデータストレージ機能を内包することがある。 目的は、さらなる処理のためだったり、結果を分析者等に渡すためだったりする。 ★ このとき用いられるストレージは、RDBMS、KVS、インメモリデータベース、NoSQLなどなど。パブリッククラウドにもいろいろな選択肢がある。

補足:ステート管理用のストレージを持つ場合もあるだろう。 ★

2.2. Data streams and models

「データストリーム」の定義は様々。 不連続の信号、イベントログ、モニタリング情報、時系列データ、ビデオ、などなど。

2005年くらいの調査では、時系列データ、キャッシュレジスタ、回転扉あたりのデータモデルが対象だった。 一方最近ではSNS等から生じる非構造データ、準構造データが見受けられる。 データ構造としては、キー・バリュー形式を挙げており、キーがタイムスタンプである。

多くのケースで、ストリーム処理はオペレータのDAG構造を抽象化として用いる。 ★ 論理プランと物理プラン。 物理プラン側で並列度などを含め、クラスタのリソースにオペレータを割り当てることになる。

オペレータは選択性、ステートの軸でカテゴライズできる。

並列度に関するカテゴリもある。

■並列度のカテゴリ

  • パイプライン並列
    • アップストリームのオペレータは並列でタプルを処理できる
  • タスク並列
    • グラフの中には同じタプルを並列で処理できる箇所がある
  • データ並列
    • スプリッタでデータを分割し、マージャで処理されたデータを結合する

気になった論文

1
2
3
Gedik, B., Özsema, H., Öztürk, O., 2016. Pipelined fission for stream programs with
dynamic selectivity and partitioned state. J. Parallel Distrib. Comput. 96, 106120.
http://dx.doi.org/10.1016/j.jpdc.2016.05.003.

2.3. Distributed data stream processing

DBMSと対比し、DSMS(Data Stream Management System)と呼ぶことがある。 ★ DSMSは以下のようなオペレータを持つ。

  • join
  • aggregation
  • filter
  • その他分析用の機能

初期のDSMSはSQLライクなインターフェースを提供していた。

CEPはイベント間の関係を明らかにする、などの用途に用いられた。

補足:ここからストリーム処理エンジンの世代説明。一般的な説明をするときに使えそう。 ★

第1世代は、DBMSの拡張として用いられ、単一マシンで実行することを想定していた。(分散処理を考慮していなかった)

第2世代は、分散処理対応 ここでIBM System Sの登場。ゴールはスケーラビリティと効率の改善。

最近(第3世代)では、スケーラブルで故障耐性のある実行を目指している。 ただし、この手の技術は宣言的なインターフェースを持たず、ユーザがアプリケーションを書く必要がある。 UDFを用いることができる。

入力データを離散化(小さなバッチに区切り)し、マイクロバッチ処理を実行するものもあった。

ある程度まとめこまれたデータに対し、繰り返し処理を実行するものもある。 目的はスループット向上。ただし、低レイテンシの要求が厳しくない場合を対象とする。

第4世代は、エッジコンピューティングを含む。 この手の技術として挙げられていたのは以下の通り。

1
2
3
Sajjad, H.P., Danniswara, K., Al-Shishtawy, A., Vlassov, V., 2016. SpanEdge: Towards
unifying stream processing over central and near-the-edge data centers. In: IEEE/
ACM Symposium on Edge Computing (SEC), pp. 168178.

SPEとして。

1
2
3
Chan, S., 2016. Apache quarks, watson, and streaming analytics: Saving the world, one
smart sprinkler at a time. Bluemix Blog (June).URL 〈https://www.ibm.com/blogs/
bluemix/2016/06/better-analytics-with-apache-quarks/〉
1
2
3
4
Pisani, F., Brunetta, J.R., do Rosario, V.M., Borin, E., 2017. Beyond the fog: Bringing
cross-platform code execution to constrained iot devices. In: Proceedings of the 29th
International Symposium on Computer Architecture and High Performance
Computing (SBAC-PAD 2017), Campinas, Brazil, pp. 1724.

クラウドとエッジ。

1
2
3
Hirzel, M., Schneider, S., Gedik, B., An, S.P.L., 2017. extensible language for distributed
stream processing. ACM Trans. Program. Lang. Syst. 39 (1), 5:15:39. http://
dx.doi.org/10.1145/3039207, (URL 〈http://doi.acm.org/10.1145/3039207〉).

2.4. Resource elasticity

クラウドの特徴の一つは、支払えばリソースを使えること。

もうひとつはリソース弾力性である。★ オートスケーリングも可能とする。

オートスケーリングに対応するには、スケールイン・アウトしたリソースに対応する仕組みも必要だが、 オートスケーリングのポリシーも必要。

オートスケーリングの仕組みは、主にバッチ処理についてビッグデータのワークロードに関し、 いくつか提案手法が存在する。★

補足:クラウドベースのオートスケーリングについて以下の論文で触れられているらしい。

1
2
3
Tolosana-Calasanz, R., çngel Ba-ares, J., Pham, C., Rana, O.F., 2016. Resource
management for bursty streams on multi-tenancy cloud environments. Future
Gener. Comput. Syst. 55, 444459. http://dx.doi.org/10.1016/j.future.2015.03.012.

ここでのポイントは、「シビアな遅延があること」。 特にSPEでは、バッチ処理と比べてスケールイン・アウトの要件が厳しい。 例えば、処理を動かし続けるから、データロストしないでスケールイン・アウトさせるのが難しいし、 スケールイン・アウト後のオペレータの再配置が難しい。

補足:つまりバッチ処理におけるスケールアウト・インはある程度実現性があるが、ストリーム処理のスケールアウト・インはまだまだ研究段階?であると言えるのか ★

3. Stream processing engines and tools

SPEの歴史を振り返る。

3.1. Early stream processing solutions

DBMSの拡張として登場。 2000年代。

ただし近年の技術と比べて、大規模データを対象としていない。

NiagaraCQ

2000年。 XMLデータに対し、定期的に繰り返し処理を実行する。

STREAM

2004年。 CQL。Continuous Query Lang.

CQLをクエリプランにコンパイルする。 DAG構造で処理を表す。

実行時には、スケジューラがオペレータをリソースに割り当てる。

チェーンスケジューリングを用いる。

補足:このあたりではUDFについて言及されていない。

Aurora

モニタリングアプリケーション向け。

STREAMと同様に、CQSを利用可能。

トレインスケジューリングを採用。 非直線性。 入力タプルをボックスに集め、それに対し処理を実行する。これによりI/Oオペレーションを減らす。

Medusa

MedusaはAuroraをクエリ処理エンジンとして使いながら、分散処理対応させた。

UDFには対応していない。

3.2. Current stream processing solutions

2種類にわけられる。

  • オペレータグラフに基づきタプル単位で処理する(補足:こっちがいわゆるコンティニュアス・ストリーム処理)
  • 入力データをある程度の塊に区切り、マイクロバッチ処理として処理する(補足:こっちがいわゆるマイクロバッチ処理によるストリーム処理)

3.2.1. Apache Storm

アプリケーションはTopologyと呼ばれる。 データはチャンクとして取り込まれ、タスクを割り当てられたノードで処理される。 データはタプル群として扱われる。

Nimbusがマスタ機能であり、ZooKeeperと連携しながら、 タスクをスケジューリングする。

WorkerノードにはSupervisorがいて、(複数の)Workerプロセスを管理する。 Workerプロセス内では(複数の)Executorスレッドが起動され、SpoutかBoltの機能を実行する。

Spoutがデータソースからデータを取得し、Boltがタプルを処理する。

フィルタ、アグリゲーション、ジョイン、データベースクエリ、UDFなどを実行する。

並列度を制御するには、Topologyごとに各コンポーネントの並列度、 Executor数を指定する。

Workerノードを追加した際、新しいTopologyをローンチ可能。 また、Workerプロセス数やExecutorスレッド数を変更可能。

一方で、タスク数などを変更するときには、一度Topologyを止める必要がある。 ★ 特にステートを持つオペレータを用いているときは問題が複雑。

性能チューニングは、Executorの入出力キューの長さ、Workerプロセスのキューの長さを変更することで可能。

またTridentを用いたマイクロバッチ処理が可能になっているし、Summingbirdのような 抽象化層が生まれている。

3.2.2. Twitter heron

2015年の論文。

Stormの当時の欠点をいくつも改善しつつAPI互換性を実現した。

補足:なお、現在のApache Storm実装では、Heron論文で挙げられていた課題が多数解決されている。

Topologyはプロセスベースであり、デバッグしやすいさなどを重視。 ★

バックプレッシャ機能も有する。

Stormと同様にアプリケーションがDAGのTopologyで表現され、SpoutとBoltも存在。

各種コンテナサービスと連携する。 Aurora on Mesos、YARN、ECS。

Topologyがローンチされると、Topologyマスタ(TM)と複数のコンテナが立ち上がる。 ZooKeeperを用いてTMが単一であることを保証し、ほかのコンポーネントからTMを発見できるようにする。 TMはゲートウェイ機能にもなるし、スタンバイTMを用いることも可能。

各コンテナは、Heronインスタンス(HI)群、ストリームマネージャ(SM)、メトリクスマネージャ(MM)を起動する。 コンテナ間でSM同士がフルメッシュのNWを構成する。

Spout、BoltはHIが担うことになるが、HI自体はJVMである。これはStormと異なる点である。★ またHIは2スレッドで構成される。 1個目スレッドはユーザ定義の処理内容を実行し、2個目スレッドはほかのコンポーネントととの連絡を担う。

Heronトラッカ(HT)はTopology群の情報を得るための、クラスタ単位のゲートウェイである。

補足:詳しくは、 Twitter Heron論文 を参照。

3.2.3. Apache S4

2010年。

並列処理の管理にアクターモデルを用いる。 ★

プロセッシングエレメント(PE)がイベント交換と処理を担う。

分散型で、シンメトリックなアーキテクチャを採用。 プロセッシングノード(PN)は機能性に関し同一。 ZooKeeperがPNのコーディネーションのために用いられる。

アプリ開発者は、PEの処理内容を定義しないとならないが、 その際キーに基づく処理を作りこむことが多い。(キーのハッシュに基づく処理の割り当て) ほかにキーを用いないPEは、例えば入力データを扱うPEである。

3.2.4. Apache samza

2017年。

Apache Kafkaをメッセージング、YARNをデプロイメント、リソース管理、セキュリティ機能のために用いる。

データフローを定義して用いるが、コンシューマを定義する。 ネイティブではDAG構造による処理の定義に対応していない。 ★

Heronと同じくシングルスレッドモデル。

各コンテナはローカルのステート管理用のKVSを持つ。 KVSはほかのマシンに転送される。これにより故障耐性を有する。

3.2.5. Apache flink

2015年。

DAG型のデータフローを定義する。 「ストリーム」は中間結果で、「トランスフォーメーション」が複数のストリームを入力とし、複数のストリームを出力とする。

並列度はストリームとオペレータの並列度に依存する。 ストリームはパーテイションに分割され、オペレータはサブタスクに分割される。 サブタスクは異なるスレッドに割り当てられる。

ジョブマスタ(JM)とタスクマネージャ(TM)。 JMがタスクのスケジューリング、チェックポイントなどに責任を持つ。 TMがサブタスクを実行する。

ワーカはJVMであり、サブタスクごとの独立したスレッドを実行する。 ★

スロットの概念があり、Stormと異なりメモリ管理機能を有する。 ★

3.2.6. Spark streaming

2012年。

イミュータブルなデータセットの抽象表現であるResilient Distributed Datasets (RDDs) のDAGでアプリケーションを表現。 故障耐性向上のため、RDDのリネージを利用。

トラディショナルなストリーム処理エンジンでは、タプルの継続的な処理を担うオペレータのグラフによるモデルを採用しており、 故障耐性を担保するのが難しい。 そのようなエンジンでは、セクションのレプリケーション、もしくはアップストリームでのバックアップで実現する。

Spark Streamingでは故障耐性を実現するのに際し、RDDを使ったマイクロバッチ処理をベースとし、 RDDの故障耐性を利用することとした。

3.2.7. Other solutions

System S(IBM Streams)はオペレータのDAGをアプリケーション構造に採用。 ★ 構造化、非構造化データの両方に対応。 SPL(ストリーム処理言語)に対応。 ストリーム処理コア(SPC)を有効活用するようSPLのコンパイルと最適化も実施する。

SPLについては以下の論文が気になる。

1
2
3
Hirzel, M., Schneider, S., Gedik, B., An, S.P.L., 2017. extensible language for distributed
stream processing. ACM Trans. Program. Lang. Syst. 39 (1), 5:1–5:39. http://
dx.doi.org/10.1145/3039207, (URL: http://doi.acm.org/10.1145/3039207 ).

ESC(以下の論文参照)もオペレータを中心としたDAGによるアプリケーション表現を採用。 並列処理の管理にはアクタモデルを採用。 ★

1
2
3
Satzger, B., Hummer, W., Leitner, P., Dustdar, S., 2011. Esc: Towards an elastic stream
computing platform for the cloud. In: IEEE International Conference on Cloud
Computing (CLOUD), pp. 348–355.

TimeStream(2013)もDAGによるアプリケーション表現を採用。

GoogleのMillWheel。2013年。 こちらもデータ変換等のグラフを抽象化してアプリケーションを作成する。 ホストの動的なセット上で動作することを想定しており、 リソース使用状況に応じて処理単位を分割する、などの挙動をとる。

Efficient Lightweight Flexible (ELF) ストリーム処理システム(2014年)は 非中央集権的アーキテクチャを採用。 各マシンがそれぞれウェブサーバからデータを取得し、それぞれがバッファツリーにデータを保持する。 各マシンで処理されたデータは、オーバレイ構成のDHTを用いて、Reduce処理される。

4. Managed cloud systems

4.1. Amazon Web Services (AWS) Kinesis

ファイアホースを使うと、RedshiftやS3等に対し、ストリームデータを格納できる。 ★ ファイアホースはバッファリングを行う。

CloudWatchと連携すると、各サービスのメトリクスを集められる。

Amazon Kinesis Streamsはストリーム処理の仕組み。 ストリームデータはシャードに分けられて分散処理される。 シャードには固定のキャパシティが存在する。 単位時間あたりのオペレーション数やデータサイズである。

4.2. Google dataflow

プログラミングモデルであり、マネージドサービスである。 ETLなどを含むバッチ処理とストリーム処理の両方を対象とする。 ★

ステップや変換を有向グラフにしたもので処理を定義する。 「PCollection」が入出力データの抽象表現である。 PCollectionには固定サイズのデータ、もしくはストリームデータをタイムスタンプで区切ったものが与えられる。 トリガを用いていつ出力するかを制御する。

SDKもある。Apache Beamプロジェクト配下でリリースされている。

Dataflowサービスは、ジョブを実行するためにGCE、GCSを管理する。

オートスケールや動的なリバランス機能を使って、オンザフライでリソースアロケーションを調整できる。 (論文当時)バッチ処理のオートスケールはリリースされていたが、ストリーム処理のオートスケールは実験段階だった。

補足:ステートフルなオペレータでも問題なくスケールアウト・インされるのか?要確認。 ★

4.3. Azure stream analytics

Azure Stream Analytics(ASA)のジョブ定義は、 入力、クエリ、出力で定義される。

Azure Event Hub、Azure IoT Hub、Blobサービスなどとの連携も可能。

T-SQLの亜種を用いた分析処理が可能。

書き出し(Sink)は色々と対応している。 Blob、Table、Azure SQL DB、Event Hub、Service Queueなど。

本論文執筆時点で、UDFには対応していない。

Streaming Units(SU)の単位で分散処理される。 なお、SUはCPU、メモリなどを包含した単位。

5. Elasticity in stream processing systems

弾力性は、モニタリング、分析、計画、実行(MAPE)プロセスで実現される。

弾力性の実現には互いに関係する2種類の課題が存在する。

  • アプリケーションワークロードに合致するITリソースを確保したり、解放したりする:elastic resource management
  • 追加・解放されたリソースに合わせ、アプリケーションを調整する

スケールアウト・イン計画に基づき、アプリケーション調整を行うのをelasticity actionsと呼ぶ。

水平、垂直の両方向のスケールの仕方が存在する。

オートスケールに対応するため、アプリケーションには調整が必要になることがある。 例えば、実行グラフの調整、中間クエリの並列度の調整、など。 ★

水平分散はしばしば、処理グラフの適応(補足:つまり、グラフ形状の最適化)、オペレータの持つステートの出力(補足:つまりステートも引き継がないといけない)が必要になる。 ★

ウィンドウ化、もしくはシーケンス化されたオペレータを用いる場合、(ステート管理が必要になるため) 並列処理(のスケールアウト・インが)難しくなる。

ストリーム処理ではロングランなジョブを無停止で動かしたい、ということがあり、 ますますスケールアウト・インするときの扱いが難しい。 ★

弾力性の実現アプローチには、staticとonlineの2種類がある。

staticでは並列度やオペレータの位置を(補足:あらかじめというニュアンスがある?)最適化する。そのためにグラフを最適化する。

onlineではリソースプールの変更、新しいリソースを活用するためのアプリケーションの動的変更を含む。

5.1. Static techniques

2008年あたりの論文によると、初期タスクアサインと中間処理の並列度の最適化アプローチでは、 水平スケールが可能だった。

R-Storm(2015年)ではStormのカスタムスケジューラを提案。 ★ CPU、メモリなどのリソースプールに基づき、スケジューリングする。 これは要は、ナップサック問題に帰着する。 ★ 当該論文では、従来手法が計算量大きすぎるとし、ここでは 「タスクを必要リソースのベクトル、ノードをリソースバジェットのベクトル」と見立て、 ベクトル間のユークリッド距離で最適解を求める手法を提案した。 タスク間の通信にはヒューリスティックスを利用。

SBON(Stream Based Overlay Network)(2006年)では、 レイテンシを考慮しながらネットワークを構成する。 コストスペースという多次元ユークリッド空間を定義し、 適応的な最適化手法を利用する。

Zhouらが提案した手法(2006年)では通信コストを最小化する。 ロードバランスも考慮。 クエリを分割し、極力アサインされるノードを減らす。

ほかにも、 AhmadやÇetintemel (2004年)の論文では ネットワークバンド幅を最小化するアルゴリズムが提案されている。

5.2. Online techniques

ストリーム処理で弾力性を実現するには、以下の2個の要素が必要。★

  • リソース使用状況、サービスレベルのメトリクス(エンドツーエンドのレイテンシなど)などを 観測する仕組み
  • スケーリングポリシー

多くのソリューションはアプリケーション(というより、オペレーション)をブラックボックスとして扱い、 メトリクスから最適化を行う。

SattlerやBeierはこれらの手法は信頼性向上にもつながる、としている(2013) その観点でいえば、タスクがボトルネックになった際に、オペレーショングラフのリライトを起こすべき、となる。 (補足:つまり、入力に対して、処理が追い付かなくなってきたとき、など)

ステートフルなオペレータの最適化は非常に困難。 調整が走っている間、オペレータのステートは適切にパーティション化されていないといけない。 ★ (補足:しかもストリーム処理なのでレイテンシも気にされることになる)

補足:以下、関連論文の紹介が続く。

Fernandezらの論文(2013)では、オペレーションのステートをストリーム処理エンジンと統合するアプローチを提案した。 つまりステートはタプルの形式で定期的にチェックポイントされるようになり、 オペレータのスケールの最中、新旧のオペレータ間でリパーティションされるようになる。 ★ CPU使用状況がメトリクスとして管理され、複数のオペレータがボトルネックになっていることがわかるとスケールアウトを実行するようになっている。

T-Storm (Xu etら。2014年) = Traffic Aware Storm。 通信コストを減らすようなスケジュールをカスタムスケジューラを導入して実現。

Anielloらの論文(2013)でも同様のアプローチを提案。 オフラインで動作するスタティックなスケジューラと動的なスケジューラの両方を提案。

Lohrmannらの論文(2015)では、リソース使用を抑えつつ、与えられたエンドツーエンドの レイテンシを守るためのスケジューリングを実現する方法を提案。 ノードは均一であることを前提としている。 Rebalance(パーティション再アサイン)とResolveBottleneck(スケールアウト)の2種類の方法を使って実現。

ESCストリーム処理エンジンは、タスクスケジューリング、性能モニタ、リソースプール管理が協調して動作する。 またUDFを実行するプロセスは、ゲートウェイ機能(PEマネージャ)と実行機能(PEワーカ)の両方を内包。 マネージャはワーカのロードバランス機能を持っている。 リソース状況がひっ迫すると、PEマネージャごとPEワーカを止め、スケールアウトを実行する。

StreamCloud(2012)は、クエリをサブクエリに分け、独立した複数の計算クラスタに割り当てる。 ステートフルなオペレータに依存して分割度合いが決まる。 リソース管理の仕組みと、ロードバランスの機能を持つ。

Heinzeらの論文(2014)では、リソースの配置(再配置)を行う際に レイテンシのスパイクを考慮に入れる。 オペレータの配置は、「インクリメンタルなビン・パッケージング問題」とみなすことができる。 ここでは主にCPU使用率やキャパシティを対象とする。(メモリやネットワークも考慮はするようだ)

補足:ここからIBM Streams(System S?)の関連論文が数件続く。

Gedikらの論文(2014) では、IBM Streamsを対象として自動並列化を狙った。 ステートフルなオペレータも含むエラスティックな自動並列化を提案。 スプリッタにおけるブロッキング時間やスループットをメトリクスとし、並列度を決める。 データスプリッタでは、ステートレスの場合はラウンドロビンに従い、 そうでなければハッシュベースになる。

Tang and Gedik (2013) らの論文では、IBM Streamsを対象とし、 タスクやパイプラインの並列化を提案。 オペレータのパイプラインのスレッドを管理。スレッドを追加することでスループットを向上させる。

Gedik等の論文(2016) では、IBM Streamsを対象とし、パイプライン並列、データ並列の両方を実現する手法を提案。 チェーンのようにつながったデータフローグラフを分割する。 その際、オペレータがレプリケート可能かどうかを考慮し、並列化する。

Wu and Tanの論文 (2015) では、以下の点に関する検討を実施。

  • 大きなサイズのステートを管理
  • ワークロードの変動
  • マルチテナントの実現

彼らはChronoStreamを提案。 弾力性とオペレータマイグレーションの実現のため、アプリケーションレベルのステートを 計算ノードレベルのステートに分割し、スライス化する。 スライス化されたステートは、チェックポイントされ、ほかのノードに複製される。 スライスごとに計算の進捗は管理される。(このあたりがSparkのDStreamと異なる)

Xuらの論文(2016)では、Stela(Stream Processing Elasticity)を提案。 スケールアウト・インの後のスループットを最適化し、計算がストップするのを極力防ぐ。 Expected Throughput Percentage (ETP)をメトリクスとして使用。 オペレータの処理速度が変わった場合に最終的にどんなスループットになるのか?の値。 本手法はStormのスケジューラの拡張として提供されている。

Hidalgoらの論文では、オペレータの分割で並列化を実現する。 その際、オペレータの状態を管理するため以下の2種の手法を使用 ★。

  • short-termアルゴリズム
    • トラフィックのピークを検知するために短期間のロードを観測
    • 下限上限で管理
  • long-termアルゴリズム
    • トラフィックパターンを把握するため長期間のロードを観測
    • マルコフチェーンモデルで管理

上記に基づき、当該オペレータが過負荷、安定、負荷不足なのかを予測する。 ★

ここ最近、コンテナや軽量な仮想化技術の利用に関する研究も盛んである。

Pahl and Lee らの論文(2015) では、コンテナをエッジ、クラウドコンピューティングの 弾力性を高めるための研究を実施。

Ottenwälderらの論文 (2013) では、オペレータ配置とマイグレーションを支援するため、 システムの特徴と移動体パターンの予測を用いる。 クラウドコンピューティングとフォグコンピューティングの両方を含む。 移動体は、近くのブローカに接続する。 これによりネットワークコストを低減し、エンドツーエンドのレイテンシも改善する。 オペレータのマイグレーション計画は動的に変わる。 その際タイムグラフモデルが用いられる。

以下に、ツール群の比較表を引用する。 ★

ツール比較表

5.3. Change and burst detection

入力データの変化、バーストを検知すること。(補足:というのが重要、ということ) それ自体は弾力性の実現ではないが、トリガとして関係することがある。

Zhu and Shashaの論文 (2003) では、移動ウェーブレットツリー・データ構造を用いて、 それらを検知する。ウィンドウの取り方は3種類。 ランドマーク(固定)、移動式、過去分を減衰。

参考:ウェーブレットと多重解像度処理

そのほか、Krishnamurthyらの論文(2003) では、Sketchを用いる。

参考:乱択データ構造の最新事情 -MinHash と HyperLogLog の最近の進歩-

6. Distributed and hybrid architecture

旧来の多くの分散ストリーム処理の技術は、クラスタを対象としていた。 最近の研究では、エッジを考慮したものが登場している。

インターネットのエッジであるマイクロデータセンタは、しばしば「Cloudlets」と呼ばれる。 データ転送量の低減、エンドツーエンドのレイテンシ改善、クラウドからのオフロードが目的。

ただし、多くの手法はまだ模索段階。

6.1. Lightweight virtualisation and containers

軽量な仮想化、もしくはコンテナを利用する、というのがしばしば挙がる。

Yanguiらの論文(2016)は、クラウド・フォグコンピューティングのPaaSを提案。 Cloud Foundaryを拡張。

Morabito and Beijarの論文 (2016) は、エッジ・コンピューテーション・プラットフォームを 提案した。リソースの弾力性を実現するため、コンテナを利用する。 シングルボードのコンピュータをゲートウェイとして利用する。 データ圧縮などに利用する。

Petroloらの論文(2016)も、同様にゲートウェイデザインを提案。 ワイヤレスセンサーネットワーク向け。

Hochreinerらの論文 (2016) では、エラスティックなストリーム処理の仕組みとして、VIennaエコシステムを提案。 プロセッシンググラフを書くためのグラフィカルなユーザインターフェースも供える。

6.2. Application placement and reconfiguration

モバイルクラウドとヘテロジーニアスメモリに関するGaiらの論文(2016) では、 ハイブリッドシナリオでのタスクスケジューリングについて触れられている。

Benoitらの論文 (2013)、Roh等の論文(2017)では、ヘテロジーニアス・ハードウェアやハイブリッドアーキテクチャにおける コンピュータリソース使用に関するスケジューリングについて取り扱っている。

Cardelliniらの論文 (2016)では、 Optimal Distributed Stream Processing Problem (ODP)向けに ヘテロジーニアスなリソースを考慮した integer programming formulation を提案。 Stormを拡張し、ODPベースのスケジューラを採用。 ネットワークレイテンシを推測。

地理分散されたVMに、ストリーム処理のオペレータを配置する問題は、NP困難(ないし、NP完全)である。(2016、2017) しかし、コストアウェアなヒューリスティックは提案されている。(Gu et al., 2016; Chen et al.,)

SpanEdge(Sajjad et al. (2016) )は、中央とエッジのデータセンタを使うストリーム処理ソリューション。 マスタ・ワーカ構成のアーキテクチャであり、hubワーカ(中央)とspokeワーカ(エッジ)を持つ。 スケジューラはローカルタスクを各エッジ上のワーカで実行しようとする。 ★

Mehdipourらの論文 (2016) は、クラウドとフォグの間の通信を最小化するように動く。 IoTデバイスからのデータを処理する。

Shenらの論文(2015) は、CiscoのConnected Streaming Analytics(CSA)を 利用する。データセンタとエッジの両方を利用する。 ★ CSAが継続的クエリのためのクエリ言語を提供する。

Geelytics(Chengらの論文 2016)は、IoT環境のためのシステム。複数の地理分散されたデータProducer、結果Consumer、計算リソースを活用する。 それらはクラウドもしくはネットワークエッジのいずれかに置かれる。 マスタ・ワーカアーキテクチャであり、Pub/Subモデルのサービスを採用。 アプリはオペレータのDAG。 スコープ限定されたタスクが特徴的。 各タスクのスコープの粒度をユーザが指定可能。 データProducerの地理情報に基づきresultタスクを配置する。

7. Future directions

大きな組織におけるビッグデータソリューションは、 バッチとオンラインの両方を有することになる。 ★ また複数のデータセンタにまたがることもあり、それらの中で弾力的にリソースを使用できることが理想的である。

ここでは将来の展望を述べる。

7.1. SDN and in-transit processing

SDN等を通じて、ネットワーク経路の途中で計算を行う。 このアプローチは、セキュリティとリソース管理の課題をはらむ。 例えば、IoTデバイスからデータセンタの経路の途中で計算させることは、攻撃可能な個所を増やすことにつながる。 また状況に合わせたリソース配置も困難だ。

既存の多くの研究では、複数のオペレータの配置においては、 レイテンシやバンド幅などのネットワーク・メトリクスに着目する。 一方で、ネットワークがプログラム可能であることは考慮されていない。

コグニティブ・アシスタンスのユースケースでは、データセンターで機械学習モデルが学習され、 その後エッジ側で推論される。 ★ このときのひとつの課題は、結果的に生じるコンセプト変動である。

7.2. Programming models for hybrid and highly distributed architecture

Boykinら(2014)、Google Cloud Dataflow(2015)のように、ハイレベルの抽象化を実現したものがある。 これらのソリューションは単一のクラスタやデータセンタでの利用に限られているが、 一部これをエッジコンピューティングまで拡張する取り組みが生じている。 そのような条件下で、リソース使用を効率的に管理する手法に関する研究が多く行われるだろう。

Apache Beam(2016)のプロジェクトでは、ユニファイドなSDKを提供。 Apache SparkやApache Flinkに対応。 一方で、エッジ・フォグ・データセンタコンピューティングを横断するユニファイドなSDKはない。

Apache Edgent (2017) がインターネット・エッジでの計算・分析を可能にする。 ★

参考:Apache Edgent

共有

Windows Tools

  • SurfEasy
    • WindowsとAndroidで共通して使えるVPNソリューション
    • 自動でVPN有効化してくれる
    • 公衆無線LANに接続するときに安心
    • Lenovo関係で知った
  • Dashlane
    • パスワード、ID、支払い方法等の管理ソリューション
    • WindowsとAndroidで共通して使える
    • Lenovo関係で知った
  • VMWare Workstation Pro
    • Vagrantのvagrant-vmware-desktopプラグインで用いるのはWorkstation Proであることに注意。
共有

WSLからVagrantを使う

参考

メモ

基本的には、 (公式)Vagrant and Windows Subsystem for Linux の通り進める。

まずWindowsとWSLの両方に、Vagrantをインストールする。 両方のバージョンが一致するように気を付ける。

共有