後で調べるリスト

  • sulrm
    • Heronを調べる中で引っかかった話
  • Apache Aurora
    • Heronを調べる中で引っかかった話
共有

ライティングに関する記事

参照

メモ

スピードライトを使ったテクニック

スピードライトだけで海外のような写真 に記載あり。作例も載っているのでわかりやすい。

共有

重大事故の時にすべきこと

参考

メモ

重大事故の時にどうするか? は、ソフトバンクのZコーポレーションのミヤサカさんが書かれた記事。

重大事故の時に何をすべきか?が書かれたブログ。 リーダ目線での具体的な行動指針が書かれている。

共有

Twitter Heronの論文

参考

論文の要点メモ

昔のメモをコピペ。

なお、 Storm vs. Heron のブログに記載の通り、 本論文当時のStormは古く、現在のStormでは解消されている(可能性のある)課題が取り扱われていることに注意が必要。

1. Introduction

Twitterでのリアルタイム・ストリーミングのユースケースの例。

  • RTAC(リアルタイム・アクティブユーザ・カウント)
    • 図1にRTACのトポロジーイメージが載っている
  • リアルタイム・エンゲージメント(ツイートや広告)

大きな課題のひとつは、デバッガビリティ。性能劣化の原因を素早く探りたい。 論理単位と物理的なプロセスを結びつけてシンプルにしたかった。

クラスタ管理の仕組み(TwitterではAuroraを使用)と組み合わせたかった。

計算リソースの効率も上げたかった。

一方で既存のアプリを書き換えたくないので、Storm API、Summingbird APIに対応させたい。

2. Related Work

挙げられていたのは以下のような技術。

  • Apache Samza
  • MillWheel(2013)
  • Photon(2013)
  • DataTorrent
  • Stormy(2012)
  • S4
  • Spark Streaming

またトラディショナルなデータベースに組み込まれたストリーム処理エンジンも挙げられていた。

  • Microsoft StreamInsight
  • IBM Inforsphere Streams
  • Oracle continuous query

要件

  • OSS
  • 性能高い
  • スケーラブル
  • Storm APIとの互換性あり

3. Motivation for Heron

3.1 Storm Background

特筆なし。Stormの論理構造の説明があるだけ。

3.2 Storm Worker Architecture: Limitations

Stormはプロセス内で複数のタスクを実行するし、それらのタスクは複数のトポロジーに属している。 これにより性能劣化などが発生しても、その原因を見極めるのが難しく、結果としてトポロジー「再起動」という 手段を取らざるを得なくなる。

ログが混ざって調査しづらい。

マシンリソースのリザーブが一律である点が辛い。 余分なリソースを確保してしまう。特に複雑なトポロジーを用いることになるハイレベルAPIで利用する場合。

大きなヒープを用いることもデバッグしづらさを助長する。

ワーカ内では、グローバルな送信スレッド、受信スレッド、コンピューティングスレッドがある。 その間で内外の通信を行う。これは無駄が多い。

3.3 Issues with the Storm Nimbus

Stormのスケジューラは、ワーカにおいてリソース使用分割をきちんと実施しない。 Storm on YARNを導入しても十分ではなかった。

ZooKeeperがワーカ数の制約になる。

NimbusはSPOF。 Nimbusがとまると新しいトポロジーを投入できないし、走っているトポロジーを止められない。 エラーが起きても気づけないし、自動的にリカバーできない。

3.4 Lack of Backpressure

Stormにはバックプレッシャ機能がなかった。

3.5 Efficiency

効率の悪さ(マシンリソースを使い切らない)が課題。 イメージで言えば…100コアのクラスタがあるとき、Utilization 90%以上のコアが30個で動いていほしいところ、Utilizaiton 30%のコアが90個になる、という具合。

4. Design Alternatives

Heronの他に選択肢がなかったのか、というと、なかった、ということが書かれている。 ポイントはStorm互換APIを持ち、Stormの欠点を克服するプロダクトがなかった、ということ。

5. Heron

5.1 Data Model and API

データモデルとAPIはStormと同様。 トポロジーは論理プラン、実際に実行されるパーティション化されたスパウトやボルトが 物理プランに相当すると考えて良い。

5.2 Architecture overview

ジェネラルなスケジューラであるAuroraを使用し、トポロジーをデプロイする。 Nimbusからの離脱。

