Flow Engine for ML

参考

総合

Azkaban

メモ

機械学習で利用されるフロー管理ツールを軽くさらってみる。

よく名前の挙がるもの

  • Apache Airflow
  • DigDag
  • Oozie

機械的な検索

Airflowの代替

alternativetoでAirflowを検索した結果 では以下の通り。

  • RunDeck
    • OSSだが商用版もある。自動化ツール。ワークフローも管理できるようだ
  • StackStorm
    • どちらかというとIFTTTみたいなものか?
  • Zenaton
    • ワークフローエンジン。JavaScriptで記述できるようだ
  • Apache Oozie
    • Hadoopエコシステムのワークフローエンジン
  • Azkaban
  • Metaflow ★
    • ワークフローエンジン
    • 機械学習にフォーカス
    • Netflix と AWS が主に開発
  • luigi
    • ワークフローエンジン
    • Pythonモジュール
    • Spotify が主に開発
共有

Kafka Streamsの始め方

参考

メモ

まとまった情報が無いような気がするので、初心者向けのメモをここに書いておくことにする。

はじめに読む文献

レファレンスとして使う文献

環境準備

Apache Kafka、もしくはConfluent Platformで環境構築しておくことを前提とする。 Apache Kafkaであれば、 公式ドキュメント のインストール手順。 Confluent Platformであれば、 Confluent Platformドキュメントのインストール手順。

また、Confluent Platformを用いるときは、 Confluent CLI をインストールしておくと便利である。

1
$ confluent local start

だけでKafka関連のサービスを開発用にローカル環境に起動できる。 具体的には、以下のサービスを立ち上げられる。

1
2
3
4
5
6
7
control-center is [UP]
ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

ちなみに、 org.apache.kafka.connect.cli.ConnectDistributed が意外とメモリを使用するので注意。

また、デフォルトでは /tmp 以下にワーキングディレクトリを作成する。 また実行時には /tmp/confluent.current を作成し、その時に使用しているワーキングディレクトリを識別できるようになっている。 tmpwatch等により、ワーキングディレクトリを乱してしまい、 confluent local start によりKafkaクラスタを起動できなくなったときは、 /tmp/confluent.current を削除してもう一度起動すると良い。

以降の説明では、Confluent Platformをインストールしたものとして説明する。

プロジェクト作成

公式チュートリアル が最初は参考になるはず。

MavenのArchetypeを使い、プロジェクトを生成する。

1
2
3
4
5
6
7
8
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.kafka \
-DarchetypeArtifactId=streams-quickstart-java \
-DarchetypeVersion=2.4.0 \
-DgroupId=net.dobachi.kafka.streams.examples \
-DartifactId=firstapp \
-Dversion=0.1 \
-Dpackage=wordcount

適宜パッケージ名などを変更して用いること。

雛形に基づいたプロジェクトには、簡単なアプリが含まれている。 最初はこれらを修正しながら、アプリの書き方に慣れるとよい。

wordcount/Pipe.java

Kafka Streamsのアプリは通常のJavaアプリと同様に、1プロセスからスタンドアローンで起動する。 ここでは、Pipe.javaの内容を確認しよう。 以下、ポイントとなるソースコードとその説明を並べる。

wordcount/Pipe.java:36

1
2
3
4
5
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

メインの中では最初にストリームを作るための設定が定義される。 上記の例では、ストリーム処理アプリの名前、Kafkaクラスタのブートストラップサーバ(つまり、Broker)、 またキーやバリューのデフォルトのシリアライゼーションの仕組みを指定します。 今回はキー・バリューともにStringであることがわかります。

wordcount/Pipe.java:42

1
2
3
final StreamsBuilder builder = new StreamsBuilder();

builder.stream("streams-plaintext-input").to("streams-pipe-output");

つづいて、ストリームのビルダをインスタンス化。 このとき、入力・出力トピックを指定する。

wordcount/Pipe.java:46

1
2
3
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

ビルダでストリームをビルドし、トポロジを定義する。

wordcount/Pipe.java:46

1
2
3
4
5
6
7
8
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

シャットダウンフックを定義。

wordcount/Pipe.java:59

1
2
3
4
5
6
7
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);

ストリーム処理を開始。

上記アプリを実行するには、事前に

  • streams-plaintext-input
  • streams-pipe-output

の2種類のトピックを生成しておく。

1
2
$ kafka-topics --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic streams-plaintext-input
$ kafka-topics --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic streams-pipe-output

トピックが作られたかどうかは、以下のように確認する。

1
$ kafka-topics --list --zookeeper localhost:2181

なお、ユーザが明示的に作るトピックの他にも、Kafkaの動作等のために作られるトピックがあるので、 上記コマンドを実行するとずらーっと出力されるはず。

コンパイル、パッケージングする。

1
$ mvn clean assembly:assembly -DdescriptorId=jar-with-dependencies

入力ファイルを作成し、入ロトピックに書き込み。

1
2
$ echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt
$ cat /tmp/file-input.txt | kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input

アプリを実行する。

1
$ java -cp target/firstapp-0.1-jar-with-dependencies.jar wordcount.Pipe

