Read and write data on Delta Lake streaming manner

参考

メモ

準備(SBT)

今回はSBTを利用してSalaを用いたSparkアプリケーションで試すことにする。 sbt を参考にSBTをセットアップする。(基本的には対象バージョンをダウンロードして置くだけ)

1
2
3
$ mkdir -p ~/Sources
$ cd ~/Sources
$ sbt new dobachi/spark-sbt.g8

ここでは、 [dobachi's spark-sbt.g8] を利用してSpark3.0.1の雛形を作った。 対話的に色々聞かれるので、ほぼデフォルトで生成。

ここでは、 StructuredNetworkWordCount を参考に、Word Countした結果をDelta Lakeに書き込むことにする。

なお、以降の例で示しているアプリケーションは、すべて dobachi's StructuredStreamingDeltaLakeExample に含まれている。

簡単なビルドと実行の例

以下のように予めncコマンドを起動しておく。

1
$ nc -lk 9999

続いて、別のターミナルでアプリケーションをビルドして実行。(ncコマンドを起動したターミナルは一旦保留)

1
2
3
$ cd structured_streaming_deltalake
$ sbt assembly
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar localhost 9999

先程起動したncコマンドの引数に合わせ、9999ポートに接続する。 Structured Streamingが起動したら、ncコマンド側で、適当な文字列(単語)を入力する(以下は例)

1
2
3
hoge hoge fuga
fuga fuga
fuga fuga hoge

もうひとつターミナルを開き、spark-shellを起動する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

アプリケーションで出力先としたディレクトリ内のDelta Lakeテーブルを参照すると、以下のようにテーブルが更新されていることがわかる。 ひとまずncコマンドでいろいろな単語を入力して挙動を試すと良い。

1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("delta").load("/tmp/delta/wordcount")
df: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> df.show
+-----+-----+
|value|count|
+-----+-----+
| fuga| 5|
| hoge| 3|
+-----+-----+

(wip)

書き込み

Delta table as a sink の通り、Delta LakeテーブルをSink(つまり書き込み先)として利用する際には、 AppendモードとCompleteモードのそれぞれが使える。

Appendモード

例のごとく、ncコマンドを起動し、アプリケーションを実行。(予めsbt assemblyしておく) もうひとつターミナルを開き、spark-shellでDelta Lakeテーブルの中身を確認する。

ターミナル1

1
$ nc -lk 9999

ターミナル2

1
2
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeAppendExample
target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar localhost 9999

ターミナル3

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

ターミナル1のncで適当な単語列を入力する。

1
hoge hoge

ターミナル3のspark-shellでDelta Lakeテーブルの中身を確認する。

1
2
3
4
5
6
7
8
9
scala> val df = spark.read.format("delta").load("/tmp/delta/wordcount_per_line")
df: org.apache.spark.sql.DataFrame = [unixtime: bigint, count: bigint]

scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968491821| 2|
+-------------+-----+

何度かncコマンド経由でテキストを流し込むと、以下のように行が加わるkとがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968491821| 2|
+-------------+-----+


scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968518584| 2|
|1605968522461| 3|
|1605968491821| 2|
+-------------+-----+

Completeモード

Completeモードは、上記のWordCountの例がそのまま例になっている。 com.example.StructuredStreamingDeltaLakeExample 参照。

読み出し

Delta table as a stream source の通り、既存のDelta Lakeテーブルからストリームデータを取り出す。

準備

ひとまずDelta Lakeのテーブルを作る。 ここではspark-shellで適当に作成する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").save("/tmp/delta/users")

上記のDataFrameのスキーマは以下の通り。

1
2
3
4
5
6
scala> df.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)

ストリームで読み込んでみる

もうひとつターミナルを立ち上げる。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users

先程作成しておいたテーブルの中身が読み込まれる。

1
2
3
4
5
6
7
8
9
10
11
-------------------------------------------
Batch: 0
-------------------------------------------
20/11/22 00:29:41 INFO CodeGenerator: Code generated in 3.212 ms
20/11/22 00:29:41 INFO CodeGenerator: Code generated in 4.1238 ms
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

先程の、Delta Lakeテーブルを作成したターミナルのspark-shellで、 追加データを作り、Delta Lakeテーブルに挿入する。

まず、追加データ用のスキーマを持つcase classを作成する。

1
scala> case class User(name: String, favorite_color: String, favorite_numbers: Array[Int])

つづいて、作られたcase classを利用して、追加用のDataFrameを作る。 (SeqからDataFrameを生成する)

1
scala> val addRecord = Seq(User("Bob", "yellow", Array(1,2))).toDF

appendモードで既存のDelta Lakeテーブルに追加する。

1
scala> addRecord.write.format("delta").mode("append").save("/tmp/delta/users")

ここで、ストリーム処理を動かしている方のターミナルを見ると、 以下のように追記された内容が表示されていることがわかる。

1
2
3
4
5
6
7
8
-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------------+----------------+
|name|favorite_color|favorite_numbers|
+----+--------------+----------------+
| Bob| yellow| [1, 2]|
+----+--------------+----------------+

読み込み時の maxFilesPerTrigger オプション

ストリーム読み込み時には、 maxFilesPerTrigger オプションを指定できる。 このオプションは、以下のパラメータとして利用される。

org/apache/spark/sql/delta/DeltaOptions.scala:98

1
2
3
4
5
6
val maxFilesPerTrigger = options.get(MAX_FILES_PER_TRIGGER_OPTION).map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw DeltaErrors.illegalDeltaOptionException(
MAX_FILES_PER_TRIGGER_OPTION, str, "must be a positive integer")
}
}

このパラメータは org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#toReadLimit メソッド内で用いられる。

org/apache/spark/sql/delta/sources/DeltaSource.scala:350

1
2
3
4
5
6
7
8
9
10
11
12
13
  def toReadLimit: ReadLimit = {
if (options.maxFilesPerTrigger.isDefined && options.maxBytesPerTrigger.isDefined) {
CompositeLimit(
ReadMaxBytes(options.maxBytesPerTrigger.get),
ReadLimit.maxFiles(options.maxFilesPerTrigger.get).asInstanceOf[ReadMaxFiles])
} else if (options.maxBytesPerTrigger.isDefined) {
ReadMaxBytes(options.maxBytesPerTrigger.get)
} else {
ReadLimit.maxFiles(
options.maxFilesPerTrigger.getOrElse(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION_DEFAULT))
}
}
}

このメソッドの呼び出し階層は以下の通り。

1
2
3
AdmissionLimits in DeltaSource.toReadLimit()  (org.apache.spark.sql.delta.sources)
DeltaSource.getDefaultReadLimit() (org.apache.spark.sql.delta.sources)
MicroBatchExecution

MicroBatchExecutionは、 org.apache.spark.sql.execution.streaming.MicroBatchExecution クラスであり、 この仕組みを通じてSparkのStructured Streamingの持つレートコントロールの仕組みに設定値をベースにした値が渡される。

読み込み時の maxBytesPerTrigger オプション

原則的には、 maxFilesPerTrigger と同じようなもの。 指定された値を用いてcase classを定義し、最大バイトサイズの目安を保持する。

追記ではなく更新したらどうなるか?

Delta Lakeでoverwriteモードで書き込む際に、パーティションカラムに対して条件を指定できることを利用し、 部分更新することにする。

まず name をパーティションカラムとしたDataFrameを作ることにする。

1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").partitionBy("name").save("/tmp/delta/partitioned_users")

上記例と同様に、ストリームでデータを読み込むため、もうひとつターミナルを起動し、以下を実行。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/partitioned_users

spark-shellを起動したターミナルで、更新用のデータを準備。

1
scala> val updateRecord = Seq(User("Ben", "green", Array(1,2))).toDF

