CDC Kafka and master table cache

過去の古いメモを少し細くしてアップロード。

参考

メモ

やりたいこと

マスタデータを格納しているRDBMSからKafkaにデータを取り込み、 KTableとしてキャッシュしたうえで、ストリームデータとJoinする。

RDBMSからのデータ取り込み

KafkaConnectを試す その2No More Silos: How to Integrate Your Databases with Apache Kafka and CDC を 参考に、Kafka Connectのjdbcコネクタを使用してみようと思った。

しかしこの例で載っているのは、差分取り込みのためにシーケンシャルなIDを持つカラムが必要であることが要件を満たさないかも、と思った。 やりたいのは、Updateも含むChange Data Capture。

test_db.json 内の該当箇所は以下の通り。

1
2
3
4
(snip)
"mode" : "incrementing",
"incrementing.column.name" : "seq",
(snip)

ということで、 JDBC Connector (Source and Sink) for Confluent Platform を確認してみた。 結論としては、更新時間とユニークIDを使うモードを利用すると良さそうだ。

JDBC Kafka Connector

JDBC Connector Prerequisites によると、 Kafka と Schema Registryがあればよさそうだ。

JDBC Connector Incremental Query Modes によると、モードは以下の通り。

  • Incrementing Column
    • ユニークで必ず値が大きくなるカラムを持つことを前提としたモード
    • 更新はキャプチャできない。そのためDWHのfactテーブルなどをキャプチャすることを想定している。
  • Timestamp Column
    • 更新時間のカラムを持つことを前提としたモード
    • 更新時間はユニークではないことを起因とした注意点がある。 同一時刻に2個のレコードが更新された場合、もし1個目のレコードが処理されたあとに障害が生じたとしたら、 復旧時にもう1個のレコードが処理されないことが起こりうる。
    • 疑問点:処理後に「処理済みTimestamp」を更新できないのだろうか
  • Timestamp and Incrementing Columns
    • 更新時刻カラム、ユニークIDカラムの両方を前提としたモード
    • 先のTimestamp Columnモードで問題となった、同一時刻に複数レコードが生成された場合における、 部分的なキャプチャ失敗を防ぐことができる。
    • 確認点:先に更新時刻を見て、ユニークIDで確認するのだろうか。だとしたら、更新もキャプチャできそう。
  • Custom Query
    • クエリを用いてフィルタされた結果をキャプチャするモード
    • Incrementing Column、Timestamp Columnなどと比べ、オフセットをトラックしないので、 クエリ自身がオフセットをトラックするのと同等の処理内容になっていないと行けない。
  • Bulk
    • テーブルをまるごとキャプチャするモード
    • レコードが消える場合になどに対応
    • 補足:他のモードで、レコードの消去に対応するには、実際に消去するのではなく、消去フラグを立てる、などの工夫が必要そう

当然だが、必要に応じて元のRDBMS側でインデックスが貼られていないとならない。

timestamp.delay.interval.ms 設定を使い、更新時刻に対し、実際に取り込むタイミングを遅延させられる。 これはトランザクションを伴うときに、一連のレコードが更新されるのを待つための機能。

なお、 JDBC Connector Message Keys によると、レコードの値や特定のカラムから、Kafkaメッセージのキーを生成できる。

更新時間とユニークIDを利用したキャプチャ

No More Silos: How to Integrate Your Databases with Apache Kafka and CDC のあたりを参考に、 別のモードを使って試してみる。

まずはKafka環境を構築するが、ここでは簡単化のためにConfluent Platformを用いることとした。 Confluent PlatformのUbuntuへのインストール手順 あたりを参考にすすめると良い。 また、 confluent コマンドがあると楽なので、 Confluent CLIのインストール手順 を参考にインストールしておく。

インストールしたら、シングルモードでKafka環境を起動しておく。

1
$ confluent local start

KafkaConnectを試す その2 あたりを参考に、Kafkaと同一マシンにPostgreSQLの環境を構築しておく。