別のターミナルを改めて開き、コンソール上に出力トピックの内容を出力する。

1
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning  --property print.key=true --topic streams-pipe-output

以下のような結果が見られるはずである。なお、今回はキーを使用しないアプリだから、左側(キーを表示する場所)には null が並ぶ。

1
2
3
null    all streams lead to kafka
null hello kafka streams
null join kafka summit

さて、ここでキーを使うようにしてみる。 今回使用したアプリをコピーし、 wordcount/PipeWithKey.java を作る。

ここで変更点は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
--- src/main/java/wordcount/Pipe.java   2020-02-14 15:23:23.808282200 +0900
+++ src/main/java/wordcount/PipeWithKey.java 2020-02-14 16:54:17.623090500 +0900
@@ -17,10 +17,8 @@
package wordcount;

import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -30,7 +28,7 @@
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
* and writes the messages as-is into a sink topic "streams-pipe-output".
*/
-public class Pipe {
+public class PipeWithKey {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
@@ -41,7 +39,8 @@

final StreamsBuilder builder = new StreamsBuilder();

- builder.stream("streams-plaintext-input").to("streams-pipe-output");
+ KStream<String, String> raw = builder.stream("streams-plaintext-input");
+ raw.map((key, value ) -> new KeyValue<>(value.split(" ")[0], value)).to("streams-pipe-output");

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

主な変更は、ストリームビルダから定義されたストリームをいったん、 raw にバインドし、 mapメソッドを使って変換している箇所である。 ここでは、バリューをスペースで区切り、先頭の単語をキーとすることにした。

このアプリをコンパイル、パッケージ化し実行すると、以下のような結果が得られる。

1
2
3
$ mvn clean assembly:assembly -DdescriptorId=jar-with-dependencies
$ cat /tmp/file-input.txt | kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input
$ java -cp target/firstapp-0.1-jar-with-dependencies.jar wordcount.PipeWithKey

実行結果の例

1
2
3
all     all streams lead to kafka
hello hello kafka streams
join join kafka summit

wordcount/LineSplit.java

先程作成したPipeWithKeyとほぼ同じ。 実行すると、 streams-linesplit-output というトピックに結果が出力される。

1
$ java -cp target/firstapp-0.1-jar-with-dependencies.jar wordcount.LineSplit

結果の例

1
2
3
4
5
6
7
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning  --property print.key=true --topic streams-linesplit-output
null all
null streams
null lead
null to
null kafka
(snip)

wordcount/WordCount.java

最後にWordCountを確認する。 ほぼ他のアプリと同じだが、ポイントはストリームを加工する定義の部分である。

wordcount/WordCount.java:53

1
2
3
4
5
6
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

以下、上記実装を説明する。

1
builder.<String, String>stream("streams-plaintext-input")

ストリームビルダを利用し、入力トピックからストリームを定義

1
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))

バリューに入っている文字列をスペース等で分割し、配列にする。 合わせて配列をflattenする。

1
.groupBy((key, value) -> value)

キーバリューから新しいキーを生成し、新しいキーに基づいてグループ化する。 今回の例では、分割されて生成された単語(バリューに入っている)をキーとしてグループ化する。 詳しくは、 公式API説明(groupBy)

1
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))

groupByにより生成された KGroupedStreamcount メソッドを呼び出し、 キーごとの合計値を求める。 今回はキーはString型であり、合計値はLong型。 また集計結果を保持するストアは counts-store という名前とする。 詳しくは、 公式API説明(count)

1
2
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

count の結果は KTable になるので、これをストリームに変換し、出力先トピックを指定する。

実行してみる。

1
2
$ mvn clean assembly:assembly -DdescriptorId=jar-with-dependencies
$ java -cp target/firstapp-0.1-jar-with-dependencies.jar wordcount.WordCount

別のターミナルを改めて立ち上げ、入力トピックに書き込む。

1
$ cat /tmp/file-input.txt | kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input

出力は以下のようになる。

1
2
3
4
5
6
7
8
9
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning  --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --topic streams-wordcount-output
all 19
lead 19
to 19
hello 19
streams 38
join 19
kafka 57
summit 19

なお、ここでは kafka-console-consumer にプロパティ value.deserializer=org.apache.kafka.common.serialization.LongDeserializer を渡した。 アプリケーションでは集計した値はLong型だったためである。 詳しくは、 公式ドキュメント(Step 5: Process some data) 参照。

なお、指定しない場合は入力されたバイト列がそのまま標準出力に渡されるようになっている。 その結果、期待する出力が得られないことになるので注意。

kafka/tools/ConsoleConsumer.scala:512

1
2
3
4
5
6
def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String): Unit = {
val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8))
val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString.
getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes)
output.write(convertedBytes)
}

なお、別の方法として WordCount の実装を修正する方法がある。以下、参考までに修正方法を紹介する。

