Getting started of Ansible Vault

参考

公式サイト

ブログ

メモ

Ansible内で使用する変数を平文でファイルに記載し、プレイブック集に入れ込むのに不安を感じるときがある。 そのようなとき、Ansible Vaultを利用すると暗号化された状態で変数を管理できる。 Ansible Vaultで管理された変数をプレイブック内で通常の変数と同様に扱えるため見通しが良くなる。

公式サイトの Encrypting content with Ansible Vault が網羅的でわかりやすいのだが、 具体例が足りない気がしたので以下に一例を示しておく。 ここに挙げた以外の使い方は、公式サイトを参照されたし。

Ansible-Vaultを用いた機密情報の暗号化のTips も参考になった。

暗号化されたファイルの作成

secret.yml 内に変数を記述し、暗号化する。 後ほど暗号化されたファイルを変数定義ファイルとして読み込む。

ここでは以下のような内容とする。

secret.yml

1
2
hoge:
fuga: foo

ファイルを暗号化する。

1
$ ansible-vault create secret.yml

上記ファイルを作成する際、Vault用パスワードを聞かれるので適切なパスワードを入力すること。 あとでパスワードは利用する。

(参考)復号して平文化

1
$ ansible-vault decrypt secret.yml

(参考)暗号化されたファイルの編集

1
$ ansible-vault edit secret.yml

(参考)Vaultパスワードをファイルとして渡す

プロンプトで入力する代わりに、どこかプレイブック集の外などにVaultパスワードを保存し利用することもできる。 ここでは、 ~/.vault_password にパスワードを記載したファイルを準備したものとする。 編集する例を示す。

1
$ ansible-vault edit secret.yml --vault-password-file ~/.vault_password

プレイブック内で変数として利用

変数定義ファイルとして渡し、プレイブック内で変数として利用する。 以下のようなプレイブックを作る。

test.yml

1
2
3
4
5
6
7
- hosts: localhost
vars_files:
- secret.yml
tasks:
- name: debug
debug:
msg: "{{ hoge.fuga }}"

以下、簡単な説明。

  • 変数定義ファイルとして secret.yml を指定( vars_files の箇所)
  • 今回はdebugモジュールを利用し、変数内の値を表示することとする。
  • 暗号化された変数定義ファイル secret.yml に記載されたとおり、構造化された変数 hoge.fuga の値を利用する。
  • 結果として、foo という内容がSTDOUTに表示されるはず。

プレイブックを実行する。

1
$ ansible-playbook test.yml --ask-vault-pass
共有

Use static image on Hexo blog

参考

メモ

Hexo 記事に画像を貼り付ける を参考に、もともとCacooのリンクを使っていた箇所を すべてスタティックな画像を利用するようにした。

post_asset_folderを利用して、記事ごとの画像ディレクトリを利用することも考えたが、 画像はひとところに集まっていてほしいので、 Global-Asset-Folder を利用することにした。

なお、上記ブログでは

1
プロジェクトトップ/images/site

以下に画像を置き、

1
![猫](/images/site/cat.png)

のようにリンクを指定していた。

自身の環境では、rootを指定しているので

1
プロジェクトトップ/image

以下にディレクトリを置き、

1
![猫](memo-blog/images/cat.png)

と指定することにした。

共有

Delta Lake with Alluxio

参考

論文

Hadoop環境

Alluxioドキュメント

メモ

Delta Lake High-Performance ACID Table Storage over Cloud Object StoresLakehouse A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics の論文の通り、 Delta Lakeはキャッシュとの組み合わせが可能である。

今回は、ストレージにHDFS、キャッシュにAlluxioを使って動作確認する。

疑似分散で動作確認

実行環境の準備

ansible-bigdata あたりを参考に、Hadoopの疑似分散環境を構築する。 Bigtopベースの2.8.5とした。

併せて、同Ansibleプレイブック集などを用いて、Spark3.1.1のコミュニティ版を配備した。

併せて、Alluxioは2.5.0-2を利用。

Alluxioに関しては、以下のようにコンパイルしてパッケージ化して用いることできる。

1
2
3
4
5
6
$ sudo -u alluxio mkdir /usr/local/src/alluxio
$ sudo -u alluxio chown alluxio:alluxio /usr/local/src/alluxio
$ cd /usr/local/src/alluxio
$ sudo -u alluxio git clone git://github.com/alluxio/alluxio.git
$ sudu -u alluxio git checkout -b v2.5.0-2 refs/tags/v2.5.0-2
$ sudo -u alluxio mvn install -Phadoop-2 -Dhadoop.version=2.8.5 -DskipTests

コンフィグとしては以下を利用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ cat conf/alluxio-site.properties

(snip)

# Common properties
# alluxio.master.hostname=localhost
alluxio.master.hostname=localhost
# alluxio.master.mount.table.root.ufs=${alluxio.work.dir}/underFSStorage
# alluxio.master.mount.table.root.ufs=/tmp
alluxio.master.mount.table.root.ufs=hdfs://localhost:8020/alluxio

# Security properties
# alluxio.security.authorization.permission.enabled=true
# alluxio.security.authentication.type=SIMPLE
alluxio.master.security.impersonation.yarn.users=*
alluxio.master.security.impersonation.yarn.groups=*

# Worker properties
# alluxio.worker.ramdisk.size=1GB
alluxio.worker.ramdisk.size=1GB
# alluxio.worker.tieredstore.levels=1
# alluxio.worker.tieredstore.level0.alias=MEM
# alluxio.worker.tieredstore.level0.dirs.path=/mnt/ramdisk

# User properties
# alluxio.user.file.readtype.default=CACHE
# alluxio.user.file.writetype.default=MUST_CACHE

ポイントは以下の通り。

  • 疑似分散環境のHDFS利用
  • Alluxioの使用するディレクトリとして、/alluxioを利用
  • マスタはローカルホストで起動

フォーマット、起動

1
2
$ sudo -u alluxio ./bin/alluxio format
$ sudo -u alluxio ./bin/alluxio-start.sh local SudoMount

テストを実行

1
$ sudo -u alluxio ./bin/alluxio runTests

もしエラーが生じた場合は、例えばHDFSの/alluxioディレクトリに、 適切な権限設定、所有者設定がされているかどうかを確認すること。

Alluxioが起動すると以下のようなUIを確認できる(ポート19999)

AlluxioのUI

先程テストで書き込まれたファイル群が見られるはず。

Alluxioに書き込まれたテストファイル群I

ここでは、上記の通り、環境を整えた前提で以下説明する。

Sparkの起動確認

Examples: Use Alluxio as Input and Output を参考に、Alluxio経由での読み書きを試す。

予め、今回の動作確認で使用するテキストデータ(AlluxioのREADME)をアップロードしておく。

1
$ sudo -u alluxio /opt/alluxio/default//bin/alluxio fs copyFromLocal /opt/alluxio/default/LICENSE /LICENSE

予め、以下のような設定をspark-defaults.confに入れておく。 Alluxioのクライアントライブラリを用いられるように。

1
2
spark.driver.extraClassPath   /opt/alluxio/default/client/alluxio-2.5.0-2-client.jar
spark.executor.extraClassPath /opt/alluxio/default/client/alluxio-2.5.0-2-client.jar

Sparkが起動することを確認する。ここではDelta Lakeも含めて起動する。

1
2
3
4
$ /usr/local/spark/default/bin/spark-shell \
--packages io.delta:delta-core_2.12:0.8.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

起動したシェルでAlluxio上のREADMEファイルを読み取り、行数を確認する。

1
2
3
4
5
6
scala> val pathOnAlluxio = "alluxio://localhost:19998/LICENSE"

scala> val testDF = spark.read.text(pathOnAlluxio)

scala> testDF.count
res0: Long = 482

Delta Lakeを通じて書き込む動作確認

準備として、Alluxio上に、dobachiユーザ用のディレクトリを作成してみる。

