Reference of connector plugin

参考

メモ

2020/12時点で、Kafka Connectのプラグインの参考になるもの探す。

kafka-connect-syslog

kafka-connect-syslog が最も簡易に動作確認できそうだった。 ただ、 ConfluentのGitHub を見る限り、GitHub上には実装が公開されていないようだった。

動作確認

ここでは、ローカルに簡易実験用の1プロセスのKafkaを起動した前提とする。 起動方法は Kafkaのクイックスタート を参照。

kafka-connect-syslog のパッケージをダウンロードして /opt/connectors 以下に展開。

1
2
3
$ cd /opt/connectors
$ sudo unzip confluentinc-kafka-connect-syslog-1.3.2.zip
$ sudo chown -R kafka:kafka confluentinc-kafka-connect-syslog-1.3.2

というパッケージが展開される。

動作確認に使用するプロパティは以下。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cat etc/minimal-tcp.properties
#
# Copyright [2016 - 2019] Confluent Inc.
#

name=syslog-tcp
tasks.max=1
connector.class=io.confluent.connect.syslog.SyslogSourceConnector
syslog.port=5515
syslog.listener=TCP
confluent.license
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

また、Connectの設定には以下を追加する。

/opt/kafka_pseudo/default/config/connect-standalone.properties

1
2
3
(snip)

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

プラグインを置く場所として、 /opt/connectors を指定した。

/opt/kafka_pseudo/default/bin/connect-standalone.sh を利用して、 スタンドアローンモードでKafka Connectを起動。

1
2
$ sudo -u kafka /opt/kafka_pseudo/default/bin/connect-standalone.sh /opt/kafka_pseudo/default/config/connect-standalone.properties \
/opt/connectors/confluentinc-kafka-connect-syslog-1.3.2/etc/minimal-tcp.properties

起動したのを確認し、別の端末から適当なデータを送信。

1
$ echo "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - Your refrigerator is running" | nc -v -w 1 localhost 5515

Console Consumerを利用して書き込み状況を確認。

1
2
$ cd ${KAFKA_HOME}
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic syslog --from-beginning

先程書き込んだものが表示されるはずである。

kafka-connect-datagen

kafka-connect-datagen もあった。 kafka-connect-datagenのGitHub に実装も公開されているように見える。 ドキュメントのリンクから、当該レポジトリのREADMEにジャンプしたため、そのように判断。

以降、v0.4.0を対象として確認したものである。

概要

指定されたスキーマで、ダミーデータを生成するコネクタ。 avro-random-generator を内部的に利用している。

スキーマ指定はAvroのスキーマファイルを渡す方法もあるし、 組み込みのスキーマを指定する方法もある。 kafka-connect-datagenのサンプルスキーマ を参照。

また、Kafkaに出力する際のフォーマットは指定可能。 Kafka Connect自体の一般的なパラメータである、 value.converter を指定すれば良い。 例えば以下の通り。

1
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

実装状況

2020/12/21時点では本プロジェクトのバージョンは0.4.0であり、

1
2
3
4
5
<parent>
<groupId>io.confluent</groupId>
<artifactId>common-parent</artifactId>
<version>6.0.0</version>
</parent>

の通り、Confluent Platformのバージョンとしては、6系である。

Confluent Platform and Apache Kafka Compatibility によると、 Confluent Platform 6系のKafkaバージョンは2.6.Xである。

io.confluent.kafka.connect.datagen.DatagenConnector

io.confluent.kafka.connect.datagen.DatagenConnector クラスは、 org.apache.kafka.connect.source.SourceConnector を継承している。 割と素直な実装。

io.confluent.kafka.connect.datagen.DatagenConnector#start メソッドは特別なことはしておらず、 コンフィグをロードするだけ。

io.confluent.kafka.connect.datagen.DatagenConnector#taskConfigs メソッドも 特別なことはしていない。start時に受け取ったプロパティから taskConfigを生成して返す。

io.confluent.kafka.connect.datagen.DatagenConnector#stop メソッド および、 io.confluent.kafka.connect.datagen.DatagenConnector#config もほぼ何もしない。

タスクには io.confluent.kafka.connect.datagen.DatagenTask クラスを利用する。

io.confluent.kafka.connect.datagen.DatagenTask クラス

io.confluent.kafka.connect.datagen.DatagenTask#start メソッドが overrideされている。 以下、ポイントを確認する。

オフセット管理の仕組みあり。

io/confluent/kafka/connect/datagen/DatagenTask.java:133

1
2
3
4
5
6
7
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
if (offset != null) {
// The offset as it is stored contains our next state, so restore it as-is.
taskGeneration = ((Long) offset.get(TASK_GENERATION)).intValue();
count = ((Long) offset.get(CURRENT_ITERATION));
random.setSeed((Long) offset.get(RANDOM_SEED));
}

io.confluent.avro.random.generator.Generator のジェネレータ(のビルダ)を利用する。

io/confluent/kafka/connect/datagen/DatagenTask.java:141

1
2
3
Generator.Builder generatorBuilder = new Generator.Builder()
.random(random)
.generation(count);

クイックスタートの設定があれば、それに従ってスキーマを読み込む。

io/confluent/kafka/connect/datagen/DatagenTask.java:144

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
String quickstartName = config.getQuickstart();
if (quickstartName != "") {
try {
quickstart = Quickstart.valueOf(quickstartName.toUpperCase());
if (quickstart != null) {
schemaFilename = quickstart.getSchemaFilename();
schemaKeyField = schemaKeyField.equals("")
? quickstart.getSchemaKeyField() : schemaKeyField;
try {
generator = generatorBuilder
.schemaStream(getClass().getClassLoader().getResourceAsStream(schemaFilename))
.build();
} catch (IOException e) {
throw new ConnectException("Unable to read the '"
+ schemaFilename + "' schema file", e);
}
}
} catch (IllegalArgumentException e) {
log.warn("Quickstart '{}' not found: ", quickstartName, e);
}

指定されたクイックスタート名に従い、パッケージに含まれるスキーマファイルを読み込み、 それを適用しながらジェネレータを生成する。

なお、クイックスタートのたぐいはenumで定義されている。

io/confluent/kafka/connect/datagen/DatagenTask.java:75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected enum Quickstart {
CLICKSTREAM_CODES("clickstream_codes_schema.avro", "code"),
CLICKSTREAM("clickstream_schema.avro", "ip"),
CLICKSTREAM_USERS("clickstream_users_schema.avro", "user_id"),
ORDERS("orders_schema.avro", "orderid"),
RATINGS("ratings_schema.avro", "rating_id"),
USERS("users_schema.avro", "userid"),
USERS_("users_array_map_schema.avro", "userid"),
PAGEVIEWS("pageviews_schema.avro", "viewtime"),
STOCK_TRADES("stock_trades_schema.avro", "symbol"),
INVENTORY("inventory.avro", "id"),
PRODUCT("product.avro", "id");

private final String schemaFilename;
private final String keyName;

Quickstart(String schemaFilename, String keyName) {
this.schemaFilename = schemaFilename;
this.keyName = keyName;
}

public String getSchemaFilename() {
return schemaFilename;
}

public String getSchemaKeyField() {
return keyName;
}
}

クイックスタートが設定されておらず、スキーマの文字列が与えられた場合は、 それを用いてジェネレータが生成される。

io/confluent/kafka/connect/datagen/DatagenTask.java:164

1
2
} else if (schemaString != "") {
generator = generatorBuilder.schemaString(schemaString).build();

それ以外の場合、つまりスキーマ定義の書かれたファイルを指定する場合は、 以下の通り。

io/confluent/kafka/connect/datagen/DatagenTask.java:166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
String err = "Unable to read the '" + schemaFilename + "' schema file";
try {
generator = generatorBuilder.schemaStream(new FileInputStream(schemaFilename)).build();
} catch (FileNotFoundException e) {
// also look in jars on the classpath
try {
generator = generatorBuilder
.schemaStream(DatagenTask.class.getClassLoader().getResourceAsStream(schemaFilename))
.build();
} catch (IOException inner) {
throw new ConnectException(err, e);
}
} catch (IOException e) {
throw new ConnectException(err, e);
}
}