想定と異なる表示だが、これは今回バリューの方にLongを用いたため。 kafka-console-consumer で表示させるために以下のように実装を修正する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@@ -50,12 +44,15 @@ public class WordCount {

final StreamsBuilder builder = new StreamsBuilder();

- builder.<String, String>stream("streams-plaintext-input")
+ KStream<String, Long> wordCount = builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
- .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
- .toStream()
- .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
+ .count(Materialized.as("counts-store"))
+ .toStream();
+
+ wordCount.foreach((key, value) -> System.out.println("key: " + key + ", value: " + value));
+
+ wordCount.map((key, value) -> new KeyValue<>(key, String.valueOf(value))).to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.String()));

つまり、もともと to で終えていたところを、いったん変数にバインドし、 foreach を使ってストリームの内容を標準出力に表示させるようにしている。 また、 map メソッドを利用し、バリューの型をLongからStringに変換してから to で書き出すようにしている。

上記修正を加えた上で、改めてパッケージ化して実行したところ、以下のような表示が得られる。

kafka-console-consumer での表示例

1
2
3
4
5
6
7
8
all     9
lead 9
to 9
hello 9
streams 18
join 9
kafka 27
summit 9

ストリーム処理アプリの標準出力例

1
2
3
4
5
6
7
8
key: all, value: 9
key: lead, value: 9
key: to, value: 9
key: hello, value: 9
key: streams, value: 18
key: join, value: 9
key: kafka, value: 27
key: summit, value: 9

無事に表示できたことが確かめられただろうか。

共有

CDC Kafka and master table cache

過去の古いメモを少し細くしてアップロード。

参考

メモ

やりたいこと

マスタデータを格納しているRDBMSからKafkaにデータを取り込み、 KTableとしてキャッシュしたうえで、ストリームデータとJoinする。

RDBMSからのデータ取り込み

KafkaConnectを試す その2No More Silos: How to Integrate Your Databases with Apache Kafka and CDC を 参考に、Kafka Connectのjdbcコネクタを使用してみようと思った。

しかしこの例で載っているのは、差分取り込みのためにシーケンシャルなIDを持つカラムが必要であることが要件を満たさないかも、と思った。 やりたいのは、Updateも含むChange Data Capture。

test_db.json 内の該当箇所は以下の通り。

1
2
3
4
(snip)
"mode" : "incrementing",
"incrementing.column.name" : "seq",
(snip)

ということで、 JDBC Connector (Source and Sink) for Confluent Platform を確認してみた。 結論としては、更新時間とユニークIDを使うモードを利用すると良さそうだ。

JDBC Kafka Connector

JDBC Connector Prerequisites によると、 Kafka と Schema Registryがあればよさそうだ。

JDBC Connector Incremental Query Modes によると、モードは以下の通り。

  • Incrementing Column
    • ユニークで必ず値が大きくなるカラムを持つことを前提としたモード
    • 更新はキャプチャできない。そのためDWHのfactテーブルなどをキャプチャすることを想定している。
  • Timestamp Column
    • 更新時間のカラムを持つことを前提としたモード
    • 更新時間はユニークではないことを起因とした注意点がある。 同一時刻に2個のレコードが更新された場合、もし1個目のレコードが処理されたあとに障害が生じたとしたら、 復旧時にもう1個のレコードが処理されないことが起こりうる。
    • 疑問点:処理後に「処理済みTimestamp」を更新できないのだろうか
  • Timestamp and Incrementing Columns
    • 更新時刻カラム、ユニークIDカラムの両方を前提としたモード
    • 先のTimestamp Columnモードで問題となった、同一時刻に複数レコードが生成された場合における、 部分的なキャプチャ失敗を防ぐことができる。
    • 確認点:先に更新時刻を見て、ユニークIDで確認するのだろうか。だとしたら、更新もキャプチャできそう。
  • Custom Query
    • クエリを用いてフィルタされた結果をキャプチャするモード
    • Incrementing Column、Timestamp Columnなどと比べ、オフセットをトラックしないので、 クエリ自身がオフセットをトラックするのと同等の処理内容になっていないと行けない。
  • Bulk
    • テーブルをまるごとキャプチャするモード
    • レコードが消える場合になどに対応
    • 補足:他のモードで、レコードの消去に対応するには、実際に消去するのではなく、消去フラグを立てる、などの工夫が必要そう

当然だが、必要に応じて元のRDBMS側でインデックスが貼られていないとならない。

timestamp.delay.interval.ms 設定を使い、更新時刻に対し、実際に取り込むタイミングを遅延させられる。 これはトランザクションを伴うときに、一連のレコードが更新されるのを待つための機能。

なお、 JDBC Connector Message Keys によると、レコードの値や特定のカラムから、Kafkaメッセージのキーを生成できる。

更新時間とユニークIDを利用したキャプチャ

No More Silos: How to Integrate Your Databases with Apache Kafka and CDC のあたりを参考に、 別のモードを使って試してみる。

まずはKafka環境を構築するが、ここでは簡単化のためにConfluent Platformを用いることとした。 Confluent PlatformのUbuntuへのインストール手順 あたりを参考にすすめると良い。 また、 confluent コマンドがあると楽なので、 Confluent CLIのインストール手順 を参考にインストールしておく。

インストールしたら、シングルモードでKafka環境を起動しておく。

1
$ confluent local start

KafkaConnectを試す その2 あたりを参考に、Kafkaと同一マシンにPostgreSQLの環境を構築しておく。

