qmk_firmware_202009

参考

メモ

2020/9に久しぶりにコンパイルしようとしたら、だいぶ勝手が変わっていた。

公式の環境構築手順 に従って環境を構築した。 なお、インストールするよう指定されていたパッケージだけでは足りなかったので、 以下のようにインストールした。

1
$ pacman --needed --noconfirm --disable-download-timeout -S git mingw-w64-x86_64-toolchain mingw-w64-x86_64-python3-pip python3 python3-pip make diffutils

最初、以下を忘れたため、失敗したので注意・・・。

1
$ qmk setup

公式のコンパイル手順 に従うと、qmkコマンドを利用してコンパイル、フラッシュするようだ。

ただ、 QMK Firmwareでファームウェアをビルドしようとしたらavr-gccでコケた話 に記載されているのと同様に、 make git-submodules が必要だった。

共有

Use GitHub actions to deploy documents

参考

メモ

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する がSphinx JPのユーザ会による記事だったので参考になるが、 依存関係を都度pipインストールしているのが気になった。

マーケットプレイスのsphinx-build を利用すると良さそうだが、 この手順ではGITHUB TOKENを利用してデプロイしているのが気になった。 レポジトリごとに設定できる、Deploy Keyを利用したい。

そこでビルドには マーケットプレイスのsphinx-build を利用し、デプロイには GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の手順を利用することにした。

レポジトリのDeploy Key設定

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の「Deploy keys の設定」章を参考に、 当該レポジトリのDeploy Keyを登録する。

任意の環境(ここではWSLのUbuntu18を利用した)で、 以下のコマンドを実行。

1
$ ssh-keygen -t rsa -b 4096 -C "<レポジトリで使用しているメールアドレス>" -f <任意の名前> -N ""

上記記事の通り、秘密鍵と公開鍵をGitHubのウェブUIで登録する。 なお、「Secrets」タブで登録した名称は、後ほどGitHub Actionsのワークフロー内で使用する。

GitHub Actionsのワークフローの記述

マーケットプレイスのsphinx-build の例を参考に、 ワークフローを記述する。 なお、最後のデプロイする部分は、 GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する を参考に、 Deploy Keyを利用するよう修正した。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
name: CI

# 今回はマスタブランチへのPushをトリガとする。
on:
push:
branches:
- master

jobs:
build:

runs-on: ubuntu-latest
# 今回はmasterブランチへのpushをトリガとしているので不要だが、gh-pagesへのpushをトリガとする場合など
# 無限ループを回避する際には以下のように記述する。
if: "!contains(github.event.head_commit.message, 'Update documentation via GitHub Actions')"

steps:
- uses: actions/checkout@v1