1
2
3
$ sudo -u alluxio /opt/alluxio/default/bin/alluxio fs mkdir /users
$ sudo -u alluxio /opt/alluxio/default/bin/alluxio fs mkdir /users/dobachi
$ sudo -u alluxio /opt/alluxio/default/bin/alluxio fs chown dobachi:dobachi /users/dobachi

先程起動しておいたシェルで、Delta Lake形式のデータを書き込んで見る。

1
2
3
4
5
6
scala> val data = spark.range(0, 5)
data: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> val outputUrl = "alluxio://localhost:19998/users/dobachi/numbers"

scala> data.write.format("delta").save(outputUrl)

すると以下のようなエラーが生じた。

1
2
3
4
5
6
7
8
9
10
scala> data.write.format("delta").save(outputUrl)
21/01/05 22:47:50 ERROR HDFSLogStore: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
See https://docs.delta.io/latest/delta-storage.html " for details.

org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.alluxio.impl=null: No AbstractFileSystem configured for scheme: alluxio

(snip)

当たり前だが、Delta Lakeの下回りのストレージとして標準では、 Alluxioが対応しておらず、LogStoreからエラーが生じた、ということのようだ。

一瞬、LogStoreを新たに開発しないといけないか?と思ったものの、よく考えたら、HDFSHadoopFileSystemLogStoreから Alluxioのスキーマを認識させてアクセスできるようにすればよいだけでは?と思った。 そこで、Hadoopの設定でAlluxioFileSystemをalluxioスキーマ(ファイルシステムのスキーマ)に明示的に登録してみる。

/etc/hadoop/conf/core-site.xmlに以下を追記。

1
2
3
4
5
<property>
<name>fs.AbstractFileSystem.alluxio.impl</name>
<value>alluxio.hadoop.AlluxioFileSystem</value>
<description>The FileSystem for alluxio uris.</description>
</property>

再びSparkを立ち上げ、適当なデータを書き込み。

1
2
3
4
5
scala> val data = spark.range(0, 5)

scala> val outputUrl = "alluxio://localhost:19998/users/dobachi/numbers"

scala> data.write.format("delta").save(outputUrl)
Alluxio上にDelta Lakeで保存された様子

以上のように書き込みに成功した。

つづいて、テーブルとして読み出す。

1
2
3
4
5
6
7
8
9
10
11
scala> val df = spark.read.format("delta").load(outputUrl)
scala> df.show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 0|
+---+

テーブルへの追記。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> val addData = spark.range(5, 10)
scala> addData.write.format("delta").mode("append").save(outputUrl)
scala> df.show
+---+
| id|
+---+
| 3|
| 9|
| 6|
| 5|
| 1|
| 8|
| 4|
| 2|
| 7|
| 0|
+---+

また、追記書き込みをしたのでDeltaログが増えていることが分かる。 (3回ぶんのログがあるのは、↑には記載していないがミスったため)

Alluxio上にDelta LakeのDeltaログ

(補足)HDFS上のディレクトリ権限に関するエラー

Sparkでの処理実行時にYARNで実行していたところ、Executorにおける処理からAlluxioを呼び出すときにエラー。 yarnユーザでのアクセスとなり、HDFS上の /alluxio へのアクセス権がなかったと考えられる。

1
2
21/01/05 02:54:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hadoop-pseudo, executor 2): alluxio.exception.status.UnauthenticatedException: Channel authentication failed with code:UNAUTHENTICATED. Channel: GrpcChannelKey{ClientType=FileSystemMasterClient, ClientHostname=hadoop-pseudo.mshome.net, ServerAddress=GrpcServerAddress{HostName=localhost, SocketAddress=localhost:19998}, ChannelId=81f7d97f-8e32-4289-bcab-ea6008d5ffac}, AuthType: SIMPLE, Error: alluxio.exception.status.UnauthenticatedException: Plain authentication failed: Failed to authenticate client user="yarn" connecting to Alluxio server and impersonating as impersonationUser="vagrant" to access Alluxio file system. User "yarn" is not configured to allow any impersonation. Please read the guide to configure impersonation at https://docs.alluxio.io/os/user/2.4/en/operation/Security.html
at alluxio.exception.status.AlluxioStatusException.from(AlluxioStatusException.java:141)

Alluxio Security のドキュメント中に「Client-Side Hadoop Impersonation」を読むと、 「なりすまし」を許可する設定があるようだ。

そこで、yarnユーザが様々なユーザになりすませるような簡易設定を以下のように加えることにした。 実際の運用する際は、なりすましのスコープに注意したほうが良さそうだ。

conf/alluxio-site.properties

1
2
alluxio.master.security.impersonation.yarn.users=*
alluxio.master.security.impersonation.yarn.groups=*

ドキュメントではクライアントで alluxio.security.login.impersonation.username も指定するよう書かれていたが、 起動時にしてしなくてもアクセスできるようになった。 あとで実装を調べたほうが良さそうだ。

共有

Information about trends in 2020

参考

ガートナー

データ・ファブリック

DNAコンピューティング

アダプティブラーニング

デジタル・ツイン

IOWN

メモ

オープンになっている情報を中心に既存情報を軽くまとめる。 特にデータ処理、データ活用との関連を探る。

先進テクノロジのハイプ・サイクル:2020年

ガートナー、「先進テクノロジのハイプ・サイクル:2020年」を発表 の記事によると、 2020/8/19にハイプ・サイクルが発表された。

記事冒頭で触れられているとおり、COVID-19の影響は否定できない。 例ではヘルス・パスポート、ソーシャル・ディスタンシング・テクノロジが挙げられていた。 記載されているととおり、「過度な期待」として初登場する技術は少ないのだが、 それに該当するということで注目されている。

挙げられていたトレンドと所感

  • デジタル・ミー
    • つまりは、デジタル・ツイン
    • 多分にもれずデジタル・ツイン関連は取り上げられていた。
    • 過去になかったデータを使うようになるという側面はあるが、 それに合わせた新たなアーキテクチャが必要になるか。
      • 過去のブーム面との例
        • 大量の顧客行動情報、ログ:非構造データ処理に適した並列分散処理フレームワーク(MapReduceなど)
        • 大量の画像データ:深層学習
        • 大量のセンサーデータ:IoT、ストリーム処理
        • 大量のデジタル・ツイン・データ:?
  • コンポジット・アーキテクチャ
    • 一言で言うとデータ・ファブリック※1を基礎としたアーキテクチャ
      • ※1)つまりオンプレ、クラウドに点在するデータを最適な場所に配置し、いつでも利用可能にする
    • 重要なのは柔軟性を実現するという目論見
    • 柔軟性を求める組織に必要とされるアーキテクチャと考えることもできそうだ
  • フォーマティブAI
    • 一言で言うと状況に動的に対応するAI
    • 動的に適応していく、さらに突き詰めると「自律的に…」というニュアンスが生じそう
    • AIに関する倫理の議論と関連すると考えられる。
  • アルゴリズムによる信頼
    • 一言で言うと、責任ある権限に基づく信頼モデルから、アルゴリズムによる信頼への転換
  • シリコンの先へ

個人的にピックアップしたキーワードと所感

週刊ダイヤモンド 2020年12/12号

週刊ダイヤモンド 2020年12/12号 の中に、 「NTT帝国の逆襲」という特集記事がある。

技術というより、事業職が強いが念の為に(あえて)技術に関連するキーワードだけ述べる。 なお、この書籍で重要なのは、本来はどちらかというとビジネス戦略側面である。 詳細は当該書籍を参照されたし。

技術キーワード

  • IOWN構想
    • 光ファイバー、ネットワーク転送、音声等のNTT特許数は圧倒的である。
    • 電力効率100話い、伝送容量125倍、遅延を200分の1を目指す
    • 光電子融合型の光トランジスタ開発によりリード。
  • O-RAN
    • 通信規格を主導。オープン化で囲い込みを打破する。
  • V-RAN
    • 通信の仮想化