1
2
3
4
5
$ sudo apt install -y postgresql
$ sudo vim /etc/postgresql/10/main/postgresql.conf
$ sudo cp /usr/share/postgresql/10/pg_hba.conf{.sample,}
$ sudo vim /usr/share/postgresql/10/pg_hba.conf
$ sudo systemctl restart postgresql

/etc/postgresql/10/main/postgresql.conf に追加する内容は以下の通り。

1
listen_addresses = '*'

/usr/share/postgresql/10/pg_hba.conf に追加する内容は以下の通り。

1
2
3
4
# PostgreSQL Client Authentication Configuration File
# ===================================================
local all all trust
host all all 127.0.0.1/32 trust

Kakfa用のデータベースとテーブルを作る。

1
$ psql -c "alter user postgres with password 'kafkatest'"
1
2
$ sudo -u postgres psql -U postgres -W -c "CREATE DATABASE testdb";
Password for user postgres:

テーブルを作る際、Timestampとインクリメンタルな値を使ったデータキャプチャを実現するためのカラムを含むようにする。 PostgreSQLで更新時のtimestampをアップデートするにはPostgreSQL で連番の数字のフィールドを作る方法 (sequence について)postgres - シーケンス inser時に自動採番 あたりを参考とする。

以下、テーブルを作り、ユニークID用のシーケンスを作り、タイムスタンプを作る流れ。 タイムスタンプはレコード更新時に合わせて更新されるようにトリガを設定する。

1
$ sudo -u postgres psql -U postgres testdb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE TABLE test_table (
seq SERIAL PRIMARY KEY,
ts timestamp NOT NULL DEFAULT now(),
item varchar(256),
price integer,
category varchar(256)
);
CREATE FUNCTION set_update_time() RETURNS OPAQUE AS '
begin
new.ts := ''now'';
return new;
end;
' LANGUAGE 'plpgsql';
CREATE TRIGGER update_tri BEFORE UPDATE ON test_table FOR EACH ROW
EXECUTE PROCEDURE set_update_time();
CREATE USER connectuser WITH password 'connectuser';
GRANT ALL ON test_table TO connectuser;
INSERT INTO test_table(item, price, category) VALUES ('apple', 400, 'fruit');
INSERT INTO test_table(item, price, category) VALUES ('banana', 160, 'fruit');
UPDATE test_table SET item='orange', price=100 where seq = 2;
INSERT INTO test_table(item, price, category) VALUES ('banana', 200, 'fruit');
INSERT INTO test_table(item, price, category) VALUES ('pork', 400, 'meet');
INSERT INTO test_table(item, price, category) VALUES ('beef', 800, 'meet');

以下のような結果が得られるはずである。

1
2
3
4
5
6
7
8
9
testdb=# SELECT * FROM test_table;
seq | ts | item | price | category
-----+----------------------------+--------+-------+----------
1 | 2020-02-02 13:31:12.065458 | apple | 400 | fruit
2 | 2020-02-02 13:31:49.220178 | orange | 100 | fruit
3 | 2020-02-02 13:32:32.324241 | banana | 200 | fruit
4 | 2020-02-02 13:33:06.560747 | pork | 400 | meet
5 | 2020-02-02 13:33:06.561966 | beef | 800 | meet
(5 rows)

Kafka Connect

JDBC Connector Incremental Query Modes を参考に、タイムスタンプ+インクリメンティングモードを用いる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cat << EOF > test_db.json
{
"name": "load-test-table",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:postgresql://localhost:5432/testdb",
"connection.user" : "connectuser",
"connection.password" : "connectuser",
"mode" : "timestamp+incrementing",
"incrementing.column.name" : "seq",
"timestamp.column.name" : "ts",
"table.whitelist" : "test_table",
"topic.prefix" : "db_",
"tasks.max" : "1"
}
}
EOF
$ curl -X DELETE http://localhost:8083/connectors/load-test-table
$ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @test_db.json
$ curl http://localhost:8083/connectors

上記コネクタでは、KafkaにAvro形式で書き込むので、 kafka-avro-console-consumerで確認する。

1
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic db_test_table --from-beginning

上記を起動した後、PostgreSQL側で適当にレコードを挿入・更新すると、 以下のような内容がコンソールコンシューマの出力に表示される。

変化がキャプチャされて取り込まれることがわかる。 挿入だけではなく、更新したものも取り込まれる。メッセージには、シーケンスとタイムスタンプの療法が含まれている。

1
2
3
4
5
6
{"seq":1,"ts":1580650272065,"item":{"string":"apple"},"price":{"int":400},"category":{"string":"fruit"}}
{"seq":2,"ts":1580650296666,"item":{"string":"banana"},"price":{"int":160},"category":{"string":"fruit"}}
{"seq":2,"ts":1580650309220,"item":{"string":"orange"},"price":{"int":100},"category":{"string":"fruit"}}
{"seq":3,"ts":1580650352324,"item":{"string":"banana"},"price":{"int":200},"category":{"string":"fruit"}}
{"seq":4,"ts":1580650386560,"item":{"string":"pork"},"price":{"int":400},"category":{"string":"meet"}}
{"seq":5,"ts":1580650386561,"item":{"string":"beef"},"price":{"int":800},"category":{"string":"meet"}}