1
2
3
4
5
$ sudo apt install -y postgresql
$ sudo vim /etc/postgresql/10/main/postgresql.conf
$ sudo cp /usr/share/postgresql/10/pg_hba.conf{.sample,}
$ sudo vim /usr/share/postgresql/10/pg_hba.conf
$ sudo systemctl restart postgresql

/etc/postgresql/10/main/postgresql.conf に追加する内容は以下の通り。

1
listen_addresses = '*'

/usr/share/postgresql/10/pg_hba.conf に追加する内容は以下の通り。

1
2
3
4
# PostgreSQL Client Authentication Configuration File
# ===================================================
local all all trust
host all all 127.0.0.1/32 trust

Kakfa用のデータベースとテーブルを作る。

1
$ psql -c "alter user postgres with password 'kafkatest'"
1
2
$ sudo -u postgres psql -U postgres -W -c "CREATE DATABASE testdb";
Password for user postgres:

テーブルを作る際、Timestampとインクリメンタルな値を使ったデータキャプチャを実現するためのカラムを含むようにする。 PostgreSQLで更新時のtimestampをアップデートするにはPostgreSQL で連番の数字のフィールドを作る方法 (sequence について)postgres - シーケンス inser時に自動採番 あたりを参考とする。

以下、テーブルを作り、ユニークID用のシーケンスを作り、タイムスタンプを作る流れ。 タイムスタンプはレコード更新時に合わせて更新されるようにトリガを設定する。

1
$ sudo -u postgres psql -U postgres testdb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE TABLE test_table (
seq SERIAL PRIMARY KEY,
ts timestamp NOT NULL DEFAULT now(),
item varchar(256),
price integer,
category varchar(256)
);
CREATE FUNCTION set_update_time() RETURNS OPAQUE AS '
begin
new.ts := ''now'';
return new;
end;
' LANGUAGE 'plpgsql';
CREATE TRIGGER update_tri BEFORE UPDATE ON test_table FOR EACH ROW
EXECUTE PROCEDURE set_update_time();
CREATE USER connectuser WITH password 'connectuser';
GRANT ALL ON test_table TO connectuser;
INSERT INTO test_table(item, price, category) VALUES ('apple', 400, 'fruit');
INSERT INTO test_table(item, price, category) VALUES ('banana', 160, 'fruit');
UPDATE test_table SET item='orange', price=100 where seq = 2;
INSERT INTO test_table(item, price, category) VALUES ('banana', 200, 'fruit');
INSERT INTO test_table(item, price, category) VALUES ('pork', 400, 'meet');
INSERT INTO test_table(item, price, category) VALUES ('beef', 800, 'meet');

以下のような結果が得られるはずである。

1
2
3
4
5
6
7
8
9
testdb=# SELECT * FROM test_table;
seq | ts | item | price | category
-----+----------------------------+--------+-------+----------
1 | 2020-02-02 13:31:12.065458 | apple | 400 | fruit
2 | 2020-02-02 13:31:49.220178 | orange | 100 | fruit
3 | 2020-02-02 13:32:32.324241 | banana | 200 | fruit
4 | 2020-02-02 13:33:06.560747 | pork | 400 | meet
5 | 2020-02-02 13:33:06.561966 | beef | 800 | meet
(5 rows)

Kafka Connect

JDBC Connector Incremental Query Modes を参考に、タイムスタンプ+インクリメンティングモードを用いる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cat << EOF > test_db.json
{
"name": "load-test-table",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:postgresql://localhost:5432/testdb",
"connection.user" : "connectuser",
"connection.password" : "connectuser",
"mode" : "timestamp+incrementing",
"incrementing.column.name" : "seq",
"timestamp.column.name" : "ts",
"table.whitelist" : "test_table",
"topic.prefix" : "db_",
"tasks.max" : "1"
}
}
EOF
$ curl -X DELETE http://localhost:8083/connectors/load-test-table
$ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @test_db.json
$ curl http://localhost:8083/connectors

上記コネクタでは、KafkaにAvro形式で書き込むので、 kafka-avro-console-consumerで確認する。

1
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic db_test_table --from-beginning