以下のようなコンポーネントで成り立つ。

  • トポロジマスタ(TM)(スタンバイTMを起動することも可能)
  • ストリームマネージャ(SM)
  • メトリクスマネージャ(MM)
  • ヘロンインスタンス(HI)(要はスパウトとボルト)

コンテナは単独の物理ノードとして起動する。 なお、Twitterではcgroupsでコンテナを起動する。

メタデータはZooKeeperに保存する。

なお、HIはJavaで実装されている。

通信はProtocol Buffersで。

5.3 Topology Master

YARNでいうAMに近い。 ZooKeeperのエファメラルノードの機能を使い、複数のTMがマスタになるのを防ぎ、TMを探すのを助ける。

5.4 Stream Manager

HIは、そのローカルのSMを通じて通信する。 通信経路は O(k^2) である。このとき、HIの数 n は、 kよりもずっと大きいことが多いので、効率的に通信できるはずである。

補足:ひとつのマシンにひとつのSM、複数のコンテナ(つまりHI等)があるモデルを仮定。

バックプレッシャの方式には種類がある。

バックプレッシャについて。 複数のステージで構成されるケースにおいて、後段のステージの処理時間が長引いていると、 タプルを送るのが つまって バッファがあふれる可能性が生じる。 そこでそれを調整する機能が必要。

TCPバックプレッシャ。 HIの処理時間が長くなると、HIの受信バッファが埋まり始め、合わせて SMの送信バッファも埋まり始める。 これによりSMは つまり 始めているのを検知し、それを上流に伝搬する。 この仕組みは実装は簡単だが、実際にはワークしない。 HI間の論理通信経路は、SM間の物理通信経路上で、 オーバーレイ構成されるためである。

スパウトバックプレッシャについて。 TCPバックプレッシャと組み合わせて用いられる。 SMがHIの処理遅延を検知すると、 スパウトのデータ読み込みを止める。 続いてスタートバックプレッシャのメッセージを他のSMに送る。これにより読み込みが抑制される。 処理遅延が解消されると、ストップバックプレッシャのメッセージを送る。 欠点は、過剰に読み込みを抑止すること、メッセージングのオーバヘッドがあること。利点は、トポロジによらず素早く対処可能なこと。

その他、ステージ・バイ・ステージバックプレッシャについて。 これはトポロジはステージの連続からなる。 バックプレッシャを伝搬させることで必要分だけ読み込み抑制する。

Heronでは、スパウトバックプレッシャ方式を用いた。

ソケットをアプリケーションレベルのバッファに対応させ、 ハイ・ウォータマークとロー・ウォータマークを定義。 ハイ・ウォータマークを超えるとバックプレッシャが発動し、 ロー・ウォータマークを下回るまでバックプレッシャが続く。

5.5 Heron Instance

デザインにはいくつかの選択肢がある。

5.5.1 Single-threaded approach

HIはJVMであり、単独のタスクを実行する。 これによりデバッグしやすくなる。

しかし、シングルスレッドアプローチは「ユーザコードが様々な理由によりブロックする可能性がある」という欠点がある。 ブロックする理由は様々だが、例えば…

  • スリープのシステムコールを実行
  • ファイルやソケットI/Oのため読み書きのシステムコールを実行
  • スレッド同期を実行

これらのブロックは特にメトリクスの取り扱いにおいて問題になった。 つまり、仮にブロックされてしまうともし問題が起きていたとしてもメトリクスの伝搬が遅くなることがあり、 ユーザは信用を置けなくなってしまうからだ。

5.5.2 Two-threaded approach

ゲートウェイスレオッドとタスク実行スレッドの2種類で構成する。 ゲートウェイスレッドは、HIの通信管理を担う。 TCP接続の管理など。

タスク実行スレッドはユーザコードを実行する。 スパウトか、ボルトかによって実行されるメソッドが異なる。 スパウトであれば nextTuple を実行してデータをフェッチするし、 ボルトであれば execute メソッドを実行してデータを処理する。 処理されたデータはゲートウェイスレッドに渡され、ローカルのSMに渡される。 なお、その他にも送られたタプルの数などのメトリクスが取得される。

★補足:このデザインは汎用的なので、他のプロダクトにも利用できそう。

スレッド間はいくつかのキューで結ばれる。 data-in、data-out、metrics-outである。 重要なのは、data-inとdata-outは長さが決まっており、このキューがいっぱいになるとバックプレッシャ機能が有効になる仕組みになっていること。