Kafka Stramsでテーブルに変換

上記の通り、RDBMSからデータを取り込んだものに対し、 マスタテーブルとして使うため、KTableに変換してみる。

GlobalKTableへの読み込み

Kafka Streams例 あたりを参考にする。 特に、 GlobalKTablesExample.java あたりが参考になるかと思う。 今回は、上記レポジトリをベースに少しいじって、本例向けのサンプルアプリを作る。

[WIP]

共有

ThirdEye LinkedIn’s business-wide monitoring platform

参考

メモ

セッションメモ

Strataの ThirdEye LinkedIn’s business-wide monitoring platform のメモ。

LinkedInで運用されている異常検知と原因分析のためのプラットフォーム。 オープンソースになっているようだ。 -> thirdeyeの実装

主に以下の内容が記載されていた。

  • MTTD: Mean time to detect
  • MTTR: Mean time to repair

50 チーム以上がThirdEyeを利用。 何千もの時系列データが監視されている?

攻撃を検知したり、AIモデル周りの監視。

アーキ図あり。

異常検知における課題:スケーラビリティ、性能

手動でのコンフィグ、監視は現実的でない。

ルールベースの単純な仕組みは不十分。 多すぎるアラートは邪魔。

ブログメモ

Analyzing anomalies with ThirdEye のメモ。

データキューブについて。 LinkedInではPinotを使って事前にキューブ化されている。

ディメンジョン・ヒートマップの利用。 あるディメンジョンにおける問題の原因分析に有用。

変化の検出の仕方について。

ただし単独のディメンジョンの問題を検出するだけでは不十分。 複数のディメンジョンにまたがって分析してわかる問題を検出したい。

ディメンジョンをツリー構成。 ベースラインと現在の値でツリーを構成。

ノードの親子関係を利用。 各ノードとその親ノードの変化を式に組み入れることで、木構造に基づく傾向(?)を考慮しながら、 変化の重大さを算出する。

上記仕組みを利用することで、データマネージャのバグ、機械学習モデルサービングにおけるバグを見つけた。

共有

Automating ML model training and deployments via metadata-driven data, infrastructure, feature engineering, and model management

参考

メモ

以下の内容が記載されている。

ワークフロー。 よくあるワークフローなので特筆なし。

無数のストリームイベント。 数億のレコードアップデート。 数千万の顧客アカウント。

メタデータ管理、特徴量ストア、モデルサービング、パイプラインオートメーション。

共有

Questioning the Lambda Architecture

参考

メモ

Questioning the Lambda Architecture にてJay Krepsが初めて言及したとされているようだ。 過去に読んだが忘れたので改めて、いかにメモを記載する。

まとめ

主張としては、ストリーム処理はKafkaの登場により、十分に使用に耐えうるものになったため、 バッチ処理と両方使うのではなく、ストリーム処理1本で勝負できるのでは?ということだった。

気になった文言を引用

ラムダアーキテクチャについて:

The Lambda Architecture is an approach to building stream processing applications on top of MapReduce and Storm or similar systems.

Lambdaアーキテクチャのイメージ

バッチ処理とストリーム処理で2回ロジックを書く。:

You implement your transformation logic twice, once in the batch system and once in the stream processing system.

レコメンデーションシステムを例にとる:

A good example would be a news recommendation system that needs to crawl various news sources, process and normalize all the input, and then index, rank, and store it for serving.

データを取り込み、イミュターブルなものとして扱うことはありだと思う:

I’ve written some of my thoughts about capturing and transforming immutable data streams

I have found that many people who attempt to build real-time data processing systems don’t put much thought into this problem and end-up with a system that simply cannot evolve quickly because it has no convenient way to handle reprocessing.

リアルタイム処理が本質的に近似であり、バッチ処理よりも弱く、損失しがち、という意見があるよね、と。:

One is that real-time processing is inherently approximate, less powerful, and more lossy than batch processing.

ラムダアーキテクチャの利点にCAP定理との比較が持ち出されることを引き合いに出し、ラムダアーキテクチャがCAP定例を克服するようなものではない旨を説明。:

Long story short, although there are definitely latency/availability trade-offs in stream processing, this is an architecture for asynchronous processing, so the results being computed are not kept immediately consistent with the incoming data. The CAP theorem, sadly, remains intact.

結局、StormとHadoopの両方で同じ結果を生み出すアプリケーションを 実装するのがしんどいという話。:

Programming in distributed frameworks like Storm and Hadoop is complex.

ひとつの解法は抽象化。:

Summingbird

とはいえ、2重運用はしんどい。デバッグなど。:

the operational burden of running and debugging two systems is going to be very high.

結局のところ、両方を同時に使わないでくれ、という結論:

These days, my advice is to use a batch processing framework like MapReduce if you aren’t latency sensitive, and use a stream processing framework if you are, but not to try to do both at the same time unless you absolutely must.

ストリーム処理はヒストリカルデータの高スループットでの処理に向かない、という話もあるが…:

When I’ve discussed this with people, they sometimes tell me that stream processing feels inappropriate for high-throughput processing of historical data.