# 今回はMakefileを利用するので、makeコマンドを使用するよう元ネタから修正した。
# またドキュメントのソースが含まれているディレクトリは各自の定義に依存する。
- uses: ammaraskar/sphinx-action@master
with:
build-command: "make html"
docs-folder: "documents/"
# 先ほどGitHubのウェブUIで定義した秘密鍵名を使用する。
- name: Commit documentation changes and push it
run: |
mkdir ~/.ssh
ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
echo "${{ secrets.<先ほどGitHubウェブUIで定義した秘密鍵名> }}" > ~/.ssh/id_rsa
chmod 400 ~/.ssh/id_rsa
git clone git@github.com:${GITHUB_REPOSITORY}.git --branch gh-pages --single-branch gh-pages
cp -r documents/_build/html/* gh-pages/
cd gh-pages
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
git add .
git commit -m "Update documentation via GitHub Actions" -a || true
git push origin HEAD:gh-pages
# The above command will fail if no changes were present, so we ignore
# that.
# ===============================
共有

Install Bigtop RPMs using Yum

参考

メモ

今回は、2020/7/21時点の最新バージョンである Apache Bigtop 1.4.0のパッケージバージョン の Hadoopをインストールできるかどうかを試してみることとする。

Yumのrepoファイルは レポジトリ関連の資材置き場 以下にある。 例えば、今回はCentOS7を利用することにするので、 CentOS7のbigtop.repo あたりを利用する。

1
2
$ cd /etc/yum.repos.d
$ sudo wget https://downloads.apache.org/bigtop/bigtop-1.4.0/repos/centos7/bigtop.repo

ひとまずパッケージが見つかるかどうか、確認。

1
2
3
4
5
6
7
8
9
$ sudo yum search hadoop-conf-pseudo
読み込んだプラグイン:fastestmirror
Loading mirror speeds from cached hostfile
* base: d36uatko69830t.cloudfront.net
* epel: d2lzkl7pfhq30w.cloudfront.net
* extras: d36uatko69830t.cloudfront.net
* updates: d36uatko69830t.cloudfront.net
=========================================== N/S matched: hadoop-conf-pseudo ============================================
hadoop-conf-pseudo.x86_64 : Pseudo-distributed Hadoop configuration

確認できたので、試しにインストール。

1
$ sudo yum install hadoop-conf-pseudo

自分の手元の環境では、依存関係で以下のパッケージがインストールされた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
========================================================================================================================
Package アーキテクチャー バージョン リポジトリー 容量
========================================================================================================================
インストール中:
hadoop-conf-pseudo x86_64 2.8.5-1.el7 bigtop 20 k
依存性関連でのインストールをします:
at x86_64 3.1.13-24.el7 base 51 k
bc x86_64 1.06.95-13.el7 base 115 k
bigtop-groovy noarch 2.4.10-1.el7 bigtop 9.8 M
bigtop-jsvc x86_64 1.0.15-1.el7 bigtop 29 k
bigtop-utils noarch 1.4.0-1.el7 bigtop 11 k
cups-client x86_64 1:1.6.3-43.el7 base 152 k
ed x86_64 1.9-4.el7 base 72 k
hadoop x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs-datanode x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-hdfs-namenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-hdfs-secondarynamenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-mapreduce x86_64 2.8.5-1.el7 bigtop 34 M
hadoop-mapreduce-historyserver x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-yarn x86_64 2.8.5-1.el7 bigtop 20 M
hadoop-yarn-nodemanager x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-yarn-resourcemanager x86_64 2.8.5-1.el7 bigtop 5.6 k
libpcap x86_64 14:1.5.3-12.el7 base 139 k
m4 x86_64 1.4.16-10.el7 base 256 k
mailx x86_64 12.5-19.el7 base 245 k
nmap-ncat x86_64 2:6.40-19.el7 base 206 k
patch x86_64 2.7.1-12.el7_7 base 111 k
psmisc x86_64 22.20-16.el7 base 141 k
redhat-lsb-core x86_64 4.1-27.el7.centos.1 base 38 k
redhat-lsb-submod-security x86_64 4.1-27.el7.centos.1 base 15 k
spax x86_64 1.5.2-13.el7 base 260 k
time x86_64 1.7-45.el7 base 30 k
zookeeper x86_64 3.4.6-1.el7 bigtop 7.0 M

initスクリプトがインストールされていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
$ ls -1 /etc/init.d/
README
functions
hadoop-hdfs-datanode
hadoop-hdfs-namenode
hadoop-hdfs-secondarynamenode
hadoop-mapreduce-historyserver
hadoop-yarn-nodemanager
hadoop-yarn-resourcemanager
netconsole
network

ひとまずHDFSをフォーマット。

1
$ sudo -u hdfs hdfs namenode -format

あとは、上記の各種Hadoopサービスを立ち上げれば良い。

共有

Delta Lake 0.7.0

参考

メモ

0.7.0が出たので、本リリースの特徴を確認する。

SQL DDLへの対応やHive メタストアの対応

0.6系まではScala、Python APIのみであったが、SQL DDLにも対応した。 0.7.0のテーブル読み書き0.6.1のテーブル読み書き を見比べると、SQLの例が載っていることがわかる。 対応するSQL構文については src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 あたりを見ると良い。

なお、 Spark3系でないとHiveメタストアに対応できない理由 を見る限り、 Spark3系のAPI(や、DataSourceV2も、かな)を使わないと、Data SourceのカスタムAPIを利用できないため、 これまでHiveメタストアのような外部メタストアと連携したDelta Lakeのメタデータ管理ができなかった、とのこと。

なお、今回の対応でSparkのカタログ機能を利用することになったので、起動時もしくはSparkSession生成時の オプション指定が必要になった。 その代わり、ライブラリの明示的なインポートが不要であり、クエリはDelta Lakeのパーサで解釈された後、 解釈できないようであれば通常のパーサで処理されるようになる。

起動時のオプション例

例:

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

なお、ここでは SparkSessionExtensions を利用し、SparkSession生成時にカスタムルール等を挿入している。 この機能は2020/06/19時点でSpark本体側でExperimentalであることに注意。 今後もSpark本体側の仕様変更に引きずられる可能性はある。

パーサの呼び出し流れ

セッション拡張機能を利用し、パーサが差し替えられている。

io/delta/sql/DeltaSparkSessionExtension.scala:73

1
2
3
4
5
6
7
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}

(snip)

io.delta.sql.parser.DeltaSqlParser クラスでは デリゲート用のパーサを受け取り、自身のパーサで処理できなかった場合に処理をデリゲートパーサに渡す。

io/delta/sql/parser/DeltaSqlParser.scala:66

1
2
3
4
class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
private val builder = new DeltaSqlAstBuilder

(snip)

例えば、 SparkSessionsql メソッドを使って呼び出す場合を例にする。 このとき、内部では、 org.apache.spark.sql.catalyst.parser.ParserInterface#parsePlan メソッドが呼ばれて、 渡されたクエリ文 sqlText が処理される。

org/apache/spark/sql/SparkSession.scala:601

1
2
3
4
5
6
7
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}

この parsePlan がoverrideされており、以下のように定義されている。

io/delta/sql/parser/DeltaSqlParser.scala:69

1
2
3
4
5
6
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ => delegate.parsePlan(sqlText)
}
}

まずは io.delta.sql.parser.DeltaSqlParser#parse メソッドを利用してパースがここ見られるが、 LogicalPlanが戻らなかったときは、デリゲート用パーサが呼び出されるようになっている。

カスタムカタログ

Spark3ではDataSourvV2の中で、プラガブルなカタログに対応した。 Delta Lake 0.7.0はこれを利用し、カスタムカタログを用いる。(これにより、Hiveメタストアを経由してDelta Lake形式のデータを読み書きできるようになっている) 使われているカタログは org.apache.spark.sql.delta.catalog.DeltaCatalog である。 (SparkSessionのインスタンス生成する際、もしくは起動時のオプション指定)

当該カタログ内部では、例えば org.apache.spark.sql.delta.catalog.DeltaCatalog#createDeltaTable メソッドが定義されており、 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable ※ しようとするときなどに呼び出されるようになっている。

org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension#createTable をoverrideしている

なお、このクラスもデリゲート用のカタログを用いるようになっている。 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable メソッドは以下のようになっており、 データソースが delta 出ない場合は、親クラスの createTable (つまり標準的なもの)が呼び出されるようになっている。

org/apache/spark/sql/delta/catalog/DeltaCatalog.scala:149

1
2
3
4
5
6
7
8
9
10
11
12
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
createDeltaTable(
ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
} else {
super.createTable(ident, schema, partitions, properties)
}
}

ScalaやPythonでの例

代表的にScalaの例を出す。公式サイトには以下のように載っている。

1
2
3
df.write.format("delta").saveAsTable("events")      // create table in the metastore

df.write.format("delta").save("/delta/events") // create table by path

Hiveメタストア経由で書き込むケースと、ストレージ上に直接書き出すケースが載っている。

SQLでのマージ処理

SQLを用いたマージの例 の通り、Delta Lakeの特徴であるマージ機能もSQLから呼び出させる。

1
2
3
4
5
6
spark.sql(s"""MERGE INTO $tableName USING newData
ON ${tableName}.id = newData.id
WHEN MATCHED THEN
UPDATE SET ${tableName}.id = newData.id
WHEN NOT MATCHED THEN INSERT *
""")

Spark SQLのカタログに登録されたDelta LakeのテーブルからDeltaTableを生成することもできる。

1
2
scala> import io.delta.tables.DeltaTable
scala> val tbl = DeltaTable.forName(tableName)

Presto / Athena用のメタデータの自動生成

Delta LakeはPresto、Athena用のメタデータを生成できるが、更新があった際にパーティションごとに自動で再生成できるようになった。

テーブル履歴の切り詰めの管理

Delta Lakeは更新の履歴を保持することも特徴の一つだが、 データ本体とログのそれぞれの切り詰め対象期間を指定できる。

CREATEやALTER句内で、TBLPROPERTIESとして指定することになっている。 例えば以下。

1
2
spark.sql(s"CREATE TABLE $tableName(id LONG) USING delta TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")
spark.sql(s"ALTER TABLE $tableName SET TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")

ユーザメタデータ

spark.databricks.delta.commitInfo.userMetadata プロパティを利用して、ユーザメタデータを付与できる。

1
df.write.option("spark.databricks.delta.commitInfo.userMetadata", "test").format("delta").mode("append").save("/tmp/test")
1
2
scala> spark.sql("SET spark.databricks.delta.commitInfo.userMetadata=test")
scala> spark.sql(s"INSERT INTO $tableName VALUES 0, 1, 2, 3, 4")

AzureのData Lake Storage Gen2

対応した。

しかし、 0.7.0で対応したAzure Data Lake Storage Gen2 の通り、 前提となっている各種ソフトウェアバージョンは高め。

ストリーム処理のone-timeトリガの改善

DataStreamReaderのオプションでmaxFilesPerTriggerを設定しているとしても、 one-time triggerでは一度に溜まったすべてのデータを読み込むようになった。(Spark 3系の話) 

共有

About tream table theory

参考

メモ

Tyler AkidauらによるApache Beamにまつわる情報

Foundations of streaming SQL stream & table theory (General theory) の通り、 ストリームデータとテーブルデータの関係性を論じたもの。

上記の内容の全体は、 OReillyのStreaming Systems に記載あり。

これらは、ストリーム・テーブル理論をベースに、Apache Beamを利用したストリームデータ処理・活用システムについて論じたもの。

Matthias J. Sax、Guozhang Wangらによる論文

Streams and Tables Two Sides of the Same Coin

概要

ストリームデータの処理モデルを提案。 論理・物理の間の一貫性の崩れに対処する。

1 Introduction

データストリームのチャレンジ

  • 物理的なアウト・オブ・オーダへの対応
  • 処理のオペレータがステートフルでないといけない、レイテンシ、正しさ、処理コストなどのトレードオフがある、など
  • 分散処理への対応

物理的な並びが保証されない条件下で、 十分に表現力のあるオペレータ(モデル)を立案する。

2 Background

ストリームデータのモデル。

ポイント。レコード配下の構成。

  • オフセット:物理の並び
  • タイムスタンプ:論理の並び(生成時刻)
  • キー
  • バリュー

3 Duality of streams and tables

レイテンシは、データストリームの特性に依存する。

論理、物理の並びの差異を解決するため、結果を継続的に更新するモデルを提案した。

Dual Streaming Model

DUality of streams and tables

テーブルは、テーブルのバージョンのコレクションと考えることもできる。

4 STREAM PROCESSING OPERATORS

モデル定義を解説。 フィルターなどのステートレスな処理から、アグリゲーションなどのステートフルな処理まで。

out-of-order なレコードが届いても、最終的な出力結果テーブルがout-of-orderレコードがないときと同一になるように動くことを期待する。

Watermarkと似たような機構として、「retention time」を導入。 結果テーブル内の「現在時刻 - retention time」よりも古い結果をメンテナンス対象から外す。

Stream - Table Joinのケースで、「遅れテーブル更新」が生じると、 下流システムに対して結果の上書きデータを出力する必要がある。

5 CASE STUDY: APACHE KAFKA

Kafka Streamsの例。

Kafka Streamsの利用企業:The New York Times, Pinterest, LINE, Trivago, etc

Kafka Streamsのオペレーションはすべてノンブロッキングであり、 レコードを受領次第処理し、KTableにマテリアライズされたり、Topicに書き戻したりされる。

Window集計の例を題材に、retention timeを利用。 retention timeを長くするほどストレージを利用するが、 out-of-orderレコードに対してロバストになる。 また、retention timeが長いほど、ウィンドウ結果が確定したと言えるタイミングが遅くなる。

6 RELATED WORK

Relationを取り扱うモデル、out-of-orderを取り扱うモデル、テーブルバージョン管理を取り扱うモデルなど。

共有

Handle pictures in delta lake and hudi

参考

Delta Lake

まずはjpgをSparkで読み込む

予め、Delta Lakeを読み込みながら起動する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.11:0.6.0

Spark公式ドキュメントのImage data source を利用し、jpgをDataFrameとして読み込んでみる。 データは、 Kaggleのmnistデータ を利用。このデータはjpgになっているので便利。

1
2
3
scala> val home = sys.env("HOME")
scala> val imageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/0"
scala> val df = spark.read.format("image").option("dropInvalid", true).load(home + "/" + imageDir)

データをDataFrameにする定義ができた。 内容は以下の通り。

1
2
3
4
5
6
7
8
scala> df.select("image.origin", "image.width", "image.height").show(3, truncate=false)
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_20188.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_12634.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_26812.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+

公式ドキュメントのとおりだが、カラムは以下の通り。

  • origin: StringType (represents the file path of the image)
  • height: IntegerType (height of the image)
  • width: IntegerType (width of the image)
  • nChannels: IntegerType (number of image channels)
  • mode: IntegerType (OpenCV-compatible type)
  • data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
1
2
3
4
5
6
7
8
9
scala> df.printSchema
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)

Delta Lakeで扱う

書き込み

これをDelta LakeのData Sourceで書き出してみる。

1
2
scala> val deltaImagePath = "/tmp/delta-lake-image"
scala> df.write.format("delta").save(deltaImagePath)

読み出し

できた。試しにDeltaTableとして読み出してみる。

1
2
3
scala> import io.delta.tables._
scala> import org.apache.spark.sql.functions._
scala> val deltaTable = DeltaTable.forPath(deltaImagePath)

更新

mergeを試す。 マージのためにキーを明示的に与えつつデータを準備する。

1
2
3
4
5
scala> val targetDf = df.select($"image.origin", $"image")
scala> val targetImagePath = "/tmp/delta-table"
scala> targetDf.write.format("delta").save(targetImagePath)
scala> val sourcePath = "Downloads/mnist_jpg/trainingSet/trainingSet/1"
scala> val sourceDf = spark.read.format("image").option("dropInvalid", true).load(home + "/" + sourcePath).select($"image.origin", $"image")

Delta Tableを定義。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val deltaTable = DeltaTable.forPath(targetImagePath)
scala> deltaTable
.as("target")
.merge(
sourceDf.as("source"),
"target.origin = source.origin")
.whenMatched
.updateExpr(Map(
"image" -> "source.image"))
.whenNotMatched
.insertExpr(Map(
"origin" -> "source.origin",
"image" -> "source.image"))
.execute()

実際に、追加データが入っていることが確かめられる。

1
2
3
4
5
6
7
8
9
scala> deltaTable.toDF.where($"origin" contains "trainingSet/1").select("image.origin", "image.width", "image.height").show(3, truncate=false)
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10266.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10665.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10772.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+
only showing top 3 rows

SparkのImage Data Sourceは、メタデータと画像本体(バイナリ)を構造で扱っているだけなので、 実は特に工夫することなく扱えた。

公式ドキュメントによると、OpenCV形式バイナリで保持されている。 これから先は、Python使って処理したいので、改めて開き直す。

Parquetの内容を確認する

parquet-mrparquet-tools を利用して、Parquetの内容を確認する。

なお、Hudi v0.5.2におけるParquetのバージョンは以下の通り、1.10.1である。

pom.xml:84

1
<parquet.version>1.10.1</parquet.version>

当該バージョンに対応するタグをチェックアウトし、予めparquet-toolsをパッケージしておくこと。

スキーマ

1
2
3
4
5
6
7
8
9
10
11
$ java -jar target/parquet-tools-1.10.1.jar schema /tmp/delta-lake-image/part-00000-c3d03d70-1785-435f-b055-b2a78903e732-c000.snappy.parquet
message spark_schema {
optional group image {
optional binary origin (UTF8);
optional int32 height;
optional int32 width;
optional int32 nChannels;
optional int32 mode;
optional binary data;
}
}

シンプルにParquet内に保持されていることがわかる。 Spark SQLのParquet取扱機能を利用している。

メタデータ

ファイル名やクリエイタなど。

1
2
3
file:        file:/tmp/delta-lake-image/part-00000-c3d03d70-1785-435f-b055-b2a78903e732-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"image","type":{"type":"struct","fields":[{"name":"origin","type":"string","nullable":true,"metadata":{}},{"name":"height","type":"integer","nullable":true,"metadata":{}},{"name":"width","type":"integer","nullable":true,"metadata":{}},{"name":"nChannels","type":"integer","nullable":true,"metadata":{}},{"name":"mode","type":"integer","nullable":true,"metadata":{}},{"name":"data","type":"binary","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

ファイルスキーマ

1
2
3
4
5
6
7
8
9
file schema: spark_schema
--------------------------------------------------------------------------------
image: OPTIONAL F:6
.origin: OPTIONAL BINARY O:UTF8 R:0 D:2
.height: OPTIONAL INT32 R:0 D:2
.width: OPTIONAL INT32 R:0 D:2
.nChannels: OPTIONAL INT32 R:0 D:2
.mode: OPTIONAL INT32 R:0 D:2
.data: OPTIONAL BINARY R:0 D:2

Rowグループ

1
2
3
4
5
6
7
8
9
10
11
row group 1: RC:32 TS:29939 OFFSET:4
--------------------------------------------------------------------------------
image:
.origin: BINARY SNAPPY DO:0 FPO:4 SZ:620/2838/4.58 VC:32 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_10203.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9098.jpg, num_nulls: 0]
.height: INT32 SNAPPY DO:0 FPO:624 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.width: INT32 SNAPPY DO:0 FPO:698 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.nChannels: INT32 SNAPPY DO:0 FPO:772 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
.mode: INT32 SNAPPY DO:0 FPO:846 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
.data: BINARY SNAPPY DO:0 FPO:920 SZ:21175/26821/1.27 VC:32 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 0x00

(snip)

Pythonで画像処理してみる

とりあえずOpenCVをインストールしたPython環境のJupyterでPySparkを起動。

1
$ PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" /opt/spark/default/bin/pyspark --packages io.delta:delta-core_2.11:0.6.0 --conf spark.pyspark.driver.python=$HOME/venv/jupyter/bin/jupyter

この後はJupyter内で処理する。

1
2
3
4
5
from delta.tables import *
from pyspark.sql.functions import *
delta_image_path = "/tmp/delta-lake-image"
deltaTable = DeltaTable.forPath(spark, delta_image_path)
deltaTable.toDF().select("image.origin", "image.width", "image.height").show(3, truncate=False)

とりあえず読み込めたことがわかる。

1
2
3
4
5
6
7
8
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_20188.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_12634.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_26812.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+
only showing top 3 rows

中のデータを利用できることを確かめるため、1件だけ取り出して処理してみる。

1
img = deltaTable.toDF().where("image.origin == 'file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg'").select("image.origin", "image.width", "image.height", "image.nChannels", "image.data").collect()[0]

とりあえずエッジ抽出をしてみる。

1
2
3
4
5
6
7
import numpy as np
import cv2
from matplotlib import pyplot as plt

nparr = np.frombuffer(img.data, np.uint8).reshape(img.height, img.width, img.nChannels)
edges = cv2.Canny(nparr,100,200)
plt.imshow(edges)

Jupyter上でエッジ抽出されたことが確かめられただろうか。

エッジ抽出された様

これをSparkでUDFで実行するようにする。 (とりあえず雑にライブラリをインポート…)

1
2
3
4
5
6
7
8
9
10
11
def get_edge(data, h, w, c):
import numpy as np
import cv2
nparr = np.frombuffer(data, np.uint8).reshape(h, w, c)
edge = cv2.Canny(nparr,100,200)
return edge.tobytes()

from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType

get_edge_udf = udf(get_edge, BinaryType())

エッジ抽出されたndarrayをバイト列に変換し、Spark上ではBinaryTypeで扱うように指定した。

1
2
3
with_edge = deltaTable.toDF().select("image.origin", "image.width", "image.height", "image.nChannels", get_edge_udf("image.data", "image.height", "image.width", "image.nChannels").alias("edge"), "image.data")

with_edge.show(3)
1
2
3
4
5
6
7
8
+--------------------+-----+------+---------+--------------------+--------------------+
| origin|width|height|nChannels| edge| data|
+--------------------+-----+------+---------+--------------------+--------------------+
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[05 00 04 08 00 0...|
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[05 00 0B 00 00 0...|
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[06 00 02 00 02 0...|
+--------------------+-----+------+---------+--------------------+--------------------+
only showing top 3 rows

こんな感じで、OpenCVを用いた処理をDataFrameに対して実行できる。

なお、当然ながら処理されたデータを取り出せば、また画像として表示も可能。

1
2
3
new_img = with_edge.where("origin == 'file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg'").select("origin", "width", "height", "nChannels", "data", "edge").collect()[0]
new_nparr = np.frombuffer(new_img.edge, np.uint8).reshape(new_img.height, new_img.width)
plt.imshow(new_nparr)

Hudi

画像読み込み

まずはjpgをSparkで読み込む と同様に データを読み込む。

Hudiのquick-start-guide を参考にしながら、まずはHudiを読み込みながら、シェルを起動する。

1
2
$ /opt/spark/default/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

ライブラリをインポート。

1
2
3
4
scala> import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._

画像データを読み込み

1
2
3
4
5
scala> val tableName = "hudi_images"
scala> val basePath = "file:///tmp/hudi_images"
scala> val home = sys.env("HOME")
scala> val imageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/*"
scala> val df = spark.read.format("image").option("dropInvalid", true).load(home + "/" + imageDir)

今回は、Hudiにおけるパーティション構成を確認するため、上記のようにワイルドカード指定した。

書き込み

Hudi書き込み用にデータを変換。パーティション用キーなどを定義する。

1
2
3
4
5
scala> import org.apache.spark.sql.functions.{col, udf}
scala> val partitionPath = udf((str: String) => {
str.split("/").takeRight(2).head
})
scala> val hudiDf = df.select($"image.origin", partitionPath($"image.origin") as "partitionpath", $"*")

書き込み。

1
2
3
4
5
6
7
scala> hudiDf.write.format("hudi").
option(TABLE_NAME, tableName).
option(PRECOMBINE_FIELD_OPT_KEY, "origin").
option(RECORDKEY_FIELD_OPT_KEY, "origin").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
mode(Overwrite).
save(basePath)

書き込まれたデータは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ ls -R /tmp/hudi_images/
/tmp/hudi_images/:
0 1 2 3 4 5 6 7 8 9

/tmp/hudi_images/0:
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-29-14074_20200517135104.parquet

/tmp/hudi_images/1:
4912032f-3365-4852-b2ae-91ffb5ea9806-0_1-29-14075_20200517135104.parquet

/tmp/hudi_images/2:
0636bb93-dd32-40f1-b5c8-328130386528-0_2-29-14076_20200517135104.parquet

/tmp/hudi_images/3:
6f0154cd-4a9f-4001-935b-47d067416797-0_3-29-14077_20200517135104.parquet

/tmp/hudi_images/4:
b6f29c2c-7df0-481a-8149-2bbc93e92e2c-0_4-29-14078_20200517135104.parquet

/tmp/hudi_images/5:
24115172-10db-4c85-808a-bf4048e0f533-0_5-29-14079_20200517135104.parquet

/tmp/hudi_images/6:
64823f4b-26fb-4679-87e8-cd81d01b1181-0_6-29-14080_20200517135104.parquet

/tmp/hudi_images/7:
d0c097e3-9ab7-477a-b591-593c6cb7880f-0_7-29-14081_20200517135104.parquet

/tmp/hudi_images/8:
13a48b38-cbfb-4076-957d-9c580765bfca-0_8-29-14082_20200517135104.parquet

/tmp/hudi_images/9:
5d3ba9ae-8ede-410f-95f3-e8d69e8c0478-0_9-29-14083_20200517135104.parquet

メタデータは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ ls -1 /tmp/hudi_images/.hoodie/
20200517135104.clean
20200517135104.clean.inflight
20200517135104.clean.requested
20200517135104.commit
20200517135104.commit.requested
20200517135104.inflight
20200517140006.clean
20200517140006.clean.inflight
20200517140006.clean.requested
20200517140006.commit
20200517140006.commit.requested
20200517140006.inflight
archived
hoodie.properties

読み込み

読み込んで見る。

1
2
3
4
scala> val hudiImageDf = spark.
read.
format("hudi").
load(basePath + "/*")

スキーマは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> hudiImageDf.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- origin: string (nullable = true)
|-- partitionpath: string (nullable = true)
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)

すべてのただの書き込みAPIを試しただけだが、想定通り書き込み・読み込みできている。

更新

Hudiといえば、テーブルを更新可能であることも特徴のひとつなので、試しに更新をしてみる。 元は同じデータであるが、「0」の画像データを新しいデータとしてDataFrameを定義し、更新する。 意図的に、一部のデータだけ更新する。

1
2
3
4
5
6
7
8
9
10
scala> val updateImageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/0"
scala> val updateDf = spark.read.format("image").option("dropInvalid", true).load(home + "/" + updateImageDir)
scala> val updateHudiDf = updateDf.select($"image.origin", partitionPath($"image.origin") as "partitionpath", $"*")
scala> updateHudiDf.write.format("hudi").
option(TABLE_NAME, tableName).
option(PRECOMBINE_FIELD_OPT_KEY, "origin").
option(RECORDKEY_FIELD_OPT_KEY, "origin").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
mode(Append).
save(basePath)

以下のように、「0」のデータが更新されていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
$ ls -R /tmp/hudi_images/
/tmp/hudi_images/:
0 1 2 3 4 5 6 7 8 9

/tmp/hudi_images/0:
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-29-14074_20200517135104.parquet
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_.parquet

/tmp/hudi_images/1:
4912032f-3365-4852-b2ae-91ffb5ea9806-0_1-29-14075_20200517135104.parquet

(snip)

つづいて、更新されたことを確かめるため、改めて読み込みDataFrameを定義し、

1
2
3
4
scala> val updatedHudiImageDf = spark.
read.
format("hudi").
load(basePath + "/*")

試しに、差分がわかるように「1」と「0」のデータをそれぞれ読んで見る。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala> updatedHudiImageDf.select($"_hoodie_commit_time", $"_hoodie_commit_seqno", $"_hoodie_partition_path").filter("partitionpath == 1").show(3)
+-------------------+--------------------+----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_partition_path|
+-------------------+--------------------+----------------------+
| 20200517135104| 20200517135104_1_1| 1|
| 20200517135104| 20200517135104_1_12| 1|
| 20200517135104| 20200517135104_1_45| 1|
+-------------------+--------------------+----------------------+
only showing top 3 rows


scala> updatedHudiImageDf.select($"_hoodie_commit_time", $"_hoodie_commit_seqno", $"_hoodie_partition_path").filter("partitionpath == 0").show(3)
+-------------------+--------------------+----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_partition_path|
+-------------------+--------------------+----------------------+
| 20200517140006|20200517140006_0_...| 0|
| 20200517140006|20200517140006_0_...| 0|
| 20200517140006|20200517140006_0_...| 0|
+-------------------+--------------------+----------------------+
only showing top 3 rows

上記のように、更新されたことがわかる。最新の更新日時は、 2020/05/17 14:00:06UTC である。

ここで読み込まれたデータも、Delta Lake同様に画像を元にしたベクトルデータとして扱える。 Pythonであれば ndarray に変換して用いれば良い。

Parquetの内容を確認する

parquet-mrparquet-tools を利用して、Parquetの内容を確認する。

なお、Hudi v0.5.2におけるParquetのバージョンは以下の通り、1.10.1である。

pom.xml:84

1
<parquet.version>1.10.1</parquet.version>

当該バージョンに対応するタグをチェックアウトし、予めparquet-toolsをパッケージしておくこと。

スキーマ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ java -jar target/parquet-tools-1.10.1.jar schema /tmp/hudi_images/0/86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet
message hoodie.hudi_images.hudi_images_record {
optional binary _hoodie_commit_time (UTF8);
optional binary _hoodie_commit_seqno (UTF8);
optional binary _hoodie_record_key (UTF8);
optional binary _hoodie_partition_path (UTF8);
optional binary _hoodie_file_name (UTF8);
optional binary origin (UTF8);
optional binary partitionpath (UTF8);
optional group image {
optional binary origin (UTF8);
optional int32 height;
optional int32 width;
optional int32 nChannels;
optional int32 mode;
optional binary data;
}
}

Hudiが独自に付加したカラムが追加されていることが見て取れる。

メタデータ

つづいて、メタデータを確認する。

ファイル名やクリエイタなど。

1
2
file:                   file:/tmp/hudi_images/0/86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)

ブルームフィルタはここでは省略

1
2
3
extra:                  org.apache.hudi.bloomfilter = ///

(snip)

レコードキーやAvroスキーマ

1
2
3
4
extra:                  hoodie_min_record_key = file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg
extra: parquet.avro.schema = {"type":"record","name":"hudi_images_record","namespace":"hoodie.hudi_images","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"origin","type":["string","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"image","type":[{"type":"record","name":"image","namespace":"hoodie.hudi_images.hudi_images_record","fields":[{"name":"origin","type":["string","null"]},{"name":"height","type":["int","null"]},{"name":"width","type":["int","null"]},{"name":"nChannels","type":["int","null"]},{"name":"mode","type":["int","null"]},{"name":"data","type":["bytes","null"]}]},"null"]}]}
extra: writer.model.name = avro
extra: hoodie_max_record_key = file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg

上記の通り、Hudiでは parquet-avro を利用し、Avro形式でParquet内のデータを保持する。

スキーマ詳細

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
file schema:            hoodie.hudi_images.hudi_images_record
--------------------------------------------------------------------------------
_hoodie_commit_time: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_commit_seqno: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_record_key: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_partition_path: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
origin: OPTIONAL BINARY O:UTF8 R:0 D:1
partitionpath: OPTIONAL BINARY O:UTF8 R:0 D:1
image: OPTIONAL F:6
.origin: OPTIONAL BINARY O:UTF8 R:0 D:2
.height: OPTIONAL INT32 R:0 D:2
.width: OPTIONAL INT32 R:0 D:2
.nChannels: OPTIONAL INT32 R:0 D:2
.mode: OPTIONAL INT32 R:0 D:2
.data: OPTIONAL BINARY R:0 D:2

Rowグループ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
row group 1:            RC:4132 TS:4397030 OFFSET:4
--------------------------------------------------------------------------------
_hoodie_commit_time: BINARY GZIP DO:0 FPO:4 SZ:165/127/0.77 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 20200517140006, max: 20200517140006, num_nulls: 0]
_hoodie_commit_seqno: BINARY GZIP DO:0 FPO:169 SZ:10497/107513/10.24 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 20200517140006_0_42001, max: 20200517140006_0_46132, num_nulls: 0]
_hoodie_record_key: BINARY GZIP DO:0 FPO:10666 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
_hoodie_partition_path: BINARY GZIP DO:0 FPO:26866 SZ:100/62/0.62 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
_hoodie_file_name: BINARY GZIP DO:0 FPO:26966 SZ:448/419/0.94 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet, max: 86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet, num_nulls: 0]
origin: BINARY GZIP DO:0 FPO:27414 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
partitionpath: BINARY GZIP DO:0 FPO:43614 SZ:100/62/0.62 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
image:
.origin: BINARY GZIP DO:0 FPO:43714 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
.height: INT32 GZIP DO:0 FPO:59914 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.width: INT32 GZIP DO:0 FPO:60025 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.nChannels: INT32 GZIP DO:0 FPO:60136 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
.mode: INT32 GZIP DO:0 FPO:60247 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
.data: BINARY GZIP DO:0 FPO:60358 SZ:1609341/3262447/2.03 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 0x00

(snip)
共有

Research about BigTop

参考

メモ

Trunkのビルド方法

How to build Bigtop-trunk を見ると、現在ではDocker上でビルドする方法が推奨されているようだ。 ただ、

1
# ./gradlew spark-pkg-ind

で実行されるとき、都度Mavenキャッシュのない状態から開始される。 これが待ち時間長いので工夫が必要。

なお、上記コマンドで実行されるタスクは、以下の通り。

packages.gradle:629

1
2
3
4
5
6
7
8
  task "$target-pkg-ind" (
description: "Invoking a native binary packaging for $target in Docker. Usage: \$ ./gradlew " +
"-POS=[centos-7|fedora-26|debian-9|ubuntu-16.04|opensuse-42.3] " +
"-Pprefix=[trunk|1.2.1|1.2.0|1.1.0|...] $target-pkg-ind " +
"-Pnexus=[true|false]",
group: PACKAGES_GROUP) doLast {
def _prefix = project.hasProperty("prefix") ? prefix : "trunk"
(snip)

bigtop-ci/build.sh がタスク内で実行されるスクリプトである。 この中でDockerが呼び出されてビルドが実行される。

これをいじれば、一応ビルド時に使ったDockerコンテナを残すことができそうではある。

-> reuse_images というブランチに、コンテナンをリユースする機能をつけた。

デプロイ周りを読み解いてみる

Deployment and Integration Testing を見る限り、 Dockerベースのデプロイ方法Puppetベースのデプロイ方法 があるように見える。

Dockerベースのデプロイを試す

Dockerベースのデプロイ方法 に従って、Dockerベースでデプロイしてみる。 ソースコードは20200427時点でのmasterを利用。

Ubuntu16でDocker、Java、Ruby環境を整え、以下を実行した。

1
$ sudo ./gradlew -Pconfig=config_ubuntu-16.04.yaml -Pnum_instances=1 docker-provisioner

なお、20200427時点で Dockerベースのデプロイ方法 ではコンフィグファイルが config_ubuntu_xenial.yaml となっていたが、 BIGTOP-2814 の際に変更されたのに合わせた。

プロビジョニング完了後、コンテナを確認し、接続。

1
2
3
4
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7311ae86181a bigtop/puppet:trunk-ubuntu-16.04 "/sbin/init" 4 minutes ago Up 4 minutes 20200426_151302_r30103_bigtop_1
$ sudo docker exec -it 20200426_151302_r30103_bigtop_1 bash

以下のように hdfs コマンドを実行できることがわかる。

1
2
3
4
5
6
7
8
9
root@7311ae86181a:/# hdfs dfs -ls /
Found 7 items
drwxr-xr-x - hdfs hadoop 0 2020-04-26 15:15 /apps
drwxrwxrwx - hdfs hadoop 0 2020-04-26 15:15 /benchmarks
drwxr-xr-x - hbase hbase 0 2020-04-26 15:15 /hbase
drwxr-xr-x - solr solr 0 2020-04-26 15:15 /solr
drwxrwxrwt - hdfs hadoop 0 2020-04-26 15:15 /tmp
drwxr-xr-x - hdfs hadoop 0 2020-04-26 15:15 /user
drwxr-xr-x - hdfs hadoop 0 2020-04-26 15:15 /var

なお、 -Pnum_instances=1 の値を3に変更すると、コンテナが3個立ち上がる。

1
$ sudo ./gradlew -Pconfig=config_ubuntu-16.04.yaml -Pnum_instances=3 docker-provisioner

試しに、dfsadmin を実行してみる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
root@53211353deec:/# sudo -u hdfs hdfs dfsadmin -report
Configured Capacity: 374316318720 (348.61 GB)
Present Capacity: 345302486224 (321.59 GB)
DFS Remaining: 345300606976 (321.59 GB)
DFS Used: 1879248 (1.79 MB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (3):

Name: 172.17.0.2:50010 (53211353deec.bigtop.apache.org)
Hostname: 53211353deec.bigtop.apache.org
Decommission Status : Normal
Configured Capacity: 124772106240 (116.20 GB)
DFS Used: 626416 (611.73 KB)
Non DFS Used: 9637679376 (8.98 GB)
DFS Remaining: 115100246016 (107.20 GB)
DFS Used%: 0.00%
DFS Remaining%: 92.25%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Sun Apr 26 15:33:12 UTC 2020

(snip)

また、 --stack オプションを利用し、デプロイするコンポーネントを指定できる。 ここではHadoopに加え、Sparkをプロビジョニングしてみる。

1
$ sudo ./gradlew -Pconfig=config_ubuntu-16.04.yaml -Pstack=hdfs,yarn,spark -Pnum_instances=1 docker-provisioner

コンテナに接続し、Sparkを動かす。

1
2
3
4
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
56e1ff5670be bigtop/puppet:trunk-ubuntu-16.04 "/sbin/init" 3 minutes ago Up 3 minutes 20200426_155010_r21667_bigtop_1
$ sudo docker exec -it 20200426_155010_r21667_bigtop_1 bash
1
2
3
root@56e1ff5670be:/# spark-shell
scala> spark.sparkContext.master
res1: String = yarn

上記のように、マスタ=YARNで起動していることがわかる。 参考までに、 spark-env.sh は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
root@56e1ff5670be:/# cat /etc/spark/conf/spark-env.sh
export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark}
export SPARK_LOG_DIR=${SPARK_LOG_DIR:-/var/log/spark}
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=NONE"
export HADOOP_HOME=${HADOOP_HOME:-/usr/lib/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}
export HIVE_CONF_DIR=${HIVE_CONF_DIR:-/etc/hive/conf}

export STANDALONE_SPARK_MASTER_HOST=56e1ff5670be.bigtop.apache.org
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST
export SPARK_MASTER_URL=yarn
export SPARK_MASTER_WEBUI_PORT=8080

export SPARK_WORKER_DIR=${SPARK_WORKER_DIR:-/var/run/spark/work}
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8081

export SPARK_DIST_CLASSPATH=$(hadoop classpath)

なお、構築したクラスタを破棄する際には以下の通り。(公式ドキュメントの通り)

1
$ sudo ./gradlew docker-provisioner-destroy

仕様確認

上記のプロビジョナのタスクは、 build.gradle 内にある。

build.gradle:263

1
2
3
task "docker-provisioner"(type:Exec,

(snip)

上記の中で、 ./docker-hadoop.sh が用いられている。

build.gradle:296

1
2
3
4
5
def command = [
'./docker-hadoop.sh',
'-C', _config,
'--create', _num_instances,
]

--create オプションが用いられているので、 create 関数が呼ばれる。

provisioner/docker/docker-hadoop.sh:387

1
2
3
if [ "$READY_TO_LAUNCH" = true ]; then
create $NUM_INSTANCES
fi

create 内では、以下のように docker-compose が用いられている。

provisioner/docker/docker-hadoop.sh:78

1
docker-compose -p $PROVISION_ID up -d --scale bigtop=$1 --no-recreate

また、コンポーネントの内容に応じて、Puppetマニフェスト(正確には、hieraファイル)が生成されるようになっている。

provisioner/docker/docker-hadoop.sh:101

1
generate-config "$hadoop_head_node" "$repo" "$components"

また、最終的には、 provision 関数、さらに bigtop-puppet 関数を通じ、各コンテナ内でpuppetが実行されるようになっている。

1
2
3
4
5
6
bigtop-puppet() {
if docker exec $1 bash -c "puppet --version" | grep ^3 >/dev/null ; then
future="--parser future"
fi
docker exec $1 bash -c "puppet apply --detailed-exitcodes $future --modulepath=/bigtop-home/bigtop-deploy/puppet/modules:/etc/puppet/modules:/usr/share/puppet/modules /bigtop-home/bigtop-deploy/puppet/manifests"
}

Ambariのモジュールを確認する。

bigtop-deploy/puppet/modules/ambari/manifests/init.pp にAmbariデプロイ用のモジュールがある。

内容は短い。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class ambari {

class deploy ($roles) {
if ("ambari-server" in $roles) {
include ambari::server
}

if ("ambari-agent" in $roles) {
include ambari::agent
}
}

class server {
package { "ambari-server":
ensure => latest,
}

exec {
"mpack install":
command => "/bin/bash -c 'echo yes | /usr/sbin/ambari-server install-mpack --purge --verbose --mpack=/var/lib/ambari-server/resources/odpi-ambari-mpack-1.0.0.0-SNAPSHOT.tar.gz'",
require => [ Package["ambari-server"] ]
}

exec {
"server setup":
command => "/usr/sbin/ambari-server setup -j $(readlink -f /usr/bin/java | sed 's@jre/bin/java@@') -s",
require => [ Package["ambari-server"], Package["jdk"], Exec["mpack install"] ]
}

service { "ambari-server":
ensure => running,
require => [ Package["ambari-server"], Exec["server setup"] ],
hasrestart => true,
hasstatus => true,
}
}

class agent($server_host = "localhost") {
package { "ambari-agent":
ensure => latest,
}

file {
"/etc/ambari-agent/conf/ambari-agent.ini":
content => template('ambari/ambari-agent.ini'),
require => [Package["ambari-agent"]],
}

service { "ambari-agent":
ensure => running,
require => [ Package["ambari-agent"], File["/etc/ambari-agent/conf/ambari-agent.ini"] ],
hasrestart => true,
hasstatus => true,
}
}
}

なお、上記の通り、サーバとエージェントそれぞれのマニフェストが存在する。 ODPiのMpackをインストールしているのが特徴。

逆にいうと、それをインストールしないと使えるVersionが存在しない。(ベンダ製のVDFを読み込めば使えるが) また、ODPiのMpackをインストールした上でクラスタを構成しようとしたところ、 ODPiのレポジトリが見当たらなかった。 プライベートレポジトリを立てる必要があるのだろうか。

いったん、公式のAmbariをインストールした上で動作確認することにする。

ホットIssue

BIGTOP-3123 が1.5リリース向けのIssue

共有

機械学習向けのFeature StoreないしStorage Layer Software

参考

プロダクト

企業アーキテクチャ

まとめ

メモ

傾向

  • Google Big Query、Big Table、Redisあたりを特徴量置き場として使っている例が見られた。

Feature Storeとして挙げられている特徴・機能

主に、featuer storeとしての特徴

機能・分析補助

計算

性能

連係

rawデータストアを含めた特徴

  • 画像、動画、音声など非テキストデータとテキストデータの統合的な取り扱い

特徴量エンジニアリングの例

Hopsworks Feature Store The missing data layer in ML pipelines? に一例が載っていたのでついでに転記。

  • Converting categorical data into numeric data;
  • Normalizing data (to alleviate ill-conditioned optimization when features originate from different distributions);
  • One-hot-encoding/binarization;
  • Feature binning (e.g., convert continuous features into discrete);
  • Feature hashing (e.g., to reduce the memory footprint of one-hot-encoded features);
  • Computing polynomial features;
  • Representation learning (e.g., extract features using clustering, embeddings, or generative models);
  • Computing aggregate features (e.g., count, min, max, stdev).

feature storeにおける画像の取扱は?

feature storeのレベルになると行列化されているので、画像を特別なものとして扱わない? rawデータストア上では画像は画像として扱う。

Feastにおけるデータフロー概要

※Feastから幾つか図を引用。

Feast Bridging ML Models and Data に載っていたイメージ。

Feastのデータフローから引用

データオーナ側はストリームデータ(Kafka)、DWH(BigQuery)、File(BigQuery)が書かれている。 また真ん中にはApache Beamが書かれており、ストリームETLを経ながらデータがサービングシステムに渡されている。 データは基本的にはストリームとして扱うようだ。

また特徴量を取得するときは以下のようにする。

特徴量の取得

hopsworksにおけるfeature store

※Hopsworksから幾つか図を引用。

Hopsworksの公式ドキュメントのFeature Store に掲載されていたイメージは以下の通り。 Rawデータストアとは異なる位置づけ。

hopsworksでのfeature storeの位置づけ

Feastでも言われているが、データエンジニアとデータサイエンティストの間にあるもの、とされている。

データストアする部分の全体アーキテクチャ。

feature storeのアーキテクチャ
feature storeのレイヤ構成

複数のコンポーネントを組み合わせて、ひとつのfeature storeを構成しているようである。

ストレージ製品の動向

Netapp

Accelerated AI and deep learning pipelines across edge, core, and cloud では、

  • Create a smooth, secure flow of data for your AI workloads.
  • Unify AI compute and data silos across sites and regions.​
  • Your data, always available: right place, right time.

が挙げられている。 また、クラウド・オンプレ、エッジ・センタを統合する、というのが重要なアピールポイントに見えた。 詳しくは、 Edge to Core to Cloud Architecture for AI を読めばわかりそう。

Dell EMC

単独の技術というより、コンピューティングの工夫を含めてのソリューションのようにみえる。 ENTERPRISE MACHINE & DEEP LEARNING WITH INTELLIGENT STORAGE に思想が書いてありそう。まだ読んでいない。

共有

Hudi

参考

メモ

公式ドキュメント

載っている特徴は、以下の通り。

  • Upsert support with fast, pluggable indexing.
  • Atomically publish data with rollback support.
  • Snapshot isolation between writer & queries.
  • Savepoints for data recovery.
  • Manages file sizes, layout using statistics.
  • Async compaction of row & columnar data.
  • Timeline metadata to track lineage.

クイックスタートから確認(version 0.5.2前提)

Quick Start Guide を参考に進める。

公式ドキュメントではSpark2.4.4を利用しているが、ここでは2.4.5を利用する。

1
2
3
4
$ export SPARK_HOME=/opt/spark/default
$ ${SPARK_HOME}/bin/spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

必要なライブラリをインポート

1
2
3
4
5
6
7
8
9
10
scala> import org.apache.hudi.QuickstartUtils._
scala> import scala.collection.JavaConversions._
scala> import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._
scala>
scala> val tableName = "hudi_trips_cow"
scala> val basePath = "file:///tmp/hudi_trips_cow"
scala> val dataGen = new DataGenerator

ダミーデータには org.apache.hudi.QuickstartUtils.DataGenerator クラスを利用する。 以下の例では、 org.apache.hudi.QuickstartUtils.DataGenerator#generateInserts メソッドを利用しデータを生成するが、 どういうレコードが生成されるかは、 org.apache.hudi.QuickstartUtils.DataGenerator#generateRandomValue メソッドあたりを見るとわかる。

ダミーデータを生成し、Spark DataFrameに変換。

1
2
scala> val inserts = convertToStringList(dataGen.generateInserts(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

中身は以下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> df.show
+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid|
+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
| 0.4726905879569653|0.46157858450465483|driver-213| 0.754803407008858| 0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|0.0|28432dec-53eb-402...|
| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655| 43.4923811219014|americas/brazil/s...|rider-213|0.0|1bd3905e-a6c4-404...|
| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|americas/united_s...|rider-213|0.0|c9cc8f4b-acee-413...|
|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|americas/united_s...|rider-213|0.0|4be1c199-86dc-489...|
| 0.40613510977307| 0.5644092139040959|driver-213| 0.798706304941517|0.02698359227182834|17.851135255091155| asia/india/chennai|rider-213|0.0|83f4d3df-46c1-48a...|
| 0.8742041526408587| 0.7528268153249502|driver-213| 0.9197827128888302| 0.362464770874404|19.179139106643607|americas/united_s...|rider-213|0.0|cb8b392d-c9d0-445...|
| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|americas/united_s...|rider-213|0.0|66aaf87d-4786-4d0...|
| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|americas/brazil/s...|rider-213|0.0|c5a335f5-c57f-4f5...|
| 0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368| asia/india/chennai|rider-213|0.0|53026eda-28c4-4d8...|
|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|americas/united_s...|rider-213|0.0|cd42df54-5215-402...|
+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
1
2
3
4
5
6
7
8
scala> df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

なお、生成されたファイルは以下の通り。 PARTITIONPATH_FIELD_OPT_KEY で指定したカラムをパーティションキーとして用いていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ ls -R /tmp/hudi_trips_cow/
/tmp/hudi_trips_cow/:
americas asia

/tmp/hudi_trips_cow/americas:
brazil united_states

/tmp/hudi_trips_cow/americas/brazil:
sao_paulo

/tmp/hudi_trips_cow/americas/brazil/sao_paulo:
ae28c85a-38f0-487f-a42d-3a0babc9d321-0_0-21-25_20200329002247.parquet

/tmp/hudi_trips_cow/americas/united_states:
san_francisco

/tmp/hudi_trips_cow/americas/united_states/san_francisco:
849db286-1cbe-4a1f-b544-9939893e99f8-0_1-21-26_20200329002247.parquet

/tmp/hudi_trips_cow/asia:
india

/tmp/hudi_trips_cow/asia/india:
chennai

/tmp/hudi_trips_cow/asia/india/chennai:
2ebfbab0-4f8f-42db-b79e-1c0cbcc3cf39-0_2-21-27_20200329002247.parquet

保存したデータを読み出してみる。

1
2
3
4
5
scala> val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

中身は以下の通り。 元データに対し、Hudiのカラムが追加されていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> tripsSnapshotDF.show
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
|_hoodie_commit_time|_hoodie_commit_seqno| _hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| begin_lat| begin_lon| driver| end_lat| end_lon| fare| partitionpath| rider| ts| uuid|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+
| 20200329002247| 20200329002247_1_1|7695c291-8530-473...| americas/united_s...|849db286-1cbe-4a1...|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|americas/united_s...|rider-213|0.0|7695c291-8530-473...|
| 20200329002247| 20200329002247_1_3|2f06fcd2-8296-423...| americas/united_s...|849db286-1cbe-4a1...| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|americas/united_s...|rider-213|0.0|2f06fcd2-8296-423...|
| 20200329002247| 20200329002247_1_5|6ebc4028-9aae-420...| americas/united_s...|849db286-1cbe-4a1...| 0.8742041526408587| 0.7528268153249502|driver-213| 0.9197827128888302| 0.362464770874404|19.179139106643607|americas/united_s...|rider-213|0.0|6ebc4028-9aae-420...|
| 20200329002247| 20200329002247_1_6|8bf60390-ad41-4b0...| americas/united_s...|849db286-1cbe-4a1...|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|americas/united_s...|rider-213|0.0|8bf60390-ad41-4b0...|
| 20200329002247| 20200329002247_1_7|762e8cb2-8806-47d...| americas/united_s...|849db286-1cbe-4a1...| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|americas/united_s...|rider-213|0.0|762e8cb2-8806-47d...|
| 20200329002247| 20200329002247_0_8|28622337-d76b-442...| americas/brazil/s...|ae28c85a-38f0-487...| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655| 43.4923811219014|americas/brazil/s...|rider-213|0.0|28622337-d76b-442...|
| 20200329002247| 20200329002247_0_9|33aec15d-356f-475...| americas/brazil/s...|ae28c85a-38f0-487...| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|americas/brazil/s...|rider-213|0.0|33aec15d-356f-475...|
| 20200329002247| 20200329002247_0_10|2d71c9a3-26a3-40b...| americas/brazil/s...|ae28c85a-38f0-487...| 0.4726905879569653|0.46157858450465483|driver-213| 0.754803407008858| 0.9671159942018241|34.158284716382845|americas/brazil/s...|rider-213|0.0|2d71c9a3-26a3-40b...|
| 20200329002247| 20200329002247_2_2|a997a8f0-4ab6-4d5...| asia/india/chennai|2ebfbab0-4f8f-42d...| 0.40613510977307| 0.5644092139040959|driver-213| 0.798706304941517|0.02698359227182834|17.851135255091155| asia/india/chennai|rider-213|0.0|a997a8f0-4ab6-4d5...|
| 20200329002247| 20200329002247_2_4|271de424-a0f8-427...| asia/india/chennai|2ebfbab0-4f8f-42d...| 0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368| asia/india/chennai|rider-213|0.0|271de424-a0f8-427...|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+--------------------+---------+---+--------------------+

上記の通り、SparkのData Source機能を利用している。 中では、org.apache.hudi.DefaultSource#createRelation メソッドが用いられる。

つづいて、更新を試す。

1
2
3
4
5
6
7
8
9
10
scala> val updates = convertToStringList(dataGen.generateUpdates(10))
scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
scala> df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

もう一度、DataFrameとして読み出すと、レコードが追加されていることを確かめられる。(ここでは省略) この後の、 incremental クエリタイプの実験のため、上記の更新を幾度か実行しておく。

つづいて、 incremental クエリタイプで読み出す。

一度読み出し、最初のコミット時刻を取り出す。

1
2
3
4
5
6
7
8
scala> spark.
read.
format("hudi").
load(basePath + "/*/*/*/*").
createOrReplaceTempView("hudi_trips_snapshot")

scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
scala> val beginTime = commits(commits.length - 2) // commit time we are interested in

今回は、初回書き込みに加えて2回更新したので、 commits は以下の通り。

1
2
scala> commits
res12: Array[String] = Array(20200330002239, 20200330002354, 20200330003142)

また、今回「読み込みの最初」とするコミットは、以下の通り。 つまり、2回目の更新時。

1
2
scala> beginTime
res13: String = 20200330002354

では、 incremental クエリタイプで読み出す。

1
2
3
4
5
6
7
scala> val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

結果は以下のようなイメージ。

1
2
3
4
5
6
7
8
9
10
11
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+-------------------+------------------+--------------------+-------------------+---+
|_hoodie_commit_time| fare| begin_lon| begin_lat| ts|
+-------------------+------------------+--------------------+-------------------+---+
| 20200330003142| 87.68271062363665| 0.9273857651526887| 0.1620033132033215|0.0|
| 20200330003142| 40.44073446276323|9.842943407509797E-4|0.47631824594751015|0.0|
| 20200330003142| 45.39370966816483| 0.65888271115305| 0.8535610661589833|0.0|
| 20200330003142|47.332186591003044| 0.8006023508896579| 0.9025851737325563|0.0|
| 20200330003142| 93.34457064050349| 0.6331319396951335| 0.5375953886834237|0.0|
| 20200330003142|31.065524210209226| 0.7608842984578864| 0.9514417909802292|0.0|
+-------------------+------------------+--------------------+-------------------+---+

なお、ここでbeginTimeを1遡ることにすると…。

1
scala> val beginTime = commits(commits.length - 3) // commit time we are interested in

以下のように、2回目のコミットも含まれるようになる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
+-------------------+------------------+--------------------+-------------------+---+
|_hoodie_commit_time| fare| begin_lon| begin_lat| ts|
+-------------------+------------------+--------------------+-------------------+---+
| 20200330003142| 87.68271062363665| 0.9273857651526887| 0.1620033132033215|0.0|
| 20200330003142| 40.44073446276323|9.842943407509797E-4|0.47631824594751015|0.0|
| 20200330003142| 45.39370966816483| 0.65888271115305| 0.8535610661589833|0.0|
| 20200330003142|47.332186591003044| 0.8006023508896579| 0.9025851737325563|0.0|
| 20200330002354| 39.09858962414072| 0.08151154133724581|0.21729959707372848|0.0|
| 20200330003142| 93.34457064050349| 0.6331319396951335| 0.5375953886834237|0.0|
| 20200330002354| 80.87869643345753| 0.0748253615757305| 0.9787639413761751|0.0|
| 20200330003142|31.065524210209226| 0.7608842984578864| 0.9514417909802292|0.0|
| 20200330002354|21.602186045036387| 0.772134626462835| 0.3291184473506418|0.0|
| 20200330002354| 43.41497201940956| 0.6226833057042072| 0.5501675314928346|0.0|
| 20200330002354| 35.71294622426758| 0.6696123015022845| 0.7318572150654761|0.0|
| 20200330002354| 67.30906296028802| 0.16768228612130764|0.29666655980198253|0.0|
+-------------------+------------------+--------------------+-------------------+---+

org.apache.hudi.DefaultSource#createRelation(書き込み)

クイックスタートで、例えば更新などする際の動作を確認する。

1
2
3
4
5
6
7
8
scala> df.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)

のような例を実行する際、内部的には org.apache.hudi.DefaultSource#createRelation が呼ばれる。

org/apache/hudi/DefaultSource.scala:85

1
2
3
4
5
6
7
8
9
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {

val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
createRelation(sqlContext, parameters, df.schema)
}

上記メソッド内では、 org.apache.hudi.HoodieSparkSqlWriter$#write メソッドが呼ばれており、 これが書き込みの実態である。 なお、その下の org.apache.hudi.DefaultSource#createRelation は、読み込み時に呼ばれるものと同一。

ここでは org.apache.hudi.HoodieSparkSqlWriter#write メソッドを確認する。 当該メソッドの冒頭では、オペレーションの判定などいくつか前処理が行われた後、 以下の箇所から実際に書き出す処理が定義されている。

org/apache/hudi/HoodieSparkSqlWriter.scala:85

1
2
3
4
5
6
7
8
9
10
11
12
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)

(snip)

まず delete オペレーションかどうかで処理が別れるが、上記の例では upsert オペレーションなので一旦そのまま読み進める。 ネームスペース(データベースやテーブル?)を取得した後、SparkのStructTypeで保持されたスキーマ情報を、AvroのSchemaに変換する。 変換されたスキーマをSparkで登録する。

つづいて、DataFrameをRDDに変換する。

org/apache/hudi/HoodieSparkSqlWriter.scala:97

1
2
3
4
5
6
7
8
9
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val orderingVal = DataSourceUtils.getNestedFieldValAsString(
gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false).asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}).toJavaRDD()

RDDに一度変換した後、mapメソッドで加工する。

まず、 genericRecords の内容は以下のようなものが含まれる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
result = {GenericRecord[1]@27822} 
0 = {GenericData$Record@27827} "{"begin_lat": 0.09632451474505643, "begin_lon": 0.8989273848550128, "driver": "driver-164", "end_lat": 0.6431885917325862, "end_lon": 0.6664889106258252, "fare": 86.865568091804, "partitionpath": "americas/brazil/sao_paulo", "rider": "rider-164", "ts": 0.0, "uuid": "5d49cfb5-0db4-4172-bff4-e581eb1f9783"}"
schema = {Schema$RecordSchema@27835} "{"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"begin_lat","type":["double","null"]},{"name":"begin_lon","type":["double","null"]},{"name":"driver","type":["string","null"]},{"name":"end_lat","type":["double","null"]},{"name":"end_lon","type":["double","null"]},{"name":"fare","type":["double","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"rider","type":["string","null"]},{"name":"ts","type":["double","null"]},{"name":"uuid","type":["string","null"]}]}"
values = {Object[10]@27836}
0 = {Double@27838} 0.09632451474505643
1 = {Double@27839} 0.8989273848550128
2 = {Utf8@27840} "driver-164"
3 = {Double@27841} 0.6431885917325862
4 = {Double@27842} 0.6664889106258252
5 = {Double@27843} 86.865568091804
6 = {Utf8@27844} "americas/brazil/sao_paulo"
7 = {Utf8@27845} "rider-164"
8 = {Double@27846} 0.0
9 = {Utf8@27847} "5d49cfb5-0db4-4172-bff4-e581eb1f9783"

上記の通り、これは入ロクレコードそのものである。 その後、mapメソッドを使ってHudiで利用するキーを含む、Hudiのレコード形式に変換する。

変換された hoodieAllIncomingRecords は以下のような内容になる。

1
2
3
4
5
6
7
8
9
10
11
result = {Wrappers$SeqWrapper@27881}  size = 1
0 = {HoodieRecord@27883} "HoodieRecord{key=HoodieKey { recordKey=5d49cfb5-0db4-4172-bff4-e581eb1f9783 partitionPath=americas/brazil/sao_paulo}, currentLocation='null', newLocation='null'}"
key = {HoodieKey@27892} "HoodieKey { recordKey=5d49cfb5-0db4-4172-bff4-e581eb1f9783 partitionPath=americas/brazil/sao_paulo}"
recordKey = "5d49cfb5-0db4-4172-bff4-e581eb1f9783"
partitionPath = "americas/brazil/sao_paulo"
data = {OverwriteWithLatestAvroPayload@27893}
recordBytes = {byte[142]@27895}
orderingVal = "0.0"
currentLocation = null
newLocation = null
sealed = false

上記の例の通り、ペイロードは org.apache.hudi.common.model.OverwriteWithLatestAvroPayload で保持される。

その後、いくつかモードの確認が行われた後、もしテーブルがなければ org.apache.hudi.common.table.HoodieTableMetaClient#initTableType を用いて テーブルを初期化する。

その後、重複レコードを必要に応じて落とす。

org/apache/hudi/HoodieSparkSqlWriter.scala:132

1
2
3
4
5
6
7
8
9
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(
jsc,
hoodieAllIncomingRecords,
mapAsJavaMap(parameters), client.getTimelineServer)
} else {
hoodieAllIncomingRecords
}

レコードが空かどうかを改めて確認しつつ、 最後に書き込み実施。 org.apache.hudi.DataSourceUtils#doWriteOperation が実態である。

org/apache/hudi/HoodieSparkSqlWriter.scala:147

1
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)

org.apache.hudi.DataSourceUtils#doWriteOperation メソッドは以下の通り。

org/apache/hudi/DataSourceUtils.java:162

1
2
3
4
5
6
7
8
9
10
11
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String commitTime, String operation) {
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
return client.bulkInsert(hoodieRecords, commitTime);
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
return client.insert(hoodieRecords, commitTime);
} else {
// default is upsert
return client.upsert(hoodieRecords, commitTime);
}
}

今回の例だと、 upseart オペレーションなので org.apache.hudi.client.HoodieWriteClient#upsert メソッドが呼ばれる。 このメソッドは以下のとおりだが、ポイントは、 org.apache.hudi.client.HoodieWriteClient#upsertRecordsInternal メソッドである。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());

Timer.Context indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table);
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return upsertRecordsInternal(taggedRecords, commitTime, table, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + commitTime, e);
}
}

org.apache.hudi.client.HoodieWriteClient#upsertRecordsInternal メソッド内のポイントは、 以下の箇所。 org.apache.spark.api.java.AbstractJavaRDDLike#mapPartitionsWithIndex メソッドで、upsertやinsertの処理を定義している。

org/apache/hudi/client/HoodieWriteClient.java:470

1
2
3
4
5
6
7
8
9
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (isUpsert) {
return hoodieTable.handleUpsertPartition(commitTime, partition, recordItr, partitioner);
} else {
return hoodieTable.handleInsertPartition(commitTime, partition, recordItr, partitioner);
}
}, true).flatMap(List::iterator);

return updateIndexAndCommitIfNeeded(writeStatusRDD, hoodieTable, commitTime);

ここでは、 org.apache.hudi.table.HoodieCopyOnWriteTable#handleUpsertPartition メソッドを確認してみる。

org/apache/hudi/table/HoodieCopyOnWriteTable.java:253

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(commitTime, binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}

真ん中あたりに、INSERTかUPDATEかで条件分岐しているが、ここでは例としてINSERT側を確認する。 org.apache.hudi.table.HoodieCopyOnWriteTable#handleInsert メソッドがポイントとなる。 なお、当該メッソッドには同期的な実装と、非同期的な実装があるよう。 ここでは上記呼び出しに基づき、非同期的な実装の方を確認する。

org/apache/hudi/table/HoodieCopyOnWriteTable.java:233

1
2
3
4
5
6
7
8
9
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
}

戻り値が、 org.apache.hudi.execution.CopyOnWriteLazyInsertIterable クラスのインスタンスになっていることがわかる。 このイテレータは、 org.apache.hudi.client.utils.LazyIterableIterator アブストラクトクラスを継承している。 org.apache.hudi.client.utils.LazyIterableIterator では、nextメソッドが

org/apache/hudi/client/utils/LazyIterableIterator.java:116

1
2
3
4
5
6
7
8
@Override
public O next() {
try {
return computeNext();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

のように定義されており、実態が org.apache.hudi.client.utils.LazyIterableIterator#computeNext であることがわかる。 当該メソッドは、 org.apache.hudi.execution.CopyOnWriteLazyInsertIterable#CopyOnWriteLazyInsertIterable クラスではオーバーライドされており、 以下のように定義されている。

org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java:93

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (null != bufferedIteratorExecutor) {
bufferedIteratorExecutor.shutdownNow();
}
}
}

どうやら内部でFutureパターンを利用し、非同期化して書き込みを行っているようだ。(これが筋よしなのかどうかは要議論。update、つまりマージも同様になっている。) 処理内容を知る上でポイントとなるのは、

1
2
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema));

の箇所。 org.apache.hudi.execution.CopyOnWriteLazyInsertIterable#getInsertHandler あたり。 中で用いられている、 org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.CopyOnWriteInsertHandler クラスがポイントとなる。 このクラスは、書き込みデータのキュー(要確認)からレコードを受け取って、処理していると考えられる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
}

if (handle.canWrite(payload.record)) {
// write the payload, if the handle has capacity
handle.write(insertPayload, payload.insertValue, payload.exception);
} else {
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

下の方にある org.apache.hudi.io.HoodieCreateHandle クラスを用いているあたりがポイント。 そのwriteメソッドは以下の通り。 org.apache.hudi.io.storage.HoodieStorageWriter#writeAvroWithMetadata を用いて書き出しているように見える。 (実際には org.apache.hudi.io.storage.HoodieParquetWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
try {
if (avroRecord.isPresent()) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
record.seal();
recordsWritten++;
insertRecordsWritten++;
} else {
recordsDeleted++;
}
writeStatus.markSuccess(record, recordMetadata);
// deflate record payload after recording success. This will help users access payload as a
// part of marking
// record successful.
record.deflate();
} catch (Throwable t) {
// Not throwing exception from here, since we don't want to fail the entire job
// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record " + record, t);
}
}

org.apache.hudi.io.storage.HoodieParquetWriter#writeAvroWithMetadata メソッドは以下の通りである。 つまり、 org.apache.parquet.hadoop.ParquetWriter#write を用いてParquet内に、Avroレコードを書き出していることがわかる。

1
2
3
4
5
6
7
8
9
10
@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
String seqId =
HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(), recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(),
file.getName());
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, commitTime, seqId);
super.write(avroRecord);
writeSupport.add(record.getRecordKey());
}

今回のクイックスタートの例では、 avroRecord には以下のような内容が含まれていた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
result = {GenericData$Record@18566} "{"_hoodie_commit_time": "20200331002133", "_hoodie_commit_seqno": "20200331002133_0_44", "_hoodie_record_key": "7b887fb5-2837-4cac-b075-a8a8450f453d", "_hoodie_partition_path": "asia/india/chennai", "_hoodie_file_name": "317a54b0-70b8-4bdc-bfde-12ba4fde982b-0_0-207-301_20200331002133.parquet", "begin_lat": 0.4789745387904072, "begin_lon": 0.14781856144057215, "driver": "driver-022", "end_lat": 0.10509642405359532, "end_lon": 0.07682825311613706, "fare": 30.429177017810616, "partitionpath": "asia/india/chennai", "rider": "rider-022", "ts": 0.0, "uuid": "7b887fb5-2837-4cac-b075-a8a8450f453d"}"
schema = {Schema$RecordSchema@18582} "{"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"begin_lat","type":["double","null"]},{"name":"begin_lon","type":["double","null"]},{"name":"driver","type":["string","null"]},{"name":"end_lat","type":["double","null"]},{"name":"end_lon","type":["double","null"]},{"name":"fare","type":["double","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"rider","type":["string","null"]},{"name":"ts","type":["double","null"]},{"name":"uuid","type":["string","null"]}]}"
values = {Object[15]@18583}
0 = "20200331002133"
1 = "20200331002133_0_44"
2 = "7b887fb5-2837-4cac-b075-a8a8450f453d"
3 = "asia/india/chennai"
4 = "317a54b0-70b8-4bdc-bfde-12ba4fde982b-0_0-207-301_20200331002133.parquet"
5 = {Double@18596} 0.4789745387904072
6 = {Double@18597} 0.14781856144057215
7 = {Utf8@18598} "driver-022"
8 = {Double@18599} 0.10509642405359532
9 = {Double@18600} 0.07682825311613706
10 = {Double@18601} 30.429177017810616
11 = {Utf8@18602} "asia/india/chennai"
12 = {Utf8@18603} "rider-022"
13 = {Double@18604} 0.0
14 = {Utf8@18605} "7b887fb5-2837-4cac-b075-a8a8450f453d"

org.apache.hudi.DefaultSource#createRelation(読み込み)

当該メソッドのポイントを確認する。

hoodie.datasource.query.type の種類によって返すRelationが変わる。

org/apache/hudi/DefaultSource.scala:60

1
2
3
4
5
6
7
8
9
10
11
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {

(snip)

} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {

(snip)

} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
}

上記の通り、 snapshot 、もしくは incremental クエリタイプである。 なお、以下の通り、 MERGE_ON_READ テーブルに対する snapshot クエリタイプは利用できない。 もし使いたければ、SparkのData Source機能ではなく、Hiveテーブルとして読み込むこと。

org/apache/hudi/DefaultSource.scala:69

1
2
log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
"Please query the Hive table registered using Spark SQL.")

まずクエリタイプが snapshot である場合は、 以下の通り、Parquetとして読み込みが定義され、Relationが返される。

org/apache/hudi/DefaultSource.scala:72

1
2
3
4
5
6
DataSource.apply(
sparkSession = sqlContext.sparkSession,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = parameters)
.resolveRelation()

例えば、クイックスタートの例

1
2
3
4
5
scala> val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

では、こちらのタイプ。 ParquetベースのRelation(実際には、HadoopFsRelation)が返される。 上記の例では、当該RelationのrootPathsに、以下のような値が含まれる。

1
2
3
4
5
6
7
rootPaths = {$colon$colon@14885} "::" size = 6
0 = {Path@15421} "file:/tmp/hudi_trips_cow/americas/brazil/sao_paulo/ae28c85a-38f0-487f-a42d-3a0babc9d321-0_0-21-25_20200329002247.parquet"
1 = {Path@15422} "file:/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.hoodie_partition_metadata"
2 = {Path@15423} "file:/tmp/hudi_trips_cow/americas/united_states/san_francisco/849db286-1cbe-4a1f-b544-9939893e99f8-0_1-21-26_20200329002247.parquet"
3 = {Path@15424} "file:/tmp/hudi_trips_cow/americas/united_states/san_francisco/.hoodie_partition_metadata"
4 = {Path@15425} "file:/tmp/hudi_trips_cow/asia/india/chennai/2ebfbab0-4f8f-42db-b79e-1c0cbcc3cf39-0_2-21-27_20200329002247.parquet"
5 = {Path@15426} "file:/tmp/hudi_trips_cow/asia/india/chennai/.hoodie_partition_metadata"

次にクエリタイプが incremental である場合は、 以下の通り、 org.apache.hudi.IncrementalRelation#IncrementalRelation が返される。

org/apache/hudi/DefaultSource.scala:79

1
new IncrementalRelation(sqlContext, path.get, optParams, schema)

クイックスタートの例

1
2
3
4
5
6
7
scala> val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

では、 org.apache.hudi.IncrementalRelation#IncrementalRelation が戻り値として返される。

IncrementalRelation

コンストラクタ

Parquetをファイルを単純に読めば良いのと比べて、格納された最新データを返すようにしないとならないので それなりに複雑なRelationとなっている。

以下、簡単にコンストラクタのポイントを確認する。

最初にメタデータを取得するクライアント。 コミット、セーブポイント、コンパクションなどの情報を得られるようになる。

org/apache/hudi/IncrementalRelation.scala:51

1
val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)

クイックスタートの例では、 metaPath は、 file:/tmp/hudi_trips_cow/.hoodie だった。

続いてテーブル情報のインスタンスを取得する。 テーブル情報からタイムラインを取り出す。

org/apache/hudi/IncrementalRelation.scala:57

1
2
3
4
5
6
7
8
9
10
private val hoodieTable = HoodieTable.getHoodieTable(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
sqlContext.sparkContext)
val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) {
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
}

クイックスタートの例で実際に生成されたタイムラインは以下の通り。

1
2
3
4
instants = {ArrayList@25586}  size = 3
0 = {HoodieInstant@25589} "[20200330002239__commit__COMPLETED]"
1 = {HoodieInstant@25590} "[20200330002354__commit__COMPLETED]"
2 = {HoodieInstant@25591} "[20200330003142__commit__COMPLETED]"

オプションとして与えられた「はじめ」と「おわり」から、 対象となるタイムラインを構成する。 タイムライン上、最も新しいインスタンスを取得し、 Parquetファイルからスキーマを読んでいる。

org/apache/hudi/IncrementalRelation.scala:68

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val lastInstant = commitTimeline.lastInstant().get()

val commitsToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
.getInstants.iterator().toList

// use schema from a file produced in the latest instant
val latestSchema = {
// use last instant if instant range is empty
val instant = commitsToReturn.lastOption.getOrElse(lastInstant)
val latestMeta = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata])
val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next()
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))
}

クイックスタートの例では、 commitsToReturn は以下の通り。

1
2
3
4
5
result = {$colon$colon@25626} "::" size = 1
0 = {HoodieInstant@25591} "[20200330003142__commit__COMPLETED]"
state = {HoodieInstant$State@25602} "COMPLETED"
action = "commit"
timestamp = "20200330003142"

また、少々気になるのは、

1
2
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))

という箇所で、もともとParquet形式のデータからAvro形式のスキーマを取り出し、それをさらにSparkのStructTypeに変換しているところ。 実際にParquetのfooterから取り出したスキーマ情報を、AvroのSchemaに変換しているのは以下の箇所。

org/apache/hudi/common/util/ParquetUtils.java:140

1
2
3
public static Schema readAvroSchema(Configuration configuration, Path parquetFilePath) {
return new AvroSchemaConverter().convert(readSchema(configuration, parquetFilePath));
}
  • Parquet自身にAvroへの変換器 org.apache.parquet.avro.AvroSchemaConverter が備わっているので便利?
  • SparkのData Source機能でDataFrame化してからスキーマを取り出すと、一度読み込みが生じていしまうから非効率?

という理由が想像されるが、やや回りくどいような印象を持った。 ★要確認

本編に戻る。続いてフィルタを定義。

org/apache/hudi/IncrementalRelation.scala:86

1
2
3
4
5
6
7
8
9
10
val filters = {
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
val filterStr = optParams.getOrElse(
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
DataSourceReadOptions.DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL)
filterStr.split(",").filter(!_.isEmpty)
} else {
Array[String]()
}
}

ここまでがコンストラクタ。

buildScan

実際にSparkのData Sourceで読み込むときに用いられる読み込みの手段が定義されている。 以下にポイントを述べる。

org/apache/hudi/IncrementalRelation.scala:99

1
2
3
override def buildScan(): RDD[Row] = {

(snip)

ファイルIDとフルPATHのマップを作る。

1
2
3
4
5
6
val fileIdToFullPath = mutable.HashMap[String, String]()
for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])
fileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
}

上記マップに対し、必要に応じてフィルタを適用する。

org/apache/hudi/IncrementalRelation.scala:106

1
2
3
4
5
6
7
8
9
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
fileIdToFullPath.filter(p => globMatcher.matches(p._2))
} else {
fileIdToFullPath
}

コンストラクタで定義されたフィルタを適用しながら、 対象となるParquetファイルを読み込み、RDDを生成する。

org/apache/hudi/IncrementalRelation.scala:117

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
if (filteredFullPath.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
filters.foldLeft(sqlContext.read.options(sOpts)
.schema(latestSchema)
.parquet(filteredFullPath.values.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
.toDF().rdd
}

Hudiへの書き込み

Writing Hudi Tables をベースに調べる。

オペレーション種類

書き込みのオペレーション種類は、upsert、insert、bulk_insert。 クイックスタートにはbulk_insertはなかった。

DeltaStreamer

ユーティリティとして付属するDeltaStreamerを用いると、 Kafka等からデータを取り込める。 Avro等のスキーマのデータを読み取れる。

動作確認

パッケージ化

公式ドキュメントのData Streamer の手順に基づくと、 ビルドされたユーティリティを使うことになるので、 予めパッケージ化しておく。

1
2
3
4
5
6
$ mkdir -p ~/Sources
$ cd ~/Sources
$ git clone https://github.com/apache/incubator-hudi.git incubator-hudi-052
$ cd incubator-hudi-052
$ git checkout -b release-0.5.2-incubating refs/tags/release-0.5.2-incubating
$ mvn clean package -DskipTests -DskipITs

実行

公式ドキュメントのData Streamerに基づくと、Confluentメンバが作成した ( apurvam streams-prototyping )サンプルデータ作成用のAvroスキーマと Confluent PlatformのKSQLのユーティリティを 使ってサンプルデータを作成する。

ついては。予めConfluent Platformをインストールしておくこと。

まずはスキーマをダウンロードする。

1
$ curl https://raw.githubusercontent.com/apurvam/streams-prototyping/master/src/main/resources/impressions.avro > /tmp/impressions.avro

テストデータを生成する。

1
$ ksql-datagen schema=/tmp/impressions.avro format=avro topic=impressions key=impressionid

別の端末を開き、ユーティリティを起動する。

1
2
3
4
5
6
7
8
9
10
11
$ export SPARK_HOME=/opt/spark/default
$ cd ~/Sources/incubator-hudi-052
$ ${SPARK_HOME}/bin/spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--props file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--target-base-path file:\/\/\/tmp/hudi-deltastreamer-op \
--target-table uber.impressions \
--table-type COPY_ON_WRITE \
--op BULK_INSERT

なお、 公式ドキュメントのData Streamer から2箇所修正した。(JarファイルPATH、 --table-type オプション追加。

Kafkaから読み込んで書き出したデータ( /tmp/hudi-deltastreamer-op )を確認してみる。

1
2
3
$ ${SPARK_HOME}/bin/spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

シェルが起動したら、以下の通り読み込んで見る。 なお、ここでは userid がパーティションキーとなっているので、ロード時にそれを指定した。

1
2
3
4
5
scala> val basePath = "file:///tmp/hudi-deltastreamer-op"
scala> val impressionDF = spark.
read.
format("hudi").
load(basePath + "/*/*")