条件付きのoverwriteモードで既存のDelta Lakeテーブルの一部レコードを更新する。

1
2
3
4
5
6
scala> updateRecord
.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "name == 'Ben'")
.save("/tmp/delta/partitioned_users")

上記を実行したときに、ストリーム処理を実行しているターミナル側で、 以下のエラーが生じ、プロセスが停止した。

1
2
3
4
5
6
20/11/23 22:10:55 ERROR MicroBatchExecution: Query [id = 13cf0aa0-116c-4c95-aea0-3e6f779e02c8, runId = 6f71a4bb-c067-4f6d-aa17-6bf04eea3520] terminated
with error
java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273)
at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)

(snip)

後ほど検証するが、Delta Lakeのストリーム処理では、既存レコードのupdate、deleteなどの既存レコードの更新となる処理は基本的にサポートされていない。 オプションを指定することで、更新を無視することができる。

読み込み時の ignoreDeletes オプション

基本的には、上記の通り、元テーブルに変化があった場合はエラーを生じるようになっている。 Ignore updates and deletes に記載の通り、update、merge into、delete、overwriteがエラーの対象である。

ひとまずdeleteでエラーになるのを避けるため、 ignoreDeletes で無視するオプションを指定できる。

動作確認する。

spark-shellを立ち上げ、検証用のテーブルを作成する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").save("/tmp/delta/users_for_delete")

別のターミナルを立ち上げ、当該テーブルをストリームで読み込む。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users_for_delete

spark-shellでテーブルとして読み込む。

1
2
scala> import io.delta.tables._
scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete")
1
scala> deltaTable.delete("name == 'Ben'")

上記の通り、テーブルを更新(削除)した結果、ストリーム処理が以下のエラーを出力して終了した。

1
2
3
4
5
6
20/11/23 22:26:21 ERROR MicroBatchExecution: Query [id = 660b82a9-ca40-4b91-8032-d75807b11c18, runId = 7e46e9a7-1ba2-48b2-b264-53c254cfa6fc] terminated
with error
java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273)
at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
(snip)

そこで、同じことをもう一度 ignoreDelte オプションを利用して実行してみる。

1
scala> df.write.format("delta").partitionBy("name").save("/tmp/delta/users_for_delete2")

なお、 ignoreDelete オプションはパーティションカラムに指定されたカラムに対し、 where句を利用して削除するものに対して有効である。 (パーティションカラムに指定されていないカラムをwhere句に使用してdeleteしたところ、エラーが生じた)

別のターミナルを立ち上げ、当該テーブルをストリームで読み込む。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeDeleteExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users_for_delete2

spark-shellでテーブルとして読み込む。

1
2
scala> import io.delta.tables._
scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete2")
1
scala> deltaTable.delete("name == 'Ben'")

このとき、特にエラーなくストリーム処理が続いた。

またテーブルを見たところ、

1
2
3
4
5
6
scala> deltaTable.toDF.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
+------+--------------+----------------+

削除対象のレコードが消えていることが確かめられる。

読み込み時の ignoreChanges オプション

ignoreChanges オプションは、いったん概ね ignoreDelete オプションと同様だと思えば良い。 ただしdeleteに限らない。(deleteも含む) 細かな点は後で調査する。

org.apache.spark.sql.delta.sources.DeltaSource クラスについて

Delta Lake用のストリームData Sourceである。 親クラスは以下の通り。

1
2
3
DeltaSource (org.apache.spark.sql.delta.sources)
Source (org.apache.spark.sql.execution.streaming)
SparkDataStream (org.apache.spark.sql.connector.read.streaming)

気になったメンバ変数は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/** A check on the source table that disallows deletes on the source data. */
private val ignoreChanges = options.ignoreChanges || ignoreFileDeletion
--> レコードの変更(ファイル変更)を無視するかどうか

/** A check on the source table that disallows commits that only include deletes to the data. */
private val ignoreDeletes = options.ignoreDeletes || ignoreFileDeletion || ignoreChanges
--> レコードの削除(ファイル削除)を無視するかどうか

private val excludeRegex: Option[Regex] = options.excludeRegex
--> ADDファイルのリストする際、無視するファイルを指定する

override val schema: StructType = deltaLog.snapshot.metadata.schema
--> 対象テーブルのスキーマ

(snip)

private val tableId = deltaLog.snapshot.metadata.id
--> テーブルのID

private var previousOffset: DeltaSourceOffset = null
--> バッチの取得時のオフセットを保持する

// A metadata snapshot when starting the query.
private var initialState: DeltaSourceSnapshot = null
private var initialStateVersion: Long = -1L
--> org.apache.spark.sql.delta.sources.DeltaSource#getBatch などから呼ばれる、スナップショットを保持するための変数

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッド

ストリーム処理のバッチ取得メソッド org.apache.spark.sql.delta.sources.DeltaSource#getBatch などから呼ばれるメソッド。 スナップショットなどの情報から、開始時点のバージョンから現在までの変化を返す。

org.apache.spark.sql.delta.sources.DeltaSource#getSnapshotAt メソッド

上記の org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドなどで利用される、 指定されたバージョンでのスナップショットを返す。

org.apache.spark.sql.delta.SnapshotManagement#getSnapshotAt を内部的に利用する。

org.apache.spark.sql.delta.sources.DeltaSource#getChangesWithRateLimit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドに比べ、 レート制限を考慮した上での変化を返す。

org.apache.spark.sql.delta.sources.DeltaSource#getStartingOffset メソッド

org.apache.spark.sql.delta.sources.DeltaSource#latestOffset メソッド内で呼ばれる。 ストリーム処理でマイクロバッチを構成する際、対象としているデータのオフセットを返すのだが、 それらのうち、初回の場合に呼ばれる。★要確認

当該メソッド内で、「開始時点か、変化をキャプチャして処理しているのか」、「コミット内のオフセット位置」などの確認が行われ、 org.apache.spark.sql.delta.sources.DeltaSourceOffset にラップした上でメタデータが返される。

org.apache.spark.sql.delta.sources.DeltaSource#latestOffset メソッド

org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl#latestOffset メソッドをオーバーライドしている。 今回のマイクロバッチで対象となるデータのうち、最後のデータを示すオフセットを返す。

org.apache.spark.sql.delta.sources.DeltaSource#verifyStreamHygieneAndFilterAddFiles メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドで呼ばれる。 getChanges メソッドは指定されたバージョン以降の「変更」を取得するものである。 その中において、verifyStreamHygieneAndFilterAddFiles メソッドは関係あるアクションだけ取り出すために用いられる。

org.apache.spark.sql.delta.sources.DeltaSource#getBatch メソッド

org.apache.spark.sql.execution.streaming.Source#getBatch メソッドをオーバライドしたもの。 与えられたオフセット(開始、終了)をもとに、そのタイミングで処理すべきバッチとなるDataFrameを返す。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits クラスについて

レート管理のためのクラス。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#toReadLimit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getDefaultReadLimit メソッド内で呼ばれる。 オプションで与えられたレート制限のヒント値を、 org.apache.spark.sql.connector.read.streaming.ReadLimit に変換する。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#admit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChangesWithRateLimit メソッド内で呼ばれる。 上記メソッドは、レート制限を考慮して変化に関する情報のイテレータを返す。 admit メソッドはその中において、レートリミットに達しているかどうかを判断するために用いられる。

共有

BigDL memory usage

参考

メモ

前提

Download によると、Sparkは2.4系まで対応しているようだ。 Issue-3070 によると、Spark3対応も進んでいるようだ。

Spark起動

Run で記載の通り、以下のように起動した。