上記を起動した後、PostgreSQL側で適当にレコードを挿入・更新すると、 以下のような内容がコンソールコンシューマの出力に表示される。

変化がキャプチャされて取り込まれることがわかる。 挿入だけではなく、更新したものも取り込まれる。メッセージには、シーケンスとタイムスタンプの療法が含まれている。

1
2
3
4
5
6
{"seq":1,"ts":1580650272065,"item":{"string":"apple"},"price":{"int":400},"category":{"string":"fruit"}}
{"seq":2,"ts":1580650296666,"item":{"string":"banana"},"price":{"int":160},"category":{"string":"fruit"}}
{"seq":2,"ts":1580650309220,"item":{"string":"orange"},"price":{"int":100},"category":{"string":"fruit"}}
{"seq":3,"ts":1580650352324,"item":{"string":"banana"},"price":{"int":200},"category":{"string":"fruit"}}
{"seq":4,"ts":1580650386560,"item":{"string":"pork"},"price":{"int":400},"category":{"string":"meet"}}
{"seq":5,"ts":1580650386561,"item":{"string":"beef"},"price":{"int":800},"category":{"string":"meet"}}

Kafka Stramsでテーブルに変換

上記の通り、RDBMSからデータを取り込んだものに対し、 マスタテーブルとして使うため、KTableに変換してみる。

GlobalKTableへの読み込み

Kafka Streams例 あたりを参考にする。 特に、 GlobalKTablesExample.java あたりが参考になるかと思う。 今回は、上記レポジトリをベースに少しいじって、本例向けのサンプルアプリを作る。

[WIP]

共有

ThirdEye LinkedIn’s business-wide monitoring platform

参考

メモ

セッションメモ

Strataの ThirdEye LinkedIn’s business-wide monitoring platform のメモ。

LinkedInで運用されている異常検知と原因分析のためのプラットフォーム。 オープンソースになっているようだ。 -> thirdeyeの実装

主に以下の内容が記載されていた。

  • MTTD: Mean time to detect
  • MTTR: Mean time to repair

50 チーム以上がThirdEyeを利用。 何千もの時系列データが監視されている?

攻撃を検知したり、AIモデル周りの監視。

アーキ図あり。

異常検知における課題:スケーラビリティ、性能

手動でのコンフィグ、監視は現実的でない。

ルールベースの単純な仕組みは不十分。 多すぎるアラートは邪魔。

ブログメモ

Analyzing anomalies with ThirdEye のメモ。

データキューブについて。 LinkedInではPinotを使って事前にキューブ化されている。

ディメンジョン・ヒートマップの利用。 あるディメンジョンにおける問題の原因分析に有用。

変化の検出の仕方について。

ただし単独のディメンジョンの問題を検出するだけでは不十分。 複数のディメンジョンにまたがって分析してわかる問題を検出したい。

ディメンジョンをツリー構成。 ベースラインと現在の値でツリーを構成。

ノードの親子関係を利用。 各ノードとその親ノードの変化を式に組み入れることで、木構造に基づく傾向(?)を考慮しながら、 変化の重大さを算出する。

上記仕組みを利用することで、データマネージャのバグ、機械学習モデルサービングにおけるバグを見つけた。

共有

Automating ML model training and deployments via metadata-driven data, infrastructure, feature engineering, and model management

参考

メモ

以下の内容が記載されている。

ワークフロー。 よくあるワークフローなので特筆なし。

無数のストリームイベント。 数億のレコードアップデート。 数千万の顧客アカウント。

メタデータ管理、特徴量ストア、モデルサービング、パイプラインオートメーション。

共有

Questioning the Lambda Architecture

参考

メモ

Questioning the Lambda Architecture にてJay Krepsが初めて言及したとされているようだ。 過去に読んだが忘れたので改めて、いかにメモを記載する。

まとめ

主張としては、ストリーム処理はKafkaの登場により、十分に使用に耐えうるものになったため、 バッチ処理と両方使うのではなく、ストリーム処理1本で勝負できるのでは?ということだった。

気になった文言を引用