問題は、NWがいっぱいなときにdata-outキューが溜まった状態になることだった。 これにより、生存オブジェクトがメモリ内に大量に残り、GCによる回収ができない。 このとき、もしdata-outキューからの送信よりも、先に受信が発動すると、新しいオブジェクトが生成されることになり、 GCを発動するが回収可能なオブジェクトが少ないため、さらなる性能劣化を引き起こす。

これを軽減するため、data-out、data-inキューの長さを状況に応じて増減することにした。

5.6 Metrics Manager

メトリクスマネージャは、コンテナごとに1個。

5.7 Startup Sequence and Failure Scenarios

トポロジがサブミットされてから実際に処理が開始されるまでの流れの説明。

  • スケジューラがリソースをアロケート
  • TMが起動し、ZooKeeperにエファメラルノードを登録
  • SMがZooKeeperからTMを確認し、SMと接続する。ハートビートを送り始める。
  • SMの接続が完了すると、TMはトポロジのコンポーネントをアサインする
  • SMはTMから物理プランを取得し、SM同士がつながる
  • HIが立ち上がり、ローカルSMを見つけ、物理プランをダウンロードし実行開始。
  • 故障時の対応のためTMは物理プランをZooKeeperに保存する。

いくつか故障シナリオが想定されている。

  • TMが故障した場合、ZooKeeper上の情報を使って復旧可能である。復旧後、SMは新しいTMを見つける。
  • SMが故障した場合、復旧したSMは物理プランを取得する。他のSMも新しいSMの物理プランを取得する。
  • HIが故障した場合、再起動してローカルSMにつなぎにいく。その後物理プランを取得し処理を再開する。

5.8 Architecture Features: Summary

まとめが記載されているのみ。

6. Heron in Production

プロダクションで利用するため、いくつかの周辺機能を有している。

6.1 Heron Tracker

ヘロントラッカーはZooKeeperを利用し、トポロジのローンチ、既存トポロジの停止、 物理プランの変更を追従する。 また同様にトポロジマスタを把握し、メトリクス等を取得する。

6.2 Heron UI

ヘロントラッカーAPIを利用し、UIを提供する。 論文上にはUIの例が載っている。 トポロジのDAG、コンテナ、コンポーネント、メトリクスを把握できる。

6.3 Heron Viz

メトリクスの可視化。トラッカーAPIを利用し、新しいトポロジがローンチされたことを検知し、可視化する。

ヘルスメトリクス、リソースメトリクス、コンポーネントメトリクス、ストリームマネージャメトリクス。

ヘルスメトリクスではラグや失敗などを表示する。

リソースメトリクスでは予約されたCPU、実際に使用されたCPU、同様にメモリに関する情報を扱う。またGCなども。

コンポーネントメトリクスはスパウトではエミットされたタプル数などのコンポーネントごとに固有のメトリクスを扱う。 エンドツーエンドのレイテンシも扱う。

ストリームマネージャメトリクスは、インスタンスに送受信されたタプル数やバックプレッシャ機能に関するメトリクスを扱う。

6.4 Heron@Twitter

TwitterではStormではなくHeronがデファクトスタンダードである。

ユースケースは多岐にわたるが、データ加工、フィルタリング、結合、コンテンツのアグリゲーションなどなど。 機械学習も含む。例えば、回帰、アソシエーション、クラスタリングなど。

3倍ほどのリソース使用効率を得られた。

7. Empirical Evaluation

本論文用に組まれた動作確認。 StormとHeronの比較。 Ackあり・なしの両方。

なお、計測はデータ処理が安定してから開始するようにした。 そのため、Stormでは0mq層 1 でほとんどドロップが起きていないときに計測することを意味し、 Heronではバックプレッシャが発動しておらず転送キューが溜まっていない状態での計測を意味する。

Ackありのケースについて、タプルのドロップは、

  • Storm:0mqでのドロップもしくはタイムアウト
  • Heron:タイムアウト

を要因とするものを想定する。

7.3 Word Count Topology

スパウトで高々175k単語のランダムな単語群を生成する。 それをボルトに渡し、メモリマップに保持する。

これは単純な処理なので、オーバーヘッドを計測するのに適している。

結果は、スループットで10倍〜14倍、レイテンシで5〜15倍の改善が見られた。

Heronのエンドツーエンドでのレイテンシにおけるボトルネックは、 SMのバッファでバッチ化されることであり、これは概ね数十ms程度の影響がある。

CPUコアの使用量は、2〜3倍小さくなった。 (補足:無駄にリソース確保せずに、きちんと各コアを使い切っている、というのも影響しているようだ)