IOWN公開情報

あくまで公開された情報ではあるがリスト化しておく。

  • IOWN公式ウェブサイト
    • 概要や目指しているところを理解できるコンテンツが公開されている
    • 構成要素は以下の通り。
      • 「オールフォトニクス・ネットワーク」
        • 「大容量光伝送システム・デバイス」、「イジングマシン」、「光格子時計ネットワーク」
      • 「デジタルツインコンピューティング」
        • 「多様な産業やモノとヒトのデジタルツインを自在に掛け合わせて演算を行う」ことが特色
      • 「コグニティブ・ファウンデーション」
        • ICTリソースを全体最適に調和させ、必要な情報をNW内に流通させる。
共有

Reference of connector plugin

参考

メモ

2020/12時点で、Kafka Connectのプラグインの参考になるもの探す。

kafka-connect-syslog

kafka-connect-syslog が最も簡易に動作確認できそうだった。 ただ、 ConfluentのGitHub を見る限り、GitHub上には実装が公開されていないようだった。

動作確認

ここでは、ローカルに簡易実験用の1プロセスのKafkaを起動した前提とする。 起動方法は Kafkaのクイックスタート を参照。

kafka-connect-syslog のパッケージをダウンロードして /opt/connectors 以下に展開。

1
2
3
$ cd /opt/connectors
$ sudo unzip confluentinc-kafka-connect-syslog-1.3.2.zip
$ sudo chown -R kafka:kafka confluentinc-kafka-connect-syslog-1.3.2

というパッケージが展開される。

動作確認に使用するプロパティは以下。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cat etc/minimal-tcp.properties
#
# Copyright [2016 - 2019] Confluent Inc.
#

name=syslog-tcp
tasks.max=1
connector.class=io.confluent.connect.syslog.SyslogSourceConnector
syslog.port=5515
syslog.listener=TCP
confluent.license
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

また、Connectの設定には以下を追加する。

/opt/kafka_pseudo/default/config/connect-standalone.properties

1
2
3
(snip)

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

プラグインを置く場所として、 /opt/connectors を指定した。

/opt/kafka_pseudo/default/bin/connect-standalone.sh を利用して、 スタンドアローンモードでKafka Connectを起動。

1
2
$ sudo -u kafka /opt/kafka_pseudo/default/bin/connect-standalone.sh /opt/kafka_pseudo/default/config/connect-standalone.properties \
/opt/connectors/confluentinc-kafka-connect-syslog-1.3.2/etc/minimal-tcp.properties

起動したのを確認し、別の端末から適当なデータを送信。

1
$ echo "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - Your refrigerator is running" | nc -v -w 1 localhost 5515

Console Consumerを利用して書き込み状況を確認。

1
2
$ cd ${KAFKA_HOME}
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic syslog --from-beginning

先程書き込んだものが表示されるはずである。

kafka-connect-datagen

kafka-connect-datagen もあった。 kafka-connect-datagenのGitHub に実装も公開されているように見える。 ドキュメントのリンクから、当該レポジトリのREADMEにジャンプしたため、そのように判断。

以降、v0.4.0を対象として確認したものである。

概要

指定されたスキーマで、ダミーデータを生成するコネクタ。 avro-random-generator を内部的に利用している。

スキーマ指定はAvroのスキーマファイルを渡す方法もあるし、 組み込みのスキーマを指定する方法もある。 kafka-connect-datagenのサンプルスキーマ を参照。

また、Kafkaに出力する際のフォーマットは指定可能。 Kafka Connect自体の一般的なパラメータである、 value.converter を指定すれば良い。 例えば以下の通り。

1
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

実装状況

2020/12/21時点では本プロジェクトのバージョンは0.4.0であり、

1
2
3
4
5
<parent>
<groupId>io.confluent</groupId>
<artifactId>common-parent</artifactId>
<version>6.0.0</version>
</parent>

の通り、Confluent Platformのバージョンとしては、6系である。

Confluent Platform and Apache Kafka Compatibility によると、 Confluent Platform 6系のKafkaバージョンは2.6.Xである。

io.confluent.kafka.connect.datagen.DatagenConnector

io.confluent.kafka.connect.datagen.DatagenConnector クラスは、 org.apache.kafka.connect.source.SourceConnector を継承している。 割と素直な実装。

io.confluent.kafka.connect.datagen.DatagenConnector#start メソッドは特別なことはしておらず、 コンフィグをロードするだけ。

io.confluent.kafka.connect.datagen.DatagenConnector#taskConfigs メソッドも 特別なことはしていない。start時に受け取ったプロパティから taskConfigを生成して返す。

io.confluent.kafka.connect.datagen.DatagenConnector#stop メソッド および、 io.confluent.kafka.connect.datagen.DatagenConnector#config もほぼ何もしない。

タスクには io.confluent.kafka.connect.datagen.DatagenTask クラスを利用する。

io.confluent.kafka.connect.datagen.DatagenTask クラス

io.confluent.kafka.connect.datagen.DatagenTask#start メソッドが overrideされている。 以下、ポイントを確認する。

オフセット管理の仕組みあり。

io/confluent/kafka/connect/datagen/DatagenTask.java:133

1
2
3
4
5
6
7
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
if (offset != null) {
// The offset as it is stored contains our next state, so restore it as-is.
taskGeneration = ((Long) offset.get(TASK_GENERATION)).intValue();
count = ((Long) offset.get(CURRENT_ITERATION));
random.setSeed((Long) offset.get(RANDOM_SEED));
}

io.confluent.avro.random.generator.Generator のジェネレータ(のビルダ)を利用する。

io/confluent/kafka/connect/datagen/DatagenTask.java:141

1
2
3
Generator.Builder generatorBuilder = new Generator.Builder()
.random(random)
.generation(count);

クイックスタートの設定があれば、それに従ってスキーマを読み込む。

io/confluent/kafka/connect/datagen/DatagenTask.java:144

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
String quickstartName = config.getQuickstart();
if (quickstartName != "") {
try {
quickstart = Quickstart.valueOf(quickstartName.toUpperCase());
if (quickstart != null) {
schemaFilename = quickstart.getSchemaFilename();
schemaKeyField = schemaKeyField.equals("")
? quickstart.getSchemaKeyField() : schemaKeyField;
try {
generator = generatorBuilder
.schemaStream(getClass().getClassLoader().getResourceAsStream(schemaFilename))
.build();
} catch (IOException e) {
throw new ConnectException("Unable to read the '"
+ schemaFilename + "' schema file", e);
}
}
} catch (IllegalArgumentException e) {
log.warn("Quickstart '{}' not found: ", quickstartName, e);
}

指定されたクイックスタート名に従い、パッケージに含まれるスキーマファイルを読み込み、 それを適用しながらジェネレータを生成する。

なお、クイックスタートのたぐいはenumで定義されている。

io/confluent/kafka/connect/datagen/DatagenTask.java:75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected enum Quickstart {
CLICKSTREAM_CODES("clickstream_codes_schema.avro", "code"),
CLICKSTREAM("clickstream_schema.avro", "ip"),
CLICKSTREAM_USERS("clickstream_users_schema.avro", "user_id"),
ORDERS("orders_schema.avro", "orderid"),
RATINGS("ratings_schema.avro", "rating_id"),
USERS("users_schema.avro", "userid"),
USERS_("users_array_map_schema.avro", "userid"),
PAGEVIEWS("pageviews_schema.avro", "viewtime"),
STOCK_TRADES("stock_trades_schema.avro", "symbol"),
INVENTORY("inventory.avro", "id"),
PRODUCT("product.avro", "id");

private final String schemaFilename;
private final String keyName;

Quickstart(String schemaFilename, String keyName) {
this.schemaFilename = schemaFilename;
this.keyName = keyName;
}

public String getSchemaFilename() {
return schemaFilename;
}

public String getSchemaKeyField() {
return keyName;
}
}

クイックスタートが設定されておらず、スキーマの文字列が与えられた場合は、 それを用いてジェネレータが生成される。

io/confluent/kafka/connect/datagen/DatagenTask.java:164

1
2
} else if (schemaString != "") {
generator = generatorBuilder.schemaString(schemaString).build();

それ以外の場合、つまりスキーマ定義の書かれたファイルを指定する場合は、 以下の通り。

io/confluent/kafka/connect/datagen/DatagenTask.java:166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
String err = "Unable to read the '" + schemaFilename + "' schema file";
try {
generator = generatorBuilder.schemaStream(new FileInputStream(schemaFilename)).build();
} catch (FileNotFoundException e) {
// also look in jars on the classpath
try {
generator = generatorBuilder
.schemaStream(DatagenTask.class.getClassLoader().getResourceAsStream(schemaFilename))
.build();
} catch (IOException inner) {
throw new ConnectException(err, e);
}
} catch (IOException e) {
throw new ConnectException(err, e);
}
}