ラムダアーキテクチャについて:

The Lambda Architecture is an approach to building stream processing applications on top of MapReduce and Storm or similar systems.

Lambdaアーキテクチャのイメージ

バッチ処理とストリーム処理で2回ロジックを書く。:

You implement your transformation logic twice, once in the batch system and once in the stream processing system.

レコメンデーションシステムを例にとる:

A good example would be a news recommendation system that needs to crawl various news sources, process and normalize all the input, and then index, rank, and store it for serving.

データを取り込み、イミュターブルなものとして扱うことはありだと思う:

I’ve written some of my thoughts about capturing and transforming immutable data streams

I have found that many people who attempt to build real-time data processing systems don’t put much thought into this problem and end-up with a system that simply cannot evolve quickly because it has no convenient way to handle reprocessing.

リアルタイム処理が本質的に近似であり、バッチ処理よりも弱く、損失しがち、という意見があるよね、と。:

One is that real-time processing is inherently approximate, less powerful, and more lossy than batch processing.

ラムダアーキテクチャの利点にCAP定理との比較が持ち出されることを引き合いに出し、ラムダアーキテクチャがCAP定例を克服するようなものではない旨を説明。:

Long story short, although there are definitely latency/availability trade-offs in stream processing, this is an architecture for asynchronous processing, so the results being computed are not kept immediately consistent with the incoming data. The CAP theorem, sadly, remains intact.

結局、StormとHadoopの両方で同じ結果を生み出すアプリケーションを 実装するのがしんどいという話。:

Programming in distributed frameworks like Storm and Hadoop is complex.

ひとつの解法は抽象化。:

Summingbird

とはいえ、2重運用はしんどい。デバッグなど。:

the operational burden of running and debugging two systems is going to be very high.

結局のところ、両方を同時に使わないでくれ、という結論:

These days, my advice is to use a batch processing framework like MapReduce if you aren’t latency sensitive, and use a stream processing framework if you are, but not to try to do both at the same time unless you absolutely must.

ストリーム処理はヒストリカルデータの高スループットでの処理に向かない、という話もあるが…:

When I’ve discussed this with people, they sometimes tell me that stream processing feels inappropriate for high-throughput processing of historical data.

バッチ処理もストリーム処理も抽象化の仕方は、DAGをベースにしたものであり、 その点では共通である、と。:

But there is no reason this should be true. The fundamental abstraction in stream processing is data flow DAGs, which are exactly the same underlying abstraction in a traditional data warehouse (a la Volcano) as well as being the fundamental abstraction in the MapReduce successor Tez.

ということでKafka。:

Use Kafka

提案アーキテクチャ

Kafkaに入れた後は、HDFS等に簡単に保存できる。:

Kafka has good integration with Hadoop, so mirroring any Kafka topic into HDFS is easy.

このあと少し、Kafkaの説明が続く。

この時点では、Event Sourcing、CQRSについての言及あり。

Indeed, a lot of people are familiar with similar patterns that go by the name Event Sourcing or CQRS.

LinkedInにて、JayはSamzaを利用。

I know this approach works well using Samza as the stream processing system because we do it at LinkedIn.

提案手法の難点として、一時的に2倍の出力ストレージサイズが必要になる。

However, my proposal requires temporarily having 2x the storage space in the output database and requires a database that supports high-volume writes for the re-load.

単純さを大事にする。:

So, in cases where simplicity is important, consider this approach as an alternative to the Lambda Architecture.

所感

当時よりも、最近のワークロードは複雑なものも含めて期待されるようになっており、 ますます「バッチ処理とストリーム処理で同じ処理を実装する」というのがしんどくなっている印象。

共有

ML Ops: Machine Learning as an Engineering Discipline

参考

メモ

読んでみた感想をまとめる。

感想

結論としては、まとめ表がよくまとまっているのでそれでよい気がする。 この表をベースに、アクティビティから必要なものを追加するか? -> まとめ表

演繹的ではなく、帰納的な手法であるため、もとになったコードとデータの両方が重要。