Ackが有効、無効で同様の傾向。

7.4 RTAC Topology

Ack有効の場合、Stormで6Mタプル/min出すのに360コア必要だった。レイテンシは70ms。 対してHeronでは、36コアでよく、レイテンシも24msだった。

Ack無効の場合も同様の傾向。必要なCPUコア数に関し、10倍(つまり1/10のコアで良い)の改善が得られた。

8. Conclusions and Future work

Exactly oneceセマンティクスは論文執筆時点では対応されていない。 論文中では、 Trident が引用されていた。

最近のHeronはどうか?のメモ

2019/7/8現在になりどうなったかを軽く確認。

Apache incubation Heron が公式レポジトリである。 Apache Heronのコミット状況 を見る限り、2019/7/8現在も活発に活動されている。

Apache Heronのコントリビュータ を見る限り以下の様子。

  • コアの開発者はApache Pulsarの人でもある Sanjeev Kulkarniobjmagic
  • ただし最近は Ning Wang のように見える。彼はもともと2013年あたりまでGoogleでYouTubeに携わっていたようだ。

READMEによる「Update」

2019/7/8時点のREADMEによると、Mesos in AWS、Mesos/Aurora in AWS、ローカル(ラップトップ)の上でネイティブ動作するようになった。 またApache REEFを用いてApache YARN上で動作するように試みている。 slurm にも対応しようとしているとのこと。

公式ドキュメントを覗いてみる

公式ドキュメント を確認し、最近の様子を探る。

  • Python APIがある
  • UIがかなり進化している
  • スケジューラとしては、k8s、k8s with Helm、
  • メトリクス監視の仕組みには、Prometheus、Graphite、Scribeが挙げられている

Heron's Design Goals

2019/7/8現在、以下のようなゴールを掲げている。

  • 1億件/minをさばける
  • 小さなエンドツーエンド・レイテンシ
  • スケールによらず予測可能な挙動。またトラフィックのスパイクやパイプラインの輻輳が生じても予測可能な挙動。
  • Simple administration, including:
  • シンプルな運用
    • 共有インフラにデプロイ可能
    • 強力なモニタリングの仕組み
    • 細やかに設定可能
  • デバッグしやすい

商用サポートはあるのか?

2019/7/8現在、Heronの商用サポートがあるのか? →なさそうに見える。

Introduction to Apache Heron by Streamlio の通り、StreamlioがよくHeronの説明をしているように見える。 またこのスライドではユースケースとして、

  • Ads
  • Monitoring
  • Product Safety
  • Real Time Trends
  • Real Time Machine Learning
  • Real Time Business Intelligence

あたりを挙げている。参考までに。 また顧客(?)としては、

  • Twitter
  • Google
  • Stanford University
  • Machine Zone
  • Inidiana University
  • Microsoft
  • Industrial.io

を挙げている。

ただし、 Streamlioのサポートサービス を見る限り、Apache Pulsarを対象としているがApache Heronが対象に入っているようには見えない。 また Streamlioのプロセッシングエンジンに関する説明 を見ると、Apache Heronに言及しているが、あくまでStreamlioがApache Heronの 開発に携わっていた経験がStreamlioのストリーム処理エンジンの開発に生かされている件について触れられているだけである。 現在は、 Pulsar functions が彼らのコアか。


  1. 0mqの記述が読み取れることから、0.8系Stormを比較対象としたように見える。↩︎

共有

Apache Kafkaビルド時のエラー

参考

メモ

Re: Kafka Trunk Build Failure with Gradle 5.0 に記載のように Gradle 4系でエラーが出るようなので、いったん5系をインストールして実行し直した。 (現環境のUbuntuではaptインストールでのバージョンが4系だったので、 SDKMANを使って5系をインストールした)

Gradleのバージョンを上げたところ問題なく実行できた。 以下の通り。

1
$ gradle clients:test --tests org.apache.kafka.clients.producer.KafkaProducerTest

ここではGradleを使って、clientsサブプロジェクト以下の org.apache.kafka.clients.producer.KafkaProducerTest を実行する例である。

共有

Kafkaコンソールプロデューサを起点とした確認

参考

メモ

2019/6/23時点でのtrunkで再確認した内容。 ほぼ昔のメモのままコピペ・・・。

ConsoleProducer

エントリポイントとして、Kafkaのコンソールプロデューサを選ぶ。

bin/kafka-console-producer.sh:20

1
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

コンソールプロデューサの実態は、KafkaProducerである。