最後のAvroに関連する情報を生成して終了。

io/confluent/kafka/connect/datagen/DatagenTask.java:184

1
2
3
avroSchema = generator.schema();
avroData = new AvroData(1);
ksqlSchema = avroData.toConnectSchema(avroSchema);

io.confluent.kafka.connect.datagen.DatagenTask#poll メソッドもoverrideされている。 以下、ポイントを記載する。

インターバル機能あり。

io/confluent/kafka/connect/datagen/DatagenTask.java:192

1
2
3
4
5
6
7
8
if (maxInterval > 0) {
try {
Thread.sleep((long) (maxInterval * Math.random()));
} catch (InterruptedException e) {
Thread.interrupted();
return null;
}
}

ジェネレータを利用し、オブジェクトを生成する。

io/confluent/kafka/connect/datagen/DatagenTask.java:201

1
2
3
4
5
6
7
8
final Object generatedObject = generator.generate();
if (!(generatedObject instanceof GenericRecord)) {
throw new RuntimeException(String.format(
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead",
generatedObject.getClass().getName()
));
}
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;

生成されたオブジェクトから、スキーマ定義に基づいてフィールドの値を取り出し、 バリューのArrayListを生成する。

io/confluent/kafka/connect/datagen/DatagenTask.java:210

1
2
3
4
5
6
7
8
9
10
11
12
final List<Object> genericRowValues = new ArrayList<>();
for (org.apache.avro.Schema.Field field : avroSchema.getFields()) {
final Object value = randomAvroMessage.get(field.name());
if (value instanceof Record) {
final Record record = (Record) value;
final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value();
Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue);
genericRowValues.add(optionValue);
} else {
genericRowValues.add(value);
}
}

キーも同様に取り出し、Kafka Connectの形式に変換する。

io/confluent/kafka/connect/datagen/DatagenTask.java:224

1
2
3
4
5
6
7
SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null);
if (!schemaKeyField.isEmpty()) {
key = avroData.toConnectData(
randomAvroMessage.getSchema().getField(schemaKeyField).schema(),
randomAvroMessage.get(schemaKeyField)
);
}

先程ArrayListとして取り出したバリューもKafka Connect形式に変換する。

io/confluent/kafka/connect/datagen/DatagenTask.java:233

1
2
final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema);
final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value();

イテレートのたびに、メタデータを更新する。

io/confluent/kafka/connect/datagen/DatagenTask.java:246

1
2
3
4
5
6
7
8
9
// The source offsets will be the values that the next task lifetime will restore from
// Essentially, the "next" state of the connector after this loop completes
Map<String, Object> sourceOffset = new HashMap<>();
// The next lifetime will be a member of the next generation.
sourceOffset.put(TASK_GENERATION, taskGeneration + 1);
// We will have produced this record
sourceOffset.put(CURRENT_ITERATION, count + 1);
// This is the seed that we just re-seeded for our own next iteration.
sourceOffset.put(RANDOM_SEED, seed);

最後に、SourceRecordのリスト形式に変換し、 レコードとして生成して戻り値として返す。

io/confluent/kafka/connect/datagen/DatagenTask.java:261

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final List<SourceRecord> records = new ArrayList<>();
SourceRecord record = new SourceRecord(
sourcePartition,
sourceOffset,
topic,
null,
key.schema(),
key.value(),
messageSchema,
messageValue,
null,
headers
);
records.add(record);
count += records.size();
return records;

つづいて、 io.confluent.kafka.connect.datagen.DatagenTask#stop メソッドだが、 これは特に何もしない。

io.confluent.kafka.connect.datagen.DatagenTask#getOptionalSchema という オプショナルなフィールドに関するスキーマを取得するためのヘルパーメソッドもある。 io.confluent.kafka.connect.datagen.DatagenTask#getOptionalValue メソッドもある。

動作確認

confluentinc-kafka-connect-datagen-0.4.0.zip をダウンロードし、 /opt/connectors以下に展開したものとする。

今回は以下の設定ファイルを参考に、データ生成してみる。 なお、イテレーション回数は適度に修正して用いることを推奨する。

confluentinc-kafka-connect-datagen-0.4.0/etc/connector_users.config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"name": "datagen-users",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}

上記はconfluentコマンド用で利用する際のコンフィグファイルである。 そこで以下のようなKafka Connect用の設定ファイルを生成する。

/opt/connectors/confluentinc-kafka-connect-datagen-0.4.0/etc/connector_users.properties

1
2
3
4
5
6
7
8
9
10
name=users
connector.class=io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic=users
quickstart=users
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
max.interval=1000
iterations=10
tasks.max=1

スタンドアローンモードでKafka Connectを起動する。

1
2
3
$ sudo -u kafka /opt/kafka_pseudo/default/bin/connect-standalone.sh \
/opt/kafka_pseudo/default/config/connect-standalone.properties \
/opt/connectors/confluentinc-kafka-connect-datagen-0.4.0/etc/connector_users.properties

停止した後、結果を確認する。 トピックが作られたことがわかる。

1
2
3
4
5
$ /opt/kafka_pseudo/default/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
_confluent-command
syslog
users

データを確認する。

1
2
3
4
5
6
7
8
9
10
11
$ /opt/kafka_pseudo/default/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users --from-beginning
{"registertime":1501850210149,"userid":"User_8","regionid":"Region_3","gender":"FEMALE"}
{"registertime":1516539876299,"userid":"User_2","regionid":"Region_7","gender":"OTHER"}
{"registertime":1505292095234,"userid":"User_4","regionid":"Region_1","gender":"OTHER"}
{"registertime":1502118362741,"userid":"User_3","regionid":"Region_1","gender":"FEMALE"}
{"registertime":1503193759324,"userid":"User_9","regionid":"Region_5","gender":"MALE"}
{"registertime":1507693509191,"userid":"User_1","regionid":"Region_8","gender":"OTHER"}
{"registertime":1497764008309,"userid":"User_1","regionid":"Region_6","gender":"OTHER"}
{"registertime":1514606256206,"userid":"User_1","regionid":"Region_3","gender":"MALE"}
{"registertime":1492595638722,"userid":"User_2","regionid":"Region_6","gender":"MALE"}
{"registertime":1500602208014,"userid":"User_3","regionid":"Region_1","gender":"MALE"}

ダミーデータが生成されていることが確認できた。