最後のAvroに関連する情報を生成して終了。

io/confluent/kafka/connect/datagen/DatagenTask.java:184

1
2
3
avroSchema = generator.schema();
avroData = new AvroData(1);
ksqlSchema = avroData.toConnectSchema(avroSchema);

io.confluent.kafka.connect.datagen.DatagenTask#poll メソッドもoverrideされている。 以下、ポイントを記載する。

インターバル機能あり。

io/confluent/kafka/connect/datagen/DatagenTask.java:192

1
2
3
4
5
6
7
8
if (maxInterval > 0) {
try {
Thread.sleep((long) (maxInterval * Math.random()));
} catch (InterruptedException e) {
Thread.interrupted();
return null;
}
}

ジェネレータを利用し、オブジェクトを生成する。

io/confluent/kafka/connect/datagen/DatagenTask.java:201

1
2
3
4
5
6
7
8
final Object generatedObject = generator.generate();
if (!(generatedObject instanceof GenericRecord)) {
throw new RuntimeException(String.format(
"Expected Avro Random Generator to return instance of GenericRecord, found %s instead",
generatedObject.getClass().getName()
));
}
final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;

生成されたオブジェクトから、スキーマ定義に基づいてフィールドの値を取り出し、 バリューのArrayListを生成する。

io/confluent/kafka/connect/datagen/DatagenTask.java:210

1
2
3
4
5
6
7
8
9
10
11
12
final List<Object> genericRowValues = new ArrayList<>();
for (org.apache.avro.Schema.Field field : avroSchema.getFields()) {
final Object value = randomAvroMessage.get(field.name());
if (value instanceof Record) {
final Record record = (Record) value;
final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value();
Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue);
genericRowValues.add(optionValue);
} else {
genericRowValues.add(value);
}
}

キーも同様に取り出し、Kafka Connectの形式に変換する。

io/confluent/kafka/connect/datagen/DatagenTask.java:224

1
2
3
4
5
6
7
SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null);
if (!schemaKeyField.isEmpty()) {
key = avroData.toConnectData(
randomAvroMessage.getSchema().getField(schemaKeyField).schema(),
randomAvroMessage.get(schemaKeyField)
);
}

先程ArrayListとして取り出したバリューもKafka Connect形式に変換する。

io/confluent/kafka/connect/datagen/DatagenTask.java:233

1
2
final org.apache.kafka.connect.data.Schema messageSchema = avroData.toConnectSchema(avroSchema);
final Object messageValue = avroData.toConnectData(avroSchema, randomAvroMessage).value();

イテレートのたびに、メタデータを更新する。

io/confluent/kafka/connect/datagen/DatagenTask.java:246

1
2
3
4
5
6
7
8
9
// The source offsets will be the values that the next task lifetime will restore from
// Essentially, the "next" state of the connector after this loop completes
Map<String, Object> sourceOffset = new HashMap<>();
// The next lifetime will be a member of the next generation.
sourceOffset.put(TASK_GENERATION, taskGeneration + 1);
// We will have produced this record
sourceOffset.put(CURRENT_ITERATION, count + 1);
// This is the seed that we just re-seeded for our own next iteration.
sourceOffset.put(RANDOM_SEED, seed);

最後に、SourceRecordのリスト形式に変換し、 レコードとして生成して戻り値として返す。

io/confluent/kafka/connect/datagen/DatagenTask.java:261

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final List<SourceRecord> records = new ArrayList<>();
SourceRecord record = new SourceRecord(
sourcePartition,
sourceOffset,
topic,
null,
key.schema(),
key.value(),
messageSchema,
messageValue,
null,
headers
);
records.add(record);
count += records.size();
return records;

つづいて、 io.confluent.kafka.connect.datagen.DatagenTask#stop メソッドだが、 これは特に何もしない。

io.confluent.kafka.connect.datagen.DatagenTask#getOptionalSchema という オプショナルなフィールドに関するスキーマを取得するためのヘルパーメソッドもある。 io.confluent.kafka.connect.datagen.DatagenTask#getOptionalValue メソッドもある。

動作確認

confluentinc-kafka-connect-datagen-0.4.0.zip をダウンロードし、 /opt/connectors以下に展開したものとする。

今回は以下の設定ファイルを参考に、データ生成してみる。 なお、イテレーション回数は適度に修正して用いることを推奨する。

confluentinc-kafka-connect-datagen-0.4.0/etc/connector_users.config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"name": "datagen-users",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}

上記はconfluentコマンド用で利用する際のコンフィグファイルである。 そこで以下のようなKafka Connect用の設定ファイルを生成する。

/opt/connectors/confluentinc-kafka-connect-datagen-0.4.0/etc/connector_users.properties

1
2
3
4
5
6
7
8
9
10
name=users
connector.class=io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic=users
quickstart=users
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
max.interval=1000
iterations=10
tasks.max=1

スタンドアローンモードでKafka Connectを起動する。

1
2
3
$ sudo -u kafka /opt/kafka_pseudo/default/bin/connect-standalone.sh \
/opt/kafka_pseudo/default/config/connect-standalone.properties \
/opt/connectors/confluentinc-kafka-connect-datagen-0.4.0/etc/connector_users.properties

停止した後、結果を確認する。 トピックが作られたことがわかる。

1
2
3
4
5
$ /opt/kafka_pseudo/default/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
_confluent-command
syslog
users

データを確認する。

1
2
3
4
5
6
7
8
9
10
11
$ /opt/kafka_pseudo/default/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic users --from-beginning
{"registertime":1501850210149,"userid":"User_8","regionid":"Region_3","gender":"FEMALE"}
{"registertime":1516539876299,"userid":"User_2","regionid":"Region_7","gender":"OTHER"}
{"registertime":1505292095234,"userid":"User_4","regionid":"Region_1","gender":"OTHER"}
{"registertime":1502118362741,"userid":"User_3","regionid":"Region_1","gender":"FEMALE"}
{"registertime":1503193759324,"userid":"User_9","regionid":"Region_5","gender":"MALE"}
{"registertime":1507693509191,"userid":"User_1","regionid":"Region_8","gender":"OTHER"}
{"registertime":1497764008309,"userid":"User_1","regionid":"Region_6","gender":"OTHER"}
{"registertime":1514606256206,"userid":"User_1","regionid":"Region_3","gender":"MALE"}
{"registertime":1492595638722,"userid":"User_2","regionid":"Region_6","gender":"MALE"}
{"registertime":1500602208014,"userid":"User_3","regionid":"Region_1","gender":"MALE"}

ダミーデータが生成されていることが確認できた。

(WIP)

共有

Create projects includes sbt launcher

参考

メモ

プロジェクト内に、sbt自体を含めてクローンしただけでビルドできるようにしたい、という動機。 現時点ではCoursierプロジェクトが汎用性※が高く便利であると、個人的には感じた。