kafka/tools/ConsoleProducer.scala:45

1
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

なお、 ConsoleProducerにはsendメソッドが定義されており、 以下のように標準入力からデータを読み出してはメッセージを送信する、を繰り返す。

kafka/tools/ConsoleProducer.scala:54

1
2
3
4
5
do {
record = reader.readMessage()
if (record != null)
send(producer, record, config.sync)
} while (record != null)

kafka/tools/ConsoleProducer.scala:70

1
2
3
4
5
6
7
private def send(producer: KafkaProducer[Array[Byte], Array[Byte]],
record: ProducerRecord[Array[Byte], Array[Byte]], sync: Boolean): Unit = {
if (sync)
producer.send(record).get()
else
producer.send(record, new ErrorLoggingCallback(record.topic, record.key, record.value, false))
}

KafkaProducer

それでは実態であるKafkaProducerを確認する。

コンストラクタを確認する

まずクライアントのIDが定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:332

1
2
3
4
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;

続いてトランザクションIDが定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:337

1
2
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;

Kafkaのトランザクションについては、 Confluentのトランザクションに関する説明 を参照。

その後、ログ設定、メトリクス設定、キー・バリューのシリアライザ設定。 その後、各種設定値を定義する。

特徴的なところとしては、メッセージを送信するときにメッセージをいったんまとめこむ accumulator も このときに定義される。

org/apache/kafka/clients/producer/KafkaProducer.java:396

1
2
3
4
5
6
7
8
9
10
11
12
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

他には Sendor も挙げられる。 例えば、上記で示したaccumulatorも、Sendorのコンストラクタに渡され、内部で利用される。

org/apache/kafka/clients/producer/KafkaProducer.java:422

1
this.sender = newSender(logContext, kafkaClient, this.metadata);

org/apache/kafka/clients/producer/KafkaProducer.java:463

1
2
3
4
5
6
7
8
9
10
11
12
13
14
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);

その他にはトランザクション・マネージャなども。

共有

GradleプロジェクトをIntellijでインポート

参考

メモ

昔は gradlew idea とやってIdea用のプロジェクトファイルを生成してから インポートしていたけれども、最近のIntellijは「ディレクトリを指定」してインポートすると、 Gradleプロジェクトを検知して、ウィザードを開いてくれる。

例えばApache KafkaなどのGradleプロジェクトをインポートするときには、 メニューからインポートを選択し、ディレクトリをインポートすると良い。

共有

Kafkaコンソールコンシューマを起点とした確認

参考

メモ

2019/6/23時点でのtrunkで再確認した内容。 ほぼ昔のメモのままコピペ・・・。

ConsoleConsumer

コンシューマの実装を確認するにあたっては、コンソール・コンシューマが良いのではないか。

bin/kafka-console-consumer.sh:21

1
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

から、 kafka.tools.ConsoleConsumer クラスあたりを確認すると良いだろう。

mainを確認すると、ConsumerConfigのインスタンスを生成し、runメソッドに渡すことがわかる。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:54

1
2
3
val conf = new ConsumerConfig(args)
try {
run(conf)

ここで、ConsumerConfigはコンソールからパラメータを受け取るためのクラス。 実態はrunメソッドである。

まず大事な箇所として、 KafkaConsumer クラスのインスタンスを生成している箇所がある。 ★

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:67

1
val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)

この KafkaConsumer クラスがコンシューマの実態である。 なお、コンソール・コンシューマでは、これをラップした便利クラス ConsumerWrapper が定義されており、 そちらを通じて主に使う。

またシャットダウンフックを定義している。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:75

1
addShutdownHook(consumerWrapper, conf)

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:87

1
2
3
4
5
6
7
8
9
10
11
12
13
def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig) {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
consumer.wakeup()

shutdownLatch.await()

if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}

これはグレースフルにシャットダウンするには大切。 ★

またデータ処理の実態は、以下の process メソッドである。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:77

1
try process(conf.maxMessages, conf.formatter, consumerWrapper, System.out, conf.skipMessageOnError)

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:101