内容は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
scala> impressionDF.show
+-------------------+--------------------+------------------+----------------------+--------------------+---------------+--------------+-------+-----+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|impresssiontime| impressionid| userid| adid|
+-------------------+--------------------+------------------+----------------------+--------------------+---------------+--------------+-------+-----+
| 20200406002420|20200406002420_1_...| impression_106| user_83|fb381e12-f9ec-4fb...| 1586096500438|impression_106|user_83|ad_57|
| 20200406002420|20200406002420_1_...| impression_107| user_83|fb381e12-f9ec-4fb...| 1586096464324|impression_107|user_83|ad_11|
| 20200406002420|20200406002420_1_...| impression_111| user_83|fb381e12-f9ec-4fb...| 1586096366450|impression_111|user_83|ad_14|
| 20200406002420|20200406002420_1_...| impression_111| user_83|fb381e12-f9ec-4fb...| 1586099019181|impression_111|user_83|ad_38|
| 20200406002420|20200406002420_1_...| impression_116| user_83|fb381e12-f9ec-4fb...| 1586099146437|impression_116|user_83|ad_48|
| 20200406002420|20200406002420_1_...| impression_121| user_83|fb381e12-f9ec-4fb...| 1586098316334|impression_121|user_83|ad_26|

(snip)

実装確認