データサイエンティスト、MLエンジニア、DevOpsエンジニア、データエンジニア。 MLエンジニア。あえてエンジニアと称しているのは、ソフトウェア開発のスキルを有していることを期待するから。

フェアネスの考慮、というか機械学習の倫理考慮をどうやって機械的に実現するのか、というのはかねてより気になっていた。 フェアネスの考慮などを達成するためには、テストデータセットの作り方、メトリクスの作り方に工夫するとよい。つまり、男女でそれぞれ個別にもテストするなど。 まだ一面ではあるが、参考になった。

気になる文言の抜粋

以下の文言が印象に残った。 ほかのレポートでも言われていることに見える。

Deeplearning.ai reports that “only 22 percent of companies using machine learning have successfully deployed a model”.

このレポートがどれか気になった。 Deeplearning.aiのレポート か。

確かに以下のように記載されている。

Although AI budgets are on the rise, only 22 percent of companies using machine learning have successfully deployed a model, the study found.

データが大切論:

ML is not just code, it’s code plus data

training data, which will affect the behavior of the model in production

ML = Code + Dataの図

It never stops changing, and you can’t control how it will change.

確かに、データはコントロールできない。

Data Engineering

ML Opsの概念図

チーム構成について言及あり。:

But the most likely scenario right now is that a successful team would include a Data Scientist or ML Engineer, a DevOps Engineer and a Data Engineer.

データサイエンティストのほかに、明確にMLエンジニア、DevOpsエンジニア、データエンジニアを入れている。 基盤エンジニアはデータエンジニアに含まれるのだろうか。

Even if an organization includes all necessary skills, it won’t be successful if they don’t work closely together.

ノートブックに殴り書かれたコードは不十分の話:

getting a model to work great in a messy notebook is not enough.

ML Engineers

データパイプラインの話:

data pipeline

殴り書きのコードではなく、適切なパイプラインはメリットいくつかあるよね、と。:

Switching to proper data pipelines provides many advantages in code reuse, run time visibility, management and scalability.

トレーニングとサービングの両方でパイプラインがあるけど、 入力データや変換内容は、場合によっては微妙に異なる可能性がある、と。:

Most models will need 2 versions of the pipeline: one for training and one for serving.

ML Pipelineは特定のデータから独立しているためCICDと連携可能

For example, the training pipeline usually runs over batch files that contain all features, while the serving pipeline often runs online and receives only part of the features in the requests, retrieving the rest from a database.

いくつかTensorFlow関係のツールが紹介されている。確認したほうがよさそう。:

TensorFlow Pipeline

TensorFlow Transform

バージョン管理について:

In ML, we also need to track model versions, along with the data used to train it, and some meta-information like training hyperparameters.

Models and metadata can be tracked in a standard version control system like Git, but data is often too large and mutable for that to be efficient and practical.

コードのラインサイクルと、モデルのライフサイクルは異なる:

It’s also important to avoid tying the model lifecycle to the code lifecycle, since model training often happens on a different schedule.

It’s also necessary to version data and tie each trained model to the exact versions of code, data and hyperparameters that were used.

Having comprehensive automated tests can give great confidence to a team, accelerating the pace of production deployments dramatically.

モデルの検証は本質的に、統計に基づくものになる。というのも、そもそもモデルの出力が確率的だし、入力データも変動する。:

model validation tests need to be necessarily statistical in nature

Just as good unit tests must test several cases, model validation needs to be done individually for relevant segments of the data, known as slices.

Data validation is analogous to unit testing in the code domain.

ML pipelines should also validate higher level statistical properties of the input.

TensorFlow Data Validation

Therefore, in addition to monitoring standard metrics like latency, traffic, errors and saturation, we also need to monitor model prediction performance.

An obvious challenge with monitoring model performance is that we usually don’t have a verified label to compare our model’s predictions to, since the model works on new data.

まとめ表
共有

jvm profiler for Spark at Uber

参考

メモ

概要

Sparkのエグゼキュータ等のJVMプロファイリングを行うためのライブラリ。

JVM起動時に、agentとしてアタッチするようにする。