※ここでは、多くの環境で実行可能で、様々なツールを一度にセットアップ可能という意味。

Coursierプロジェクト

Coursier を使って最速でScalaの開発環境を整える のブログに記載されているとおり、 Scala開発環境を簡単に整えられる。

ひとまずDocker内で試す。

1
2
3
$ sudo docker pull ubuntu:18.04
$ sudo docker run -rm -it -d --name ubu ubuntu:18.04
$ sudo docker exec -it ubu /bin/bash

Dockerコンテナ内でインストール。

1
2
3
4
5
# apt update
# apt install -y curl
# curl -fLo cs https://git.io/coursier-cli-linux &&
chmod +x cs &&
./cs

結果として以下がインストールされた様子。

1
2
3
4
5
6
7
8
9
Checking if the standard Scala applications are installed
Installed ammonite
Installed cs
Installed coursier
Installed scala
Installed scalac
Installed sbt
Installed sbtn
Installed scalafmt

~/.profileに環境変数等がまとまっているので有効化する。

1
# source ~/.profile

これでインストールされたコマンドが使用できるようになった。

なお、参考までに上記でダウンロードしたcsコマンドは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
$ ./cs setup --help
Command: setup
Usage: cs setup
--jvm <string?>
--jvm-dir <string?>
--system-jvm <bool?>
--local-only <bool>
--update <bool>
--jvm-index <string?>
--graalvm-home <string?>
--graalvm-option <string*>
--graalvm-default-version <string?>
--install-dir | --dir <string?>
--install-platform <string?>
Platform for prebuilt binaries (e.g. "x86_64-pc-linux", "x86_64-apple-darwin", "x86_64-pc-win32")
--install-prefer-prebuilt <bool>
--only-prebuilt <bool>
Require prebuilt artifacts for native applications, don't try to build native executable ourselves
--repository | -r <maven|sonatype:$repo|ivy2local|bintray:$org/$repo|bintray-ivy:$org/$repo|typesafe:ivy-$repo|typesafe:$repo|sbt-plugin:$repo|ivy:$pattern>
Repository - for multiple repositories, separate with comma and/or add this option multiple times (e.g. -r central,ivy2local -r sonatype:snapshots, or equivalently -r central,ivy2local,sonatype:snapshots)
--default-repositories <bool>
--proguarded <bool?>
--channel <org:name>
Channel for apps
--default-channels <bool>
Add default channels
--contrib <bool>
Add contrib channel
--file-channels <bool>
Add channels read from the configuration directory
--cache <string?>
Cache directory (defaults to environment variable COURSIER_CACHE, or ~/.cache/coursier/v1 on Linux and ~/Library/Caches/Coursier/v1 on Mac)
--mode | -m <offline|update-changing|update|missing|force>
Download mode (default: missing, that is fetch things missing from cache)
--ttl | -l <duration>
TTL duration (e.g. "24 hours")
--parallel | -n <int>
Maximum number of parallel downloads (default: 6)
--checksum <checksum1,checksum2,...>
Checksum types to check - end with none to allow for no checksum validation if no checksum is available, example: SHA-256,SHA-1,none
--retry-count <int>
Retry limit for Checksum error when fetching a file
--cache-file-artifacts | --cfa <bool>
Flag that specifies if a local artifact should be cached.
--follow-http-to-https-redirect <bool>
Whether to follow http to https redirections
--credentials <host(realm) user:pass|host user:pass>
Credentials to be used when fetching metadata or artifacts. Specify multiple times to pass multiple credentials. Alternatively, use the COURSIER_CREDENTIALS environment variable
--credential-file <string*>
Path to credential files to read credentials from
--use-env-credentials <bool>
Whether to read credentials from COURSIER_CREDENTIALS (env) or coursier.credentials (Java property), along those passed with --credentials and --credential-file
--quiet | -q <counter>
Quiet output
--verbose | -v <counter>
Increase verbosity (specify several times to increase more)
--progress | -P <bool>
Force display of progress bars
--env <bool>
--user-home <string?>
--banner <bool?>
--yes | -y <bool?>
--try-revert <bool>
--apps <string*>

(補足)sbt-launcher-packageプロジェクト

sbt-launcher-package をビルドすることでSBTランチャーを提供できそうだった。

(補足)launcherプロジェクト

https://github.com/sbt/launcher/blob/1.x/.java-version にも記載されているとおり、 JDK7系にしか公式に対応していなかった。

共有

Read and write data on Delta Lake streaming manner

参考

メモ

準備(SBT)

今回はSBTを利用してSalaを用いたSparkアプリケーションで試すことにする。 sbt を参考にSBTをセットアップする。(基本的には対象バージョンをダウンロードして置くだけ)

1
2
3
$ mkdir -p ~/Sources
$ cd ~/Sources
$ sbt new dobachi/spark-sbt.g8

ここでは、 [dobachi's spark-sbt.g8] を利用してSpark3.0.1の雛形を作った。 対話的に色々聞かれるので、ほぼデフォルトで生成。

ここでは、 StructuredNetworkWordCount を参考に、Word Countした結果をDelta Lakeに書き込むことにする。

なお、以降の例で示しているアプリケーションは、すべて dobachi's StructuredStreamingDeltaLakeExample に含まれている。

簡単なビルドと実行の例

以下のように予めncコマンドを起動しておく。

1
$ nc -lk 9999

続いて、別のターミナルでアプリケーションをビルドして実行。(ncコマンドを起動したターミナルは一旦保留)

1
2
3
$ cd structured_streaming_deltalake
$ sbt assembly
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar localhost 9999

先程起動したncコマンドの引数に合わせ、9999ポートに接続する。 Structured Streamingが起動したら、ncコマンド側で、適当な文字列(単語)を入力する(以下は例)

1
2
3
hoge hoge fuga
fuga fuga
fuga fuga hoge

もうひとつターミナルを開き、spark-shellを起動する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

アプリケーションで出力先としたディレクトリ内のDelta Lakeテーブルを参照すると、以下のようにテーブルが更新されていることがわかる。 ひとまずncコマンドでいろいろな単語を入力して挙動を試すと良い。

1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("delta").load("/tmp/delta/wordcount")
df: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

scala> df.show
+-----+-----+
|value|count|
+-----+-----+
| fuga| 5|
| hoge| 3|
+-----+-----+

(wip)

書き込み

Delta table as a sink の通り、Delta LakeテーブルをSink(つまり書き込み先)として利用する際には、 AppendモードとCompleteモードのそれぞれが使える。

Appendモード

例のごとく、ncコマンドを起動し、アプリケーションを実行。(予めsbt assemblyしておく) もうひとつターミナルを開き、spark-shellでDelta Lakeテーブルの中身を確認する。

ターミナル1

1
$ nc -lk 9999

ターミナル2

1
2
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeAppendExample
target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar localhost 9999

ターミナル3

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

ターミナル1のncで適当な単語列を入力する。

1
hoge hoge

ターミナル3のspark-shellでDelta Lakeテーブルの中身を確認する。

1
2
3
4
5
6
7
8
9
scala> val df = spark.read.format("delta").load("/tmp/delta/wordcount_per_line")
df: org.apache.spark.sql.DataFrame = [unixtime: bigint, count: bigint]

scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968491821| 2|
+-------------+-----+

何度かncコマンド経由でテキストを流し込むと、以下のように行が加わるkとがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968491821| 2|
+-------------+-----+


scala> df.show
+-------------+-----+
| unixtime|count|
+-------------+-----+
|1605968518584| 2|
|1605968522461| 3|
|1605968491821| 2|
+-------------+-----+

Completeモード

Completeモードは、上記のWordCountの例がそのまま例になっている。 com.example.StructuredStreamingDeltaLakeExample 参照。

読み出し

Delta table as a stream source の通り、既存のDelta Lakeテーブルからストリームデータを取り出す。

準備

ひとまずDelta Lakeのテーブルを作る。 ここではspark-shellで適当に作成する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").save("/tmp/delta/users")

上記のDataFrameのスキーマは以下の通り。

1
2
3
4
5
6
scala> df.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)