org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer クラスの実装を確認する。

まずmainは以下の通り。

org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:298

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}

Map<String, String> additionalSparkConfigs = SchedulerConfGenerator.getSparkSchedulingConfigs(cfg);
JavaSparkContext jssc =
UtilHelpers.buildSparkContext("delta-streamer-" + cfg.targetTableName, cfg.sparkMaster, additionalSparkConfigs);
try {
new HoodieDeltaStreamer(cfg, jssc).sync();
} finally {
jssc.stop();
}
}

上記の通り、 org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer#sync メソッドがエントリポイント。

org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:116

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void sync() throws Exception {
if (cfg.continuousMode) {
deltaSyncService.start(this::onDeltaSyncShutdown);
deltaSyncService.waitForShutdown();
LOG.info("Delta Sync shutting down");
} else {
LOG.info("Delta Streamer running only single round");
try {
deltaSyncService.getDeltaSync().syncOnce();
} catch (Exception ex) {
LOG.error("Got error running delta sync once. Shutting down", ex);
throw ex;
} finally {
deltaSyncService.close();
LOG.info("Shut down delta streamer");
}
}
}

上記の通り、 continous モードかどうかで動作が変わる。

ここでは一旦、ワンショットの場合を確認する。

上記の通り、 org.apache.hudi.utilities.deltastreamer.DeltaSync#syncOnce メソッドがエントリポイント。 当該メソッドは以下のようにシンプルな内容。