(WIP)

共有

Create projects includes sbt launcher

参考

メモ

プロジェクト内に、sbt自体を含めてクローンしただけでビルドできるようにしたい、という動機。 現時点ではCoursierプロジェクトが汎用性※が高く便利であると、個人的には感じた。

※ここでは、多くの環境で実行可能で、様々なツールを一度にセットアップ可能という意味。

Coursierプロジェクト

Coursier を使って最速でScalaの開発環境を整える のブログに記載されているとおり、 Scala開発環境を簡単に整えられる。

ひとまずDocker内で試す。

1
2
3
$ sudo docker pull ubuntu:18.04
$ sudo docker run -rm -it -d --name ubu ubuntu:18.04
$ sudo docker exec -it ubu /bin/bash

Dockerコンテナ内でインストール。

1
2
3
4
5
# apt update
# apt install -y curl
# curl -fLo cs https://git.io/coursier-cli-linux &&
chmod +x cs &&
./cs

結果として以下がインストールされた様子。

1
2
3
4
5
6
7
8
9
Checking if the standard Scala applications are installed
Installed ammonite
Installed cs
Installed coursier
Installed scala
Installed scalac
Installed sbt
Installed sbtn
Installed scalafmt

~/.profileに環境変数等がまとまっているので有効化する。

1
# source ~/.profile

これでインストールされたコマンドが使用できるようになった。

なお、参考までに上記でダウンロードしたcsコマンドは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
$ ./cs setup --help
Command: setup
Usage: cs setup
--jvm <string?>
--jvm-dir <string?>
--system-jvm <bool?>
--local-only <bool>
--update <bool>
--jvm-index <string?>
--graalvm-home <string?>
--graalvm-option <string*>
--graalvm-default-version <string?>
--install-dir | --dir <string?>
--install-platform <string?>
Platform for prebuilt binaries (e.g. "x86_64-pc-linux", "x86_64-apple-darwin", "x86_64-pc-win32")
--install-prefer-prebuilt <bool>
--only-prebuilt <bool>
Require prebuilt artifacts for native applications, don't try to build native executable ourselves
--repository | -r <maven|sonatype:$repo|ivy2local|bintray:$org/$repo|bintray-ivy:$org/$repo|typesafe:ivy-$repo|typesafe:$repo|sbt-plugin:$repo|ivy:$pattern>
Repository - for multiple repositories, separate with comma and/or add this option multiple times (e.g. -r central,ivy2local -r sonatype:snapshots, or equivalently -r central,ivy2local,sonatype:snapshots)
--default-repositories <bool>
--proguarded <bool?>
--channel <org:name>
Channel for apps
--default-channels <bool>
Add default channels
--contrib <bool>
Add contrib channel
--file-channels <bool>
Add channels read from the configuration directory
--cache <string?>
Cache directory (defaults to environment variable COURSIER_CACHE, or ~/.cache/coursier/v1 on Linux and ~/Library/Caches/Coursier/v1 on Mac)
--mode | -m <offline|update-changing|update|missing|force>
Download mode (default: missing, that is fetch things missing from cache)
--ttl | -l <duration>
TTL duration (e.g. "24 hours")
--parallel | -n <int>
Maximum number of parallel downloads (default: 6)
--checksum <checksum1,checksum2,...>
Checksum types to check - end with none to allow for no checksum validation if no checksum is available, example: SHA-256,SHA-1,none
--retry-count <int>
Retry limit for Checksum error when fetching a file
--cache-file-artifacts | --cfa <bool>
Flag that specifies if a local artifact should be cached.
--follow-http-to-https-redirect <bool>
Whether to follow http to https redirections
--credentials <host(realm) user:pass|host user:pass>
Credentials to be used when fetching metadata or artifacts. Specify multiple times to pass multiple credentials. Alternatively, use the COURSIER_CREDENTIALS environment variable
--credential-file <string*>
Path to credential files to read credentials from
--use-env-credentials <bool>
Whether to read credentials from COURSIER_CREDENTIALS (env) or coursier.credentials (Java property), along those passed with --credentials and --credential-file
--quiet | -q <counter>
Quiet output
--verbose | -v <counter>
Increase verbosity (specify several times to increase more)
--progress | -P <bool>
Force display of progress bars
--env <bool>
--user-home <string?>
--banner <bool?>
--yes | -y <bool?>
--try-revert <bool>
--apps <string*>

(補足)sbt-launcher-packageプロジェクト

sbt-launcher-package をビルドすることでSBTランチャーを提供できそうだった。

(補足)launcherプロジェクト

https://github.com/sbt/launcher/blob/1.x/.java-version にも記載されているとおり、 JDK7系にしか公式に対応していなかった。

共有

Read and write data on Delta Lake streaming manner

参考

メモ

準備(SBT)

今回はSBTを利用してSalaを用いたSparkアプリケーションで試すことにする。 sbt を参考にSBTをセットアップする。(基本的には対象バージョンをダウンロードして置くだけ)

1
2
3
$ mkdir -p ~/Sources
$ cd ~/Sources
$ sbt new dobachi/spark-sbt.g8

ここでは、 [dobachi's spark-sbt.g8] を利用してSpark3.0.1の雛形を作った。 対話的に色々聞かれるので、ほぼデフォルトで生成。

ここでは、 StructuredNetworkWordCount を参考に、Word Countした結果をDelta Lakeに書き込むことにする。

なお、以降の例で示しているアプリケーションは、すべて dobachi's StructuredStreamingDeltaLakeExample に含まれている。

簡単なビルドと実行の例

以下のように予めncコマンドを起動しておく。

1
$ nc -lk 9999

続いて、別のターミナルでアプリケーションをビルドして実行。(ncコマンドを起動したターミナルは一旦保留)

1
2
3
$ cd structured_streaming_deltalake
$ sbt assembly
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar localhost 9999

先程起動したncコマンドの引数に合わせ、9999ポートに接続する。 Structured Streamingが起動したら、ncコマンド側で、適当な文字列(単語)を入力する(以下は例)

1
2
3
hoge hoge fuga
fuga fuga
fuga fuga hoge

もうひとつターミナルを開き、spark-shellを起動する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

アプリケーションで出力先としたディレクトリ内のDelta Lakeテーブルを参照すると、以下のようにテーブルが更新されていることがわかる。 ひとまずncコマンドでいろいろな単語を入力して挙動を試すと良い。

1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("delta").load("/tmp/delta/wordcount")
df: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> df.show
+-----+-----+
|value|count|
+-----+-----+
| fuga| 5|
| hoge| 3|
+-----+-----+

(wip)

書き込み

Delta table as a sink の通り、Delta LakeテーブルをSink(つまり書き込み先)として利用する際には、 AppendモードとCompleteモードのそれぞれが使える。

Appendモード

例のごとく、ncコマンドを起動し、アプリケーションを実行。(予めsbt assemblyしておく) もうひとつターミナルを開き、spark-shellでDelta Lakeテーブルの中身を確認する。

ターミナル1

1
$ nc -lk 9999

ターミナル2

1
2
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeAppendExample
target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar localhost 9999

ターミナル3

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

ターミナル1のncで適当な単語列を入力する。

1
hoge hoge

ターミナル3のspark-shellでDelta Lakeテーブルの中身を確認する。

1
2
3
4
5
6
7
8
9
scala> val df = spark.read.format("delta").load("/tmp/delta/wordcount_per_line")
df: org.apache.spark.sql.DataFrame = [unixtime: bigint, count: bigint]

scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968491821| 2|
+-------------+-----+

何度かncコマンド経由でテキストを流し込むと、以下のように行が加わるkとがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968491821| 2|
+-------------+-----+


scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968518584| 2|
|1605968522461| 3|
|1605968491821| 2|
+-------------+-----+

Completeモード

Completeモードは、上記のWordCountの例がそのまま例になっている。 com.example.StructuredStreamingDeltaLakeExample 参照。

読み出し

Delta table as a stream source の通り、既存のDelta Lakeテーブルからストリームデータを取り出す。

準備

ひとまずDelta Lakeのテーブルを作る。 ここではspark-shellで適当に作成する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").save("/tmp/delta/users")

上記のDataFrameのスキーマは以下の通り。

1
2
3
4
5
6
scala> df.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)