1
2
3
4
def process(maxMessages: Integer, formatter: MessageFormatter, consumer: ConsumerWrapper, output: PrintStream,
skipMessageOnError: Boolean) {

(snip)

処理するメッセージ最大数、出力ストリームに渡す形を整形するメッセージフォーマッタ、コンシューマのラッパ、出力ストリーム、 エラーのときに呼び飛ばすかどうかのフラグが渡される。

少し処理の中身を確認する。 まず以下のようにメッセージを取得する。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:104

1
2
3
4
    val msg: ConsumerRecord[Array[Byte], Array[Byte]] = try {
consumer.receive()

(snip)

上記の receive は、先程の通り、ラッパーのreceiveメソッドである。

receiveメソッドは以下のようにコンシューマのポール機能を使い、 メッセージを取得する。

core/src/main/scala/kafka/tools/ConsoleConsumer.scala:437

1
2
3
4
5
6
7
8
9
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (!recordIter.hasNext) {
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
if (!recordIter.hasNext)
throw new TimeoutException()
}

recordIter.next
}

メッセージを取得するためのイテレータが得られるので、nextメソッドで取得する。 →イテレータのnextを使って1件ずつ取り出していることがわかる。 ★

続いて、取り出されたメッセージを ConsumerRecord 化してフォーマッタに渡す。   core/src/main/scala/kafka/tools/ConsoleConsumer.scala:118

1
2
formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers), output)

これにより出力が確定する。

KafkaConsumer

つづいて、KafkaConsumerクラスを確認する。

トピックアサイン

ここではトピックへの登録を確認する。 ★

コンシューマグループを使って自動で行うアサインメントを使用する場合。リバランスも行われる。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:936

1
2
3
  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {

(snip)

上記はトピックのコレクションを渡すメソッドだが、パターンを指定する方法もある。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:1008

1
2
3
  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {

(snip)

なお、何らかの理由(詳しくはJavadoc参照)により、リバランスが必要になったら、 第2引数に渡されているリスナがまず呼び出される。

コンシューマ・グループを利用せず、手動で行うアサインメントの場合は以下の通り。

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:1083

1
public void assign(Collection<TopicPartition> partitions) {

いずれの場合も内部的には SubscriptionState が管理のために用いられる。 タイプは以下の通り。 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:72

1
2
3
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}

またコンシューマの管理を行う ConsumerCoordinator には、 updatePatternSubscription メソッドがある。

org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:192

1
2
3
4
5
6
7
public void updatePatternSubscription(Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe))
metadata.requestUpdateForNewTopics();
}

matchesSubscribedPattern を用いて、現在クラスタが抱えているトピックの中から、 サブスクライブ対象のトピックをフィルタして取得し、SubscriptionState#subscribeFromPattern メソッドを呼ぶ。 これにより、当該コンシューマの購読するトピックが更新される。 この更新はいくつかのタイミングで発動するが、例えば KafkaConsumer#poll(java.time.Duration) の 中で呼ばれる updateAssignmentMetadataIfNeeded メソッドを通じて呼び出される。

コンシューマ故障の検知

基本的にはハートビート( heartbeat.interval.ms で設定)が session.timeout.ms を 超えて届かなくなると、故障したとみなされる。 その結果、当該クライアント(この場合はコンシューマ)がグループから外され、 リバランスが起きる。

なお、ハートビートはコーディネータに対して送られる。 コーディネータは以下のように定義されている。 org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:128

1
private Node coordinator = null;

org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:692

1
2
3
4
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());

AbstractCoordinator#sendFindCoordinatorRequest メソッドが呼ばれる契機は複数あるが、 例えば、コンシューマがポールするときなどにコーディネータが更新される。

なお、コーディネータにクライアントを登録する際、 セッションタイムアウトの値も渡され、対応される。 予め定められた group.min.session.timeout.ms や group.max.session.timeout.ms を満たす セッションタイムアウトが用いられる。

セッションタイムアウトの値は、例えば以下のように設定される。

kafka/coordinator/group/GroupCoordinator.scala:146

1
doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)

この値は、最終的に MemberMetadata に渡されて用いられる。 例えばハートビートのデッドラインの計算などに用いられることになる。

kafka/coordinator/group/MemberMetadata.scala:56