org/apache/hudi/utilities/deltastreamer/DeltaSync.java:218

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Option<String> syncOnce() throws Exception {
Option<String> scheduledCompaction = Option.empty();
HoodieDeltaStreamerMetrics metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
Timer.Context overallTimerContext = metrics.getOverallTimerContext();

// Refresh Timeline
refreshTimeline();

Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> srcRecordsWithCkpt = readFromSource(commitTimelineOpt);

if (null != srcRecordsWithCkpt) {
// this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start
// compactor
if (null == schemaProvider) {
// Set the schemaProvider if not user-provided
this.schemaProvider = srcRecordsWithCkpt.getKey();
// Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient();
}

scheduledCompaction = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext);
}

// Clear persistent RDDs
jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
return scheduledCompaction;
}

最初にメトリクスの準備、データソースから読み出してRDD化する定義( org.apache.hudi.utilities.deltastreamer.DeltaSync#readFromSource メソッド) その後、 org.apache.hudi.utilities.deltastreamer.DeltaSync#writeToSink メソッドにより、定義されたRDDの内容を実際に書き込む。

ここでは上記メソッドを確認する。

まず与えられたRDDから重複排除する。

org/apache/hudi/utilities/deltastreamer/DeltaSync.java:352

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {

Option<String> scheduledCompactionInstant = Option.empty();
// filter dupes if needed
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig());
}