executorプロセスのUUIDを付与しながらメトリクスを取得できるようだ。 Kafkaに流すことも可能。

公式GitHub のREADMEによると、オフヒープの使用量なども計測できており、チューニングに役立ちそう。

20200107時点では、2か月前ほどに更新されており、まだ生きているプロジェクトのようだ。

動作確認

公式GitHub のREADMEに記載されていた手順で、パッケージをビルドし、ローカルモードのSparkで利用してみた。

ビルド方法:

1
$ mvn clean package

パッケージビルド結果は、 target/jvm-profiler-1.0.0.jar に保存される。

これをjavaagentとして用いる。 渡すオプションは、 --conf spark.executor.driverJavaOptions=-javaagent:${JVMPROFILER_HOME}/target/jvm-profiler-1.0.0.jar である。 環境変数 ${JVMPROFILER_HOME} は先ほどビルドしたレポジトリのPATHとする。

また、今回は com.uber.profiling.reporters.FileOutputReporter を用いて、ファイル出力を試みることとする。

結果的に、Sparkの起動コマンドは、以下のような感じになる。:

1
$ ${SPARK_HOME}/bin/spark-shell --conf spark.driver.extraJavaOptions=-javaagent:/home/ubuntu/Sources/jvm-profiler/target/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/jvm-profile

ここで

  • 環境変数 ${SPARK_HOME} はSparkを配備したPATHである
  • ディレクトリ /tmp/jvm-profile は予め作成しておく

とする。

生成されるレコードは、以下のようなJSONである。CpuAndMemory.jsonの例は以下の通り。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
{
"heapMemoryMax": 954728448,
"role": "driver",
"nonHeapMemoryTotalUsed": 156167536,
"bufferPools": [
{
"totalCapacity": 20572,
"name": "direct",
"count": 10,
"memoryUsed": 20575
},
{
"totalCapacity": 0,
"name": "mapped",
"count": 0,
"memoryUsed": 0
}
],
"heapMemoryTotalUsed": 400493400,
"vmRSS": 812081152,
"epochMillis": 1578408135107,
"nonHeapMemoryCommitted": 157548544,
"heapMemoryCommitted": 744488960,
"memoryPools": [
{
"peakUsageMax": 251658240,
"usageMax": 251658240,
"peakUsageUsed": 37649152,
"name": "Code Cache",
"peakUsageCommitted": 38010880,
"usageUsed": 37649152,
"type": "Non-heap memory",
"usageCommitted": 38010880
},
{
"peakUsageMax": -1,
"usageMax": -1,
"peakUsageUsed": 104054944,
"name": "Metaspace",
"peakUsageCommitted": 104857600,
"usageUsed": 104054944,
"type": "Non-heap memory",
"usageCommitted": 104857600
},
{
"peakUsageMax": 1073741824,
"usageMax": 1073741824,
"peakUsageUsed": 14463440,
"name": "Compressed Class Space",
"peakUsageCommitted": 14680064,
"usageUsed": 14463440,
"type": "Non-heap memory",
"usageCommitted": 14680064
},
{
"peakUsageMax": 336592896,
"usageMax": 243269632,
"peakUsageUsed": 247788352,
"name": "PS Eden Space",
"peakUsageCommitted": 250085376,
"usageUsed": 218352416,
"type": "Heap memory",
"usageCommitted": 239075328
},
{
"peakUsageMax": 58195968,
"usageMax": 55050240,
"peakUsageUsed": 43791112,
"name": "PS Survivor Space",
"peakUsageCommitted": 58195968,
"usageUsed": 43791112,
"type": "Heap memory",
"usageCommitted": 55050240
},
{
"peakUsageMax": 716177408,
"usageMax": 716177408,
"peakUsageUsed": 138349872,
"name": "PS Old Gen",
"peakUsageCommitted": 450363392,
"usageUsed": 138349872,
"type": "Heap memory",
"usageCommitted": 450363392
}
],
"processCpuLoad": 0.02584087025382403,
"systemCpuLoad": 0.026174300837744344,
"processCpuTime": 49500000000,
"vmHWM": 812081152,
"appId": "local-1578407721611",
"vmPeak": 4925947904,
"name": "24974@ubuec2",
"host": "ubuec2",
"processUuid": "38d5c63f-d70d-4e4d-9d54-a2381b9c37a7",
"nonHeapMemoryMax": -1,
"vmSize": 4925947904,
"gc": [
{
"collectionTime": 277,
"name": "PS Scavenge",
"collectionCount": 16
},
{
"collectionTime": 797,
"name": "PS MarkSweep",
"collectionCount": 4
}
]
}
  • nonヒープのメモリ使用量についても情報あり
  • ヒープについては、RSSに関する情報もある
  • ヒープ内の領域に関する情報もあり、GCに関する情報もある
