BigDL memory usage

参考

メモ

前提

Download によると、Sparkは2.4系まで対応しているようだ。 Issue-3070 によると、Spark3対応も進んでいるようだ。

Spark起動

Run で記載の通り、以下のように起動した。

1
2
3
$ export SPARK_HOME=/usr/local/spark/default
$ export BIGDL_HOME=/usr/local/bigdl/default
$ ${BIGDL_HOME}/bin/spark-shell-with-bigdl.sh --master local[*]
1
2
scala> import com.intel.analytics.bigdl.utils.Engine
scala> Engine.init

とりあえず初期化までは動いた。

サンプルを動かす

Examples に記載のサンプルを動かす。 LeNet Train にあるLeNetの例が良さそう。

MNISTの画像をダウンロードして、展開した。

1
2
3
4
5
6
7
$ mkdir ~/tmp
$ cd ~/tmp
$ wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
$ gunzip *.gz

展開したディレクトリを指定しながら、LeNetの学習を実行。

1
2
3
4
5
6
7
8
$ spark-submit \
--master local[6] \
--driver-class-path ${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
--class com.intel.analytics.bigdl.models.lenet.Train \
${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
-f $HOME/tmp \
-b 12 \
--checkpoint ./model

実行中の様子は以下の通り。

ヒストリのキャプチャ

今回はローカルモードで実行したが、入力された学習データと同様のサイズのキャッシュがメモリ上に展開されていることがわかる。

サンプルの中身

2020/11/15時点のmasterブランチを確認する。

Trainクラス

上記サンプルで実行されているTrainを見る。 モデルを定義している箇所は以下の通り。

com/intel/analytics/bigdl/models/lenet/Train.scala:48

1
2
3
4
5
6
7
8
9
10
11
12
val model = if (param.modelSnapshot.isDefined) {
Module.load[Float](param.modelSnapshot.get)
} else {
if (param.graphModel) {
LeNet5.graph(classNum = 10)
} else {
Engine.getEngineType() match {
case MklBlas => LeNet5(10)
case MklDnn => LeNet5.dnnGraph(param.batchSize / Engine.nodeNumber(), 10)
}
}
}

Modelインスタンスは以下の通り、Optimizerオブジェクトに渡され、 データセットに合わせたOptimizerが返される。 (例:データセットが分散データセットかどうか、など)

com/intel/analytics/bigdl/models/lenet/Train.scala:83

1
2
3
4
val optimizer = Optimizer(
model = model,
dataset = trainSet,
criterion = criterion)

参考までに、Optimizerの種類と判定の処理は以下の通り。

com/intel/analytics/bigdl/optim/Optimizer.scala:688

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dataset match {
case d: DistributedDataSet[_] =>
Engine.getOptimizerVersion() match {
case OptimizerV1 =>
new DistriOptimizer[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case OptimizerV2 =>
new DistriOptimizerV2[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
}
case d: LocalDataSet[_] =>
new LocalOptimizer[T](
model = model,
dataset = d.toLocal().asInstanceOf[LocalDataSet[MiniBatch[T]]],
criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case _ =>
throw new UnsupportedOperationException
}

返されたOptimizerの、Optimizer#optimizeメソッドを利用し学習が実行される。

com/intel/analytics/bigdl/models/lenet/Train.scala:98

1
2
3
4
5
6
7
8
optimizer
.setValidation(
trigger = Trigger.everyEpoch,
dataset = validationSet,
vMethods = Array(new Top1Accuracy, new Top5Accuracy[Float], new Loss[Float]))
.setOptimMethod(optimMethod)
.setEndWhen(Trigger.maxEpoch(param.maxEpoch))
.optimize()

Optimizerの種類

上記の内容を見るに、Optimizerにはいくつか種類がありそうだ。

  • DistriOptimizer
  • DistriOptimizerV2
  • LocalOptimizer

DistriOptimizer

ひとまずメモリ使用に関連する箇所ということで、入力データの準備の処理を確認する。 com.intel.analytics.bigdl.optim.DistriOptimizer#optimize メソッドには 以下のような箇所がある。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:870

1
prepareInput()

これは com.intel.analytics.bigdl.optim.DistriOptimizer#prepareInput メソッドであり、 内部的に com.intel.analytics.bigdl.optim.AbstractOptimizer#prepareInput メソッドを呼び出し、 入力データをSparkのキャッシュに載せるように処理する。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:808

1
2
3
4
if (!dataset.toDistributed().isCached) {
DistriOptimizer.logger.info("caching training rdd ...")
DistriOptimizer.prepareInput(this.dataset, this.validationDataSet)
}

キャシュに載せると箇所は以下の通り。

com/intel/analytics/bigdl/optim/AbstractOptimizer.scala:279

1
2
3
4
5
6
7
private[bigdl] def prepareInput[T: ClassTag](dataset: DataSet[MiniBatch[T]],
validationDataSet: Option[DataSet[MiniBatch[T]]]): Unit = {
dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]].cache()
if (validationDataSet.isDefined) {
validationDataSet.get.toDistributed().cache()
}
}

上記の DistributedDataSetchache メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:216

1
2
3
4
5
6
def cache(): Unit = {
if (originRDD() != null) {
originRDD().count()
}
isCached = true
}

originRDD の戻り値に対して、count を読んでいる。 ここで count を呼ぶのは、入力データである originRDD の戻り値に入っているRDDをメモリ上にマテリアライズするためである。

count を呼ぶだけでマテリアライズできるのは、予め入力データを定義したときに Spark RDDの cache を利用してキャッシュ化することを指定されているからである。 今回の例では、Optimizerオブジェクトのapplyを利用する際に渡されるデータセット trainSet を 定義する際に予め cache が呼ばれる。

com/intel/analytics/bigdl/models/lenet/Train.scala:79

1
2
3
val trainSet = DataSet.array(load(trainData, trainLabel), sc) ->
BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(
param.batchSize)

trainSet を定義する際、 com.intel.analytics.bigdl.dataset.DataSet$#array(T[], org.apache.spark.SparkContext) メソッドが 呼ばれるのだが、その中で以下のように Sparkの RDD#cache が呼ばれていることがわかる。

com/intel/analytics/bigdl/dataset/DataSet.scala:343

1
2
3
4
5
6
7
8
9
10
11
12
def array[T: ClassTag](localData: Array[T], sc: SparkContext): DistributedDataSet[T] = {
val nodeNumber = Engine.nodeNumber()
new CachedDistriDataSet[T](
sc.parallelize(localData, nodeNumber)
// Keep this line, or the array will be send to worker every time
.coalesce(nodeNumber, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

以下、一例。

具体的には、Optimizerのインスタンスを生成するための apply メソッドはいくつかあるが、 以下のように引数にデータセットを指定する箇所がある。(再掲)

com/intel/analytics/bigdl/optim/Optimizer.scala:619

1
_dataset = (DataSet.rdd(sampleRDD) ->

ここで用いられている com.intel.analytics.bigdl.dataset.DataSet#rdd メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:363

1
2
3
4
5
6
7
8
9
10
def rdd[T: ClassTag](data: RDD[T], partitionNum: Int = Engine.nodeNumber()
): DistributedDataSet[T] = {
new CachedDistriDataSet[T](
data.coalesce(partitionNum, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

com.intel.analytics.bigdl.dataset.CachedDistriDataSet#CachedDistriDataSet のコンストラクタ引数に、 org.apache.spark.rdd.RDD#cache を用いてキャッシュ化することを指定したRDDを渡していることがわかる。

共有

Generate PDF using pandoc

参考

メモ

TeX Liveのインストール手順 の通り、最新版をインストールする。 なお、フルセットでインストールした。

PDFを作成するときは以下のようにする。

1
$ pandoc test.md -o test.pdf --pdf-engine=lualatex -V documentclass=bxjsarticle -V classoption=pandoc

ただし、 default.latex に示すとおり、リンクに色を付けるなどしたいので、 上記マークダウンファイルには先頭部分にYAMLでコンフィグを記載することとした。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---
title: テスト文章
date: 2020-10-30
author: dobachi
colorlinks: yes
numbersections: yes
toc: yes
---

# test

## これはテスト

[Google](https://www.google.com)

<!-- vim: set ft=markdown : -->
共有

Access AWS S3 from Hadoop3 and Spark3

参考

メモ

パッケージインポート

Sparkの公式ドキュメント には、org.apache.spark:hadoop-cloud_2.12を利用するよう記載されているが、 このMavenパッケージは見当たらなかった。

そこで、 Hadoopの公式ドキュメント の通り、hadoop-awsをインポートすると、それに関連した依存関係をインポートすることにした。 spark-shellで試すとしたら以下の通り。

1
$ /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0

profileを使ったクレデンシャル設定

awscliを利用しているとしたら、profileを設定しながらクレデンシャルを使い分けていることもあると思う。 その場合、起動時の環境変数でプロフィールを指定しながら、以下のようにproviderを指定することで、 profileを利用したクレデンシャル管理の仕組みを利用してアクセスできる。

1
$ AWS_PROFILE=s3test /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0
1
scala> spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

(補足)パラメータと指定するクラスについて

ちなみに、上記パラメータの説明には、

If unspecified, then the default list of credential provider classes, queried in sequence, is: 1. org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider: Uses the values of fs.s3a.access.key and fs.s3a.secret.key. 2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports configuration of AWS access key ID and secret access key in environment variables named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK. 3. com.amazonaws.auth.InstanceProfileCredentialsProvider: supports use of instance profile credentials if running in an EC2 VM.

のように記載されている。

また、「Using Named Profile Credentials with ProfileCredentialsProvider」の節には、

Declare com.amazonaws.auth.profile.ProfileCredentialsProvider as the provider.

と書かれており、上記Default...でなくても良い。(実際に試したところ動作した)

共有

qmk_firmware_202009

参考

メモ

2020/9に久しぶりにコンパイルしようとしたら、だいぶ勝手が変わっていた。

公式の環境構築手順 に従って環境を構築した。 なお、インストールするよう指定されていたパッケージだけでは足りなかったので、 以下のようにインストールした。

1
$ pacman --needed --noconfirm --disable-download-timeout -S git mingw-w64-x86_64-toolchain mingw-w64-x86_64-python3-pip python3 python3-pip make diffutils

最初、以下を忘れたため、失敗したので注意・・・。

1
$ qmk setup

公式のコンパイル手順 に従うと、qmkコマンドを利用してコンパイル、フラッシュするようだ。

ただ、 QMK Firmwareでファームウェアをビルドしようとしたらavr-gccでコケた話 に記載されているのと同様に、 make git-submodules が必要だった。

共有

Use GitHub actions to deploy documents

参考

メモ

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する がSphinx JPのユーザ会による記事だったので参考になるが、 依存関係を都度pipインストールしているのが気になった。

マーケットプレイスのsphinx-build を利用すると良さそうだが、 この手順ではGITHUB TOKENを利用してデプロイしているのが気になった。 レポジトリごとに設定できる、Deploy Keyを利用したい。

そこでビルドには マーケットプレイスのsphinx-build を利用し、デプロイには GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の手順を利用することにした。

レポジトリのDeploy Key設定

GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する の「Deploy keys の設定」章を参考に、 当該レポジトリのDeploy Keyを登録する。

任意の環境(ここではWSLのUbuntu18を利用した)で、 以下のコマンドを実行。

1
$ ssh-keygen -t rsa -b 4096 -C "<レポジトリで使用しているメールアドレス>" -f <任意の名前> -N ""

上記記事の通り、秘密鍵と公開鍵をGitHubのウェブUIで登録する。 なお、「Secrets」タブで登録した名称は、後ほどGitHub Actionsのワークフロー内で使用する。

GitHub Actionsのワークフローの記述

マーケットプレイスのsphinx-build の例を参考に、 ワークフローを記述する。 なお、最後のデプロイする部分は、 GitHub Actionsを用いてGitHub Pagesへのデプロイを自動化する を参考に、 Deploy Keyを利用するよう修正した。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
name: CI

# 今回はマスタブランチへのPushをトリガとする。
on:
push:
branches:
- master

jobs:
build:

runs-on: ubuntu-latest
# 今回はmasterブランチへのpushをトリガとしているので不要だが、gh-pagesへのpushをトリガとする場合など
# 無限ループを回避する際には以下のように記述する。
if: "!contains(github.event.head_commit.message, 'Update documentation via GitHub Actions')"

steps:
- uses: actions/checkout@v1

# 今回はMakefileを利用するので、makeコマンドを使用するよう元ネタから修正した。
# またドキュメントのソースが含まれているディレクトリは各自の定義に依存する。
- uses: ammaraskar/sphinx-action@master
with:
build-command: "make html"
docs-folder: "documents/"
# 先ほどGitHubのウェブUIで定義した秘密鍵名を使用する。
- name: Commit documentation changes and push it
run: |
mkdir ~/.ssh
ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
echo "${{ secrets.<先ほどGitHubウェブUIで定義した秘密鍵名> }}" > ~/.ssh/id_rsa
chmod 400 ~/.ssh/id_rsa
git clone git@github.com:${GITHUB_REPOSITORY}.git --branch gh-pages --single-branch gh-pages
cp -r documents/_build/html/* gh-pages/
cd gh-pages
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
git add .
git commit -m "Update documentation via GitHub Actions" -a || true
git push origin HEAD:gh-pages
# The above command will fail if no changes were present, so we ignore
# that.
# ===============================
共有

Install Bigtop RPMs using Yum

参考

メモ

今回は、2020/7/21時点の最新バージョンである Apache Bigtop 1.4.0のパッケージバージョン の Hadoopをインストールできるかどうかを試してみることとする。

Yumのrepoファイルは レポジトリ関連の資材置き場 以下にある。 例えば、今回はCentOS7を利用することにするので、 CentOS7のbigtop.repo あたりを利用する。

1
2
$ cd /etc/yum.repos.d
$ sudo wget https://downloads.apache.org/bigtop/bigtop-1.4.0/repos/centos7/bigtop.repo

ひとまずパッケージが見つかるかどうか、確認。

1
2
3
4
5
6
7
8
9
$ sudo yum search hadoop-conf-pseudo
読み込んだプラグイン:fastestmirror
Loading mirror speeds from cached hostfile
* base: d36uatko69830t.cloudfront.net
* epel: d2lzkl7pfhq30w.cloudfront.net
* extras: d36uatko69830t.cloudfront.net
* updates: d36uatko69830t.cloudfront.net
=========================================== N/S matched: hadoop-conf-pseudo ============================================
hadoop-conf-pseudo.x86_64 : Pseudo-distributed Hadoop configuration

確認できたので、試しにインストール。

1
$ sudo yum install hadoop-conf-pseudo

自分の手元の環境では、依存関係で以下のパッケージがインストールされた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
========================================================================================================================
Package アーキテクチャー バージョン リポジトリー 容量
========================================================================================================================
インストール中:
hadoop-conf-pseudo x86_64 2.8.5-1.el7 bigtop 20 k
依存性関連でのインストールをします:
at x86_64 3.1.13-24.el7 base 51 k
bc x86_64 1.06.95-13.el7 base 115 k
bigtop-groovy noarch 2.4.10-1.el7 bigtop 9.8 M
bigtop-jsvc x86_64 1.0.15-1.el7 bigtop 29 k
bigtop-utils noarch 1.4.0-1.el7 bigtop 11 k
cups-client x86_64 1:1.6.3-43.el7 base 152 k
ed x86_64 1.9-4.el7 base 72 k
hadoop x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs x86_64 2.8.5-1.el7 bigtop 24 M
hadoop-hdfs-datanode x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-hdfs-namenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-hdfs-secondarynamenode x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-mapreduce x86_64 2.8.5-1.el7 bigtop 34 M
hadoop-mapreduce-historyserver x86_64 2.8.5-1.el7 bigtop 5.8 k
hadoop-yarn x86_64 2.8.5-1.el7 bigtop 20 M
hadoop-yarn-nodemanager x86_64 2.8.5-1.el7 bigtop 5.7 k
hadoop-yarn-resourcemanager x86_64 2.8.5-1.el7 bigtop 5.6 k
libpcap x86_64 14:1.5.3-12.el7 base 139 k
m4 x86_64 1.4.16-10.el7 base 256 k
mailx x86_64 12.5-19.el7 base 245 k
nmap-ncat x86_64 2:6.40-19.el7 base 206 k
patch x86_64 2.7.1-12.el7_7 base 111 k
psmisc x86_64 22.20-16.el7 base 141 k
redhat-lsb-core x86_64 4.1-27.el7.centos.1 base 38 k
redhat-lsb-submod-security x86_64 4.1-27.el7.centos.1 base 15 k
spax x86_64 1.5.2-13.el7 base 260 k
time x86_64 1.7-45.el7 base 30 k
zookeeper x86_64 3.4.6-1.el7 bigtop 7.0 M

initスクリプトがインストールされていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
$ ls -1 /etc/init.d/
README
functions
hadoop-hdfs-datanode
hadoop-hdfs-namenode
hadoop-hdfs-secondarynamenode
hadoop-mapreduce-historyserver
hadoop-yarn-nodemanager
hadoop-yarn-resourcemanager
netconsole
network

ひとまずHDFSをフォーマット。

1
$ sudo -u hdfs hdfs namenode -format

あとは、上記の各種Hadoopサービスを立ち上げれば良い。

共有

Delta Lake 0.7.0

参考

メモ

0.7.0が出たので、本リリースの特徴を確認する。

SQL DDLへの対応やHive メタストアの対応

0.6系まではScala、Python APIのみであったが、SQL DDLにも対応した。 0.7.0のテーブル読み書き0.6.1のテーブル読み書き を見比べると、SQLの例が載っていることがわかる。 対応するSQL構文については src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 あたりを見ると良い。

なお、 Spark3系でないとHiveメタストアに対応できない理由 を見る限り、 Spark3系のAPI(や、DataSourceV2も、かな)を使わないと、Data SourceのカスタムAPIを利用できないため、 これまでHiveメタストアのような外部メタストアと連携したDelta Lakeのメタデータ管理ができなかった、とのこと。

なお、今回の対応でSparkのカタログ機能を利用することになったので、起動時もしくはSparkSession生成時の オプション指定が必要になった。 その代わり、ライブラリの明示的なインポートが不要であり、クエリはDelta Lakeのパーサで解釈された後、 解釈できないようであれば通常のパーサで処理されるようになる。

起動時のオプション例

例:

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

なお、ここでは SparkSessionExtensions を利用し、SparkSession生成時にカスタムルール等を挿入している。 この機能は2020/06/19時点でSpark本体側でExperimentalであることに注意。 今後もSpark本体側の仕様変更に引きずられる可能性はある。

パーサの呼び出し流れ

セッション拡張機能を利用し、パーサが差し替えられている。

io/delta/sql/DeltaSparkSessionExtension.scala:73

1
2
3
4
5
6
7
class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (session, parser) =>
new DeltaSqlParser(parser)
}

(snip)

io.delta.sql.parser.DeltaSqlParser クラスでは デリゲート用のパーサを受け取り、自身のパーサで処理できなかった場合に処理をデリゲートパーサに渡す。

io/delta/sql/parser/DeltaSqlParser.scala:66

1
2
3
4
class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
private val builder = new DeltaSqlAstBuilder

(snip)

例えば、 SparkSessionsql メソッドを使って呼び出す場合を例にする。 このとき、内部では、 org.apache.spark.sql.catalyst.parser.ParserInterface#parsePlan メソッドが呼ばれて、 渡されたクエリ文 sqlText が処理される。

org/apache/spark/sql/SparkSession.scala:601

1
2
3
4
5
6
7
def sql(sqlText: String): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}

この parsePlan がoverrideされており、以下のように定義されている。

io/delta/sql/parser/DeltaSqlParser.scala:69

1
2
3
4
5
6
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
builder.visit(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ => delegate.parsePlan(sqlText)
}
}

まずは io.delta.sql.parser.DeltaSqlParser#parse メソッドを利用してパースがここ見られるが、 LogicalPlanが戻らなかったときは、デリゲート用パーサが呼び出されるようになっている。

カスタムカタログ

Spark3ではDataSourvV2の中で、プラガブルなカタログに対応した。 Delta Lake 0.7.0はこれを利用し、カスタムカタログを用いる。(これにより、Hiveメタストアを経由してDelta Lake形式のデータを読み書きできるようになっている) 使われているカタログは org.apache.spark.sql.delta.catalog.DeltaCatalog である。 (SparkSessionのインスタンス生成する際、もしくは起動時のオプション指定)

当該カタログ内部では、例えば org.apache.spark.sql.delta.catalog.DeltaCatalog#createDeltaTable メソッドが定義されており、 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable ※ しようとするときなどに呼び出されるようになっている。

org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension#createTable をoverrideしている

なお、このクラスもデリゲート用のカタログを用いるようになっている。 org.apache.spark.sql.delta.catalog.DeltaCatalog#createTable メソッドは以下のようになっており、 データソースが delta 出ない場合は、親クラスの createTable (つまり標準的なもの)が呼び出されるようになっている。

org/apache/spark/sql/delta/catalog/DeltaCatalog.scala:149

1
2
3
4
5
6
7
8
9
10
11
12
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
createDeltaTable(
ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)
} else {
super.createTable(ident, schema, partitions, properties)
}
}

ScalaやPythonでの例

代表的にScalaの例を出す。公式サイトには以下のように載っている。

1
2
3
df.write.format("delta").saveAsTable("events")      // create table in the metastore

df.write.format("delta").save("/delta/events") // create table by path

Hiveメタストア経由で書き込むケースと、ストレージ上に直接書き出すケースが載っている。

SQLでのマージ処理

SQLを用いたマージの例 の通り、Delta Lakeの特徴であるマージ機能もSQLから呼び出させる。

1
2
3
4
5
6
spark.sql(s"""MERGE INTO $tableName USING newData
ON ${tableName}.id = newData.id
WHEN MATCHED THEN
UPDATE SET ${tableName}.id = newData.id
WHEN NOT MATCHED THEN INSERT *
""")

Spark SQLのカタログに登録されたDelta LakeのテーブルからDeltaTableを生成することもできる。

1
2
scala> import io.delta.tables.DeltaTable
scala> val tbl = DeltaTable.forName(tableName)

Presto / Athena用のメタデータの自動生成

Delta LakeはPresto、Athena用のメタデータを生成できるが、更新があった際にパーティションごとに自動で再生成できるようになった。

テーブル履歴の切り詰めの管理

Delta Lakeは更新の履歴を保持することも特徴の一つだが、 データ本体とログのそれぞれの切り詰め対象期間を指定できる。

CREATEやALTER句内で、TBLPROPERTIESとして指定することになっている。 例えば以下。

1
2
spark.sql(s"CREATE TABLE $tableName(id LONG) USING delta TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")
spark.sql(s"ALTER TABLE $tableName SET TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 1 day', 'delta.deletedFileRetentionDuration' = 'interval 1 day')")

ユーザメタデータ

spark.databricks.delta.commitInfo.userMetadata プロパティを利用して、ユーザメタデータを付与できる。

1
df.write.option("spark.databricks.delta.commitInfo.userMetadata", "test").format("delta").mode("append").save("/tmp/test")
1
2
scala> spark.sql("SET spark.databricks.delta.commitInfo.userMetadata=test")
scala> spark.sql(s"INSERT INTO $tableName VALUES 0, 1, 2, 3, 4")

AzureのData Lake Storage Gen2

対応した。

しかし、 0.7.0で対応したAzure Data Lake Storage Gen2 の通り、 前提となっている各種ソフトウェアバージョンは高め。

ストリーム処理のone-timeトリガの改善

DataStreamReaderのオプションでmaxFilesPerTriggerを設定しているとしても、 one-time triggerでは一度に溜まったすべてのデータを読み込むようになった。(Spark 3系の話) 

共有

About tream table theory

参考

メモ

Tyler AkidauらによるApache Beamにまつわる情報

Foundations of streaming SQL stream & table theory (General theory) の通り、 ストリームデータとテーブルデータの関係性を論じたもの。

上記の内容の全体は、 OReillyのStreaming Systems に記載あり。

これらは、ストリーム・テーブル理論をベースに、Apache Beamを利用したストリームデータ処理・活用システムについて論じたもの。

Matthias J. Sax、Guozhang Wangらによる論文

Streams and Tables Two Sides of the Same Coin

概要

ストリームデータの処理モデルを提案。 論理・物理の間の一貫性の崩れに対処する。

1 Introduction

データストリームのチャレンジ

  • 物理的なアウト・オブ・オーダへの対応
  • 処理のオペレータがステートフルでないといけない、レイテンシ、正しさ、処理コストなどのトレードオフがある、など
  • 分散処理への対応

物理的な並びが保証されない条件下で、 十分に表現力のあるオペレータ(モデル)を立案する。

2 Background

ストリームデータのモデル。

ポイント。レコード配下の構成。

  • オフセット:物理の並び
  • タイムスタンプ:論理の並び(生成時刻)
  • キー
  • バリュー

3 Duality of streams and tables

レイテンシは、データストリームの特性に依存する。

論理、物理の並びの差異を解決するため、結果を継続的に更新するモデルを提案した。

Dual Streaming Model

DUality of streams and tables

テーブルは、テーブルのバージョンのコレクションと考えることもできる。

4 STREAM PROCESSING OPERATORS

モデル定義を解説。 フィルターなどのステートレスな処理から、アグリゲーションなどのステートフルな処理まで。

out-of-order なレコードが届いても、最終的な出力結果テーブルがout-of-orderレコードがないときと同一になるように動くことを期待する。

Watermarkと似たような機構として、「retention time」を導入。 結果テーブル内の「現在時刻 - retention time」よりも古い結果をメンテナンス対象から外す。

Stream - Table Joinのケースで、「遅れテーブル更新」が生じると、 下流システムに対して結果の上書きデータを出力する必要がある。

5 CASE STUDY: APACHE KAFKA

Kafka Streamsの例。

Kafka Streamsの利用企業:The New York Times, Pinterest, LINE, Trivago, etc

Kafka Streamsのオペレーションはすべてノンブロッキングであり、 レコードを受領次第処理し、KTableにマテリアライズされたり、Topicに書き戻したりされる。

Window集計の例を題材に、retention timeを利用。 retention timeを長くするほどストレージを利用するが、 out-of-orderレコードに対してロバストになる。 また、retention timeが長いほど、ウィンドウ結果が確定したと言えるタイミングが遅くなる。

6 RELATED WORK

Relationを取り扱うモデル、out-of-orderを取り扱うモデル、テーブルバージョン管理を取り扱うモデルなど。

共有

Handle pictures in delta lake and hudi

参考

Delta Lake

まずはjpgをSparkで読み込む

予め、Delta Lakeを読み込みながら起動する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.11:0.6.0

Spark公式ドキュメントのImage data source を利用し、jpgをDataFrameとして読み込んでみる。 データは、 Kaggleのmnistデータ を利用。このデータはjpgになっているので便利。

1
2
3
scala> val home = sys.env("HOME")
scala> val imageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/0"
scala> val df = spark.read.format("image").option("dropInvalid", true).load(home + "/" + imageDir)

データをDataFrameにする定義ができた。 内容は以下の通り。

1
2
3
4
5
6
7
8
scala> df.select("image.origin", "image.width", "image.height").show(3, truncate=false)
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_20188.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_12634.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_26812.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+

公式ドキュメントのとおりだが、カラムは以下の通り。

  • origin: StringType (represents the file path of the image)
  • height: IntegerType (height of the image)
  • width: IntegerType (width of the image)
  • nChannels: IntegerType (number of image channels)
  • mode: IntegerType (OpenCV-compatible type)
  • data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
1
2
3
4
5
6
7
8
9
scala> df.printSchema
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)

Delta Lakeで扱う

書き込み

これをDelta LakeのData Sourceで書き出してみる。

1
2
scala> val deltaImagePath = "/tmp/delta-lake-image"
scala> df.write.format("delta").save(deltaImagePath)

読み出し

できた。試しにDeltaTableとして読み出してみる。

1
2
3
scala> import io.delta.tables._
scala> import org.apache.spark.sql.functions._
scala> val deltaTable = DeltaTable.forPath(deltaImagePath)

更新

mergeを試す。 マージのためにキーを明示的に与えつつデータを準備する。

1
2
3
4
5
scala> val targetDf = df.select($"image.origin", $"image")
scala> val targetImagePath = "/tmp/delta-table"
scala> targetDf.write.format("delta").save(targetImagePath)
scala> val sourcePath = "Downloads/mnist_jpg/trainingSet/trainingSet/1"
scala> val sourceDf = spark.read.format("image").option("dropInvalid", true).load(home + "/" + sourcePath).select($"image.origin", $"image")

Delta Tableを定義。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val deltaTable = DeltaTable.forPath(targetImagePath)
scala> deltaTable
.as("target")
.merge(
sourceDf.as("source"),
"target.origin = source.origin")
.whenMatched
.updateExpr(Map(
"image" -> "source.image"))
.whenNotMatched
.insertExpr(Map(
"origin" -> "source.origin",
"image" -> "source.image"))
.execute()

実際に、追加データが入っていることが確かめられる。

1
2
3
4
5
6
7
8
9
scala> deltaTable.toDF.where($"origin" contains "trainingSet/1").select("image.origin", "image.width", "image.height").show(3, truncate=false)
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10266.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10665.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/1/img_10772.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+
only showing top 3 rows

SparkのImage Data Sourceは、メタデータと画像本体(バイナリ)を構造で扱っているだけなので、 実は特に工夫することなく扱えた。

公式ドキュメントによると、OpenCV形式バイナリで保持されている。 これから先は、Python使って処理したいので、改めて開き直す。

Parquetの内容を確認する

parquet-mrparquet-tools を利用して、Parquetの内容を確認する。

なお、Hudi v0.5.2におけるParquetのバージョンは以下の通り、1.10.1である。

pom.xml:84

1
<parquet.version>1.10.1</parquet.version>

当該バージョンに対応するタグをチェックアウトし、予めparquet-toolsをパッケージしておくこと。

スキーマ

1
2
3
4
5
6
7
8
9
10
11
$ java -jar target/parquet-tools-1.10.1.jar schema /tmp/delta-lake-image/part-00000-c3d03d70-1785-435f-b055-b2a78903e732-c000.snappy.parquet
message spark_schema {
optional group image {
optional binary origin (UTF8);
optional int32 height;
optional int32 width;
optional int32 nChannels;
optional int32 mode;
optional binary data;
}
}

シンプルにParquet内に保持されていることがわかる。 Spark SQLのParquet取扱機能を利用している。

メタデータ

ファイル名やクリエイタなど。

1
2
3
file:        file:/tmp/delta-lake-image/part-00000-c3d03d70-1785-435f-b055-b2a78903e732-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"image","type":{"type":"struct","fields":[{"name":"origin","type":"string","nullable":true,"metadata":{}},{"name":"height","type":"integer","nullable":true,"metadata":{}},{"name":"width","type":"integer","nullable":true,"metadata":{}},{"name":"nChannels","type":"integer","nullable":true,"metadata":{}},{"name":"mode","type":"integer","nullable":true,"metadata":{}},{"name":"data","type":"binary","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

ファイルスキーマ

1
2
3
4
5
6
7
8
9
file schema: spark_schema
--------------------------------------------------------------------------------
image: OPTIONAL F:6
.origin: OPTIONAL BINARY O:UTF8 R:0 D:2
.height: OPTIONAL INT32 R:0 D:2
.width: OPTIONAL INT32 R:0 D:2
.nChannels: OPTIONAL INT32 R:0 D:2
.mode: OPTIONAL INT32 R:0 D:2
.data: OPTIONAL BINARY R:0 D:2

Rowグループ

1
2
3
4
5
6
7
8
9
10
11
row group 1: RC:32 TS:29939 OFFSET:4
--------------------------------------------------------------------------------
image:
.origin: BINARY SNAPPY DO:0 FPO:4 SZ:620/2838/4.58 VC:32 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_10203.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9098.jpg, num_nulls: 0]
.height: INT32 SNAPPY DO:0 FPO:624 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.width: INT32 SNAPPY DO:0 FPO:698 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.nChannels: INT32 SNAPPY DO:0 FPO:772 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
.mode: INT32 SNAPPY DO:0 FPO:846 SZ:74/70/0.95 VC:32 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
.data: BINARY SNAPPY DO:0 FPO:920 SZ:21175/26821/1.27 VC:32 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 0x00

(snip)

Pythonで画像処理してみる

とりあえずOpenCVをインストールしたPython環境のJupyterでPySparkを起動。

1
$ PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" /opt/spark/default/bin/pyspark --packages io.delta:delta-core_2.11:0.6.0 --conf spark.pyspark.driver.python=$HOME/venv/jupyter/bin/jupyter

この後はJupyter内で処理する。

1
2
3
4
5
from delta.tables import *
from pyspark.sql.functions import *
delta_image_path = "/tmp/delta-lake-image"
deltaTable = DeltaTable.forPath(spark, delta_image_path)
deltaTable.toDF().select("image.origin", "image.width", "image.height").show(3, truncate=False)

とりあえず読み込めたことがわかる。

1
2
3
4
5
6
7
8
+-------------------------------------------------------------------------------+-----+------+
|origin |width|height|
+-------------------------------------------------------------------------------+-----+------+
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_20188.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_12634.jpg|28 |28 |
|file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_26812.jpg|28 |28 |
+-------------------------------------------------------------------------------+-----+------+
only showing top 3 rows

中のデータを利用できることを確かめるため、1件だけ取り出して処理してみる。

1
img = deltaTable.toDF().where("image.origin == 'file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg'").select("image.origin", "image.width", "image.height", "image.nChannels", "image.data").collect()[0]

とりあえずエッジ抽出をしてみる。

1
2
3
4
5
6
7
import numpy as np
import cv2
from matplotlib import pyplot as plt

nparr = np.frombuffer(img.data, np.uint8).reshape(img.height, img.width, img.nChannels)
edges = cv2.Canny(nparr,100,200)
plt.imshow(edges)

Jupyter上でエッジ抽出されたことが確かめられただろうか。

エッジ抽出された様

これをSparkでUDFで実行するようにする。 (とりあえず雑にライブラリをインポート…)

1
2
3
4
5
6
7
8
9
10
11
def get_edge(data, h, w, c):
import numpy as np
import cv2
nparr = np.frombuffer(data, np.uint8).reshape(h, w, c)
edge = cv2.Canny(nparr,100,200)
return edge.tobytes()

from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType

get_edge_udf = udf(get_edge, BinaryType())

エッジ抽出されたndarrayをバイト列に変換し、Spark上ではBinaryTypeで扱うように指定した。

1
2
3
with_edge = deltaTable.toDF().select("image.origin", "image.width", "image.height", "image.nChannels", get_edge_udf("image.data", "image.height", "image.width", "image.nChannels").alias("edge"), "image.data")

with_edge.show(3)
1
2
3
4
5
6
7
8
+--------------------+-----+------+---------+--------------------+--------------------+
| origin|width|height|nChannels| edge| data|
+--------------------+-----+------+---------+--------------------+--------------------+
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[05 00 04 08 00 0...|
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[05 00 0B 00 00 0...|
|file:///home/cent...| 28| 28| 1|[00 00 00 00 00 0...|[06 00 02 00 02 0...|
+--------------------+-----+------+---------+--------------------+--------------------+
only showing top 3 rows

こんな感じで、OpenCVを用いた処理をDataFrameに対して実行できる。

なお、当然ながら処理されたデータを取り出せば、また画像として表示も可能。

1
2
3
new_img = with_edge.where("origin == 'file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg'").select("origin", "width", "height", "nChannels", "data", "edge").collect()[0]
new_nparr = np.frombuffer(new_img.edge, np.uint8).reshape(new_img.height, new_img.width)
plt.imshow(new_nparr)

Hudi

画像読み込み

まずはjpgをSparkで読み込む と同様に データを読み込む。

Hudiのquick-start-guide を参考にしながら、まずはHudiを読み込みながら、シェルを起動する。

1
2
$ /opt/spark/default/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

ライブラリをインポート。

1
2
3
4
scala> import org.apache.spark.sql.SaveMode._
scala> import org.apache.hudi.DataSourceReadOptions._
scala> import org.apache.hudi.DataSourceWriteOptions._
scala> import org.apache.hudi.config.HoodieWriteConfig._

画像データを読み込み

1
2
3
4
5
scala> val tableName = "hudi_images"
scala> val basePath = "file:///tmp/hudi_images"
scala> val home = sys.env("HOME")
scala> val imageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/*"
scala> val df = spark.read.format("image").option("dropInvalid", true).load(home + "/" + imageDir)

今回は、Hudiにおけるパーティション構成を確認するため、上記のようにワイルドカード指定した。

書き込み

Hudi書き込み用にデータを変換。パーティション用キーなどを定義する。

1
2
3
4
5
scala> import org.apache.spark.sql.functions.{col, udf}
scala> val partitionPath = udf((str: String) => {
str.split("/").takeRight(2).head
})
scala> val hudiDf = df.select($"image.origin", partitionPath($"image.origin") as "partitionpath", $"*")

書き込み。

1
2
3
4
5
6
7
scala> hudiDf.write.format("hudi").
option(TABLE_NAME, tableName).
option(PRECOMBINE_FIELD_OPT_KEY, "origin").
option(RECORDKEY_FIELD_OPT_KEY, "origin").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
mode(Overwrite).
save(basePath)

書き込まれたデータは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ ls -R /tmp/hudi_images/
/tmp/hudi_images/:
0 1 2 3 4 5 6 7 8 9

/tmp/hudi_images/0:
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-29-14074_20200517135104.parquet

/tmp/hudi_images/1:
4912032f-3365-4852-b2ae-91ffb5ea9806-0_1-29-14075_20200517135104.parquet

/tmp/hudi_images/2:
0636bb93-dd32-40f1-b5c8-328130386528-0_2-29-14076_20200517135104.parquet

/tmp/hudi_images/3:
6f0154cd-4a9f-4001-935b-47d067416797-0_3-29-14077_20200517135104.parquet

/tmp/hudi_images/4:
b6f29c2c-7df0-481a-8149-2bbc93e92e2c-0_4-29-14078_20200517135104.parquet

/tmp/hudi_images/5:
24115172-10db-4c85-808a-bf4048e0f533-0_5-29-14079_20200517135104.parquet

/tmp/hudi_images/6:
64823f4b-26fb-4679-87e8-cd81d01b1181-0_6-29-14080_20200517135104.parquet

/tmp/hudi_images/7:
d0c097e3-9ab7-477a-b591-593c6cb7880f-0_7-29-14081_20200517135104.parquet

/tmp/hudi_images/8:
13a48b38-cbfb-4076-957d-9c580765bfca-0_8-29-14082_20200517135104.parquet

/tmp/hudi_images/9:
5d3ba9ae-8ede-410f-95f3-e8d69e8c0478-0_9-29-14083_20200517135104.parquet

メタデータは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ ls -1 /tmp/hudi_images/.hoodie/
20200517135104.clean
20200517135104.clean.inflight
20200517135104.clean.requested
20200517135104.commit
20200517135104.commit.requested
20200517135104.inflight
20200517140006.clean
20200517140006.clean.inflight
20200517140006.clean.requested
20200517140006.commit
20200517140006.commit.requested
20200517140006.inflight
archived
hoodie.properties

読み込み

読み込んで見る。

1
2
3
4
scala> val hudiImageDf = spark.
read.
format("hudi").
load(basePath + "/*")

スキーマは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> hudiImageDf.printSchema
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- origin: string (nullable = true)
|-- partitionpath: string (nullable = true)
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = true)
| |-- width: integer (nullable = true)
| |-- nChannels: integer (nullable = true)
| |-- mode: integer (nullable = true)
| |-- data: binary (nullable = true)

すべてのただの書き込みAPIを試しただけだが、想定通り書き込み・読み込みできている。

更新

Hudiといえば、テーブルを更新可能であることも特徴のひとつなので、試しに更新をしてみる。 元は同じデータであるが、「0」の画像データを新しいデータとしてDataFrameを定義し、更新する。 意図的に、一部のデータだけ更新する。

1
2
3
4
5
6
7
8
9
10
scala> val updateImageDir = "Downloads/mnist_jpg/trainingSet/trainingSet/0"
scala> val updateDf = spark.read.format("image").option("dropInvalid", true).load(home + "/" + updateImageDir)
scala> val updateHudiDf = updateDf.select($"image.origin", partitionPath($"image.origin") as "partitionpath", $"*")
scala> updateHudiDf.write.format("hudi").
option(TABLE_NAME, tableName).
option(PRECOMBINE_FIELD_OPT_KEY, "origin").
option(RECORDKEY_FIELD_OPT_KEY, "origin").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
mode(Append).
save(basePath)

以下のように、「0」のデータが更新されていることがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
$ ls -R /tmp/hudi_images/
/tmp/hudi_images/:
0 1 2 3 4 5 6 7 8 9

/tmp/hudi_images/0:
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-29-14074_20200517135104.parquet
86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_.parquet

/tmp/hudi_images/1:
4912032f-3365-4852-b2ae-91ffb5ea9806-0_1-29-14075_20200517135104.parquet

(snip)

つづいて、更新されたことを確かめるため、改めて読み込みDataFrameを定義し、

1
2
3
4
scala> val updatedHudiImageDf = spark.
read.
format("hudi").
load(basePath + "/*")

試しに、差分がわかるように「1」と「0」のデータをそれぞれ読んで見る。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
scala> updatedHudiImageDf.select($"_hoodie_commit_time", $"_hoodie_commit_seqno", $"_hoodie_partition_path").filter("partitionpath == 1").show(3)
+-------------------+--------------------+----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_partition_path|
+-------------------+--------------------+----------------------+
| 20200517135104| 20200517135104_1_1| 1|
| 20200517135104| 20200517135104_1_12| 1|
| 20200517135104| 20200517135104_1_45| 1|
+-------------------+--------------------+----------------------+
only showing top 3 rows


scala> updatedHudiImageDf.select($"_hoodie_commit_time", $"_hoodie_commit_seqno", $"_hoodie_partition_path").filter("partitionpath == 0").show(3)
+-------------------+--------------------+----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_partition_path|
+-------------------+--------------------+----------------------+
| 20200517140006|20200517140006_0_...| 0|
| 20200517140006|20200517140006_0_...| 0|
| 20200517140006|20200517140006_0_...| 0|
+-------------------+--------------------+----------------------+
only showing top 3 rows

上記のように、更新されたことがわかる。最新の更新日時は、 2020/05/17 14:00:06UTC である。

ここで読み込まれたデータも、Delta Lake同様に画像を元にしたベクトルデータとして扱える。 Pythonであれば ndarray に変換して用いれば良い。

Parquetの内容を確認する

parquet-mrparquet-tools を利用して、Parquetの内容を確認する。

なお、Hudi v0.5.2におけるParquetのバージョンは以下の通り、1.10.1である。

pom.xml:84

1
<parquet.version>1.10.1</parquet.version>

当該バージョンに対応するタグをチェックアウトし、予めparquet-toolsをパッケージしておくこと。

スキーマ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ java -jar target/parquet-tools-1.10.1.jar schema /tmp/hudi_images/0/86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet
message hoodie.hudi_images.hudi_images_record {
optional binary _hoodie_commit_time (UTF8);
optional binary _hoodie_commit_seqno (UTF8);
optional binary _hoodie_record_key (UTF8);
optional binary _hoodie_partition_path (UTF8);
optional binary _hoodie_file_name (UTF8);
optional binary origin (UTF8);
optional binary partitionpath (UTF8);
optional group image {
optional binary origin (UTF8);
optional int32 height;
optional int32 width;
optional int32 nChannels;
optional int32 mode;
optional binary data;
}
}

Hudiが独自に付加したカラムが追加されていることが見て取れる。

メタデータ

つづいて、メタデータを確認する。

ファイル名やクリエイタなど。

1
2
file:                   file:/tmp/hudi_images/0/86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)

ブルームフィルタはここでは省略

1
2
3
extra:                  org.apache.hudi.bloomfilter = ///

(snip)

レコードキーやAvroスキーマ

1
2
3
4
extra:                  hoodie_min_record_key = file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg
extra: parquet.avro.schema = {"type":"record","name":"hudi_images_record","namespace":"hoodie.hudi_images","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"origin","type":["string","null"]},{"name":"partitionpath","type":["string","null"]},{"name":"image","type":[{"type":"record","name":"image","namespace":"hoodie.hudi_images.hudi_images_record","fields":[{"name":"origin","type":["string","null"]},{"name":"height","type":["int","null"]},{"name":"width","type":["int","null"]},{"name":"nChannels","type":["int","null"]},{"name":"mode","type":["int","null"]},{"name":"data","type":["bytes","null"]}]},"null"]}]}
extra: writer.model.name = avro
extra: hoodie_max_record_key = file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg

上記の通り、Hudiでは parquet-avro を利用し、Avro形式でParquet内のデータを保持する。

スキーマ詳細

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
file schema:            hoodie.hudi_images.hudi_images_record
--------------------------------------------------------------------------------
_hoodie_commit_time: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_commit_seqno: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_record_key: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_partition_path: OPTIONAL BINARY O:UTF8 R:0 D:1
_hoodie_file_name: OPTIONAL BINARY O:UTF8 R:0 D:1
origin: OPTIONAL BINARY O:UTF8 R:0 D:1
partitionpath: OPTIONAL BINARY O:UTF8 R:0 D:1
image: OPTIONAL F:6
.origin: OPTIONAL BINARY O:UTF8 R:0 D:2
.height: OPTIONAL INT32 R:0 D:2
.width: OPTIONAL INT32 R:0 D:2
.nChannels: OPTIONAL INT32 R:0 D:2
.mode: OPTIONAL INT32 R:0 D:2
.data: OPTIONAL BINARY R:0 D:2

Rowグループ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
row group 1:            RC:4132 TS:4397030 OFFSET:4
--------------------------------------------------------------------------------
_hoodie_commit_time: BINARY GZIP DO:0 FPO:4 SZ:165/127/0.77 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 20200517140006, max: 20200517140006, num_nulls: 0]
_hoodie_commit_seqno: BINARY GZIP DO:0 FPO:169 SZ:10497/107513/10.24 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 20200517140006_0_42001, max: 20200517140006_0_46132, num_nulls: 0]
_hoodie_record_key: BINARY GZIP DO:0 FPO:10666 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
_hoodie_partition_path: BINARY GZIP DO:0 FPO:26866 SZ:100/62/0.62 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
_hoodie_file_name: BINARY GZIP DO:0 FPO:26966 SZ:448/419/0.94 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet, max: 86c74288-e541-4a25-addf-0c00e17ef6bf-0_0-60-27759_20200517140006.parquet, num_nulls: 0]
origin: BINARY GZIP DO:0 FPO:27414 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
partitionpath: BINARY GZIP DO:0 FPO:43614 SZ:100/62/0.62 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
image:
.origin: BINARY GZIP DO:0 FPO:43714 SZ:16200/342036/21.11 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_1.jpg, max: file:///home/centos/Downloads/mnist_jpg/trainingSet/trainingSet/0/img_9996.jpg, num_nulls: 0]
.height: INT32 GZIP DO:0 FPO:59914 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.width: INT32 GZIP DO:0 FPO:60025 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 28, max: 28, num_nulls: 0]
.nChannels: INT32 GZIP DO:0 FPO:60136 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
.mode: INT32 GZIP DO:0 FPO:60247 SZ:111/73/0.66 VC:4132 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0, max: 0, num_nulls: 0]
.data: BINARY GZIP DO:0 FPO:60358 SZ:1609341/3262447/2.03 VC:4132 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 0x00

(snip)
共有

Research about BigTop

参考

メモ

Trunkのビルド方法

How to build Bigtop-trunk を見ると、現在ではDocker上でビルドする方法が推奨されているようだ。 ただ、

1
# ./gradlew spark-pkg-ind

で実行されるとき、都度Mavenキャッシュのない状態から開始される。 これが待ち時間長いので工夫が必要。

なお、上記コマンドで実行されるタスクは、以下の通り。

packages.gradle:629

1
2
3
4
5
6
7
8
  task "$target-pkg-ind" (
description: "Invoking a native binary packaging for $target in Docker. Usage: \$ ./gradlew " +
"-POS=[centos-7|fedora-26|debian-9|ubuntu-16.04|opensuse-42.3] " +
"-Pprefix=[trunk|1.2.1|1.2.0|1.1.0|...] $target-pkg-ind " +
"-Pnexus=[true|false]",
group: PACKAGES_GROUP) doLast {
def _prefix = project.hasProperty("prefix") ? prefix : "trunk"
(snip)

bigtop-ci/build.sh がタスク内で実行されるスクリプトである。 この中でDockerが呼び出されてビルドが実行される。

これをいじれば、一応ビルド時に使ったDockerコンテナを残すことができそうではある。

-> reuse_images というブランチに、コンテナンをリユースする機能をつけた。

デプロイ周りを読み解いてみる

Deployment and Integration Testing を見る限り、 Dockerベースのデプロイ方法Puppetベースのデプロイ方法 があるように見える。

Dockerベースのデプロイを試す

Dockerベースのデプロイ方法 に従って、Dockerベースでデプロイしてみる。 ソースコードは20200427時点でのmasterを利用。

Ubuntu16でDocker、Java、Ruby環境を整え、以下を実行した。

1
$ sudo ./gradlew -Pconfig=config_ubuntu-16.04.yaml -Pnum_instances=1 docker-provisioner

なお、20200427時点で Dockerベースのデプロイ方法 ではコンフィグファイルが config_ubuntu_xenial.yaml となっていたが、 BIGTOP-2814 の際に変更されたのに合わせた。

プロビジョニング完了後、コンテナを確認し、接続。

1
2
3
4
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7311ae86181a bigtop/puppet:trunk-ubuntu-16.04 "/sbin/init" 4 minutes ago Up 4 minutes 20200426_151302_r30103_bigtop_1
$ sudo docker exec -it 20200426_151302_r30103_bigtop_1 bash

以下のように hdfs コマンドを実行できることがわかる。

1
2
3
4
5
6
7
8
9
root@7311ae86181a:/# hdfs dfs -ls /
Found 7 items
drwxr-xr-x - hdfs hadoop 0 2020-04-26 15:15 /apps
drwxrwxrwx - hdfs hadoop 0 2020-04-26 15:15 /benchmarks
drwxr-xr-x - hbase hbase 0 2020-04-26 15:15 /hbase
drwxr-xr-x - solr solr 0 2020-04-26 15:15 /solr
drwxrwxrwt - hdfs hadoop 0 2020-04-26 15:15 /tmp
drwxr-xr-x - hdfs hadoop 0 2020-04-26 15:15 /user
drwxr-xr-x - hdfs hadoop 0 2020-04-26 15:15 /var

なお、 -Pnum_instances=1 の値を3に変更すると、コンテナが3個立ち上がる。

1
$ sudo ./gradlew -Pconfig=config_ubuntu-16.04.yaml -Pnum_instances=3 docker-provisioner

試しに、dfsadmin を実行してみる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
root@53211353deec:/# sudo -u hdfs hdfs dfsadmin -report
Configured Capacity: 374316318720 (348.61 GB)
Present Capacity: 345302486224 (321.59 GB)
DFS Remaining: 345300606976 (321.59 GB)
DFS Used: 1879248 (1.79 MB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (3):

Name: 172.17.0.2:50010 (53211353deec.bigtop.apache.org)
Hostname: 53211353deec.bigtop.apache.org
Decommission Status : Normal
Configured Capacity: 124772106240 (116.20 GB)
DFS Used: 626416 (611.73 KB)
Non DFS Used: 9637679376 (8.98 GB)
DFS Remaining: 115100246016 (107.20 GB)
DFS Used%: 0.00%
DFS Remaining%: 92.25%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Sun Apr 26 15:33:12 UTC 2020

(snip)

また、 --stack オプションを利用し、デプロイするコンポーネントを指定できる。 ここではHadoopに加え、Sparkをプロビジョニングしてみる。

1
$ sudo ./gradlew -Pconfig=config_ubuntu-16.04.yaml -Pstack=hdfs,yarn,spark -Pnum_instances=1 docker-provisioner

コンテナに接続し、Sparkを動かす。

1
2
3
4
$ sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
56e1ff5670be bigtop/puppet:trunk-ubuntu-16.04 "/sbin/init" 3 minutes ago Up 3 minutes 20200426_155010_r21667_bigtop_1
$ sudo docker exec -it 20200426_155010_r21667_bigtop_1 bash
1
2
3
root@56e1ff5670be:/# spark-shell
scala> spark.sparkContext.master
res1: String = yarn

上記のように、マスタ=YARNで起動していることがわかる。 参考までに、 spark-env.sh は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
root@56e1ff5670be:/# cat /etc/spark/conf/spark-env.sh
export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark}
export SPARK_LOG_DIR=${SPARK_LOG_DIR:-/var/log/spark}
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=NONE"
export HADOOP_HOME=${HADOOP_HOME:-/usr/lib/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}
export HIVE_CONF_DIR=${HIVE_CONF_DIR:-/etc/hive/conf}

export STANDALONE_SPARK_MASTER_HOST=56e1ff5670be.bigtop.apache.org
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST
export SPARK_MASTER_URL=yarn
export SPARK_MASTER_WEBUI_PORT=8080

export SPARK_WORKER_DIR=${SPARK_WORKER_DIR:-/var/run/spark/work}
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8081

export SPARK_DIST_CLASSPATH=$(hadoop classpath)

なお、構築したクラスタを破棄する際には以下の通り。(公式ドキュメントの通り)

1
$ sudo ./gradlew docker-provisioner-destroy

仕様確認

上記のプロビジョナのタスクは、 build.gradle 内にある。

build.gradle:263

1
2
3
task "docker-provisioner"(type:Exec,

(snip)

上記の中で、 ./docker-hadoop.sh が用いられている。

build.gradle:296

1
2
3
4
5
def command = [
'./docker-hadoop.sh',
'-C', _config,
'--create', _num_instances,
]

--create オプションが用いられているので、 create 関数が呼ばれる。

provisioner/docker/docker-hadoop.sh:387

1
2
3
if [ "$READY_TO_LAUNCH" = true ]; then
create $NUM_INSTANCES
fi

create 内では、以下のように docker-compose が用いられている。

provisioner/docker/docker-hadoop.sh:78

1
docker-compose -p $PROVISION_ID up -d --scale bigtop=$1 --no-recreate

また、コンポーネントの内容に応じて、Puppetマニフェスト(正確には、hieraファイル)が生成されるようになっている。

provisioner/docker/docker-hadoop.sh:101

1
generate-config "$hadoop_head_node" "$repo" "$components"

また、最終的には、 provision 関数、さらに bigtop-puppet 関数を通じ、各コンテナ内でpuppetが実行されるようになっている。

1
2
3
4
5
6
bigtop-puppet() {
if docker exec $1 bash -c "puppet --version" | grep ^3 >/dev/null ; then
future="--parser future"
fi
docker exec $1 bash -c "puppet apply --detailed-exitcodes $future --modulepath=/bigtop-home/bigtop-deploy/puppet/modules:/etc/puppet/modules:/usr/share/puppet/modules /bigtop-home/bigtop-deploy/puppet/manifests"
}

Ambariのモジュールを確認する。

bigtop-deploy/puppet/modules/ambari/manifests/init.pp にAmbariデプロイ用のモジュールがある。

内容は短い。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class ambari {

class deploy ($roles) {
if ("ambari-server" in $roles) {
include ambari::server
}

if ("ambari-agent" in $roles) {
include ambari::agent
}
}

class server {
package { "ambari-server":
ensure => latest,
}

exec {
"mpack install":
command => "/bin/bash -c 'echo yes | /usr/sbin/ambari-server install-mpack --purge --verbose --mpack=/var/lib/ambari-server/resources/odpi-ambari-mpack-1.0.0.0-SNAPSHOT.tar.gz'",
require => [ Package["ambari-server"] ]
}

exec {
"server setup":
command => "/usr/sbin/ambari-server setup -j $(readlink -f /usr/bin/java | sed 's@jre/bin/java@@') -s",
require => [ Package["ambari-server"], Package["jdk"], Exec["mpack install"] ]
}

service { "ambari-server":
ensure => running,
require => [ Package["ambari-server"], Exec["server setup"] ],
hasrestart => true,
hasstatus => true,
}
}

class agent($server_host = "localhost") {
package { "ambari-agent":
ensure => latest,
}

file {
"/etc/ambari-agent/conf/ambari-agent.ini":
content => template('ambari/ambari-agent.ini'),
require => [Package["ambari-agent"]],
}

service { "ambari-agent":
ensure => running,
require => [ Package["ambari-agent"], File["/etc/ambari-agent/conf/ambari-agent.ini"] ],
hasrestart => true,
hasstatus => true,
}
}
}

なお、上記の通り、サーバとエージェントそれぞれのマニフェストが存在する。 ODPiのMpackをインストールしているのが特徴。

逆にいうと、それをインストールしないと使えるVersionが存在しない。(ベンダ製のVDFを読み込めば使えるが) また、ODPiのMpackをインストールした上でクラスタを構成しようとしたところ、 ODPiのレポジトリが見当たらなかった。 プライベートレポジトリを立てる必要があるのだろうか。

いったん、公式のAmbariをインストールした上で動作確認することにする。

ホットIssue

BIGTOP-3123 が1.5リリース向けのIssue

共有