1
2
3
4
5
6
7
8
9
private[group] class MemberMetadata(var memberId: String,
val groupId: String,
val groupInstanceId: Option[String],
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {

共有

The 8 Requirements of Real-Time Stream Processing

参照

メモ

MillWheel でも触れられているストリーム処理について書かれた論文。 2005年。 時代は古いが当時の考察は現在でも有用と考える。

1. Introduction

ウォール街のデータ処理量は大きい。 2005年時点で122,000msg/sec。年率2倍で増大。 しかも処理レイテンシが秒では遅い。 金融に限らず異常検知、センサーデータ活用などのユースケースでは同様の課題感だろう。

「メインメモリDBMS」、「ルールベースエンジン」などがこの手のユースケースのために再注目されたが、 その後いわゆる「ストリーム処理エンジン」も登場。

2. Eight rules for stream processing

Rule 1: Keep the Data Moving

ストレージに書き込むなどしてレイテンシを悪化させてはならない。

ポーリングもレイテンシを悪化させる。 そこでイベント駆動のアプローチを用いることがよく考えられる。

Rule 2: Query using SQL on Streams (StreamSQL)

ハイレベルAPIを用いることで、開発サイクルを加速し、保守性を上げる。

StreamSQL:ストリーム処理固有の機能を有するSQL リッチなウィンドウ関数、マージなど。

Rule 3: Handle Stream Imperfections (Delayed, Missing and Out-of-Order Data)

例えば複数のストリームからなるデータを処理する際、 あるストリームからのデータが到着しないとき、タイムアウトして結果を出す必要がある。

同じようにout-of-orderデータについてもウィンドウに応じてタイムアウトしないといけない。

Rule 4: Generate Predictable Outcomes

結果がdeterministicであり、再現性があること。

計算そのものだけではなく、対故障性の観点でも重要。

Rule 5: Integrate Stored and Streaming Data

過去データに基づき、何らかのパターンなどを見つける処理はよくある。 そのためステート管理の仕組み(データストア)は重要。

例えば金融のトレーディングで、あるパターンを見つける、など。 他にも異常検知も。

過去データを使いながら、実時間のデータにキャッチアップするケースもあるだろう。

このようにヒストリカルデータとライブデータをシームレスにつなぐ機能は重要。

Rule 6: Guarantee Data Safety and Availability

HAは悩ましい課題。 故障が発生してフェールオーバするとして、バックアップハードウェアを 立ち上げて処理可能な状態にするような待ち時間は許容できない。

そこでタンデム構成を取ることは現実的にありえる。

Rule 7: Partition and Scale Applications Automatically

ローレベルの実装を経ずに、分散処理できること。 またマルチスレッド処理可能なこと。

Rule 8: Process and Respond Instantaneously

数万メッセージ/secの量を処理する。 さらにレイテンシはマイクロ秒オーダから、ミリ秒オーダ。

そのためには極力コンポーネント間をまたぐ処理をしないこと。

3. SOFTWARE TECHNOLOGIES for STREAM PROCESSING

基本的なアーキテクチャは、DBMS、ルールベースエンジン、ストリーム処理エンジン。

DBMSはデータをストアしてから処理する。データを動かしつづけるわけではない。

またSPEはSQLスタイルでのストリームデータの処理を可能とする。

SPEとルールベースエンジンは、ブロックおよびタイムアウトを実現しやすい。 DBMSはそのあたりはアプリケーションに委ねられる。 仮にトリガを用いたとしても…。

いずれのアーキテクチャにおいても、予測可能な結果を得るためには、 deterministicな処理を実現しないといけない(例えばタイムスタンプの並びを利用、など) SPEとルールベースエンジンはそれを実現しやすいが、DBMSのACID特性は あくまでトラディショナルなデータベースのためのもであり、ストリームデータ処理のための ものではない。

状態を保存するデータストアとしてDBMSを見たとき、サーバ・クライアントモデルのDBMSは 主にレイテンシの面で不十分である。 唯一受け入れられるとしたら、アプリケーション内に埋め込めるDBMSの場合である。 一方ルールベースエンジンはストリームデータを扱うのは得意だが、 大規模な状態管理と状態に対する柔軟なクエリが苦手である。

3.3 Tabular results

このあたりにまとめ表が載っている。

共有

MillWheel Paper

参考

メモ

昔のメモをここに転記。

1. Introduction

故障耐性、ステート永続化、スケーラビリティが重要。 Googleでは多数の分散処理システムを運用しているので、何かしらおかしなことが置き続けている。

MillWheelはMapReduceと同様にフレームワーク。ただし、ストリーム処理やローレイテンシでの処理のため。

Spark StreamingとSonoraは、チェックポイント機能持つが、実装のためのオペレータが限られている。 S4は故障耐性が不十分。 Stormはexactly onceで動作するが、Tridentは厳密な順序保証が必要。

MapReduce的な考えをストリーム処理に持ち込んでも、妥協した柔軟性しか得られていない。 ストリーミングSQLは簡潔な手段をもたらしているが、ステートの抽象化、複雑なアプリケーションの実装しやすさという意味では、 オペレーショナルなフローに基づくアプローチのほうが有利。

  • 分散処理の専門家でなくても複雑なストリーム処理を実装できること
  • 実行可能性

2. Motivation and requirements

GoogleのZeitgeistを例にした動機の説明。

  • 永続化のためのストレージ
  • Low Watermark(遅れデータへの対応)
  • 重複への対応(Exactly Onceの実現)

3. System overview

Zeitgeistの例で言えば、検索クエリが入力データであり、データ流量がスパイクしたり、凹んだりしたことが出力データ。

データ形式:key、value、timestamp

4. Core concept

キーごとに計算され、並列処理化される。 また、キーがアグリゲーションや比較の軸になる。 Zeitgeistの例では、例えば検索クエリそのものをキーとすることが考えられる。 また、キー抽出のための関数を定義して用いる。

ストリーム:データの流れ。 コンピューテーションは複数のストリームを出力することもある。

ウィンドウ修正するときなどに用いられるステートを保持する。 そのための抽象化の仕組みがある。 キーごとのステート。

Low Watermarkの仕組みがあり、Wall timeが進むごとにWatermarkが進む。 時間経過とともにWatermarkを越したデータに対し、計算が行われる。

タイマー機能あり。 Low Watermarkもタイマーでキックされる、と考えて良い。

5. API

計算APIは、キーごとのステートをフェッチ、加工し、必要に応じてレコード生成し、タイマーを セットする。

Injector:MillWheelに外部データを入力する。 injector low watermarkも生成可能。

なお、low watermarkを逸脱するデータが生じた場合は、 ユーザアプリでは捨てるか、それとも現在の集計に組み込むか決められる。

6. Fault tolerance

到達保証の考え方としては、ユーザアプリで冪等性を考慮しなくて良いようにする、という点が挙げられる。

基本的にAckがなければデータが再送される設計に基づいているが、 受信者がAckを返す直前に何らかの理由で故障した場合、データが重複して処理される可能性がある。 そこでユニークIDを付与し、重複デーかどうかを確かめられるようにする。 判定にはブルームフィルタも利用する。 ID管理の仕組みにはガベージコレクションの仕組みもある。

チェックポイントの仕組みもある。 バックエンドストレージにはBigtableなどを想定。

なお、性能を重視しチェックポイントを無効化することもできるが、 そうすると何らかの故障が生じて、データ送信元へのAckが遅れることがある。 パイプラインが並列化すると、それだけシステム内のどこかで故障が生じる可能性は上がる。 そこで、滞留するデータについては部分的にチェックポイントすることで、 計算コストとエンドツーエンドレイテンシのバランスを保つ。 ★

永続化されたステートの一貫性を保つため、アトミックなオペレーションでラップする。 ただし、マシンのフェールオーバ時などにゾンビ化したWriterが存在する可能性を考慮し、 シークエンサートークンを用いるようにする。 つまり、新たに現れたWriterは古いシークエンサートークンを無効化してから、 動作を開始するようにしている。

7. System Implementation

MillWheelはステート管理のためにデータストア(BigTableなど)を使う。 故障発生時にはデータストアからメモリ情報を再現する。

low watermarkのジャーナルはデータストアに保存される。

感想:このあたりデータストアの性能は、最終的なパフォーマンスに大きく影響しそうだ。 ★

8. Evaluation

単純なシャッフル処理では、レイテンシの中央値は数ミリ秒。95パーセンタイルで30ミリ秒。 Exactly onceなどを有効化すると中央値は33.7ミリ秒。95パーセンタイルで93.8ミリ秒。

CPU数をスケールアウトしても、レイテンシに著しい劣化は見られない。(逆に言うと、99パーセンタイルではある程度の劣化が見られる)

low watermarkは処理のステージをまたぐと、実時間に対してラグが生じる。 このあたりは活発に改善を進められているところ。

ステート管理などにストレージ(BitTableなど)を使う。 これによりリード・ライトがたくさん発生する。 ワーカにおけるキャッシュは有効。

実際のユースケースは、広告関係。 そのほか、Google Street Viewでのパノラマ画像の生成など。 ★

9. Related work

ストリーム処理システムが必要とするものは、以下の論文に記載されている。 The 8 Requirements of Real-Time Stream Processing

Spark Streamingに対しては、MillWheelの方がgeneralであると主張。 RDDへの依存性がユーザに制約をもたらす、とも。 またチェックポイントの間隔はMillWheelの方が粒度が細かい。

共有