バッチ処理もストリーム処理も抽象化の仕方は、DAGをベースにしたものであり、 その点では共通である、と。:

But there is no reason this should be true. The fundamental abstraction in stream processing is data flow DAGs, which are exactly the same underlying abstraction in a traditional data warehouse (a la Volcano) as well as being the fundamental abstraction in the MapReduce successor Tez.

ということでKafka。:

Use Kafka

提案アーキテクチャ

Kafkaに入れた後は、HDFS等に簡単に保存できる。:

Kafka has good integration with Hadoop, so mirroring any Kafka topic into HDFS is easy.

このあと少し、Kafkaの説明が続く。

この時点では、Event Sourcing、CQRSについての言及あり。

Indeed, a lot of people are familiar with similar patterns that go by the name Event Sourcing or CQRS.

LinkedInにて、JayはSamzaを利用。

I know this approach works well using Samza as the stream processing system because we do it at LinkedIn.

提案手法の難点として、一時的に2倍の出力ストレージサイズが必要になる。

However, my proposal requires temporarily having 2x the storage space in the output database and requires a database that supports high-volume writes for the re-load.

単純さを大事にする。:

So, in cases where simplicity is important, consider this approach as an alternative to the Lambda Architecture.

所感

当時よりも、最近のワークロードは複雑なものも含めて期待されるようになっており、 ますます「バッチ処理とストリーム処理で同じ処理を実装する」というのがしんどくなっている印象。

共有

ML Ops: Machine Learning as an Engineering Discipline

参考

メモ

読んでみた感想をまとめる。

感想

結論としては、まとめ表がよくまとまっているのでそれでよい気がする。 この表をベースに、アクティビティから必要なものを追加するか? -> まとめ表

演繹的ではなく、帰納的な手法であるため、もとになったコードとデータの両方が重要。

データサイエンティスト、MLエンジニア、DevOpsエンジニア、データエンジニア。 MLエンジニア。あえてエンジニアと称しているのは、ソフトウェア開発のスキルを有していることを期待するから。

フェアネスの考慮、というか機械学習の倫理考慮をどうやって機械的に実現するのか、というのはかねてより気になっていた。 フェアネスの考慮などを達成するためには、テストデータセットの作り方、メトリクスの作り方に工夫するとよい。つまり、男女でそれぞれ個別にもテストするなど。 まだ一面ではあるが、参考になった。

気になる文言の抜粋

以下の文言が印象に残った。 ほかのレポートでも言われていることに見える。

Deeplearning.ai reports that “only 22 percent of companies using machine learning have successfully deployed a model”.

このレポートがどれか気になった。 Deeplearning.aiのレポート か。

確かに以下のように記載されている。

Although AI budgets are on the rise, only 22 percent of companies using machine learning have successfully deployed a model, the study found.

データが大切論:

ML is not just code, it’s code plus data

training data, which will affect the behavior of the model in production

ML = Code + Dataの図

It never stops changing, and you can’t control how it will change.

確かに、データはコントロールできない。

Data Engineering

ML Opsの概念図

チーム構成について言及あり。:

But the most likely scenario right now is that a successful team would include a Data Scientist or ML Engineer, a DevOps Engineer and a Data Engineer.

データサイエンティストのほかに、明確にMLエンジニア、DevOpsエンジニア、データエンジニアを入れている。 基盤エンジニアはデータエンジニアに含まれるのだろうか。

Even if an organization includes all necessary skills, it won’t be successful if they don’t work closely together.

ノートブックに殴り書かれたコードは不十分の話:

getting a model to work great in a messy notebook is not enough.

ML Engineers

データパイプラインの話:

data pipeline

殴り書きのコードではなく、適切なパイプラインはメリットいくつかあるよね、と。:

Switching to proper data pipelines provides many advantages in code reuse, run time visibility, management and scalability.

トレーニングとサービングの両方でパイプラインがあるけど、 入力データや変換内容は、場合によっては微妙に異なる可能性がある、と。:

Most models will need 2 versions of the pipeline: one for training and one for serving.

ML Pipelineは特定のデータから独立しているためCICDと連携可能

For example, the training pipeline usually runs over batch files that contain all features, while the serving pipeline often runs online and receives only part of the features in the requests, retrieving the rest from a database.

いくつかTensorFlow関係のツールが紹介されている。確認したほうがよさそう。:

TensorFlow Pipeline

TensorFlow Transform

バージョン管理について:

In ML, we also need to track model versions, along with the data used to train it, and some meta-information like training hyperparameters.

Models and metadata can be tracked in a standard version control system like Git, but data is often too large and mutable for that to be efficient and practical.

コードのラインサイクルと、モデルのライフサイクルは異なる:

It’s also important to avoid tying the model lifecycle to the code lifecycle, since model training often happens on a different schedule.

It’s also necessary to version data and tie each trained model to the exact versions of code, data and hyperparameters that were used.

Having comprehensive automated tests can give great confidence to a team, accelerating the pace of production deployments dramatically.

モデルの検証は本質的に、統計に基づくものになる。というのも、そもそもモデルの出力が確率的だし、入力データも変動する。:

model validation tests need to be necessarily statistical in nature

Just as good unit tests must test several cases, model validation needs to be done individually for relevant segments of the data, known as slices.

Data validation is analogous to unit testing in the code domain.

ML pipelines should also validate higher level statistical properties of the input.

TensorFlow Data Validation

Therefore, in addition to monitoring standard metrics like latency, traffic, errors and saturation, we also need to monitor model prediction performance.

An obvious challenge with monitoring model performance is that we usually don’t have a verified label to compare our model’s predictions to, since the model works on new data.

まとめ表
共有

jvm profiler for Spark at Uber

参考

メモ

概要

Sparkのエグゼキュータ等のJVMプロファイリングを行うためのライブラリ。

JVM起動時に、agentとしてアタッチするようにする。

executorプロセスのUUIDを付与しながらメトリクスを取得できるようだ。 Kafkaに流すことも可能。

公式GitHub のREADMEによると、オフヒープの使用量なども計測できており、チューニングに役立ちそう。

20200107時点では、2か月前ほどに更新されており、まだ生きているプロジェクトのようだ。

動作確認

公式GitHub のREADMEに記載されていた手順で、パッケージをビルドし、ローカルモードのSparkで利用してみた。

ビルド方法:

1
$ mvn clean package

パッケージビルド結果は、 target/jvm-profiler-1.0.0.jar に保存される。

これをjavaagentとして用いる。 渡すオプションは、 --conf spark.executor.driverJavaOptions=-javaagent:${JVMPROFILER_HOME}/target/jvm-profiler-1.0.0.jar である。 環境変数 ${JVMPROFILER_HOME} は先ほどビルドしたレポジトリのPATHとする。

また、今回は com.uber.profiling.reporters.FileOutputReporter を用いて、ファイル出力を試みることとする。

結果的に、Sparkの起動コマンドは、以下のような感じになる。:

1
$ ${SPARK_HOME}/bin/spark-shell --conf spark.driver.extraJavaOptions=-javaagent:/home/ubuntu/Sources/jvm-profiler/target/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/jvm-profile

ここで

  • 環境変数 ${SPARK_HOME} はSparkを配備したPATHである
  • ディレクトリ /tmp/jvm-profile は予め作成しておく

とする。

生成されるレコードは、以下のようなJSONである。CpuAndMemory.jsonの例は以下の通り。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
{
"heapMemoryMax": 954728448,
"role": "driver",
"nonHeapMemoryTotalUsed": 156167536,
"bufferPools": [
{
"totalCapacity": 20572,
"name": "direct",
"count": 10,
"memoryUsed": 20575
},
{
"totalCapacity": 0,
"name": "mapped",
"count": 0,
"memoryUsed": 0
}
],
"heapMemoryTotalUsed": 400493400,
"vmRSS": 812081152,
"epochMillis": 1578408135107,
"nonHeapMemoryCommitted": 157548544,
"heapMemoryCommitted": 744488960,
"memoryPools": [
{
"peakUsageMax": 251658240,
"usageMax": 251658240,
"peakUsageUsed": 37649152,
"name": "Code Cache",
"peakUsageCommitted": 38010880,
"usageUsed": 37649152,
"type": "Non-heap memory",
"usageCommitted": 38010880
},
{
"peakUsageMax": -1,
"usageMax": -1,
"peakUsageUsed": 104054944,
"name": "Metaspace",
"peakUsageCommitted": 104857600,
"usageUsed": 104054944,
"type": "Non-heap memory",
"usageCommitted": 104857600
},
{
"peakUsageMax": 1073741824,
"usageMax": 1073741824,
"peakUsageUsed": 14463440,
"name": "Compressed Class Space",
"peakUsageCommitted": 14680064,
"usageUsed": 14463440,
"type": "Non-heap memory",
"usageCommitted": 14680064
},
{
"peakUsageMax": 336592896,
"usageMax": 243269632,
"peakUsageUsed": 247788352,
"name": "PS Eden Space",
"peakUsageCommitted": 250085376,
"usageUsed": 218352416,
"type": "Heap memory",
"usageCommitted": 239075328
},
{
"peakUsageMax": 58195968,
"usageMax": 55050240,
"peakUsageUsed": 43791112,
"name": "PS Survivor Space",
"peakUsageCommitted": 58195968,
"usageUsed": 43791112,
"type": "Heap memory",
"usageCommitted": 55050240
},
{
"peakUsageMax": 716177408,
"usageMax": 716177408,
"peakUsageUsed": 138349872,
"name": "PS Old Gen",
"peakUsageCommitted": 450363392,
"usageUsed": 138349872,
"type": "Heap memory",
"usageCommitted": 450363392
}
],
"processCpuLoad": 0.02584087025382403,
"systemCpuLoad": 0.026174300837744344,
"processCpuTime": 49500000000,
"vmHWM": 812081152,
"appId": "local-1578407721611",
"vmPeak": 4925947904,
"name": "24974@ubuec2",
"host": "ubuec2",
"processUuid": "38d5c63f-d70d-4e4d-9d54-a2381b9c37a7",
"nonHeapMemoryMax": -1,
"vmSize": 4925947904,
"gc": [
{
"collectionTime": 277,
"name": "PS Scavenge",
"collectionCount": 16
},
{
"collectionTime": 797,
"name": "PS MarkSweep",
"collectionCount": 4
}
]
}
  • nonヒープのメモリ使用量についても情報あり
  • ヒープについては、RSSに関する情報もある
  • ヒープ内の領域に関する情報もあり、GCに関する情報もある