1
2
3
$ export SPARK_HOME=/usr/local/spark/default
$ export BIGDL_HOME=/usr/local/bigdl/default
$ ${BIGDL_HOME}/bin/spark-shell-with-bigdl.sh --master local[*]
1
2
scala> import com.intel.analytics.bigdl.utils.Engine
scala> Engine.init

とりあえず初期化までは動いた。

サンプルを動かす

Examples に記載のサンプルを動かす。 LeNet Train にあるLeNetの例が良さそう。

MNISTの画像をダウンロードして、展開した。

1
2
3
4
5
6
7
$ mkdir ~/tmp
$ cd ~/tmp
$ wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
$ gunzip *.gz

展開したディレクトリを指定しながら、LeNetの学習を実行。

1
2
3
4
5
6
7
8
$ spark-submit \
--master local[6] \
--driver-class-path ${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
--class com.intel.analytics.bigdl.models.lenet.Train \
${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
-f $HOME/tmp \
-b 12 \
--checkpoint ./model

実行中の様子は以下の通り。

ヒストリのキャプチャ

今回はローカルモードで実行したが、入力された学習データと同様のサイズのキャッシュがメモリ上に展開されていることがわかる。

サンプルの中身

2020/11/15時点のmasterブランチを確認する。

Trainクラス

上記サンプルで実行されているTrainを見る。 モデルを定義している箇所は以下の通り。

com/intel/analytics/bigdl/models/lenet/Train.scala:48

1
2
3
4
5
6
7
8
9
10
11
12
val model = if (param.modelSnapshot.isDefined) {
Module.load[Float](param.modelSnapshot.get)
} else {
if (param.graphModel) {
LeNet5.graph(classNum = 10)
} else {
Engine.getEngineType() match {
case MklBlas => LeNet5(10)
case MklDnn => LeNet5.dnnGraph(param.batchSize / Engine.nodeNumber(), 10)
}
}
}

Modelインスタンスは以下の通り、Optimizerオブジェクトに渡され、 データセットに合わせたOptimizerが返される。 (例:データセットが分散データセットかどうか、など)

com/intel/analytics/bigdl/models/lenet/Train.scala:83

1
2
3
4
val optimizer = Optimizer(
model = model,
dataset = trainSet,
criterion = criterion)

参考までに、Optimizerの種類と判定の処理は以下の通り。

com/intel/analytics/bigdl/optim/Optimizer.scala:688

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dataset match {
case d: DistributedDataSet[_] =>
Engine.getOptimizerVersion() match {
case OptimizerV1 =>
new DistriOptimizer[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case OptimizerV2 =>
new DistriOptimizerV2[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
}
case d: LocalDataSet[_] =>
new LocalOptimizer[T](
model = model,
dataset = d.toLocal().asInstanceOf[LocalDataSet[MiniBatch[T]]],
criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case _ =>
throw new UnsupportedOperationException
}

返されたOptimizerの、Optimizer#optimizeメソッドを利用し学習が実行される。

com/intel/analytics/bigdl/models/lenet/Train.scala:98

1
2
3
4
5
6
7
8
optimizer
.setValidation(
trigger = Trigger.everyEpoch,
dataset = validationSet,
vMethods = Array(new Top1Accuracy, new Top5Accuracy[Float], new Loss[Float]))
.setOptimMethod(optimMethod)
.setEndWhen(Trigger.maxEpoch(param.maxEpoch))
.optimize()

Optimizerの種類

上記の内容を見るに、Optimizerにはいくつか種類がありそうだ。

  • DistriOptimizer
  • DistriOptimizerV2
  • LocalOptimizer

DistriOptimizer

ひとまずメモリ使用に関連する箇所ということで、入力データの準備の処理を確認する。 com.intel.analytics.bigdl.optim.DistriOptimizer#optimize メソッドには 以下のような箇所がある。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:870

1
prepareInput()

これは com.intel.analytics.bigdl.optim.DistriOptimizer#prepareInput メソッドであり、 内部的に com.intel.analytics.bigdl.optim.AbstractOptimizer#prepareInput メソッドを呼び出し、 入力データをSparkのキャッシュに載せるように処理する。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:808

1
2
3
4
if (!dataset.toDistributed().isCached) {
DistriOptimizer.logger.info("caching training rdd ...")
DistriOptimizer.prepareInput(this.dataset, this.validationDataSet)
}

キャシュに載せると箇所は以下の通り。

com/intel/analytics/bigdl/optim/AbstractOptimizer.scala:279

1
2
3
4
5
6
7
private[bigdl] def prepareInput[T: ClassTag](dataset: DataSet[MiniBatch[T]],
validationDataSet: Option[DataSet[MiniBatch[T]]]): Unit = {
dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]].cache()
if (validationDataSet.isDefined) {
validationDataSet.get.toDistributed().cache()
}
}

上記の DistributedDataSetchache メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:216

1
2
3
4
5
6
def cache(): Unit = {
if (originRDD() != null) {
originRDD().count()
}
isCached = true
}

originRDD の戻り値に対して、count を読んでいる。 ここで count を呼ぶのは、入力データである originRDD の戻り値に入っているRDDをメモリ上にマテリアライズするためである。

count を呼ぶだけでマテリアライズできるのは、予め入力データを定義したときに Spark RDDの cache を利用してキャッシュ化することを指定されているからである。 今回の例では、Optimizerオブジェクトのapplyを利用する際に渡されるデータセット trainSet を 定義する際に予め cache が呼ばれる。

com/intel/analytics/bigdl/models/lenet/Train.scala:79

1
2
3
val trainSet = DataSet.array(load(trainData, trainLabel), sc) ->
BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(
param.batchSize)

trainSet を定義する際、 com.intel.analytics.bigdl.dataset.DataSet$#array(T[], org.apache.spark.SparkContext) メソッドが 呼ばれるのだが、その中で以下のように Sparkの RDD#cache が呼ばれていることがわかる。

com/intel/analytics/bigdl/dataset/DataSet.scala:343

1
2
3
4
5
6
7
8
9
10
11
12
def array[T: ClassTag](localData: Array[T], sc: SparkContext): DistributedDataSet[T] = {
val nodeNumber = Engine.nodeNumber()
new CachedDistriDataSet[T](
sc.parallelize(localData, nodeNumber)
// Keep this line, or the array will be send to worker every time
.coalesce(nodeNumber, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

以下、一例。

具体的には、Optimizerのインスタンスを生成するための apply メソッドはいくつかあるが、 以下のように引数にデータセットを指定する箇所がある。(再掲)

com/intel/analytics/bigdl/optim/Optimizer.scala:619

1
_dataset = (DataSet.rdd(sampleRDD) ->

ここで用いられている com.intel.analytics.bigdl.dataset.DataSet#rdd メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:363

1
2
3
4
5
6
7
8
9
10
def rdd[T: ClassTag](data: RDD[T], partitionNum: Int = Engine.nodeNumber()
): DistributedDataSet[T] = {
new CachedDistriDataSet[T](
data.coalesce(partitionNum, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

com.intel.analytics.bigdl.dataset.CachedDistriDataSet#CachedDistriDataSet のコンストラクタ引数に、 org.apache.spark.rdd.RDD#cache を用いてキャッシュ化することを指定したRDDを渡していることがわかる。

共有

Generate PDF using pandoc

参考

メモ

TeX Liveのインストール手順 の通り、最新版をインストールする。 なお、フルセットでインストールした。

PDFを作成するときは以下のようにする。

1
$ pandoc test.md -o test.pdf --pdf-engine=lualatex -V documentclass=bxjsarticle -V classoption=pandoc

ただし、 default.latex に示すとおり、リンクに色を付けるなどしたいので、 上記マークダウンファイルには先頭部分にYAMLでコンフィグを記載することとした。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---
title: テスト文章
date: 2020-10-30
author: dobachi
colorlinks: yes
numbersections: yes
toc: yes
---

# test

## これはテスト

[Google](https://www.google.com)

<!-- vim: set ft=markdown : -->
共有

Access AWS S3 from Hadoop3 and Spark3

参考

メモ

パッケージインポート

Sparkの公式ドキュメント には、org.apache.spark:hadoop-cloud_2.12を利用するよう記載されているが、 このMavenパッケージは見当たらなかった。

そこで、 Hadoopの公式ドキュメント の通り、hadoop-awsをインポートすると、それに関連した依存関係をインポートすることにした。 spark-shellで試すとしたら以下の通り。

1
$ /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0

profileを使ったクレデンシャル設定

awscliを利用しているとしたら、profileを設定しながらクレデンシャルを使い分けていることもあると思う。 その場合、起動時の環境変数でプロフィールを指定しながら、以下のようにproviderを指定することで、 profileを利用したクレデンシャル管理の仕組みを利用してアクセスできる。

1
$ AWS_PROFILE=s3test /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0
1
scala> spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

(補足)パラメータと指定するクラスについて

ちなみに、上記パラメータの説明には、

If unspecified, then the default list of credential provider classes, queried in sequence, is: 1. org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider: Uses the values of fs.s3a.access.key and fs.s3a.secret.key. 2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports configuration of AWS access key ID and secret access key in environment variables named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK. 3. com.amazonaws.auth.InstanceProfileCredentialsProvider: supports use of instance profile credentials if running in an EC2 VM.

のように記載されている。

また、「Using Named Profile Credentials with ProfileCredentialsProvider」の節には、

Declare com.amazonaws.auth.profile.ProfileCredentialsProvider as the provider.

と書かれており、上記Default...でなくても良い。(実際に試したところ動作した)

共有

qmk_firmware_202009

参考

メモ

2020/9に久しぶりにコンパイルしようとしたら、だいぶ勝手が変わっていた。

公式の環境構築手順 に従って環境を構築した。 なお、インストールするよう指定されていたパッケージだけでは足りなかったので、 以下のようにインストールした。

1
$ pacman --needed --noconfirm --disable-download-timeout -S git mingw-w64-x86_64-toolchain mingw-w64-x86_64-python3-pip python3 python3-pip make diffutils

最初、以下を忘れたため、失敗したので注意・・・。

1
$ qmk setup

公式のコンパイル手順 に従うと、qmkコマンドを利用してコンパイル、フラッシュするようだ。

ただ、 QMK Firmwareでファームウェアをビルドしようとしたらavr-gccでコケた話 に記載されているのと同様に、 make git-submodules が必要だった。

共有

Use GitHub actions to deploy documents

参考

メモ

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する がSphinx JPのユーザ会による記事だったので参考になるが、 依存関係を都度pipインストールしているのが気になった。

マーケットプレイスのsphinx-build を利用すると良さそうだが、 この手順ではGITHUB TOKENを利用してデプロイしているのが気になった。 レポジトリごとに設定できる、Deploy Keyを利用したい。

そこでビルドには マーケットプレイスのsphinx-build を利用し、デプロイには GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の手順を利用することにした。

レポジトリのDeploy Key設定

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の「Deploy keys の設定」章を参考に、 当該レポジトリのDeploy Keyを登録する。

任意の環境(ここではWSLのUbuntu18を利用した)で、 以下のコマンドを実行。

1
$ ssh-keygen -t rsa -b 4096 -C "<レポジトリで使用しているメールアドレス>" -f <任意の名前> -N ""

上記記事の通り、秘密鍵と公開鍵をGitHubのウェブUIで登録する。 なお、「Secrets」タブで登録した名称は、後ほどGitHub Actionsのワークフロー内で使用する。

GitHub Actionsのワークフローの記述

マーケットプレイスのsphinx-build の例を参考に、 ワークフローを記述する。 なお、最後のデプロイする部分は、 GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する を参考に、 Deploy Keyを利用するよう修正した。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
name: CI

# 今回はマスタブランチへのPushをトリガとする。
on:
push:
branches:
- master

jobs:
build:

runs-on: ubuntu-latest
# 今回はmasterブランチへのpushをトリガとしているので不要だが、gh-pagesへのpushをトリガとする場合など
# 無限ループを回避する際には以下のように記述する。
if: "!contains(github.event.head_commit.message, 'Update documentation via GitHub Actions')"

steps:
- uses: actions/checkout@v1

# 今回はMakefileを利用するので、makeコマンドを使用するよう元ネタから修正した。
# またドキュメントのソースが含まれているディレクトリは各自の定義に依存する。
- uses: ammaraskar/sphinx-action@master
with:
build-command: "make html"
docs-folder: "documents/"
# 先ほどGitHubのウェブUIで定義した秘密鍵名を使用する。
- name: Commit documentation changes and push it
run: |
mkdir ~/.ssh
ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
echo "${{ secrets.<先ほどGitHubウェブUIで定義した秘密鍵名> }}" > ~/.ssh/id_rsa
chmod 400 ~/.ssh/id_rsa
git clone git@github.com:${GITHUB_REPOSITORY}.git --branch gh-pages --single-branch gh-pages
cp -r documents/_build/html/* gh-pages/
cd gh-pages
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
git add .
git commit -m "Update documentation via GitHub Actions" -a || true
git push origin HEAD:gh-pages
# The above command will fail if no changes were present, so we ignore
# that.
# ===============================
共有

Install Bigtop RPMs using Yum

参考

メモ

今回は、2020/7/21時点の最新バージョンである Apache Bigtop 1.4.0のパッケージバージョン の Hadoopをインストールできるかどうかを試してみることとする。

Yumのrepoファイルは レポジトリ関連の資材置き場 以下にある。 例えば、今回はCentOS7を利用することにするので、 CentOS7のbigtop.repo あたりを利用する。

1
2
$ cd /etc/yum.repos.d
$ sudo wget https://downloads.apache.org/bigtop/bigtop-1.4.0/repos/centos7/bigtop.repo

ひとまずパッケージが見つかるかどうか、確認。

1
2
3
4
5
6
7
8
9
$ sudo yum search hadoop-conf-pseudo
読み込んだプラグイン:fastestmirror
Loading mirror speeds from cached hostfile
* base: d36uatko69830t.cloudfront.net
* epel: d2lzkl7pfhq30w.cloudfront.net
* extras: d36uatko69830t.cloudfront.net
* updates: d36uatko69830t.cloudfront.net
=========================================== N/S matched: hadoop-conf-pseudo ============================================
hadoop-conf-pseudo.x86_64 : Pseudo-distributed Hadoop configuration

確認できたので、試しにインストール。

1
$ sudo yum install hadoop-conf-pseudo

自分の手元の環境では、依存関係で以下のパッケージがインストールされた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
========================================================================================================================
Package アーキテクチャー バージョン リポジトリー 容量
========================================================================================================================
インストール中:
hadoop-conf-pseudo x86_64 2.8.5-1.el7 bigtop 20 k
依存性関連でのインストールをします:
at x86_64 3.1.13-24.el7 base 51 k
bc x86_64 1.06.95-13.el7 base 115 k
bigtop-groovy noarch 2.4.10-1.el7 bigtop 9.8 M
bigtop-jsvc x86_64 1.0.15-1.el7 bigtop 29 k
bigtop-utils noarch 1.4.0-1.el7 bigtop 11 k
cups-client x86_64 1:1.6.3-43.el7 base 152 k
ed x86_64 1.9-4.el7 base 72 k
hadoop x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs-datanode x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-hdfs-namenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-hdfs-secondarynamenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-mapreduce x86_64 2.8.5-1.el7 bigtop 34 M
hadoop-mapreduce-historyserver x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-yarn x86_64 2.8.5-1.el7 bigtop 20 M
hadoop-yarn-nodemanager x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-yarn-resourcemanager x86_64 2.8.5-1.el7 bigtop 5.6 k
libpcap x86_64 14:1.5.3-12.el7 base 139 k
m4 x86_64 1.4.16-10.el7 base 256 k
mailx x86_64 12.5-19.el7 base 245 k
nmap-ncat x86_64 2:6.40-19.el7 base 206 k
patch x86_64 2.7.1-12.el7_7 base 111 k
psmisc x86_64 22.20-16.el7 base 141 k
redhat-lsb-core x86_64 4.1-27.el7.centos.1 base 38 k
redhat-lsb-submod-security x86_64 4.1-27.el7.centos.1 base 15 k
spax x86_64 1.5.2-13.el7 base 260 k
time x86_64 1.7-45.el7 base 30 k
zookeeper x86_64 3.4.6-1.el7 bigtop 7.0 M

initスクリプトがインストールされていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
$ ls -1 /etc/init.d/
README
functions
hadoop-hdfs-datanode
hadoop-hdfs-namenode
hadoop-hdfs-secondarynamenode
hadoop-mapreduce-historyserver
hadoop-yarn-nodemanager
hadoop-yarn-resourcemanager
netconsole
network

ひとまずHDFSをフォーマット。

1
$ sudo -u hdfs hdfs namenode -format

あとは、上記の各種Hadoopサービスを立ち上げれば良い。

共有

Delta Lake 0.7.0

参考

メモ

0.7.0が出たので、本リリースの特徴を確認する。

SQL DDLへの対応やHive メタストアの対応

0.6系まではScala、Python APIのみであったが、SQL DDLにも対応した。 0.7.0のテーブル読み書き0.6.1のテーブル読み書き を見比べると、SQLの例が載っていることがわかる。 対応するSQL構文については src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 あたりを見ると良い。

なお、 Spark3系でないとHiveメタストアに対応できない理由 を見る限り、 Spark3系のAPI(や、DataSourceV2も、かな)を使わないと、Data SourceのカスタムAPIを利用できないため、 これまでHiveメタストアのような外部メタストアと連携したDelta Lakeのメタデータ管理ができなかった、とのこと。

なお、今回の対応でSparkのカタログ機能を利用することになったので、起動時もしくはSparkSession生成時の オプション指定が必要になった。 その代わり、ライブラリの明示的なインポートが不要であり、クエリはDelta Lakeのパーサで解釈された後、 解釈できないようであれば通常のパーサで処理されるようになる。

起動時のオプション例

例:

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

なお、ここでは SparkSessionExtensions を利用し、SparkSession生成時にカスタムルール等を挿入している。 この機能は2020/06/19時点でSpark本体側でExperimentalであることに注意。 今後もSpark本体側の仕様変更に引きずられる可能性はある。

パーサの呼び出し流れ

セッション拡張機能を利用し、パーサが差し替えられている。

io/delta/sql/DeltaSparkSessionExtension.scala:73

1
2
3
4
5
6
7
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}

(snip)

io.delta.sql.parser.DeltaSqlParser クラスでは デリゲート用のパーサを受け取り、自身のパーサで処理できなかった場合に処理をデリゲートパーサに渡す。

io/delta/sql/parser/DeltaSqlParser.scala:66

1
2
3
4
class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
private val builder = new DeltaSqlAstBuilder

(snip)

例えば、 SparkSessionsql メソッドを使って呼び出す場合を例にする。 このとき、内部では、 org.apache.spark.sql.catalyst.parser.ParserInterface#parsePlan メソッドが呼ばれて、 渡されたクエリ文 sqlText が処理される。

org/apache/spark/sql/SparkSession.scala:601

1
2
3
4
5
6
7
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}

この parsePlan がoverrideされており、以下のように定義されている。

io/delta/sql/parser/DeltaSqlParser.scala:69

1
2
3
4
5
6
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ => delegate.parsePlan(sqlText)
}
}

まずは io.delta.sql.parser.DeltaSqlParser#parse メソッドを利用してパースがここ見られるが、 LogicalPlanが戻らなかったときは、デリゲート用パーサが呼び出されるようになっている。

カスタムカタログ

Spark3ではDataSourvV2の中で、プラガブルなカタログに対応した。 Delta Lake 0.7.0はこれを利用し、カスタムカタログを用いる。(これにより、Hiveメタストアを経由してDelta Lake形式のデータを読み書きできるようになっている) 使われているカタログは org.apache.spark.sql.delta.catalog.DeltaCatalog である。 (SparkSessionのインスタンス生成する際、もしくは起動時のオプション指定)

当該カタログ内部では、例えば org.apache.spark.sql.delta.catalog.DeltaCatalog#createDeltaTable メソッドが定義されており、 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable ※ しようとするときなどに呼び出されるようになっている。

org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension#createTable をoverrideしている

なお、このクラスもデリゲート用のカタログを用いるようになっている。 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable メソッドは以下のようになっており、 データソースが delta 出ない場合は、親クラスの createTable (つまり標準的なもの)が呼び出されるようになっている。

org/apache/spark/sql/delta/catalog/DeltaCatalog.scala:149

1
2
3
4
5
6
7
8
9
10
11
12
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
createDeltaTable(
ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
} else {
super.createTable(ident, schema, partitions, properties)
}
}

ScalaやPythonでの例

代表的にScalaの例を出す。公式サイトには以下のように載っている。

1
2
3
df.write.format("delta").saveAsTable("events")      // create table in the metastore

df.write.format("delta").save("/delta/events") // create table by path

Hiveメタストア経由で書き込むケースと、ストレージ上に直接書き出すケースが載っている。

SQLでのマージ処理

SQLを用いたマージの例 の通り、Delta Lakeの特徴であるマージ機能もSQLから呼び出させる。

1
2
3
4
5
6
spark.sql(s"""MERGE INTO $tableName USING newData
ON ${tableName}.id = newData.id
WHEN MATCHED THEN
UPDATE SET ${tableName}.id = newData.id
WHEN NOT MATCHED THEN INSERT *
""")

Spark SQLのカタログに登録されたDelta LakeのテーブルからDeltaTableを生成することもできる。

1
2
scala> import io.delta.tables.DeltaTable
scala> val tbl = DeltaTable.forName(tableName)

Presto / Athena用のメタデータの自動生成

Delta LakeはPresto、Athena用のメタデータを生成できるが、更新があった際にパーティションごとに自動で再生成できるようになった。

テーブル履歴の切り詰めの管理

Delta Lakeは更新の履歴を保持することも特徴の一つだが、 データ本体とログのそれぞれの切り詰め対象期間を指定できる。

CREATEやALTER句内で、TBLPROPERTIESとして指定することになっている。 例えば以下。

1
2
spark.sql(s"CREATE TABLE $tableName(id LONG) USING delta TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")
spark.sql(s"ALTER TABLE $tableName SET TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")

ユーザメタデータ

spark.databricks.delta.commitInfo.userMetadata プロパティを利用して、ユーザメタデータを付与できる。

1
df.write.option("spark.databricks.delta.commitInfo.userMetadata", "test").format("delta").mode("append").save("/tmp/test")
1
2
scala> spark.sql("SET spark.databricks.delta.commitInfo.userMetadata=test")
scala> spark.sql(s"INSERT INTO $tableName VALUES 0, 1, 2, 3, 4")

AzureのData Lake Storage Gen2

対応した。

しかし、 0.7.0で対応したAzure Data Lake Storage Gen2 の通り、 前提となっている各種ソフトウェアバージョンは高め。

ストリーム処理のone-timeトリガの改善

DataStreamReaderのオプションでmaxFilesPerTriggerを設定しているとしても、 one-time triggerでは一度に溜まったすべてのデータを読み込むようになった。(Spark 3系の話) 

共有

About tream table theory

参考

メモ

Tyler AkidauらによるApache Beamにまつわる情報

Foundations of streaming SQL stream & table theory (General theory) の通り、 ストリームデータとテーブルデータの関係性を論じたもの。

上記の内容の全体は、 OReillyのStreaming Systems に記載あり。

これらは、ストリーム・テーブル理論をベースに、Apache Beamを利用したストリームデータ処理・活用システムについて論じたもの。

Matthias J. Sax、Guozhang Wangらによる論文

Streams and Tables Two Sides of the Same Coin

概要

ストリームデータの処理モデルを提案。 論理・物理の間の一貫性の崩れに対処する。

1 Introduction

データストリームのチャレンジ

  • 物理的なアウト・オブ・オーダへの対応
  • 処理のオペレータがステートフルでないといけない、レイテンシ、正しさ、処理コストなどのトレードオフがある、など
  • 分散処理への対応

物理的な並びが保証されない条件下で、 十分に表現力のあるオペレータ(モデル)を立案する。

2 Background

ストリームデータのモデル。

ポイント。レコード配下の構成。

  • オフセット:物理の並び
  • タイムスタンプ:論理の並び(生成時刻)
  • キー
  • バリュー

3 Duality of streams and tables

レイテンシは、データストリームの特性に依存する。

論理、物理の並びの差異を解決するため、結果を継続的に更新するモデルを提案した。

Dual Streaming Model

DUality of streams and tables

テーブルは、テーブルのバージョンのコレクションと考えることもできる。

4 STREAM PROCESSING OPERATORS

モデル定義を解説。 フィルターなどのステートレスな処理から、アグリゲーションなどのステートフルな処理まで。

out-of-order なレコードが届いても、最終的な出力結果テーブルがout-of-orderレコードがないときと同一になるように動くことを期待する。

Watermarkと似たような機構として、「retention time」を導入。 結果テーブル内の「現在時刻 - retention time」よりも古い結果をメンテナンス対象から外す。

Stream - Table Joinのケースで、「遅れテーブル更新」が生じると、 下流システムに対して結果の上書きデータを出力する必要がある。

5 CASE STUDY: APACHE KAFKA

Kafka Streamsの例。

Kafka Streamsの利用企業:The New York Times, Pinterest, LINE, Trivago, etc

Kafka Streamsのオペレーションはすべてノンブロッキングであり、 レコードを受領次第処理し、KTableにマテリアライズされたり、Topicに書き戻したりされる。

Window集計の例を題材に、retention timeを利用。 retention timeを長くするほどストレージを利用するが、 out-of-orderレコードに対してロバストになる。 また、retention timeが長いほど、ウィンドウ結果が確定したと言えるタイミングが遅くなる。

6 RELATED WORK

Relationを取り扱うモデル、out-of-orderを取り扱うモデル、テーブルバージョン管理を取り扱うモデルなど。

共有

Handle pictures in delta lake and hudi

参考

Delta Lake

まずはjpgをSparkで読み込む

予め、Delta Lakeを読み込みながら起動する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.11:0.6.0

Spark公式ドキュメントのImage data source を利用し、jpgをDataFrameとして読み込んでみる。 データは、 Kaggleのmnistデータ を利用。このデータはjpgになっているので便利。

1
2
3
scala> val home = sys.env("HOME")
scala> val imageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/0"
scala> val df = spark.read.format("image").option("dropInvalid", true).load(home + "/" + imageDir)

データをDataFrameにする定義ができた。 内容は以下の通り。

1
2
3
4
5
6
7
8
scala> df.select("image.origin", "image.width", "image.height").show(3, truncate=false)
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_20188.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_12634.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_26812.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+

公式ドキュメントのとおりだが、カラムは以下の通り。

  • origin: StringType (represents the file path of the image)
  • height: IntegerType (height of the image)
  • width: IntegerType (width of the image)
  • nChannels: IntegerType (number of image channels)
  • mode: IntegerType (OpenCV-compatible type)
  • data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
1
2
3
4
5
6
7
8
9
scala> df.printSchema
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)

Delta Lakeで扱う

書き込み

これをDelta LakeのData Sourceで書き出してみる。

1
2
scala> val deltaImagePath = "/tmp/delta-lake-image"
scala> df.write.format("delta").save(deltaImagePath)

読み出し

できた。試しにDeltaTableとして読み出してみる。

1
2
3
scala> import io.delta.tables._
scala> import org.apache.spark.sql.functions._
scala> val deltaTable = DeltaTable.forPath(deltaImagePath)

更新

mergeを試す。 マージのためにキーを明示的に与えつつデータを準備する。

1
2
3
4
5
scala> val targetDf = df.select($"image.origin", $"image")
scala> val targetImagePath = "/tmp/delta-table"
scala> targetDf.write.format("delta").save(targetImagePath)
scala> val sourcePath = "Downloads/mnist_jpg/trainingSet/trainingSet/1"
scala> val sourceDf = spark.read.format("image").option("dropInvalid", true).load(home + "/" + sourcePath).select($"image.origin", $"image")

Delta Tableを定義。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val deltaTable = DeltaTable.forPath(targetImagePath)
scala> deltaTable
.as("target")
.merge(
sourceDf.as("source"),
"target.origin = source.origin")
.whenMatched
.updateExpr(Map(
"image" -> "source.image"))
.whenNotMatched
.insertExpr(Map(
"origin" -> "source.origin",
"image" -> "source.image"))
.execute()

実際に、追加データが入っていることが確かめられる。

1
2
3
4
5
6
7
8
9
scala> deltaTable.toDF.where($"origin" contains "trainingSet/1").select("image.origin", "image.width", "image.height").show(3, truncate=false)
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10266.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10665.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10772.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+
only showing top 3 rows

SparkのImage Data Sourceは、メタデータと画像本体(バイナリ)を構造で扱っているだけなので、 実は特に工夫することなく扱えた。

公式ドキュメントによると、OpenCV形式バイナリで保持されている。 これから先は、Python使って処理したいので、改めて開き直す。

Parquetの内容を確認する

parquet-mrparquet-tools を利用して、Parquetの内容を確認する。

なお、Hudi v0.5.2におけるParquetのバージョンは以下の通り、1.10.1である。

pom.xml:84

1
<parquet.version>1.10.1</parquet.version>

当該バージョンに対応するタグをチェックアウトし、予めparquet-toolsをパッケージしておくこと。

スキーマ

1
2
3
4
5
6
7
8
9
10
11
$ java -jar target/parquet-tools-1.10.1.jar schema /tmp/delta-lake-image/part-00000-c3d03d70-1785-435f-b055-b2a78903e732-c000.snappy.parquet
message spark_schema {
optional group image {
optional binary origin (UTF8);
optional int32 height;
optional int32 width;
optional int32 nChannels;
optional int32 mode;
optional binary data;
}
}

シンプルにParquet内に保持されていることがわかる。 Spark SQLのParquet取扱機能を利用している。

メタデータ

ファイル名やクリエイタなど。

1
2
3
file:        file:/tmp/delta-lake-image/part-00000-c3d03d70-1785-435f-b055-b2a78903e732-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"image","type":{"type":"struct","fields":[{"name":"origin","type":"string","nullable":true,"metadata":{}},{"name":"height","type":"integer","nullable":true,"metadata":{}},{"name":"width","type":"integer","nullable":true,"metadata":{}},{"name":"nChannels","type":"integer","nullable":true,"metadata":{}},{"name":"mode","type":"integer","nullable":true,"metadata":{}},{"name":"data","type":"binary","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

ファイルスキーマ

1
2
3
4
5
6
7
8
9
file schema: spark_schema
--------------------------------------------------------------------------------
image: OPTIONAL F:6
.origin: OPTIONAL BINARY O:UTF8 R:0 D:2
.height: OPTIONAL INT32 R:0 D:2
.width: OPTIONAL INT32 R:0 D:2
.nChannels: OPTIONAL INT32 R:0 D:2
.mode: OPTIONAL INT32 R:0 D:2
.data: OPTIONAL BINARY R:0 D:2

Rowグループ

1
2
3
4
5
6
7
8
9
10
11
row group 1: RC:32 TS:29939 OFFSET:4
--------------------------------------------------------------------------------
image:
.origin: BINARY SNAPPY DO:0 FPO:4 SZ:620/2838/4.58 VC:32 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_10203.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9098.jpg, num_nulls: 0]
.height: INT32 SNAPPY DO:0 FPO:624 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.width: INT32 SNAPPY DO:0 FPO:698 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.nChannels: INT32 SNAPPY DO:0 FPO:772 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
.mode: INT32 SNAPPY DO:0 FPO:846 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
.data: BINARY SNAPPY DO:0 FPO:920 SZ:21175/26821/1.27 VC:32 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 0x00

(snip)

Pythonで画像処理してみる

とりあえずOpenCVをインストールしたPython環境のJupyterでPySparkを起動。

1
$ PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" /opt/spark/default/bin/pyspark --packages io.delta:delta-core_2.11:0.6.0 --conf spark.pyspark.driver.python=$HOME/venv/jupyter/bin/jupyter

この後はJupyter内で処理する。

1
2
3
4
5
from delta.tables import *
from pyspark.sql.functions import *
delta_image_path = "/tmp/delta-lake-image"
deltaTable = DeltaTable.forPath(spark, delta_image_path)
deltaTable.toDF().select("image.origin", "image.width", "image.height").show(3, truncate=False)

とりあえず読み込めたことがわかる。

1
2
3
4
5
6
7
8
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_20188.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_12634.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_26812.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+
only showing top 3 rows

中のデータを利用できることを確かめるため、1件だけ取り出して処理してみる。

1
img = deltaTable.toDF().where("image.origin == 'file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg'").select("image.origin", "image.width", "image.height", "image.nChannels", "image.data").collect()[0]

とりあえずエッジ抽出をしてみる。

1
2
3
4
5
6
7
import numpy as np
import cv2
from matplotlib import pyplot as plt

nparr = np.frombuffer(img.data, np.uint8).reshape(img.height, img.width, img.nChannels)
edges = cv2.Canny(nparr,100,200)
plt.imshow(edges)

Jupyter上でエッジ抽出されたことが確かめられただろうか。

エッジ抽出された様

これをSparkでUDFで実行するようにする。 (とりあえず雑にライブラリをインポート…)

1
2
3
4
5
6
7
8
9
10
11
def get_edge(data, h, w, c):
import numpy as np
import cv2
nparr = np.frombuffer(data, np.uint8).reshape(h, w, c)
edge = cv2.Canny(nparr,100,200)
return edge.tobytes()

from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType

get_edge_udf = udf(get_edge, BinaryType())

エッジ抽出されたndarrayをバイト列に変換し、Spark上ではBinaryTypeで扱うように指定した。

1
2
3
with_edge = deltaTable.toDF().select("image.origin", "image.width", "image.height", "image.nChannels", get_edge_udf("image.data", "image.height", "image.width", "image.nChannels").alias("edge"), "image.data")

with_edge.show(3)
1
2
3
4
5
6
7
8
+--------------------+-----+------+---------+--------------------+--------------------+
| origin|width|height|nChannels| edge| data|
+--------------------+-----+------+---------+--------------------+--------------------+
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[05 00 04 08 00 0...|
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[05 00 0B 00 00 0...|
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[06 00 02 00 02 0...|
+--------------------+-----+------+---------+--------------------+--------------------+
only showing top 3 rows

こんな感じで、OpenCVを用いた処理をDataFrameに対して実行できる。

なお、当然ながら処理されたデータを取り出せば、また画像として表示も可能。

1
2
3
new_img = with_edge.where("origin == 'file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg'").select("origin", "width", "height", "nChannels", "data", "edge").collect()[0]
new_nparr = np.frombuffer(new_img.edge, np.uint8).reshape(new_img.height, new_img.width)
plt.imshow(new_nparr)

Hudi

画像読み込み

まずはjpgをSparkで読み込む と同様に データを読み込む。

Hudiのquick-start-guide を参考にしながら、まずはHudiを読み込みながら、シェルを起動する。

1
2
$ /opt/spark/default/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

ライブラリをインポート。

1
2
3
4
scala> import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._

画像データを読み込み

1
2
3
4
5
scala> val tableName = "hudi_images"
scala> val basePath = "file:///tmp/hudi_images"
scala> val home = sys.env("HOME")
scala> val imageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/*"
scala> val df = spark.read.format("image").option("dropInvalid", true).load(home + "/" + imageDir)

今回は、Hudiにおけるパーティション構成を確認するため、上記のようにワイルドカード指定した。

書き込み

Hudi書き込み用にデータを変換。パーティション用キーなどを定義する。

1
2
3
4
5
scala> import org.apache.spark.sql.functions.{col, udf}
scala> val partitionPath = udf((str: String) => {
str.split("/").takeRight(2).head
})
scala> val hudiDf = df.select($"image.origin", partitionPath($"image.origin") as "partitionpath", $"*")

書き込み。

1
2
3
4
5
6
7
scala> hudiDf.write.format("hudi").
option(TABLE_NAME, tableName).
option(PRECOMBINE_FIELD_OPT_KEY, "origin").
option(RECORDKEY_FIELD_OPT_KEY, "origin").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
mode(Overwrite).
save(basePath)

書き込まれたデータは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ ls -R /tmp/hudi_images/
/tmp/hudi_images/:
0 1 2 3 4 5 6 7 8 9

/tmp/hudi_images/0:
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-29-14074_20200517135104.parquet

/tmp/hudi_images/1:
4912032f-3365-4852-b2ae-91ffb5ea9806-0_1-29-14075_20200517135104.parquet

/tmp/hudi_images/2:
0636bb93-dd32-40f1-b5c8-328130386528-0_2-29-14076_20200517135104.parquet

/tmp/hudi_images/3:
6f0154cd-4a9f-4001-935b-47d067416797-0_3-29-14077_20200517135104.parquet

/tmp/hudi_images/4:
b6f29c2c-7df0-481a-8149-2bbc93e92e2c-0_4-29-14078_20200517135104.parquet

/tmp/hudi_images/5:
24115172-10db-4c85-808a-bf4048e0f533-0_5-29-14079_20200517135104.parquet

/tmp/hudi_images/6:
64823f4b-26fb-4679-87e8-cd81d01b1181-0_6-29-14080_20200517135104.parquet

/tmp/hudi_images/7:
d0c097e3-9ab7-477a-b591-593c6cb7880f-0_7-29-14081_20200517135104.parquet

/tmp/hudi_images/8:
13a48b38-cbfb-4076-957d-9c580765bfca-0_8-29-14082_20200517135104.parquet

/tmp/hudi_images/9:
5d3ba9ae-8ede-410f-95f3-e8d69e8c0478-0_9-29-14083_20200517135104.parquet

メタデータは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ ls -1 /tmp/hudi_images/.hoodie/
20200517135104.clean
20200517135104.clean.inflight
20200517135104.clean.requested
20200517135104.commit
20200517135104.commit.requested
20200517135104.inflight
20200517140006.clean
20200517140006.clean.inflight
20200517140006.clean.requested
20200517140006.commit
20200517140006.commit.requested
20200517140006.inflight
archived
hoodie.properties

読み込み

読み込んで見る。

1
2
3
4
scala> val hudiImageDf = spark.
read.
format("hudi").
load(basePath + "/*")

スキーマは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> hudiImageDf.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- origin: string (nullable = true)
|-- partitionpath: string (nullable = true)
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)

すべてのただの書き込みAPIを試しただけだが、想定通り書き込み・読み込みできている。

更新

Hudiといえば、テーブルを更新可能であることも特徴のひとつなので、試しに更新をしてみる。 元は同じデータであるが、「0」の画像データを新しいデータとしてDataFrameを定義し、更新する。 意図的に、一部のデータだけ更新する。

1
2
3
4
5
6
7
8
9
10
scala> val updateImageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/0"
scala> val updateDf = spark.read.format("image").option("dropInvalid", true).load(home + "/" + updateImageDir)
scala> val updateHudiDf = updateDf.select($"image.origin", partitionPath($"image.origin") as "partitionpath", $"*")
scala> updateHudiDf.write.format("hudi").
option(TABLE_NAME, tableName).
option(PRECOMBINE_FIELD_OPT_KEY, "origin").
option(RECORDKEY_FIELD_OPT_KEY, "origin").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
mode(Append).
save(basePath)

以下のように、「0」のデータが更新されていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
$ ls -R /tmp/hudi_images/
/tmp/hudi_images/:
0 1 2 3 4 5 6 7 8 9

/tmp/hudi_images/0:
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-29-14074_20200517135104.parquet
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_.parquet

/tmp/hudi_images/1:
4912032f-3365-4852-b2ae-91ffb5ea9806-0_1-29-14075_20200517135104.parquet

(snip)

つづいて、更新されたことを確かめるため、改めて読み込みDataFrameを定義し、

1
2
3
4
scala> val updatedHudiImageDf = spark.
read.
format("hudi").
load(basePath + "/*")

試しに、差分がわかるように「1」と「0」のデータをそれぞれ読んで見る。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala> updatedHudiImageDf.select($"_hoodie_commit_time", $"_hoodie_commit_seqno", $"_hoodie_partition_path").filter("partitionpath == 1").show(3)
+-------------------+--------------------+----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_partition_path|
+-------------------+--------------------+----------------------+
| 20200517135104| 20200517135104_1_1| 1|
| 20200517135104| 20200517135104_1_12| 1|
| 20200517135104| 20200517135104_1_45| 1|
+-------------------+--------------------+----------------------+
only showing top 3 rows


scala> updatedHudiImageDf.select($"_hoodie_commit_time", $"_hoodie_commit_seqno", $"_hoodie_partition_path").filter("partitionpath == 0").show(3)
+-------------------+--------------------+----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_partition_path|
+-------------------+--------------------+----------------------+
| 20200517140006|20200517140006_0_...| 0|
| 20200517140006|20200517140006_0_...| 0|
| 20200517140006|20200517140006_0_...| 0|
+-------------------+--------------------+----------------------+
only showing top 3 rows

上記のように、更新されたことがわかる。最新の更新日時は、 2020/05/17 14:00:06UTC である。

ここで読み込まれたデータも、Delta Lake同様に画像を元にしたベクトルデータとして扱える。 Pythonであれば ndarray に変換して用いれば良い。

Parquetの内容を確認する

parquet-mrparquet-tools を利用して、Parquetの内容を確認する。

なお、Hudi v0.5.2におけるParquetのバージョンは以下の通り、1.10.1である。

pom.xml:84

1
<parquet.version>1.10.1</parquet.version>

当該バージョンに対応するタグをチェックアウトし、予めparquet-toolsをパッケージしておくこと。

スキーマ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ java -jar target/parquet-tools-1.10.1.jar schema /tmp/hudi_images/0/86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet
message hoodie.hudi_images.hudi_images_record {
optional binary _hoodie_commit_time (UTF8);
optional binary _hoodie_commit_seqno (UTF8);
optional binary _hoodie_record_key (UTF8);
optional binary _hoodie_partition_path (UTF8);
optional binary _hoodie_file_name (UTF8);
optional binary origin (UTF8);
optional binary partitionpath (UTF8);
optional group image {
optional binary origin (UTF8);
optional int32 height;
optional int32 width;
optional int32 nChannels;
optional int32 mode;
optional binary data;
}
}

Hudiが独自に付加したカラムが追加されていることが見て取れる。

メタデータ

つづいて、メタデータを確認する。

ファイル名やクリエイタなど。

1
2
file:                   file:/tmp/hudi_images/0/86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)

ブルームフィルタはここでは省略

1
2
3
extra:                  org.apache.hudi.bloomfilter = ///

(snip)

レコードキーやAvroスキーマ

1
2
3
4
extra:                  hoodie_min_record_key = file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg
extra: parquet.avro.schema = {"type":"record","name":"hudi_images_record","namespace":"hoodie.hudi_images","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"origin","type":["string","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"image","type":[{"type":"record","name":"image","namespace":"hoodie.hudi_images.hudi_images_record","fields":[{"name":"origin","type":["string","null"]},{"name":"height","type":["int","null"]},{"name":"width","type":["int","null"]},{"name":"nChannels","type":["int","null"]},{"name":"mode","type":["int","null"]},{"name":"data","type":["bytes","null"]}]},"null"]}]}
extra: writer.model.name = avro
extra: hoodie_max_record_key = file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg

上記の通り、Hudiでは parquet-avro を利用し、Avro形式でParquet内のデータを保持する。

スキーマ詳細

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
file schema:            hoodie.hudi_images.hudi_images_record
--------------------------------------------------------------------------------
_hoodie_commit_time: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_commit_seqno: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_record_key: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_partition_path: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
origin: OPTIONAL BINARY O:UTF8 R:0 D:1
partitionpath: OPTIONAL BINARY O:UTF8 R:0 D:1
image: OPTIONAL F:6
.origin: OPTIONAL BINARY O:UTF8 R:0 D:2
.height: OPTIONAL INT32 R:0 D:2
.width: OPTIONAL INT32 R:0 D:2
.nChannels: OPTIONAL INT32 R:0 D:2
.mode: OPTIONAL INT32 R:0 D:2
.data: OPTIONAL BINARY R:0 D:2

Rowグループ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
row group 1:            RC:4132 TS:4397030 OFFSET:4
--------------------------------------------------------------------------------
_hoodie_commit_time: BINARY GZIP DO:0 FPO:4 SZ:165/127/0.77 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 20200517140006, max: 20200517140006, num_nulls: 0]
_hoodie_commit_seqno: BINARY GZIP DO:0 FPO:169 SZ:10497/107513/10.24 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 20200517140006_0_42001, max: 20200517140006_0_46132, num_nulls: 0]
_hoodie_record_key: BINARY GZIP DO:0 FPO:10666 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
_hoodie_partition_path: BINARY GZIP DO:0 FPO:26866 SZ:100/62/0.62 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
_hoodie_file_name: BINARY GZIP DO:0 FPO:26966 SZ:448/419/0.94 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet, max: 86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet, num_nulls: 0]
origin: BINARY GZIP DO:0 FPO:27414 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
partitionpath: BINARY GZIP DO:0 FPO:43614 SZ:100/62/0.62 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
image:
.origin: BINARY GZIP DO:0 FPO:43714 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
.height: INT32 GZIP DO:0 FPO:59914 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.width: INT32 GZIP DO:0 FPO:60025 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.nChannels: INT32 GZIP DO:0 FPO:60136 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
.mode: INT32 GZIP DO:0 FPO:60247 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
.data: BINARY GZIP DO:0 FPO:60358 SZ:1609341/3262447/2.03 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 0x00

(snip)
共有