ストリームで読み込んでみる

もうひとつターミナルを立ち上げる。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users

先程作成しておいたテーブルの中身が読み込まれる。

1
2
3
4
5
6
7
8
9
10
11
-------------------------------------------
Batch: 0
-------------------------------------------
20/11/22 00:29:41 INFO CodeGenerator: Code generated in 3.212 ms
20/11/22 00:29:41 INFO CodeGenerator: Code generated in 4.1238 ms
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

先程の、Delta Lakeテーブルを作成したターミナルのspark-shellで、 追加データを作り、Delta Lakeテーブルに挿入する。

まず、追加データ用のスキーマを持つcase classを作成する。

1
scala> case class User(name: String, favorite_color: String, favorite_numbers: Array[Int])

つづいて、作られたcase classを利用して、追加用のDataFrameを作る。 (SeqからDataFrameを生成する)

1
scala> val addRecord = Seq(User("Bob", "yellow", Array(1,2))).toDF

appendモードで既存のDelta Lakeテーブルに追加する。

1
scala> addRecord.write.format("delta").mode("append").save("/tmp/delta/users")

ここで、ストリーム処理を動かしている方のターミナルを見ると、 以下のように追記された内容が表示されていることがわかる。

1
2
3
4
5
6
7
8
-------------------------------------------
Batch: 1
-------------------------------------------
+----+--------------+----------------+
|name|favorite_color|favorite_numbers|
+----+--------------+----------------+
| Bob| yellow| [1, 2]|
+----+--------------+----------------+

読み込み時の maxFilesPerTrigger オプション

ストリーム読み込み時には、 maxFilesPerTrigger オプションを指定できる。 このオプションは、以下のパラメータとして利用される。

org/apache/spark/sql/delta/DeltaOptions.scala:98

1
2
3
4
5
6
val maxFilesPerTrigger = options.get(MAX_FILES_PER_TRIGGER_OPTION).map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw DeltaErrors.illegalDeltaOptionException(
MAX_FILES_PER_TRIGGER_OPTION, str, "must be a positive integer")
}
}

このパラメータは org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#toReadLimit メソッド内で用いられる。

org/apache/spark/sql/delta/sources/DeltaSource.scala:350

1
2
3
4
5
6
7
8
9
10
11
12
13
  def toReadLimit: ReadLimit = {
if (options.maxFilesPerTrigger.isDefined && options.maxBytesPerTrigger.isDefined) {
CompositeLimit(
ReadMaxBytes(options.maxBytesPerTrigger.get),
ReadLimit.maxFiles(options.maxFilesPerTrigger.get).asInstanceOf[ReadMaxFiles])
} else if (options.maxBytesPerTrigger.isDefined) {
ReadMaxBytes(options.maxBytesPerTrigger.get)
} else {
ReadLimit.maxFiles(
options.maxFilesPerTrigger.getOrElse(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION_DEFAULT))
}
}
}

このメソッドの呼び出し階層は以下の通り。

1
2
3
AdmissionLimits in DeltaSource.toReadLimit()  (org.apache.spark.sql.delta.sources)
DeltaSource.getDefaultReadLimit() (org.apache.spark.sql.delta.sources)
MicroBatchExecution

MicroBatchExecutionは、 org.apache.spark.sql.execution.streaming.MicroBatchExecution クラスであり、 この仕組みを通じてSparkのStructured Streamingの持つレートコントロールの仕組みに設定値をベースにした値が渡される。

読み込み時の maxBytesPerTrigger オプション

原則的には、 maxFilesPerTrigger と同じようなもの。 指定された値を用いてcase classを定義し、最大バイトサイズの目安を保持する。

追記ではなく更新したらどうなるか?

Delta Lakeでoverwriteモードで書き込む際に、パーティションカラムに対して条件を指定できることを利用し、 部分更新することにする。

まず name をパーティションカラムとしたDataFrameを作ることにする。

1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").partitionBy("name").save("/tmp/delta/partitioned_users")

上記例と同様に、ストリームでデータを読み込むため、もうひとつターミナルを起動し、以下を実行。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/partitioned_users

spark-shellを起動したターミナルで、更新用のデータを準備。

1
scala> val updateRecord = Seq(User("Ben", "green", Array(1,2))).toDF

条件付きのoverwriteモードで既存のDelta Lakeテーブルの一部レコードを更新する。

1
2
3
4
5
6
scala> updateRecord
.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "name == 'Ben'")
.save("/tmp/delta/partitioned_users")

上記を実行したときに、ストリーム処理を実行しているターミナル側で、 以下のエラーが生じ、プロセスが停止した。

1
2
3
4
5
6
20/11/23 22:10:55 ERROR MicroBatchExecution: Query [id = 13cf0aa0-116c-4c95-aea0-3e6f779e02c8, runId = 6f71a4bb-c067-4f6d-aa17-6bf04eea3520] terminated
with error
java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273)
at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)

(snip)

後ほど検証するが、Delta Lakeのストリーム処理では、既存レコードのupdate、deleteなどの既存レコードの更新となる処理は基本的にサポートされていない。 オプションを指定することで、更新を無視することができる。

読み込み時の ignoreDeletes オプション

基本的には、上記の通り、元テーブルに変化があった場合はエラーを生じるようになっている。 Ignore updates and deletes に記載の通り、update、merge into、delete、overwriteがエラーの対象である。

ひとまずdeleteでエラーになるのを避けるため、 ignoreDeletes で無視するオプションを指定できる。

動作確認する。

spark-shellを立ち上げ、検証用のテーブルを作成する。

1
$ /opt/spark/default/bin/spark-shell --packages io.delta:delta-core_2.12:0.7.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
1
2
3
4
5
6
7
8
9
10
scala> val df = spark.read.format("parquet").load("/opt/spark/default/examples/src/main/resources/users.parquet")
scala> df.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

scala> df.write.format("delta").save("/tmp/delta/users_for_delete")

別のターミナルを立ち上げ、当該テーブルをストリームで読み込む。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeReadExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users_for_delete

spark-shellでテーブルとして読み込む。

1
2
scala> import io.delta.tables._
scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete")
1
scala> deltaTable.delete("name == 'Ben'")

上記の通り、テーブルを更新(削除)した結果、ストリーム処理が以下のエラーを出力して終了した。

1
2
3
4
5
6
20/11/23 22:26:21 ERROR MicroBatchExecution: Query [id = 660b82a9-ca40-4b91-8032-d75807b11c18, runId = 7e46e9a7-1ba2-48b2-b264-53c254cfa6fc] terminated
with error
java.lang.UnsupportedOperationException: Detected a data update in the source table. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory. at org.apache.spark.sql.delta.sources.DeltaSource.verifyStreamHygieneAndFilterAddFiles(DeltaSource.scala:273)
at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$getChanges$1(DeltaSource.scala:117)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
(snip)

そこで、同じことをもう一度 ignoreDelte オプションを利用して実行してみる。

1
scala> df.write.format("delta").partitionBy("name").save("/tmp/delta/users_for_delete2")

なお、 ignoreDelete オプションはパーティションカラムに指定されたカラムに対し、 where句を利用して削除するものに対して有効である。 (パーティションカラムに指定されていないカラムをwhere句に使用してdeleteしたところ、エラーが生じた)

別のターミナルを立ち上げ、当該テーブルをストリームで読み込む。

1
$ /opt/spark/default/bin/spark-submit --class com.example.StructuredStreamingDeltaLakeDeleteExample target/scala-2.12/structured_streaming_deltalake-assembly-0.0.1.jar /tmp/delta/users_for_delete2