boolean isEmpty = records.isEmpty();

(snip)

その後実際の書き込みになるが、そのとき採用したオペレーション種類によって動作が異なる。

org/apache/hudi/utilities/deltastreamer/DeltaSync.java:369

1
2
3
4
5
6
7
8
9
if (cfg.operation == Operation.INSERT) {
writeStatusRDD = writeClient.insert(records, instantTime);
} else if (cfg.operation == Operation.UPSERT) {
writeStatusRDD = writeClient.upsert(records, instantTime);
} else if (cfg.operation == Operation.BULK_INSERT) {
writeStatusRDD = writeClient.bulkInsert(records, instantTime);
} else {
throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
}

bulkInsert

ここではためしに org.apache.hudi.client.HoodieWriteClient#bulkInsert メソッドを確認してみる。

当該メソッドでは、最初にCOPY_ON_WRITEかMERGE_ON_READかに応じて、それぞれの種類のテーブル情報を取得する。 その後、 org.apache.hudi.client.HoodieWriteClient#bulkInsertInternal メソッドを使ってデータを書き込む。

なお、その間で重複排除されているが、上記の通り、もともと重複排除しているはずなので、要確認。(重複排除のロジックが異なるのかどうか、など) パット見た感じ、 org.apache.hudi.DataSourceUtils#dropDuplicates メソッドはロケーション情報(インデックス?)がない場合をドロップする。 org.apache.hudi.client.HoodieWriteClient#combineOnCondition メソッドはキーに基づきreduce処理する。 という違いがあるようだ。