共有

The evolution of metadata: LinkedIn’s story

参考

メモ

LinkedInが提唱する Generalized Metadata Architecture (GMA) を基盤としたメタデータ管理システム。

コンセプトは、 スライド に記載されているが、ざっとアーキテクチャのイメージをつかむには、 アーキテクチャ がよい。

GMA

メタデータは自動収集。

これは標準化されたメタデータモデルとアクセスレイヤによるものである。

また標準モデルが、モデルファーストのアプローチを促進する。

Metadata Serving

Metadata Serving に記載あり。

RESTサービスは、LinkedInが開発していると思われるREST.liが用いられており、 DAOもその中の「Pegasus」という仕組みを利用している。

Key-Value DAO、Search DAO、Query DAOが定義されている。

上記GMAの通り、この辺りのDAOによるアクセスレイヤの標準化が見て取れる。

Metadata Ingestion Architecture

そもそも、メタデータに対する変更は MAE(Metadata Audit Event) としてキャプチャされる。

それがKafka Streamsのジョブで刈り取られ処理される。なお、シーケンシャルに処理されるための工夫もあるようだ。

共有

Pinot: Enabling Real-time Analytics Applications

参考

メモ

オンライン分析のアーキテクチャ

  • Join on the fly
    • ベーステーブルからクエリ実行のタイミングでデータ生成
  • Pre Join + Pre Aggregate
    • 分析で必要となるテーブルをストリーム処理等で事前作成
    • 事前処理はマスタデータのテーブルとの結合、など
  • Pre Join + Pre Aggregate + Pre Cube
    • さらに分析で求められる結果データを予め作成、インデックス化
    • 例えばAggregationしておく、など

レイテンシと柔軟性のトレードオフの関係:

Latency vs. Flexibility
「Who View」のアーキテクチャ

ユースケース

  • LinkedInのメンバーが見る分析レポート
    • QPSが高い(数千/秒 級)
    • レイテンシは数十ms~sub秒
  • インタラクティブダッシュボード
    • 様々なアグリゲーション
  • 異常検知

LinkedIn以外の企業では、

  • Uber
  • slack
  • MS Teams
  • Weibo
  • factual

あたりが利用しているようだ。 Uberは、自身の技術ブログでも触れていた。

ワークフロー

ワークフロー概要

Pinotは、原則としてPre Aggregation、Pre Cubeを前提とする仕組みなので、 スキーマの定義が非常に重要。

分散処理とIngestion

またバッチとストリーム両方に対応しているので、 それぞれデータ入力(Ingestion)を定義する。

データはセグメントに分けられ、サーバに分配される。

Segment Assignment

分散処理とクエリルーティング

セグメントに基づき、Brokerによりルーティングされる。

リアルタイムサーバとオフラインサーバ

ストリームから取り込んだデータとバッチで取り込んだデータは、 共通のスキーマを用いることになる。

したがって、統一的にクエリできる?

アーキテクチャの特徴

インデックス(Cube)の特徴

Scan、Inverted Index、Sorted Index、Star-Tree Indexを併用可能。

データ処理上の工夫

比較対象として、Druidがよく挙げられているが、Sorted Index、Star-Tree Indexがポイント。