ストリームで読み込んでみる

もうひとつターミナルを立ち上げる。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users

先程作成しておいたテーブルの中身が読み込まれる。

1
2
3
4
5
6
7
8
9
10
11
-------------------------------------------
Batch: 0
-------------------------------------------
20/11/22 00:29:41 INFO CodeGenerator: Code generated in 3.212 ms
20/11/22 00:29:41 INFO CodeGenerator: Code generated in 4.1238 ms
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

先程の、Delta Lakeテーブルを作成したターミナルのspark-shellで、 追加データを作り、Delta Lakeテーブルに挿入する。

まず、追加データ用のスキーマを持つcase classを作成する。

1
scala> case class User(name: String, favorite_color: String, favorite_numbers: Array[Int])

つづいて、作られたcase classを利用して、追加用のDataFrameを作る。 (SeqからDataFrameを生成する)

1
scala> val addRecord = Seq(User("Bob", "yellow", Array(1,2))).toDF

appendモードで既存のDelta Lakeテーブルに追加する。

1
scala> addRecord.write.format("delta").mode("append").save("/tmp/delta/users")

ここで、ストリーム処理を動かしている方のターミナルを見ると、 以下のように追記された内容が表示されていることがわかる。

1
2
3
4
5
6
7
8
-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------------+----------------+
|name|favorite_color|favorite_numbers|
+----+--------------+----------------+
| Bob| yellow| [1, 2]|
+----+--------------+----------------+

読み込み時の maxFilesPerTrigger オプション

ストリーム読み込み時には、 maxFilesPerTrigger オプションを指定できる。 このオプションは、以下のパラメータとして利用される。

org/apache/spark/sql/delta/DeltaOptions.scala:98

1
2
3
4
5
6
val maxFilesPerTrigger = options.get(MAX_FILES_PER_TRIGGER_OPTION).map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw DeltaErrors.illegalDeltaOptionException(
MAX_FILES_PER_TRIGGER_OPTION, str, "must be a positive integer")
}
}

このパラメータは org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#toReadLimit メソッド内で用いられる。

org/apache/spark/sql/delta/sources/DeltaSource.scala:350

1
2
3
4
5
6
7
8
9
10
11
12
13
  def toReadLimit: ReadLimit = {
if (options.maxFilesPerTrigger.isDefined && options.maxBytesPerTrigger.isDefined) {
CompositeLimit(
ReadMaxBytes(options.maxBytesPerTrigger.get),
ReadLimit.maxFiles(options.maxFilesPerTrigger.get).asInstanceOf[ReadMaxFiles])
} else if (options.maxBytesPerTrigger.isDefined) {
ReadMaxBytes(options.maxBytesPerTrigger.get)
} else {
ReadLimit.maxFiles(
options.maxFilesPerTrigger.getOrElse(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION_DEFAULT))
}
}
}

このメソッドの呼び出し階層は以下の通り。

1
2
3
AdmissionLimits in DeltaSource.toReadLimit()  (org.apache.spark.sql.delta.sources)
DeltaSource.getDefaultReadLimit() (org.apache.spark.sql.delta.sources)
MicroBatchExecution

MicroBatchExecutionは、 org.apache.spark.sql.execution.streaming.MicroBatchExecution クラスであり、 この仕組みを通じてSparkのStructured Streamingの持つレートコントロールの仕組みに設定値をベースにした値が渡される。

読み込み時の maxBytesPerTrigger オプション

原則的には、 maxFilesPerTrigger と同じようなもの。 指定された値を用いてcase classを定義し、最大バイトサイズの目安を保持する。

追記ではなく更新したらどうなるか?

Delta Lakeでoverwriteモードで書き込む際に、パーティションカラムに対して条件を指定できることを利用し、 部分更新することにする。

まず name をパーティションカラムとしたDataFrameを作ることにする。

1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").partitionBy("name").save("/tmp/delta/partitioned_users")

上記例と同様に、ストリームでデータを読み込むため、もうひとつターミナルを起動し、以下を実行。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/partitioned_users

spark-shellを起動したターミナルで、更新用のデータを準備。

1
scala> val updateRecord = Seq(User("Ben", "green", Array(1,2))).toDF

条件付きのoverwriteモードで既存のDelta Lakeテーブルの一部レコードを更新する。

1
2
3
4
5
6
scala> updateRecord
.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "name == 'Ben'")
.save("/tmp/delta/partitioned_users")

上記を実行したときに、ストリーム処理を実行しているターミナル側で、 以下のエラーが生じ、プロセスが停止した。

1
2
3
4
5
6
20/11/23 22:10:55 ERROR MicroBatchExecution: Query [id = 13cf0aa0-116c-4c95-aea0-3e6f779e02c8, runId = 6f71a4bb-c067-4f6d-aa17-6bf04eea3520] terminated
with error
java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273)
at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)

(snip)

後ほど検証するが、Delta Lakeのストリーム処理では、既存レコードのupdate、deleteなどの既存レコードの更新となる処理は基本的にサポートされていない。 オプションを指定することで、更新を無視することができる。

読み込み時の ignoreDeletes オプション

基本的には、上記の通り、元テーブルに変化があった場合はエラーを生じるようになっている。 Ignore updates and deletes に記載の通り、update、merge into、delete、overwriteがエラーの対象である。

ひとまずdeleteでエラーになるのを避けるため、 ignoreDeletes で無視するオプションを指定できる。

動作確認する。

spark-shellを立ち上げ、検証用のテーブルを作成する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").save("/tmp/delta/users_for_delete")

別のターミナルを立ち上げ、当該テーブルをストリームで読み込む。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users_for_delete

spark-shellでテーブルとして読み込む。

1
2
scala> import io.delta.tables._
scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete")
1
scala> deltaTable.delete("name == 'Ben'")

上記の通り、テーブルを更新(削除)した結果、ストリーム処理が以下のエラーを出力して終了した。

1
2
3
4
5
6
20/11/23 22:26:21 ERROR MicroBatchExecution: Query [id = 660b82a9-ca40-4b91-8032-d75807b11c18, runId = 7e46e9a7-1ba2-48b2-b264-53c254cfa6fc] terminated
with error
java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273)
at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
(snip)