共有

The evolution of metadata: LinkedIn’s story

参考

メモ

LinkedInが提唱する Generalized Metadata Architecture (GMA) を基盤としたメタデータ管理システム。

コンセプトは、 スライド に記載されているが、ざっとアーキテクチャのイメージをつかむには、 アーキテクチャ がよい。

GMA

メタデータは自動収集。

これは標準化されたメタデータモデルとアクセスレイヤによるものである。

また標準モデルが、モデルファーストのアプローチを促進する。

Metadata Serving

Metadata Serving に記載あり。

RESTサービスは、LinkedInが開発していると思われるREST.liが用いられており、 DAOもその中の「Pegasus」という仕組みを利用している。

Key-Value DAO、Search DAO、Query DAOが定義されている。

上記GMAの通り、この辺りのDAOによるアクセスレイヤの標準化が見て取れる。

Metadata Ingestion Architecture

そもそも、メタデータに対する変更は MAE(Metadata Audit Event) としてキャプチャされる。

それがKafka Streamsのジョブで刈り取られ処理される。なお、シーケンシャルに処理されるための工夫もあるようだ。

共有

Pinot: Enabling Real-time Analytics Applications

参考

メモ

オンライン分析のアーキテクチャ

  • Join on the fly
    • ベーステーブルからクエリ実行のタイミングでデータ生成
  • Pre Join + Pre Aggregate
    • 分析で必要となるテーブルをストリーム処理等で事前作成
    • 事前処理はマスタデータのテーブルとの結合、など
  • Pre Join + Pre Aggregate + Pre Cube
    • さらに分析で求められる結果データを予め作成、インデックス化
    • 例えばAggregationしておく、など

レイテンシと柔軟性のトレードオフの関係:

Latency vs. Flexibility
「Who View」のアーキテクチャ

ユースケース

  • LinkedInのメンバーが見る分析レポート
    • QPSが高い(数千/秒 級)
    • レイテンシは数十ms~sub秒
  • インタラクティブダッシュボード
    • 様々なアグリゲーション
  • 異常検知

LinkedIn以外の企業では、

  • Uber
  • slack
  • MS Teams
  • Weibo
  • factual

あたりが利用しているようだ。 Uberは、自身の技術ブログでも触れていた。

ワークフロー

ワークフロー概要

Pinotは、原則としてPre Aggregation、Pre Cubeを前提とする仕組みなので、 スキーマの定義が非常に重要。

分散処理とIngestion

またバッチとストリーム両方に対応しているので、 それぞれデータ入力(Ingestion)を定義する。

データはセグメントに分けられ、サーバに分配される。

Segment Assignment

分散処理とクエリルーティング

セグメントに基づき、Brokerによりルーティングされる。

リアルタイムサーバとオフラインサーバ

ストリームから取り込んだデータとバッチで取り込んだデータは、 共通のスキーマを用いることになる。

したがって、統一的にクエリできる?

アーキテクチャの特徴

インデックス(Cube)の特徴

Scan、Inverted Index、Sorted Index、Star-Tree Indexを併用可能。

データ処理上の工夫

比較対象として、Druidがよく挙げられているが、Sorted Index、Star-Tree Indexがポイント。

カラムナデータフォーマットを用いるのは前提。 それに加え、Dictionary Encodeing、Bit Compressionを使ってデータ構造に基づいた圧縮を採用。

Inverted Index

転置インデックスを作っておく、という定番手法。

Sorted Index

予め、Dimensionに基づいてソートしておくことで、 フィルタする際にスキャン量を減らしながら、かつデータアクセスを効率化。

Sroted Indexの特徴

Star-Tree Index

すべてをCube化するとデータ保持のスペースが大量に必要になる。 そこで部分的にCube化する。 ★要確認

Star-Tree Indexの特徴

性能特性

Druidとの簡単な比較

Druidと簡易的に比較した結果が載っている。

まずは、レイテンシの小ささが求められるインタラクティブな分析における性能特徴:

Druidとの簡易比較

ミリ秒単位での分析を取り扱うことに関してDruidと共通だが、 各種インデックスのおかげか、Druidよりもパーセンタイルベースの比較で レイテンシが小さいとされている。

つづいて、あらかじめ定義されたクエリを大量にさばくユースケース:

Druidとの簡易比較2

レイテンシを小さく保ったまま、高いQPSを実現していることを 示すグラフが載っている。 この辺りは、工夫として載っていた各種インデックスを予め定義していることが強く効きそうだ。

続いて異常検知ユースケースの例:

Druidとの簡易比較3

データがSkewしていることが強調されているが、その意図はもう少し読み解く必要がありそう。 ★要確認

Star-Tree Indexについて

Star-Tree Index Powering Fast Aggregations on Pinot に記載あり。

共有