org/apache/hudi/client/HoodieWriteClient.java:300

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT);
setOperationType(WriteOperationType.BULK_INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());

return bulkInsertInternal(dedupedRecords, instantTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e);
}
}

上記の通り、 org.apache.hudi.client.HoodieWriteClient#bulkInsertInternal メソッドが中で用いられている。 当該メソッドでは、再パーティションないしソートが行われた後、書き込みが実行される。

org/apache/hudi/client/HoodieWriteClient.java:412

1
2
3
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
.flatMap(List::iterator);

ポイントは、org.apache.hudi.execution.BulkInsertMapFunction クラスである。 このクラスが関数として渡されている。 org.apache.hudi.execution.BulkInsertMapFunction#call メソッドは以下の通り。

org/apache/hudi/execution/BulkInsertMapFunction.java:52

1
2
3
4
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier());
}

org.apache.hudi.execution.CopyOnWriteLazyInsertIterable クラスについては、別の節で書いたとおり。

insert

org.apache.hudi.client.HoodieWriteClient#insert メソッド。

大まかな構造は、 bulkInsert と同様。 ポイントは、 org.apache.hudi.client.HoodieWriteClient#upsertRecordsInternal メソッド。

org/apache/hudi/client/HoodieWriteClient.java:229

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
setOperationType(WriteOperationType.INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records, config.getInsertShuffleParallelism());