そこで、同じことをもう一度 ignoreDelte オプションを利用して実行してみる。

1
scala> df.write.format("delta").partitionBy("name").save("/tmp/delta/users_for_delete2")

なお、 ignoreDelete オプションはパーティションカラムに指定されたカラムに対し、 where句を利用して削除するものに対して有効である。 (パーティションカラムに指定されていないカラムをwhere句に使用してdeleteしたところ、エラーが生じた)

別のターミナルを立ち上げ、当該テーブルをストリームで読み込む。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeDeleteExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users_for_delete2

spark-shellでテーブルとして読み込む。

1
2
scala> import io.delta.tables._
scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete2")
1
scala> deltaTable.delete("name == 'Ben'")

このとき、特にエラーなくストリーム処理が続いた。

またテーブルを見たところ、

1
2
3
4
5
6
scala> deltaTable.toDF.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
+------+--------------+----------------+

削除対象のレコードが消えていることが確かめられる。

読み込み時の ignoreChanges オプション

ignoreChanges オプションは、いったん概ね ignoreDelete オプションと同様だと思えば良い。 ただしdeleteに限らない。(deleteも含む) 細かな点は後で調査する。

org.apache.spark.sql.delta.sources.DeltaSource クラスについて

Delta Lake用のストリームData Sourceである。 親クラスは以下の通り。

1
2
3
DeltaSource (org.apache.spark.sql.delta.sources)
Source (org.apache.spark.sql.execution.streaming)
SparkDataStream (org.apache.spark.sql.connector.read.streaming)

気になったメンバ変数は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/** A check on the source table that disallows deletes on the source data. */
private val ignoreChanges = options.ignoreChanges || ignoreFileDeletion
--> レコードの変更(ファイル変更)を無視するかどうか

/** A check on the source table that disallows commits that only include deletes to the data. */
private val ignoreDeletes = options.ignoreDeletes || ignoreFileDeletion || ignoreChanges
--> レコードの削除(ファイル削除)を無視するかどうか

private val excludeRegex: Option[Regex] = options.excludeRegex
--> ADDファイルのリストする際、無視するファイルを指定する

override val schema: StructType = deltaLog.snapshot.metadata.schema
--> 対象テーブルのスキーマ

(snip)

private val tableId = deltaLog.snapshot.metadata.id
--> テーブルのID

private var previousOffset: DeltaSourceOffset = null
--> バッチの取得時のオフセットを保持する

// A metadata snapshot when starting the query.
private var initialState: DeltaSourceSnapshot = null
private var initialStateVersion: Long = -1L
--> org.apache.spark.sql.delta.sources.DeltaSource#getBatch などから呼ばれる、スナップショットを保持するための変数

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッド

ストリーム処理のバッチ取得メソッド org.apache.spark.sql.delta.sources.DeltaSource#getBatch などから呼ばれるメソッド。 スナップショットなどの情報から、開始時点のバージョンから現在までの変化を返す。

org.apache.spark.sql.delta.sources.DeltaSource#getSnapshotAt メソッド

上記の org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドなどで利用される、 指定されたバージョンでのスナップショットを返す。

org.apache.spark.sql.delta.SnapshotManagement#getSnapshotAt を内部的に利用する。

org.apache.spark.sql.delta.sources.DeltaSource#getChangesWithRateLimit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドに比べ、 レート制限を考慮した上での変化を返す。

org.apache.spark.sql.delta.sources.DeltaSource#getStartingOffset メソッド

org.apache.spark.sql.delta.sources.DeltaSource#latestOffset メソッド内で呼ばれる。 ストリーム処理でマイクロバッチを構成する際、対象としているデータのオフセットを返すのだが、 それらのうち、初回の場合に呼ばれる。★要確認

当該メソッド内で、「開始時点か、変化をキャプチャして処理しているのか」、「コミット内のオフセット位置」などの確認が行われ、 org.apache.spark.sql.delta.sources.DeltaSourceOffset にラップした上でメタデータが返される。

org.apache.spark.sql.delta.sources.DeltaSource#latestOffset メソッド

org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl#latestOffset メソッドをオーバーライドしている。 今回のマイクロバッチで対象となるデータのうち、最後のデータを示すオフセットを返す。

org.apache.spark.sql.delta.sources.DeltaSource#verifyStreamHygieneAndFilterAddFiles メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドで呼ばれる。 getChanges メソッドは指定されたバージョン以降の「変更」を取得するものである。 その中において、verifyStreamHygieneAndFilterAddFiles メソッドは関係あるアクションだけ取り出すために用いられる。

org.apache.spark.sql.delta.sources.DeltaSource#getBatch メソッド

org.apache.spark.sql.execution.streaming.Source#getBatch メソッドをオーバライドしたもの。 与えられたオフセット(開始、終了)をもとに、そのタイミングで処理すべきバッチとなるDataFrameを返す。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits クラスについて

レート管理のためのクラス。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#toReadLimit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getDefaultReadLimit メソッド内で呼ばれる。 オプションで与えられたレート制限のヒント値を、 org.apache.spark.sql.connector.read.streaming.ReadLimit に変換する。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#admit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChangesWithRateLimit メソッド内で呼ばれる。 上記メソッドは、レート制限を考慮して変化に関する情報のイテレータを返す。 admit メソッドはその中において、レートリミットに達しているかどうかを判断するために用いられる。

共有

BigDL memory usage

参考

メモ

前提

Download によると、Sparkは2.4系まで対応しているようだ。 Issue-3070 によると、Spark3対応も進んでいるようだ。

Spark起動

Run で記載の通り、以下のように起動した。

1
2
3
$ export SPARK_HOME=/usr/local/spark/default
$ export BIGDL_HOME=/usr/local/bigdl/default
$ ${BIGDL_HOME}/bin/spark-shell-with-bigdl.sh --master local[*]
1
2
scala> import com.intel.analytics.bigdl.utils.Engine
scala> Engine.init

とりあえず初期化までは動いた。

サンプルを動かす

Examples に記載のサンプルを動かす。 LeNet Train にあるLeNetの例が良さそう。

MNISTの画像をダウンロードして、展開した。

1
2
3
4
5
6
7
$ mkdir ~/tmp
$ cd ~/tmp
$ wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
$ gunzip *.gz

展開したディレクトリを指定しながら、LeNetの学習を実行。