カラムナデータフォーマットを用いるのは前提。 それに加え、Dictionary Encodeing、Bit Compressionを使ってデータ構造に基づいた圧縮を採用。

Inverted Index

転置インデックスを作っておく、という定番手法。

Sorted Index

予め、Dimensionに基づいてソートしておくことで、 フィルタする際にスキャン量を減らしながら、かつデータアクセスを効率化。

Sroted Indexの特徴

Star-Tree Index

すべてをCube化するとデータ保持のスペースが大量に必要になる。 そこで部分的にCube化する。 ★要確認

Star-Tree Indexの特徴

性能特性

Druidとの簡単な比較

Druidと簡易的に比較した結果が載っている。

まずは、レイテンシの小ささが求められるインタラクティブな分析における性能特徴:

Druidとの簡易比較

ミリ秒単位での分析を取り扱うことに関してDruidと共通だが、 各種インデックスのおかげか、Druidよりもパーセンタイルベースの比較で レイテンシが小さいとされている。

つづいて、あらかじめ定義されたクエリを大量にさばくユースケース:

Druidとの簡易比較2

レイテンシを小さく保ったまま、高いQPSを実現していることを 示すグラフが載っている。 この辺りは、工夫として載っていた各種インデックスを予め定義していることが強く効きそうだ。

続いて異常検知ユースケースの例:

Druidとの簡易比較3

データがSkewしていることが強調されているが、その意図はもう少し読み解く必要がありそう。 ★要確認

Star-Tree Indexについて

Star-Tree Index Powering Fast Aggregations on Pinot に記載あり。

共有

Uber's 2019 highlights

参考

メモ

Ludwig

モデル開発と比較を単純にするために作られたソフトウェア。

詳しくは、 Introducing Ludwig, a Code-Free Deep Learning Toolbox を参照。

AresDB

GPUを活用した処理エンジン。

QUIC導入

TCPとHTTP/2の置き換え。

Kotlinの性能調査

Javaとの比較など。かなり細かく調査したようだ(まだ読んでいない)

グラフ処理と機械学習を用いたUber Eatsの改善

グラフ構造に基づいた機械学習により、 レコメンデーションの効果を向上させる取り組みについて。

Uber’s Data Platform in 2019: Transforming Information to Intelligence

Uber’s Data Platform in 2019 Transforming Information to Intelligence

UberでもData Platformという言い方をするようだ。

データの用途は、スクーターの位置情報や店舗の最新メニューをトラックするだけではない。 トレンドを把握することなどにも用いられる。

データの鮮度の品質は大事。

リアルタイムの処理のニーズ・仕組み、ヒストリカルなデータの処理のニーズ・仕組み。両方ある。

補足)それぞれがあることを前提とした最適化を施すことが前提となっているようだ。

データがどこから来たのかをトラックする。 Uberの内部プロダクト uLineageにて実現。

Apache HBaseをグローバルインデックスとして利用。 これにより、高いスケーラビリティ、強い一貫性、水平方向へのスケーラビリティを獲得する。

DBEventsというプロダクトで、CDCを実現する。

トラブルシュートとプロアクティブな防止。

もしリアルタイムにデータ分析できれば、例えば利用者にドライバ替わり当たらず一定時間を過ぎてしまったケースを発見し、 オペレーションチームが助けられるかもしれない。

AresDB、Apache Pinotなど。★要確認

共有

Configure Python of PySpark in Zeppelin

参考

メモ

ウェブフロントエンドのSpark Interpreterの設定において、 以下の2項目を設定した。

  • spark.pyspark.python
  • zeppelin.pyspark.python

上記2項目が同じ値で設定されていないと、実行時エラーを生じた。

参考)Pythonバージョンの確かめ方

ドライバのPythonバージョンの確かめ方

1
2
3
4
5
%spark.pyspark

import sys

print('Zeppelin python: {}'.format(sys.version))

ExecutorのPythonバージョンの確かめ方

1
2
3
4
5
6
7
%spark.pyspark

def print_version(x):
import sys
return sys.version

spark.sparkContext.parallelize(range(1, 3)).map(print_version).collect()
共有