spark-shellでテーブルとして読み込む。

1
2
scala> import io.delta.tables._
scala> val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/users_for_delete2")
1
scala> deltaTable.delete("name == 'Ben'")

このとき、特にエラーなくストリーム処理が続いた。

またテーブルを見たところ、

1
2
3
4
5
6
scala> deltaTable.toDF.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
+------+--------------+----------------+

削除対象のレコードが消えていることが確かめられる。

読み込み時の ignoreChanges オプション

ignoreChanges オプションは、いったん概ね ignoreDelete オプションと同様だと思えば良い。 ただしdeleteに限らない。(deleteも含む) 細かな点は後で調査する。

org.apache.spark.sql.delta.sources.DeltaSource クラスについて

Delta Lake用のストリームData Sourceである。 親クラスは以下の通り。

1
2
3
DeltaSource (org.apache.spark.sql.delta.sources)
Source (org.apache.spark.sql.execution.streaming)
SparkDataStream (org.apache.spark.sql.connector.read.streaming)

気になったメンバ変数は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/** A check on the source table that disallows deletes on the source data. */
private val ignoreChanges = options.ignoreChanges || ignoreFileDeletion
--> レコードの変更(ファイル変更)を無視するかどうか

/** A check on the source table that disallows commits that only include deletes to the data. */
private val ignoreDeletes = options.ignoreDeletes || ignoreFileDeletion || ignoreChanges
--> レコードの削除(ファイル削除)を無視するかどうか

private val excludeRegex: Option[Regex] = options.excludeRegex
--> ADDファイルのリストする際、無視するファイルを指定する

override val schema: StructType = deltaLog.snapshot.metadata.schema
--> 対象テーブルのスキーマ

(snip)

private val tableId = deltaLog.snapshot.metadata.id
--> テーブルのID

private var previousOffset: DeltaSourceOffset = null
--> バッチの取得時のオフセットを保持する

// A metadata snapshot when starting the query.
private var initialState: DeltaSourceSnapshot = null
private var initialStateVersion: Long = -1L
--> org.apache.spark.sql.delta.sources.DeltaSource#getBatch などから呼ばれる、スナップショットを保持するための変数

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッド

ストリーム処理のバッチ取得メソッド org.apache.spark.sql.delta.sources.DeltaSource#getBatch などから呼ばれるメソッド。 スナップショットなどの情報から、開始時点のバージョンから現在までの変化を返す。

org.apache.spark.sql.delta.sources.DeltaSource#getSnapshotAt メソッド

上記の org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドなどで利用される、 指定されたバージョンでのスナップショットを返す。

org.apache.spark.sql.delta.SnapshotManagement#getSnapshotAt を内部的に利用する。

org.apache.spark.sql.delta.sources.DeltaSource#getChangesWithRateLimit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドに比べ、 レート制限を考慮した上での変化を返す。

org.apache.spark.sql.delta.sources.DeltaSource#getStartingOffset メソッド

org.apache.spark.sql.delta.sources.DeltaSource#latestOffset メソッド内で呼ばれる。 ストリーム処理でマイクロバッチを構成する際、対象としているデータのオフセットを返すのだが、 それらのうち、初回の場合に呼ばれる。★要確認

当該メソッド内で、「開始時点か、変化をキャプチャして処理しているのか」、「コミット内のオフセット位置」などの確認が行われ、 org.apache.spark.sql.delta.sources.DeltaSourceOffset にラップした上でメタデータが返される。

org.apache.spark.sql.delta.sources.DeltaSource#latestOffset メソッド

org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl#latestOffset メソッドをオーバーライドしている。 今回のマイクロバッチで対象となるデータのうち、最後のデータを示すオフセットを返す。

org.apache.spark.sql.delta.sources.DeltaSource#verifyStreamHygieneAndFilterAddFiles メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChanges メソッドで呼ばれる。 getChanges メソッドは指定されたバージョン以降の「変更」を取得するものである。 その中において、verifyStreamHygieneAndFilterAddFiles メソッドは関係あるアクションだけ取り出すために用いられる。

org.apache.spark.sql.delta.sources.DeltaSource#getBatch メソッド

org.apache.spark.sql.execution.streaming.Source#getBatch メソッドをオーバライドしたもの。 与えられたオフセット(開始、終了)をもとに、そのタイミングで処理すべきバッチとなるDataFrameを返す。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits クラスについて

レート管理のためのクラス。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#toReadLimit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getDefaultReadLimit メソッド内で呼ばれる。 オプションで与えられたレート制限のヒント値を、 org.apache.spark.sql.connector.read.streaming.ReadLimit に変換する。

org.apache.spark.sql.delta.sources.DeltaSource.AdmissionLimits#admit メソッド

org.apache.spark.sql.delta.sources.DeltaSource#getChangesWithRateLimit メソッド内で呼ばれる。 上記メソッドは、レート制限を考慮して変化に関する情報のイテレータを返す。 admit メソッドはその中において、レートリミットに達しているかどうかを判断するために用いられる。

共有

BigDL memory usage

参考

メモ

前提

Download によると、Sparkは2.4系まで対応しているようだ。 Issue-3070 によると、Spark3対応も進んでいるようだ。

Spark起動

Run で記載の通り、以下のように起動した。

1
2
3
$ export SPARK_HOME=/usr/local/spark/default
$ export BIGDL_HOME=/usr/local/bigdl/default
$ ${BIGDL_HOME}/bin/spark-shell-with-bigdl.sh --master local[*]
1
2
scala> import com.intel.analytics.bigdl.utils.Engine
scala> Engine.init

とりあえず初期化までは動いた。

サンプルを動かす

Examples に記載のサンプルを動かす。 LeNet Train にあるLeNetの例が良さそう。

MNISTの画像をダウンロードして、展開した。

1
2
3
4
5
6
7
$ mkdir ~/tmp
$ cd ~/tmp
$ wget http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
$ wget http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
$ gunzip *.gz

展開したディレクトリを指定しながら、LeNetの学習を実行。