return upsertRecordsInternal(dedupedRecords, instantTime, table, false);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert for commit time " + instantTime, e);
}
}

上記の通り、挿入対象のデータを表すRDDを引数に取り、データを書き込む。 これは、upsertのときと同じメソッドである。第4引数でinsertかupsertかを分ける。

当該メソッドは以下の通り。 bulkInsert と同様にリパーティションなどを経て、 org.apache.hudi.table.HoodieTable#handleUpsertPartition が呼び出される。

org/apache/hudi/client/HoodieWriteClient.java:457

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime,
HoodieTable<T> hoodieTable, final boolean isUpsert) {

(snip)

JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (isUpsert) {
return hoodieTable.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
} else {
return hoodieTable.handleInsertPartition(instantTime, partition, recordItr, partitioner);
}
}, true).flatMap(List::iterator);

(snip)

org.apache.hudi.table.HoodieTable#handleUpsertPartitionorg.apache.hudi.table.HoodieTable#handleInsertPartition が用いられている。 今回は、insertなので後者。

なお、 org.apache.hudi.table.HoodieCopyOnWriteTable#handleInsertPartition は以下の通り、実態としては org.apache.hudi.table.HoodieCopyOnWriteTable#handleUpsertPartition である。

1
2
3
4
public Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
}

当該メソッドは以下の通り。 insertやupsertでは、RDDひとつを1バケットと表現している。 バケットの情報から、insertやupdateの情報を取得して用いる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
Partitioner partitioner) {
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(instantTime, binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(instantTime, binfo.partitionPath, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
}
} catch (Throwable t) {
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}

例えば、insertの場合は、 org.apache.hudi.table.HoodieCopyOnWriteTable#handleInsert が呼び出される。 当該メソッドでは、戻り値として org.apache.hudi.execution.CopyOnWriteLazyInsertIterable#CopyOnWriteLazyInsertIterable が返される。

org/apache/hudi/table/HoodieCopyOnWriteTable.java:186

1
2
3
4
5
6
7
8
9
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
}

このメソッドについては上記ですでに説明したとおり。

HoodieCopyOnWriteTable と HoodieMergeOnReadTable

テーブルの種類によって、書き込みの実装上どういう違いがあるかを確認する。

例えば、handleInsert メソッドを確認する。なお、当該メソッドには同期的、非同期的な処理方式がそれぞれ実装されている。

HoodieCopyOnWriteTableの場合は以下の通り。

org/apache/hudi/table/HoodieCopyOnWriteTable.java:186

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
}

public Iterator<List<WriteStatus>> handleInsert(String instantTime, String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) {
HoodieCreateHandle createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordItr, sparkTaskContextSupplier);
createHandle.write();
return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator();
}

上が非同期的な方式、下が同期的な方式と見られる。 なお、実装上は同期的な処理方式は今は使われていないようにもみえるが、要確認。

HoodieMergeOnReadTableの場合は、非同期的な処理だけoverrideされている。

org/apache/hudi/table/HoodieMergeOnReadTable.java:120

1
2
3
4
5
6
7
8
9
public Iterator<List<WriteStatus>> handleInsert(String instantTime, String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (index.canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, instantTime, this, idPfx, sparkTaskContextSupplier);
} else {
return super.handleInsert(instantTime, idPfx, recordItr);
}
}
共有

pandoc template and css

参考

メモ

Pandocのバージョンは、 2.9.2を使用。

Pandocを使ってMarkdownを整形されたHTMLに変換する を参考に、テンプレートを作成してCSSを用いた。

1
2
$ mkdir -p ~/.pandoc/templates
$ pandoc -D html5 > ~/.pandoc/templates/mytemplate.html

テンプレートを適当にいじる。

その後、HTMLを以下のように生成。

1
$ pandoc --css ./pandoc-github.css --template=mytemplate -i ./README.md -o ./README.html

なお、GitHub風になるCSSは、 dashed/github-pandoc.css に公開されていたものを利用。 --css はcssのURLを表すだけなので、上記の例ではREADME.htmlと同じディレクトリに pandoc-github.css があることを期待する。

共有