1
2
3
4
5
6
7
8
$ spark-submit \
--master local[6] \
--driver-class-path ${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
--class com.intel.analytics.bigdl.models.lenet.Train \
${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
-f $HOME/tmp \
-b 12 \
--checkpoint ./model

実行中の様子は以下の通り。

ヒストリのキャプチャ

今回はローカルモードで実行したが、入力された学習データと同様のサイズのキャッシュがメモリ上に展開されていることがわかる。

サンプルの中身

2020/11/15時点のmasterブランチを確認する。

Trainクラス

上記サンプルで実行されているTrainを見る。 モデルを定義している箇所は以下の通り。

com/intel/analytics/bigdl/models/lenet/Train.scala:48

1
2
3
4
5
6
7
8
9
10
11
12
val model = if (param.modelSnapshot.isDefined) {
Module.load[Float](param.modelSnapshot.get)
} else {
if (param.graphModel) {
LeNet5.graph(classNum = 10)
} else {
Engine.getEngineType() match {
case MklBlas => LeNet5(10)
case MklDnn => LeNet5.dnnGraph(param.batchSize / Engine.nodeNumber(), 10)
}
}
}

Modelインスタンスは以下の通り、Optimizerオブジェクトに渡され、 データセットに合わせたOptimizerが返される。 (例:データセットが分散データセットかどうか、など)

com/intel/analytics/bigdl/models/lenet/Train.scala:83

1
2
3
4
val optimizer = Optimizer(
model = model,
dataset = trainSet,
criterion = criterion)

参考までに、Optimizerの種類と判定の処理は以下の通り。

com/intel/analytics/bigdl/optim/Optimizer.scala:688

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dataset match {
case d: DistributedDataSet[_] =>
Engine.getOptimizerVersion() match {
case OptimizerV1 =>
new DistriOptimizer[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case OptimizerV2 =>
new DistriOptimizerV2[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
}
case d: LocalDataSet[_] =>
new LocalOptimizer[T](
model = model,
dataset = d.toLocal().asInstanceOf[LocalDataSet[MiniBatch[T]]],
criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case _ =>
throw new UnsupportedOperationException
}

返されたOptimizerの、Optimizer#optimizeメソッドを利用し学習が実行される。

com/intel/analytics/bigdl/models/lenet/Train.scala:98

1
2
3
4
5
6
7
8
optimizer
.setValidation(
trigger = Trigger.everyEpoch,
dataset = validationSet,
vMethods = Array(new Top1Accuracy, new Top5Accuracy[Float], new Loss[Float]))
.setOptimMethod(optimMethod)
.setEndWhen(Trigger.maxEpoch(param.maxEpoch))
.optimize()

Optimizerの種類

上記の内容を見るに、Optimizerにはいくつか種類がありそうだ。

  • DistriOptimizer
  • DistriOptimizerV2
  • LocalOptimizer

DistriOptimizer

ひとまずメモリ使用に関連する箇所ということで、入力データの準備の処理を確認する。 com.intel.analytics.bigdl.optim.DistriOptimizer#optimize メソッドには 以下のような箇所がある。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:870

1
prepareInput()

これは com.intel.analytics.bigdl.optim.DistriOptimizer#prepareInput メソッドであり、 内部的に com.intel.analytics.bigdl.optim.AbstractOptimizer#prepareInput メソッドを呼び出し、 入力データをSparkのキャッシュに載せるように処理する。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:808

1
2
3
4
if (!dataset.toDistributed().isCached) {
DistriOptimizer.logger.info("caching training rdd ...")
DistriOptimizer.prepareInput(this.dataset, this.validationDataSet)
}

キャシュに載せると箇所は以下の通り。

com/intel/analytics/bigdl/optim/AbstractOptimizer.scala:279

1
2
3
4
5
6
7
private[bigdl] def prepareInput[T: ClassTag](dataset: DataSet[MiniBatch[T]],
validationDataSet: Option[DataSet[MiniBatch[T]]]): Unit = {
dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]].cache()
if (validationDataSet.isDefined) {
validationDataSet.get.toDistributed().cache()
}
}

上記の DistributedDataSetchache メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:216

1
2
3
4
5
6
def cache(): Unit = {
if (originRDD() != null) {
originRDD().count()
}
isCached = true
}

originRDD の戻り値に対して、count を読んでいる。 ここで count を呼ぶのは、入力データである originRDD の戻り値に入っているRDDをメモリ上にマテリアライズするためである。

count を呼ぶだけでマテリアライズできるのは、予め入力データを定義したときに Spark RDDの cache を利用してキャッシュ化することを指定されているからである。 今回の例では、Optimizerオブジェクトのapplyを利用する際に渡されるデータセット trainSet を 定義する際に予め cache が呼ばれる。

com/intel/analytics/bigdl/models/lenet/Train.scala:79

1
2
3
val trainSet = DataSet.array(load(trainData, trainLabel), sc) ->
BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(
param.batchSize)

trainSet を定義する際、 com.intel.analytics.bigdl.dataset.DataSet$#array(T[], org.apache.spark.SparkContext) メソッドが 呼ばれるのだが、その中で以下のように Sparkの RDD#cache が呼ばれていることがわかる。

com/intel/analytics/bigdl/dataset/DataSet.scala:343

1
2
3
4
5
6
7
8
9
10
11
12
def array[T: ClassTag](localData: Array[T], sc: SparkContext): DistributedDataSet[T] = {
val nodeNumber = Engine.nodeNumber()
new CachedDistriDataSet[T](
sc.parallelize(localData, nodeNumber)
// Keep this line, or the array will be send to worker every time
.coalesce(nodeNumber, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

以下、一例。

具体的には、Optimizerのインスタンスを生成するための apply メソッドはいくつかあるが、 以下のように引数にデータセットを指定する箇所がある。(再掲)

com/intel/analytics/bigdl/optim/Optimizer.scala:619

1
_dataset = (DataSet.rdd(sampleRDD) ->

ここで用いられている com.intel.analytics.bigdl.dataset.DataSet#rdd メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:363

1
2
3
4
5
6
7
8
9
10
def rdd[T: ClassTag](data: RDD[T], partitionNum: Int = Engine.nodeNumber()
): DistributedDataSet[T] = {
new CachedDistriDataSet[T](
data.coalesce(partitionNum, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

com.intel.analytics.bigdl.dataset.CachedDistriDataSet#CachedDistriDataSet のコンストラクタ引数に、 org.apache.spark.rdd.RDD#cache を用いてキャッシュ化することを指定したRDDを渡していることがわかる。

共有

Generate PDF using pandoc

参考

メモ

TeX Liveのインストール手順 の通り、最新版をインストールする。 なお、フルセットでインストールした。

PDFを作成するときは以下のようにする。

1
$ pandoc test.md -o test.pdf --pdf-engine=lualatex -V documentclass=bxjsarticle -V classoption=pandoc

ただし、 default.latex に示すとおり、リンクに色を付けるなどしたいので、 上記マークダウンファイルには先頭部分にYAMLでコンフィグを記載することとした。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---
title: テスト文章
date: 2020-10-30
author: dobachi
colorlinks: yes
numbersections: yes
toc: yes
---

# test

## これはテスト

[Google](https://www.google.com)

<!-- vim: set ft=markdown : -->
共有

Access AWS S3 from Hadoop3 and Spark3

参考

メモ

パッケージインポート

Sparkの公式ドキュメント には、org.apache.spark:hadoop-cloud_2.12を利用するよう記載されているが、 このMavenパッケージは見当たらなかった。

そこで、 Hadoopの公式ドキュメント の通り、hadoop-awsをインポートすると、それに関連した依存関係をインポートすることにした。 spark-shellで試すとしたら以下の通り。

1
$ /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0

profileを使ったクレデンシャル設定

awscliを利用しているとしたら、profileを設定しながらクレデンシャルを使い分けていることもあると思う。 その場合、起動時の環境変数でプロフィールを指定しながら、以下のようにproviderを指定することで、 profileを利用したクレデンシャル管理の仕組みを利用してアクセスできる。

1
$ AWS_PROFILE=s3test /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0
1
scala> spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

(補足)パラメータと指定するクラスについて

ちなみに、上記パラメータの説明には、

If unspecified, then the default list of credential provider classes, queried in sequence, is: 1. org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider: Uses the values of fs.s3a.access.key and fs.s3a.secret.key. 2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports configuration of AWS access key ID and secret access key in environment variables named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK. 3. com.amazonaws.auth.InstanceProfileCredentialsProvider: supports use of instance profile credentials if running in an EC2 VM.

のように記載されている。

また、「Using Named Profile Credentials with ProfileCredentialsProvider」の節には、

Declare com.amazonaws.auth.profile.ProfileCredentialsProvider as the provider.

と書かれており、上記Default...でなくても良い。(実際に試したところ動作した)

共有

qmk_firmware_202009

参考

メモ

2020/9に久しぶりにコンパイルしようとしたら、だいぶ勝手が変わっていた。

公式の環境構築手順 に従って環境を構築した。 なお、インストールするよう指定されていたパッケージだけでは足りなかったので、 以下のようにインストールした。

1
$ pacman --needed --noconfirm --disable-download-timeout -S git mingw-w64-x86_64-toolchain mingw-w64-x86_64-python3-pip python3 python3-pip make diffutils

最初、以下を忘れたため、失敗したので注意・・・。

1
$ qmk setup

公式のコンパイル手順 に従うと、qmkコマンドを利用してコンパイル、フラッシュするようだ。

ただ、 QMK Firmwareでファームウェアをビルドしようとしたらavr-gccでコケた話 に記載されているのと同様に、 make git-submodules が必要だった。

共有

Use GitHub actions to deploy documents

参考

メモ

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する がSphinx JPのユーザ会による記事だったので参考になるが、 依存関係を都度pipインストールしているのが気になった。

マーケットプレイスのsphinx-build を利用すると良さそうだが、 この手順ではGITHUB TOKENを利用してデプロイしているのが気になった。 レポジトリごとに設定できる、Deploy Keyを利用したい。

そこでビルドには マーケットプレイスのsphinx-build を利用し、デプロイには GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の手順を利用することにした。

レポジトリのDeploy Key設定

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の「Deploy keys の設定」章を参考に、 当該レポジトリのDeploy Keyを登録する。

任意の環境(ここではWSLのUbuntu18を利用した)で、 以下のコマンドを実行。

1
$ ssh-keygen -t rsa -b 4096 -C "<レポジトリで使用しているメールアドレス>" -f <任意の名前> -N ""

上記記事の通り、秘密鍵と公開鍵をGitHubのウェブUIで登録する。 なお、「Secrets」タブで登録した名称は、後ほどGitHub Actionsのワークフロー内で使用する。

GitHub Actionsのワークフローの記述

マーケットプレイスのsphinx-build の例を参考に、 ワークフローを記述する。 なお、最後のデプロイする部分は、 GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する を参考に、 Deploy Keyを利用するよう修正した。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
name: CI

# 今回はマスタブランチへのPushをトリガとする。
on:
push:
branches:
- master

jobs:
build:

runs-on: ubuntu-latest
# 今回はmasterブランチへのpushをトリガとしているので不要だが、gh-pagesへのpushをトリガとする場合など
# 無限ループを回避する際には以下のように記述する。
if: "!contains(github.event.head_commit.message, 'Update documentation via GitHub Actions')"

steps:
- uses: actions/checkout@v1

# 今回はMakefileを利用するので、makeコマンドを使用するよう元ネタから修正した。
# またドキュメントのソースが含まれているディレクトリは各自の定義に依存する。
- uses: ammaraskar/sphinx-action@master
with:
build-command: "make html"
docs-folder: "documents/"
# 先ほどGitHubのウェブUIで定義した秘密鍵名を使用する。
- name: Commit documentation changes and push it
run: |
mkdir ~/.ssh
ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
echo "${{ secrets.<先ほどGitHubウェブUIで定義した秘密鍵名> }}" > ~/.ssh/id_rsa
chmod 400 ~/.ssh/id_rsa
git clone git@github.com:${GITHUB_REPOSITORY}.git --branch gh-pages --single-branch gh-pages
cp -r documents/_build/html/* gh-pages/
cd gh-pages
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
git add .
git commit -m "Update documentation via GitHub Actions" -a || true
git push origin HEAD:gh-pages
# The above command will fail if no changes were present, so we ignore
# that.
# ===============================
共有

Install Bigtop RPMs using Yum

参考

メモ

今回は、2020/7/21時点の最新バージョンである Apache Bigtop 1.4.0のパッケージバージョン の Hadoopをインストールできるかどうかを試してみることとする。

Yumのrepoファイルは レポジトリ関連の資材置き場 以下にある。 例えば、今回はCentOS7を利用することにするので、 CentOS7のbigtop.repo あたりを利用する。

1
2
$ cd /etc/yum.repos.d
$ sudo wget https://downloads.apache.org/bigtop/bigtop-1.4.0/repos/centos7/bigtop.repo

ひとまずパッケージが見つかるかどうか、確認。

1
2
3
4
5
6
7
8
9
$ sudo yum search hadoop-conf-pseudo
読み込んだプラグイン:fastestmirror
Loading mirror speeds from cached hostfile
* base: d36uatko69830t.cloudfront.net
* epel: d2lzkl7pfhq30w.cloudfront.net
* extras: d36uatko69830t.cloudfront.net
* updates: d36uatko69830t.cloudfront.net
=========================================== N/S matched: hadoop-conf-pseudo ============================================
hadoop-conf-pseudo.x86_64 : Pseudo-distributed Hadoop configuration

確認できたので、試しにインストール。

1
$ sudo yum install hadoop-conf-pseudo

自分の手元の環境では、依存関係で以下のパッケージがインストールされた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
========================================================================================================================
Package アーキテクチャー バージョン リポジトリー 容量
========================================================================================================================
インストール中:
hadoop-conf-pseudo x86_64 2.8.5-1.el7 bigtop 20 k
依存性関連でのインストールをします:
at x86_64 3.1.13-24.el7 base 51 k
bc x86_64 1.06.95-13.el7 base 115 k
bigtop-groovy noarch 2.4.10-1.el7 bigtop 9.8 M
bigtop-jsvc x86_64 1.0.15-1.el7 bigtop 29 k
bigtop-utils noarch 1.4.0-1.el7 bigtop 11 k
cups-client x86_64 1:1.6.3-43.el7 base 152 k
ed x86_64 1.9-4.el7 base 72 k
hadoop x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs-datanode x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-hdfs-namenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-hdfs-secondarynamenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-mapreduce x86_64 2.8.5-1.el7 bigtop 34 M
hadoop-mapreduce-historyserver x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-yarn x86_64 2.8.5-1.el7 bigtop 20 M
hadoop-yarn-nodemanager x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-yarn-resourcemanager x86_64 2.8.5-1.el7 bigtop 5.6 k
libpcap x86_64 14:1.5.3-12.el7 base 139 k
m4 x86_64 1.4.16-10.el7 base 256 k
mailx x86_64 12.5-19.el7 base 245 k
nmap-ncat x86_64 2:6.40-19.el7 base 206 k
patch x86_64 2.7.1-12.el7_7 base 111 k
psmisc x86_64 22.20-16.el7 base 141 k
redhat-lsb-core x86_64 4.1-27.el7.centos.1 base 38 k
redhat-lsb-submod-security x86_64 4.1-27.el7.centos.1 base 15 k
spax x86_64 1.5.2-13.el7 base 260 k
time x86_64 1.7-45.el7 base 30 k
zookeeper x86_64 3.4.6-1.el7 bigtop 7.0 M

initスクリプトがインストールされていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
$ ls -1 /etc/init.d/
README
functions
hadoop-hdfs-datanode
hadoop-hdfs-namenode
hadoop-hdfs-secondarynamenode
hadoop-mapreduce-historyserver
hadoop-yarn-nodemanager
hadoop-yarn-resourcemanager
netconsole
network

ひとまずHDFSをフォーマット。

1
$ sudo -u hdfs hdfs namenode -format

あとは、上記の各種Hadoopサービスを立ち上げれば良い。

共有

Delta Lake 0.7.0

参考

メモ

0.7.0が出たので、本リリースの特徴を確認する。

SQL DDLへの対応やHive メタストアの対応

0.6系まではScala、Python APIのみであったが、SQL DDLにも対応した。 0.7.0のテーブル読み書き0.6.1のテーブル読み書き を見比べると、SQLの例が載っていることがわかる。 対応するSQL構文については src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 あたりを見ると良い。

なお、 Spark3系でないとHiveメタストアに対応できない理由 を見る限り、 Spark3系のAPI(や、DataSourceV2も、かな)を使わないと、Data SourceのカスタムAPIを利用できないため、 これまでHiveメタストアのような外部メタストアと連携したDelta Lakeのメタデータ管理ができなかった、とのこと。

なお、今回の対応でSparkのカタログ機能を利用することになったので、起動時もしくはSparkSession生成時の オプション指定が必要になった。 その代わり、ライブラリの明示的なインポートが不要であり、クエリはDelta Lakeのパーサで解釈された後、 解釈できないようであれば通常のパーサで処理されるようになる。

起動時のオプション例

例:

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

なお、ここでは SparkSessionExtensions を利用し、SparkSession生成時にカスタムルール等を挿入している。 この機能は2020/06/19時点でSpark本体側でExperimentalであることに注意。 今後もSpark本体側の仕様変更に引きずられる可能性はある。

パーサの呼び出し流れ

セッション拡張機能を利用し、パーサが差し替えられている。

io/delta/sql/DeltaSparkSessionExtension.scala:73

1
2
3
4
5
6
7
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}

(snip)

io.delta.sql.parser.DeltaSqlParser クラスでは デリゲート用のパーサを受け取り、自身のパーサで処理できなかった場合に処理をデリゲートパーサに渡す。

io/delta/sql/parser/DeltaSqlParser.scala:66

1
2
3
4
class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
private val builder = new DeltaSqlAstBuilder

(snip)

例えば、 SparkSessionsql メソッドを使って呼び出す場合を例にする。 このとき、内部では、 org.apache.spark.sql.catalyst.parser.ParserInterface#parsePlan メソッドが呼ばれて、 渡されたクエリ文 sqlText が処理される。

org/apache/spark/sql/SparkSession.scala:601

1
2
3
4
5
6
7
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}

この parsePlan がoverrideされており、以下のように定義されている。

io/delta/sql/parser/DeltaSqlParser.scala:69

1
2
3
4
5
6
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ => delegate.parsePlan(sqlText)
}
}

まずは io.delta.sql.parser.DeltaSqlParser#parse メソッドを利用してパースがここ見られるが、 LogicalPlanが戻らなかったときは、デリゲート用パーサが呼び出されるようになっている。

カスタムカタログ

Spark3ではDataSourvV2の中で、プラガブルなカタログに対応した。 Delta Lake 0.7.0はこれを利用し、カスタムカタログを用いる。(これにより、Hiveメタストアを経由してDelta Lake形式のデータを読み書きできるようになっている) 使われているカタログは org.apache.spark.sql.delta.catalog.DeltaCatalog である。 (SparkSessionのインスタンス生成する際、もしくは起動時のオプション指定)

当該カタログ内部では、例えば org.apache.spark.sql.delta.catalog.DeltaCatalog#createDeltaTable メソッドが定義されており、 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable ※ しようとするときなどに呼び出されるようになっている。

org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension#createTable をoverrideしている

なお、このクラスもデリゲート用のカタログを用いるようになっている。 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable メソッドは以下のようになっており、 データソースが delta 出ない場合は、親クラスの createTable (つまり標準的なもの)が呼び出されるようになっている。

org/apache/spark/sql/delta/catalog/DeltaCatalog.scala:149

1
2
3
4
5
6
7
8
9
10
11
12
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
createDeltaTable(
ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
} else {
super.createTable(ident, schema, partitions, properties)
}
}

ScalaやPythonでの例

代表的にScalaの例を出す。公式サイトには以下のように載っている。

1
2
3
df.write.format("delta").saveAsTable("events")      // create table in the metastore

df.write.format("delta").save("/delta/events") // create table by path

Hiveメタストア経由で書き込むケースと、ストレージ上に直接書き出すケースが載っている。

SQLでのマージ処理

SQLを用いたマージの例 の通り、Delta Lakeの特徴であるマージ機能もSQLから呼び出させる。

1
2
3
4
5
6
spark.sql(s"""MERGE INTO $tableName USING newData
ON ${tableName}.id = newData.id
WHEN MATCHED THEN
UPDATE SET ${tableName}.id = newData.id
WHEN NOT MATCHED THEN INSERT *
""")

Spark SQLのカタログに登録されたDelta LakeのテーブルからDeltaTableを生成することもできる。

1
2
scala> import io.delta.tables.DeltaTable
scala> val tbl = DeltaTable.forName(tableName)

Presto / Athena用のメタデータの自動生成

Delta LakeはPresto、Athena用のメタデータを生成できるが、更新があった際にパーティションごとに自動で再生成できるようになった。

テーブル履歴の切り詰めの管理

Delta Lakeは更新の履歴を保持することも特徴の一つだが、 データ本体とログのそれぞれの切り詰め対象期間を指定できる。

CREATEやALTER句内で、TBLPROPERTIESとして指定することになっている。 例えば以下。

1
2
spark.sql(s"CREATE TABLE $tableName(id LONG) USING delta TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")
spark.sql(s"ALTER TABLE $tableName SET TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")

ユーザメタデータ

spark.databricks.delta.commitInfo.userMetadata プロパティを利用して、ユーザメタデータを付与できる。

1
df.write.option("spark.databricks.delta.commitInfo.userMetadata", "test").format("delta").mode("append").save("/tmp/test")
1
2
scala> spark.sql("SET spark.databricks.delta.commitInfo.userMetadata=test")
scala> spark.sql(s"INSERT INTO $tableName VALUES 0, 1, 2, 3, 4")

AzureのData Lake Storage Gen2

対応した。

しかし、 0.7.0で対応したAzure Data Lake Storage Gen2 の通り、 前提となっている各種ソフトウェアバージョンは高め。

ストリーム処理のone-timeトリガの改善

DataStreamReaderのオプションでmaxFilesPerTriggerを設定しているとしても、 one-time triggerでは一度に溜まったすべてのデータを読み込むようになった。(Spark 3系の話) 

共有