1
2
3
4
5
6
7
8
$ spark-submit \
--master local[6] \
--driver-class-path ${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
--class com.intel.analytics.bigdl.models.lenet.Train \
${BIGDL_HOME}/lib/bigdl-SPARK_2.4-0.11.1-jar-with-dependencies.jar \
-f $HOME/tmp \
-b 12 \
--checkpoint ./model

実行中の様子は以下の通り。

ヒストリのキャプチャ

今回はローカルモードで実行したが、入力された学習データと同様のサイズのキャッシュがメモリ上に展開されていることがわかる。

サンプルの中身

2020/11/15時点のmasterブランチを確認する。

Trainクラス

上記サンプルで実行されているTrainを見る。 モデルを定義している箇所は以下の通り。

com/intel/analytics/bigdl/models/lenet/Train.scala:48

1
2
3
4
5
6
7
8
9
10
11
12
val model = if (param.modelSnapshot.isDefined) {
Module.load[Float](param.modelSnapshot.get)
} else {
if (param.graphModel) {
LeNet5.graph(classNum = 10)
} else {
Engine.getEngineType() match {
case MklBlas => LeNet5(10)
case MklDnn => LeNet5.dnnGraph(param.batchSize / Engine.nodeNumber(), 10)
}
}
}

Modelインスタンスは以下の通り、Optimizerオブジェクトに渡され、 データセットに合わせたOptimizerが返される。 (例:データセットが分散データセットかどうか、など)

com/intel/analytics/bigdl/models/lenet/Train.scala:83

1
2
3
4
val optimizer = Optimizer(
model = model,
dataset = trainSet,
criterion = criterion)

参考までに、Optimizerの種類と判定の処理は以下の通り。

com/intel/analytics/bigdl/optim/Optimizer.scala:688

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dataset match {
case d: DistributedDataSet[_] =>
Engine.getOptimizerVersion() match {
case OptimizerV1 =>
new DistriOptimizer[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case OptimizerV2 =>
new DistriOptimizerV2[T](
_model = model,
_dataset = d.toDistributed().asInstanceOf[DistributedDataSet[MiniBatch[T]]],
_criterion = criterion
).asInstanceOf[Optimizer[T, D]]
}
case d: LocalDataSet[_] =>
new LocalOptimizer[T](
model = model,
dataset = d.toLocal().asInstanceOf[LocalDataSet[MiniBatch[T]]],
criterion = criterion
).asInstanceOf[Optimizer[T, D]]
case _ =>
throw new UnsupportedOperationException
}

返されたOptimizerの、Optimizer#optimizeメソッドを利用し学習が実行される。

com/intel/analytics/bigdl/models/lenet/Train.scala:98

1
2
3
4
5
6
7
8
optimizer
.setValidation(
trigger = Trigger.everyEpoch,
dataset = validationSet,
vMethods = Array(new Top1Accuracy, new Top5Accuracy[Float], new Loss[Float]))
.setOptimMethod(optimMethod)
.setEndWhen(Trigger.maxEpoch(param.maxEpoch))
.optimize()

Optimizerの種類

上記の内容を見るに、Optimizerにはいくつか種類がありそうだ。

  • DistriOptimizer
  • DistriOptimizerV2
  • LocalOptimizer

DistriOptimizer

ひとまずメモリ使用に関連する箇所ということで、入力データの準備の処理を確認する。 com.intel.analytics.bigdl.optim.DistriOptimizer#optimize メソッドには 以下のような箇所がある。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:870

1
prepareInput()

これは com.intel.analytics.bigdl.optim.DistriOptimizer#prepareInput メソッドであり、 内部的に com.intel.analytics.bigdl.optim.AbstractOptimizer#prepareInput メソッドを呼び出し、 入力データをSparkのキャッシュに載せるように処理する。

com/intel/analytics/bigdl/optim/DistriOptimizer.scala:808

1
2
3
4
if (!dataset.toDistributed().isCached) {
DistriOptimizer.logger.info("caching training rdd ...")
DistriOptimizer.prepareInput(this.dataset, this.validationDataSet)
}

キャシュに載せると箇所は以下の通り。

com/intel/analytics/bigdl/optim/AbstractOptimizer.scala:279

1
2
3
4
5
6
7
private[bigdl] def prepareInput[T: ClassTag](dataset: DataSet[MiniBatch[T]],
validationDataSet: Option[DataSet[MiniBatch[T]]]): Unit = {
dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]].cache()
if (validationDataSet.isDefined) {
validationDataSet.get.toDistributed().cache()
}
}

上記の DistributedDataSetchache メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:216

1
2
3
4
5
6
def cache(): Unit = {
if (originRDD() != null) {
originRDD().count()
}
isCached = true
}

originRDD の戻り値に対して、count を読んでいる。 ここで count を呼ぶのは、入力データである originRDD の戻り値に入っているRDDをメモリ上にマテリアライズするためである。

count を呼ぶだけでマテリアライズできるのは、予め入力データを定義したときに Spark RDDの cache を利用してキャッシュ化することを指定されているからである。 今回の例では、Optimizerオブジェクトのapplyを利用する際に渡されるデータセット trainSet を 定義する際に予め cache が呼ばれる。

com/intel/analytics/bigdl/models/lenet/Train.scala:79

1
2
3
val trainSet = DataSet.array(load(trainData, trainLabel), sc) ->
BytesToGreyImg(28, 28) -> GreyImgNormalizer(trainMean, trainStd) -> GreyImgToBatch(
param.batchSize)

trainSet を定義する際、 com.intel.analytics.bigdl.dataset.DataSet$#array(T[], org.apache.spark.SparkContext) メソッドが 呼ばれるのだが、その中で以下のように Sparkの RDD#cache が呼ばれていることがわかる。

com/intel/analytics/bigdl/dataset/DataSet.scala:343

1
2
3
4
5
6
7
8
9
10
11
12
def array[T: ClassTag](localData: Array[T], sc: SparkContext): DistributedDataSet[T] = {
val nodeNumber = Engine.nodeNumber()
new CachedDistriDataSet[T](
sc.parallelize(localData, nodeNumber)
// Keep this line, or the array will be send to worker every time
.coalesce(nodeNumber, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

以下、一例。

具体的には、Optimizerのインスタンスを生成するための apply メソッドはいくつかあるが、 以下のように引数にデータセットを指定する箇所がある。(再掲)

com/intel/analytics/bigdl/optim/Optimizer.scala:619

1
_dataset = (DataSet.rdd(sampleRDD) ->

ここで用いられている com.intel.analytics.bigdl.dataset.DataSet#rdd メソッドは以下の通り。

com/intel/analytics/bigdl/dataset/DataSet.scala:363

1
2
3
4
5
6
7
8
9
10
def rdd[T: ClassTag](data: RDD[T], partitionNum: Int = Engine.nodeNumber()
): DistributedDataSet[T] = {
new CachedDistriDataSet[T](
data.coalesce(partitionNum, true)
.mapPartitions(iter => {
Iterator.single(iter.toArray)
}).setName("cached dataset")
.cache()
)
}

com.intel.analytics.bigdl.dataset.CachedDistriDataSet#CachedDistriDataSet のコンストラクタ引数に、 org.apache.spark.rdd.RDD#cache を用いてキャッシュ化することを指定したRDDを渡していることがわかる。

共有

Generate PDF using pandoc

参考

メモ

TeX Liveのインストール手順 の通り、最新版をインストールする。 なお、フルセットでインストールした。

PDFを作成するときは以下のようにする。

1
$ pandoc test.md -o test.pdf --pdf-engine=lualatex -V documentclass=bxjsarticle -V classoption=pandoc

ただし、 default.latex に示すとおり、リンクに色を付けるなどしたいので、 上記マークダウンファイルには先頭部分にYAMLでコンフィグを記載することとした。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---
title: テスト文章
date: 2020-10-30
author: dobachi
colorlinks: yes
numbersections: yes
toc: yes
---

# test

## これはテスト

[Google](https://www.google.com)

<!-- vim: set ft=markdown : -->
共有

Access AWS S3 from Hadoop3 and Spark3

参考

メモ

パッケージインポート

Sparkの公式ドキュメント には、org.apache.spark:hadoop-cloud_2.12を利用するよう記載されているが、 このMavenパッケージは見当たらなかった。

そこで、 Hadoopの公式ドキュメント の通り、hadoop-awsをインポートすると、それに関連した依存関係をインポートすることにした。 spark-shellで試すとしたら以下の通り。

1
$ /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0

profileを使ったクレデンシャル設定

awscliを利用しているとしたら、profileを設定しながらクレデンシャルを使い分けていることもあると思う。 その場合、起動時の環境変数でプロフィールを指定しながら、以下のようにproviderを指定することで、 profileを利用したクレデンシャル管理の仕組みを利用してアクセスできる。

1
$ AWS_PROFILE=s3test /opt/spark/default/bin/spark-shell --packages org.apache.hadoop:hadoop-aws:3.2.0
1
scala> spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

(補足)パラメータと指定するクラスについて

ちなみに、上記パラメータの説明には、

If unspecified, then the default list of credential provider classes, queried in sequence, is: 1. org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider: Uses the values of fs.s3a.access.key and fs.s3a.secret.key. 2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports configuration of AWS access key ID and secret access key in environment variables named AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK. 3. com.amazonaws.auth.InstanceProfileCredentialsProvider: supports use of instance profile credentials if running in an EC2 VM.

のように記載されている。

また、「Using Named Profile Credentials with ProfileCredentialsProvider」の節には、

Declare com.amazonaws.auth.profile.ProfileCredentialsProvider as the provider.

と書かれており、上記Default...でなくても良い。(実際に試したところ動作した)

共有