Getting started Spark on k8s

メモ

このメモは、Spark3.2.0をKubernetes上で動かすことを簡単に紹介するものである。 なお、日本語での説明としては、 Apache Spark on Kubernetes入門(Open Source Conference 2021 Online Hiroshima 発表資料) がとても分かりやすいので参考になる。

基本的な流れ

公式の Running Spark on Kubernetes の通りでよい。

ビルド

まずは、パッケージに含まれているDockerfileを利用して、Dockerイメージを自分でビルドする。 今回はMinikube環境で動かしているので -m オプションを利用した。

簡易的な例

1
$ /opt/spark/default/bin/docker-image-tool.sh -m -t testing build

Minikube内のDockerでイメージ一覧を確認すると以下の通り。

1
2
3
4
5
$ eval $(minikube -p minikube docker-env)
$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
spark testing e19cebbf23e7 44 minutes ago 602MB
(snip)

実行

サービスアカウントを作る。

1
2
3
4
5
6
$ minikube kubectl -- create serviceaccount spark
$ minikube kubectl -- create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:spark --namespace=default
$ minikube kubectl -- get serviceaccount
NAME SECRETS AGE
default 1 179m
spark 1 2m39s

実行は以下の通り。先ほど作ったサービスアカウントを使用するようにする。

1
$ /opt/spark/default/bin/spark-submit --master k8s://https://192.168.49.2:8443 --deploy-mode cluster --name pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.container.image=spark:testing --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark local:///opt/spark/examples/jars/spark-examples_2.12-3.2.0.jar

なお、今回はMinikube上で実行しており、Jarファイルとして指定するのはMinikube内で起動したドライバのコンテナ内のローカルファイルシステムパスである。試しに、当該近店をアタッチして起動するとわかる。

ドライバのログを確認する。

1
2
3
4
5
6
7
8
$ minikube kubectl -- logs pi-43bff77e450bdba3-driver
(snip)
22/01/10 17:32:02 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
22/01/10 17:32:02 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 0.824388 s
Pi is roughly 3.142475712378562
22/01/10 17:32:02 INFO SparkUI: Stopped Spark web UI at http://pi-43bff77e450bdba3-driver-svc.default.svc:4040
22/01/10 17:32:02 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
(snip)

pi計算の結果がログに出力されているのがわかる。

(補足)ビルドされたイメージを起動して内容確認

以下は、MinikubeのDockerを利用しビルドしたイメージのコンテナを起動して、シェルをアタッチした例。

1
2
$ eval $(minikube -p minikube docker-env)
$ docker run -it --rm spark:testing /bin/bash

(補足)Minikube使う場合のリソース設定についての注意事項

Prerequisites の通り、Minikube等を使うようであれば、リソースに対して注意がある。 以下、Minikube立ち上げ例。

1
$ minikube start --memory='4g' --cpus=3

(補足)Kubernetes環境

動作確認のためには、Kubernetes環境が必要である。

minikube start あたりを参考に環境構築しておくこと。

(補足)ボリュームのマウントについて

volume-mounts の通り、DriverやExecutorのPodにボリュームをマウントできるのだが、 HostPathに関するリスクがあるようだ。 KubernetesのhostPathについて を参照。

(補足)上記例ではサービスアカウントを作成しているが・・・

もし default サービスアカウントを利用すると以下に記載されたのと同様のエラーを生じる。

How to fix "Forbidden!Configured service account doesn't have access" with Spark on Kubernetes?

そこで、あらかじめサービスアカウントを作成して使うようにした。

参考

共有

Delta Sharing with MinIO

メモ

Delta SharingはAWS S3、Azure Blob Storage、Azure Data Lake Storage Gen2に対応している。 そこで、AWS S3互換のストレージソフトウェアだったら使えるのではないか、ということで、 MinIO を利用してみることにする。

もろもろのサンプルコードは dobachi DeltaSharingMinioExample にある。

MinIOの起動

MinIO Quickstart Guide を参考に、手元でMinIOサーバを構成する。 以下は手順の一例。

バイナリをダウンロードし、データ保存ディレクトリを作って立ち上げる。

1
2
3
4
5
6
$ mkdir -p ~/Minio
$ cd ~/Minio
$ wget https://dl.min.io/server/minio/release/linux-amd64/minio
$ mkdir -p data
$ chmod +x minio
$ ./minio server data

コンソールにメッセージが色々出る。 今回はテスト用なので特段指定しなかったが、Rootユーザの名称、パスワードが表示されているはずである。 これは後ほど、AWS S3プロトコルでMinIOサービスにアクセスする際のID、シークレットキーとしても利用する。

MinIOのクライアントをダウンロードし、エイリアスを設定する。 別のターミナルを開く。

1
2
3
4
$ cd ~/Minio
$ wget https://dl.min.io/client/mc/release/linux-amd64/mc
$ chmod +x mc
$ ./mc alias set myminio http://your_host:9000 minioadmin minioadmin

your_hostのところは各自の環境に合わせて変更してほしい。 これにより、myminio という名前でエイリアスが作成された。

ちなみに、http://your_host:40915/ からアクセスできる。

なお、試しにAWS S3クライアントで接続してみることにする。

1
2
3
4
5
6
7
$ ./mc mb data/test_bucket
$ echo 'test' > test.txt
$ export AWS_ACCESS_KEY_ID=minioadmin
$ export AWS_SECRET_ACCESS_KEY=minioadmin
$ aws --endpoint-url http://localhost:9000 s3 ls
2021-10-22 12:26:13 test_bucket
$ aws --endpoint-url http://localhost:9000 s3 cp test.txt s3://test_bucket/test.txt

ここで、今回はMinioを利用していることから、エンドポイントURLを指定していることに注意。

テストバケットに入ったデータ

Delta Sharingサーバの起動

Delta Sharing Reference Server を参考に、サーバを起動する。

リリースされたパッケージを利用する場合

パッケージを公式リリースからダウンロードして展開する。

1
2
3
4
5
$ mkdir -p ~/DeltaSharing
$ cd ~/DeltaSharing
$ wget https://github.com/delta-io/delta-sharing/releases/download/v0.2.0/delta-sharing-server-0.2.0.zip
$ unzip delta-sharing-server-0.2.0.zip
$ cd delta-sharing-server-0.2.0

現在のmainブランチを利用する場合

1
2
3
4
5
$ mkdir -p ~/Sources/
$ cd ~/Sources
$ git clone git@github.com:delta-io/delta-sharing.git
$ delta-sharing
$ ./build/sbt server/universal:packageBin

server/target/universal/delta-sharing-server-x.y.z-SNAPSHOT.zip にパッケージが保存される。 なお、x.y.zにはバージョン情報が入る。ここでは0.3.0とする。

1
2
3
4
$ cp server/target/universal/delta-sharing-server-x.y.z-SNAPSHOT.zip ~/DeltaSharing/
$ cd ~/DeltaSharing/
$ unzip delta-sharing-server-0.3.0-SNAPSHOT.zip
$ cd delta-sharing-server-0.3.0-SNAPSHOT

設定ファイルの作成

Delta Sharingはデータストアのアクセスについて、間接的にHadoopライブラリに依存している。 そこで、AWS S3プロトコルでアクセスするためのHadoop設定ファイル core-site.xml を作成する。

conf/core-site.xml

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>fs.s3a.endpoint</name>
<value>http://localhost:9000</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
</configuration>

ここではエンドポイントURL、パス指定方法の設定をしている。 なおパス指定方法の設定行わないと、Delta SharingからMinIOサーバにリクエストを送る際に、 ホスト名が bucket_name.host のような指定になってしまい、 400 But Reuqest エラーを生じてしまう。 詳しくは、 トラブルシュートの節を参照。

続いて、Delta Sharingサーバの設定ファイルを作成する。 以下は参考。

conf/delta-sharing-server.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# The format version of this config file
version: 1
# Config shares/schemas/tables to share
shares:
- name: "share1"
schemas:
- name: "schema1"
tables:
- name: "table1"
location: "s3a://test_bucket/delta_table"
# Set the host name that the server will use
host: "localhost"
# Set the port that the server will listen on
port: 18080
# Set the url prefix for the REST APIs
endpoint: "/delta-sharing"
# Set the timeout of S3 presigned url in seconds
preSignedUrlTimeoutSeconds: 900
# How many tables to cache in the server
deltaTableCacheSize: 10
# Whether we can accept working with a stale version of the table. This is useful when sharing
# static tables that will never be changed.
stalenessAcceptable: false
# Whether to evaluate user provided `predicateHints`
evaluatePredicateHints: false

ここで s3a://test_bucket/delta_table に指定しているのがMinIO内のデータである。 ここでは予め Delta Lakeフォーマットのデータを保存してあるものとする。

Delta Sharingサーバの起動

1
$ ./bin/delta-sharing-server -- --conf conf/delta-sharing-server.yaml

Delta SharingのPythonクライアントを利用してアクセスする。

まずクライアント用のプロファイルを準備する。

minio.share

1
2
3
4
5
{
"shareCredentialsVersion": 1,
"endpoint": "http://localhost:18080/delta-sharing/",
"bearerToken": ""
}

これを利用しながら、テーブルにアクセスする。

Delta Sharingのクライアントでアクセス

ここでは試しにPandas DataFrameとして読み取っている。

トラブルシュート

パス指定方法の誤り

Hadoop設定上で、パス指定方法の設定を行わないと、以下のようなエラーが生じる。

Delta Sharingサーバ側のエラー

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Caused by: org.apache.hadoop.fs.s3a.AWSS3IOException: doesBucketExist on test: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: null; S3 Extended Request ID: null), S3 Extended Request ID: null: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: null; S3 Extended Request ID: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:194)
at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:335)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3247)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at io.delta.standalone.internal.DeltaSharedTable.$anonfun$deltaLog$1(DeltaSharedTableLoader.scala:76)
at io.delta.standalone.internal.DeltaSharedTable.withClassLoader(DeltaSharedTableLoader.scala:95)
at io.delta.standalone.internal.DeltaSharedTable.<init>(DeltaSharedTableLoader.scala:74)
at io.delta.standalone.internal.DeltaSharedTableLoader.$anonfun$loadTable$1(DeltaSharedTableLoader.scala:53)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4693)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3445)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2194)
... 60 more

Minioサーバ側でのエラー

1
$ ./mc admin trace myminio --debug -v
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
test.localhost:9000 [REQUEST methodNotAllowedHandler.func1] [2021-10-22T03:57:08:000] [Client IP: 127.0.0.1]
test.localhost:9000 HEAD /
test.localhost:9000 Proto: HTTP/1.1
test.localhost:9000 Host: test.localhost:9000
test.localhost:9000 Amz-Sdk-Invocation-Id: 821962e2-e1f8-31d7-4d0d-431529a3725c
test.localhost:9000 Authorization: AWS4-HMAC-SHA256 Credential=minioadmin/20211022/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-type;host;user-agent;x-amz-content-sha256;x-amz-date, Signature=07977380f3fd92b149e8c60937554fc5ee5287a0863c101431ef51aa3968c37b
test.localhost:9000 Connection: Keep-Alive
test.localhost:9000 Content-Type: application/octet-stream
test.localhost:9000 X-Amz-Content-Sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
test.localhost:9000 Amz-Sdk-Retry: 0/0/500
test.localhost:9000 Content-Length: 0
test.localhost:9000 User-Agent: Hadoop 2.10.1, aws-sdk-java/1.11.271 Linux/4.19.104-microsoft-standard OpenJDK_64-Bit_Server_VM/11.0.12+7-LTS java/11.0.12 scala/2.12.10
test.localhost:9000 X-Amz-Date: 20211022T035708Z
test.localhost:9000
test.localhost:9000 [RESPONSE] [2021-10-22T03:57:08:000] [ Duration 66µs ↑ 137 B ↓ 375 B ]
test.localhost:9000 400 Bad Request
test.localhost:9000 Content-Type: application/xml
test.localhost:9000 Server: MinIO
test.localhost:9000 Vary: Origin
test.localhost:9000 Accept-Ranges: bytes
test.localhost:9000 Content-Length: 261
test.localhost:9000 <?xml version="1.0" encoding="UTF-8"?>
<Error><Code>BadRequest</Code><Message>An error occurred when parsing the HTTP request HEAD at &#39;/&#39;</Message><Resource>/</Resource><RequestId></RequestId><HostId>baec9ee7-bfb0-441b-a70a-493bfd80d745</HostId></Error>

参考

MinIO

Delta Sharing

Sample code

共有

Getting Started of Delta Sharing

参考

メモ

特徴

特徴については以下を参照。

リファレンスサーバを動かす

公式GitHub のREADMEを参考に、リファレンスサーバを動かす。

前提

  • OS: CentOS Linux release 7.8.2003 (Core)、CentOS Linux release 7.9.2009 (Core)
  • 必要なライブラリ
    • bzip2-devel、readline-devel、openssl-devel、sqlite-devel、libffi-devel
  • Python: 3.7.10
    • pipでjupyter labを入れておく。
  • Spark:3.1.2 w/ Hadoop3.2
    • pipか公式サイトからダウンロードしたパッケージを利用してインストールしておく。
    • 今回は簡易的な動作確認のため、Spark単体(ローカルモード)で動作させる。Hadoopとの連係はさせない。

サンプルスクリプト

Delta Sharing Example にこの記事で取り扱うサンプルスクリプト(Jupyterのノートブック)を置いてある。 なお、このプロジェクトには、Jupyter LabをPySparkと一緒に起動するサンプル補助スクリプト ./bin/pyspark_jupyter.sh が入っている。 適宜編集して利用されたし。

以降の手順では、このノートブック群を利用した例を示す。 利用する場合はサブモジュールごと以下のようにクローンすると便利。

1
2
3
$ mkdir -p -/Sources
$ cd -/Sources
$ git clone --recursive https://github.com/dobachi/delta_sharing_example.git

.profileについて

なお、この補助スクリプトは同一ディレクトリに .profile があれば、それを読み込むようになっている。 特に、環境変数 OPTIONSS3_TEST_URL に個人的な値を設定するために利用するとよい。

.profileで設定する環境変数について

  • OPTIONS : Jupyter Lab起動時に、Sparkに渡すオプションを設定するために用意した。
  • S3_TEST_URL : ノートブック内で読み書き動作確認用に用いるS3のURLを設定するために用意した。

基本的な動作

以下の流れで試す。

  • S3上にサンプルデータを作成する
  • 手元のマシンでDelta Sharingのサーバを立ち上げる
  • githubからクローンしたライブラリをローカル環境にインストール
  • S3に置いたサンプルデータをDelta Sharingのサーバ経由で取得する

S3上にサンプルデータを作成する

まずはS3上にデータを置く。なんの手段でも良いが、S3へのアクセスロールを持つEC2インスタンス上で CreateDeltaTableS3 を実行する。

なお、PySparkをJupyter Labで起動する補助するスクリプトの例がpyspark_jupyterに載っている。 CreateDeltaTableS3 のレポジトリにおいても当該スクリプトがサブモジュールとして読み込まれる。

手元のマシンでDelta Sharingのサーバを立ち上げる

つづいて、Delta Sharingのソースコードをクローンする。 なお、公式でリリースされたパッケージを用いてもよいのだが、 Delta Sharingはまだプロダクトが若く、変更も多いためmainブランチを パッケージ化して用いることにする。

1
2
3
4
5
$ mkdir -p -/Sources
$ cd -/Sources
$ git clone https://github.com/delta-io/delta-sharing.git
$ cd delta-sharing
$ ./build/sbt server/universal:packageBin

これで server/target/universal/delta-sharing-server-0.3.0-SNAPSHOT.zip ができたはず。(2021/9現在。これ以降だと、バージョンが上がっている可能性がある) これを適当なディレクトリに展開して用いるようにする。

1
2
3
4
5
$ mkdir -p ~/DeltaSharing
$ cp server/target/universal/delta-sharing-server-0.3.0-SNAPSHOT.zip ~/DeltaSharing
$ cd ~/DeltaSharing/
$ if [ -d delta-sharing-server-0.3.0-SNAPSHOT ]; then rm -r delta-sharing-server-0.3.0-SNAPSHOT; fi
$ unzip delta-sharing-server-0.3.0-SNAPSHOT.zip

展開したパッケージの中に、設定のテンプレートが入っているのでコピーして 自分の環境に合わせて編集する。

1
2
3
$ cd ~/DeltaSharing/delta-sharing-server-0.2.0-SNAPSHOT
$ cp conf/delta-sharing-server.yaml{.template,}
$ vim conf/delta-sharing-server.yaml

設定ファイルの例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# The format version of this config file
version: 1
# Config shares/schemas/tables to share
shares:
- name: "share1"
schemas:
- name: "schema1"
tables:
- name: "table1"
location: "s3a://<your configuration>"
# Set the host name that the server will use
host: "localhost"
# Set the port that the server will listen on
port: 80
# Set the url prefix for the REST APIs
endpoint: "/delta-sharing"
# Set the timeout of S3 presigned url in seconds
preSignedUrlTimeoutSeconds: 900
# How many tables to cache in the server
deltaTableCacheSize: 10
# Whether we can accept working with a stale version of the table. This is useful when sharing
# static tables that will never be changed.
stalenessAcceptable: false
# Whether to evaluate user provided `predicateHints`
evaluatePredicateHints: false

サーバを起動する。

1
$ ./bin/delta-sharing-server -- --config conf/delta-sharing-server.yaml

githubからクローンしたライブラリをローカル環境にインストール

起動したサーバとは別のターミナルを開き、Pythonクライアントを試す。

先ほどクローンしたDelta Sharingのレポジトリを利用し、 venvなどで構築した環境下にpipでdelta sharingのPythonクライアントライブラリをインストールする。

1
$ pip install ~/Sources/delta-sharing/python/

なお、もしすでに一度インストールしたことがあるようであれば、アップデートするようにするなど工夫すること。

つづいて、Spark用のパッケージを作る。

1
2
$ cd ~/Sources/delta-sharing
$ ./build/sbt spark/package

spark/target/scala-2.12/delta-sharing-spark_2.12-0.3.0-SNAPSHOT.jar にJarファイルができる。

これをコネクタの起動時にロードするようにする。 例えば、PySparkのJupyter Lab起動時に以下のようなオプションを渡す。

1
--jars /home/centos/Sources/delta-sharing/spark/target/scala-2.12/delta-sharing-spark_2.12-0.3.0-SNAPSHOT.jar

のような

S3に置いたサンプルデータをDelta Sharingのサーバ経由で取得する

PythonConnectorExample にPythonのクライアントライブラリを用いた例を示す。 Pandas DataFrameで取得する例を掲載している。

上記スクリプト内でも利用している通り、クライアントがアクセスするためには、 以下のようなプロファイルを渡す必要がある。

1
2
3
4
5
{
"shareCredentialsVersion": 1,
"endpoint": "http://localhost:80/delta-sharing/",
"bearerToken": ""
}

このプロファイルは以下の通り、ファイルのPATH等を渡すか、delta_sharing.protocol.DeltaSharingProfileインスタンスを渡すかすれば良さそう。 後者の場合、JSONテキストから生成できる。

delta_sharing/delta_sharing.py:92

1
def __init__(self, profile: Union[str, BinaryIO, TextIO, Path, DeltaSharingProfile]):

一方、 delta_sharing.delta_sharing.load_as_pandas メソッドを用いて、 Pandas DataFrameで取得する場合は、その引数に渡すプロファイルはURLのテキストが期待されている。

1
2
3
4
5
6
7
8
9
10
11
12
13
def load_as_pandas(url: str) -> pd.DataFrame:
"""
Load the shared table using the give url as a pandas DataFrame.

:param url: a url under the format "<profile>#<share>.<schema>.<table>"
:return: A pandas DataFrame representing the shared table.
"""
profile_json, share, schema, table = _parse_url(url)
profile = DeltaSharingProfile.read_from_file(profile_json)
return DeltaSharingReader(
table=Table(name=table, share=share, schema=schema),
rest_client=DataSharingRestClient(profile),
).to_pandas()

クライアントと同様に、JSONのテキストでも受け付けられるようにしたら便利か。 そもそもプロファイルとスキーマ等の指定が必ずしもひとつのURLになっていなくてもよいのでは…?と思う節もある。 が、共有は基本的にすべてURLで…という統一性を大事にするのも分かる。

プロファイルの場所にURLを指定できるか?

Spark Connector利用時は、例えばS3に置いたプロファイルを使用できるか?の確認をする。

◇補足: というのも、Spark Connectorを利用している際、Sparkのコンフィグにて、 HADOOP_HOME を設定し、Hadoopを利用するようにしてみたらどうやらHDFS を探しに行っているようだったため、SparkのAPIを通じてプロファイルを読みに行っているのだとしたら、S3等に置かれたプロファイルを読めるはずだ、と考えたため。

ここでは、 s3://hoge/fuga/deltasharing.json のようなURLを渡すことにする。

結論から言えば、delta sharingはプロファイルの読み出しにfsspecを利用しているため、仕様上はリモートのファイルを読み出せるようになっている。

ここで実行したノートブックは TestProfileOnS3 に置いてある。

delta sharing clientの生成

まず、共有データ一覧を取得するために用いるクライアントだが、

1
client = delta_sharing.SharingClient(profile_file)

の引数にS3のURLを渡したら、以下のエラーになった。

1
ImportError: Install s3fs to access S3

これは、delta sharing内で用いられるfsspecにより出された例外である。

python/delta_sharing/protocol.py:41

1
2
3
4
5
6
7
8
9
10
11
12
@staticmethod
def read_from_file(profile: Union[str, IO, Path]) -> "DeltaSharingProfile":
if isinstance(profile, str):
infile = fsspec.open(profile).open()
elif isinstance(profile, Path):
infile = fsspec.open(profile.as_uri()).open()
else:
infile = profile
try:
return DeltaSharingProfile.from_json(infile.read())
finally:
infile.close()

ということで、Python環境にpipでs3fsをインストールしてからもう一度試したところ、ひとまず動作した。

fsspecを利用しているということは、仕様上はリモートに置いてあるファイルシステムにも対応可能である、ということだった。

Pandas DataFrame、Spark DataFrameそれぞれへの読み出しについて、動作した。

認可

Autorization の通り、Bearer認証を利用できるようだ。

REST APIでアクセス

REST API を参考に確認する。

Shareのリスト

ひとまず一番簡単な、Share一覧を取得する。

1
2
3
4
5
6
7
8
$ curl http://127.0.0.1:80/delta-sharing/shares | jq
{
"items": [
{
"name": "share1"
}
]
}

テーブルバージョンの取得

Tableバージョンの取得 の通り。 なお、なぜか2行出力される。

1
2
3
4
5
6
7
$ curl -I -D - http://127.0.0.1:80/delta-sharing/shares/share1/schemas/schema1/tables/table1
HTTP/1.1 200 OK
HTTP/1.1 200 OK
delta-table-version: 4
delta-table-version: 4
content-length: 0
content-length: 0

Shareの取得にあるようにクエリパラメタとして maxResult やページングの情報を渡せる。

メタデータの取得

テーブルメタデータの取得 の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ curl http://127.0.0.1:80/delta-sharing/shares/share1/schemas/schema1/tables/table1/metadata | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 271 0 271 0 0 919 0 --:--:-- --:--:-- --:--:-- 921
{
"protocol": {
"minReaderVersion": 1
}
}
{
"metaData": {
"id": "cea27a35-d139-4a74-a5f7-5596985784b8",
"format": {
"provider": "parquet"
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": []
}
}

データの取得

テーブルデータの読み出し に従うと、「ヒント句」を渡しながら、データ(のアクセスURL)を取得できるようだ。 ドキュメントを読む限り、このヒントが働くかどうかはベストエフォートとのこと。

1
2
3
4
5
6
7
$ curl -X POST -H "Content-Type: application/json; charset=utf-8" http://127.0.0.1:80/delta-sharing/shares/share1/schemas/schema1/tables/table1/query -d @- << EOL
{
"predicateHints": [ "id < 1"
],
"limitHint": 1
}
EOL

このヒントの働きについては、別途調査する。

共有

X-Road

参考

メモ

X-Roadとは?

一言で言えば、エストニアの電子政府や公共と民間の間のデータ連携に用いられている データ流通のためのオープンソースソフトウェアである。 他の簡単な紹介は X-Roadについての簡易メモ あたりを参照。

最初の動作確認

X-RoadのGitHub のREADMEによると、AnsibleのPlaybookがある。 READMEから簡単に読み解いてみる。

Ansibleプレイブックの確認

インベントリ

ansible/hosts/example_xroad_hosts.txt によると、 central server(cs)、security server(ss)、configuration proxy、certification authorityの グループが定義されており、csやssは複数台のノードが記載されている。冗長化されている?

ansible/hosts/lxd_hosts.txt はコンテナ向けに見える。

なお、エストニア等の国?エリア?に特化するかどうか、という変数がある。 該当しない場合は、 vanilla でよさそう。

プレイブック(エントリポイント)

デプロイするときは xroad_init.yml を利用する。

ロール

プレイブックの通り、 xroad-csxroad-cp といったロールが各種類のサーバに対応したロールである。 また、このロールは依存関係に

1
2
dependencies:
- { role: xroad-base }

のようなものを持っており、 xroad-base がレポジトリ等の設定をする役割を担っている。

LXDコンテナを利用した動作確認用のデプロイ

Using LXD-hosts によると、LXDコンテナを使用し動作確認用のX-Road環境を構築できる。 ansible/hosts/lxd_hosts.txt がインベントリファイル(の例)である。 中で定義されているコンテナは、 ansible/roles/init-lxd ロールで起動される。

Ubuntu18で試す。

あらかじめLXD環境をセットアップしておく。

1
2
3
$ sudo apt-get install lxd
$ newgrp lxd
$ sudo lxd init

特に問題なければLXDコンテナが起動できるはず。詳しくは、 https://linuxcontainers.org/ja/lxd/getting-started-cli あたりを参照。

Ansibleもインストールしておく。

1
2
3
4
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt-add-repository --yes --update ppa:ansible/ansible
$ sudo apt install ansible

詳しくは、 https://docs.ansible.com/ansible/2.9_ja/installation_guide/intro_installation.html#ubuntu-ansible あたりを参照。

X-Roadのコンテナを準備する。

1
$ ansible-playbook  -i hosts/lxd_hosts.txt xroad_init.yml

実際に以下のようなコンテナが立ち上がる。

1
2
3
4
5
6
7
8
$ lxc list
+---------------+---------+-----------------------+-----------------------------------------------+------------+-----------+
| NAME | STATE | IPV4 | IPV6 | TYPE | SNAPSHOTS |
+---------------+---------+-----------------------+-----------------------------------------------+------------+-----------+
| xroad-lxd-cs | RUNNING | xx.xx.xx.xx (eth0) | xxxx:xxxx:xxxx:xxxx:xxx:xxxx:xxxx:xxxx (eth0) | PERSISTENT | 0 |
+---------------+---------+-----------------------+-----------------------------------------------+------------+-----------+
| xroad-lxd-ss1 | RUNNING | xx.xx.xx.xx (eth0) | xxxx:xxxx:xxxx:xxxx:xxx:xxxx:xxxx:xxxx (eth0) | PERSISTENT | 0 |
+---------------+---------+-----------------------+-----------------------------------------------+------------+-----------+

なお、パッケージインストール(内部的にはaptを利用してインストール)している箇所の進みが遅く、一度インタラプトして手動でaptコマンドでインストールした。

開発用のプレイブック

開発用のAnsibleプレイブックもある。 ansible/xroad_dev.ymlansible/xroad_dev_partial.yml である。

起動した環境の動作確認

X Road Academy Session 1 Setting up a local X-Road development environmentが参考になりそう。

コンポーネントの確認

Security Server

X-Road Security Server Architecture にSecurity Serverのアーキテクチャが記載されている。 特に X-Road Security Server Architecture Overview に記載されている図が分かりやすい。

Security Serverはサービスクライアントとプロバイダの仲介を担う。 電子署名と暗号を用いてメッセージのやり取りは保護される。

Proxyがメッセージを受けとるのだが、メッセージは署名と一緒に保存される。

設定情報はPostgreSQL内に保持される。 設定はユーザインターフェースを通じて行われるようになっている。

グローバルコンフィグレーションをダウンロードするConfiguration Client というのもあるようだ。

そのほかのコンポーネントは名称の通りの機能である。

X-Road Security Server Architecture Process View に処理の流れが記載されている。

共有

Softwares for Data Sharing

参考

共有対象の情報

まとめ

ソリューション

OSS

組織

メモ

そもそもデータ共有対象のデータって何があるのか?

機密性を保持するべきデータ

セキュリティ対策は何故必要か? によると企業が保護すべき情報には「個人情報」と「企業内機密情報」があるとのこと。 機密情報とは?営業秘密との違いや管理方法、漏洩リスク等を解説! には、「機密情報」の定義が記載されている。

企業が保有している情報のうち、外部への開示が予定されていないこと 情報秘密として管理されている情報 開示されれば、企業に損害が生じ得る情報

また、不正競争防止法に「営業秘密」の定義が載っている。(ようだ)

秘密管理性(秘密として管理されていること) 有用性(有用な営業上又は技術上の情報であること) 非公知性(公然と知られていないこと)

オープンなデータ

網羅性、という意味ではオープンなデータも共有対象になる。

いわゆる「オープンデータ」の意義・目的が総務省の情報通信白書に記載されている。 第1部 特集 「スマートICT」の戦略的活用でいかに日本に元気と成長をもたらすか 参照。

これに合致しなくとも、企業などで取り扱うデータには機密性を保たなくてもよい類の情報もあるだろう。

まとめ系の情報源

DATA COLLABORATION GUIDE にデータコラボレーションに関するまとめが記載されている。 データコラボレーションの具体例(パートナーとアライアンス、チャネル販売、など)に触れながら、 データエスクローについて紹介している。

What is data collaboration?にデータコラボレーションによってもたらされるメリットなどに少し触れている生地記事。 「データブリッジ」というクエリを移動させる方式(考え方?)が紹介されていた。 データコラボレーションは、線シティブナデータを共有することなく、サイロ化したデータを組み合わせられる仕組みであると紹介されている。

データ共有のためのオープンソースソフトウェア

CKAN

CKAN の通り、データ共有のためのAPI、データストア管理、機能拡張の仕組みなどを提供する。 ポータルサイトを構築可能。

そのほかにも、地理情報に基づく管理、メタデータ管理、データ管理用ウェブフロントエンド、検索などを提供する。

フェデレーションも可能そうである。 ハーベスト(収穫?)機能を利用し、リモートのメタデータを収集する。

ファイルストアには、ローカルのデータストアのほか、クラウドのデータストアも利用可能。

データカタログソフトウェア「CKAN」の使い方について備忘録 にAPIの特徴についての考察が記載されている。

CKANのショーケース には公式の例が載っている。

Eclipse Dataspace Connector

Business Data Sharing Based on Open-Source Technology の通り、IDS企画に準拠したコネクター。

Eclipse Dataspace Connector がプロジェクトディレクトリのようだがメタな情報しかない。

アクティブな開発者としては、BMWなどが挙げられるがMicrosoftも重要な位置付けにあるようだ。 もともとMicrosoftが開発していた(?)Data Appliance GXとIDS Dataspace Connectorを基にしているからだろうか。

eclipse-dataspaceconnector / DataSpaceConnector にソースコードがある。

X-Road

X-Road の通り、「distributed data exchage layer」とのこと。 X-Roadのインスタンス同士でデータ交換が可能になる。

X-RoadのGitHub によると、

The following activities, among others, are undertaken by the Nordic Institute for Interoperability Solutions (NIIS) with regard to the X-Road core

とのこと。NIISが主にメンテ活動しているようだ。

特徴

特徴:

  • address management
  • message routing
  • access rights management
  • organization-level authentication
  • machine-level authentication
  • transport-level encryption
  • time-stamping
  • digital signature of messages
  • logging
  • error handling.

大まかな構成

X-Roadで作られたエコシステムに接続するにはCAによって証明されないといけない。

X-Roadの組織モデル にある通り、

  • X-Road Operator ... エコシステムの責任者。規約・ルールの策定・事項、新しいメンバの受け入れ、など。
  • Member組織 ... サービスの提供者・利用者。オンボードプロセスを経て、エコシステムに参加可能
  • Trust Service Provider ... Time-stamp Authority(TSA)、Certification Authority(CA)機能を提供。第三者機関でもよいし、X-Road Operatorが兼ねることもある。

という関係者で構成される。

モデルとしては、中央集権的である。 X-Roadのアーキテクチャ にあるように、Central Serivceを提供するためのCentral Serverがあり、 X-Road Operatorが責任をもって運用する。 そこにはMemberの登録情報やポリシーが管理されている。 なお、分散配置されたSecurity Serverを通じて、これらの情報を利用可能になっている。

MemberはSecurity Serverを通じてX-Roadのセキュリティ面の機能を利用可能。 Security ServerはCentral Serverに保持されたコンフィグやTrust Service Provierが持っているAuthorityの情報を取得し、ローカルにキャッシュし動作する。

Information SystemはREST or SOAPプロトコルを用いてMemberが持つデータを提供・利用可能。 Security Serverがエントリポイントになり、Information Systemを利用可能。

データ交換のログが保存される。やり取りされるメッセージにはタイムスタンプが付く。 ログにはタイムスタンプとデジタル署名が用いられ、訴訟手続などにも利用可能。 TSAがタイムスタンプを提供し、Security Serverがログを保存する。 CAがメッセージへのサインを提供する。

詳しくは、 X-Roadのアーキテクチャ詳細 参照。そこの絵がわかりやすい。

フェデレーション

X-Roadエコシステム間のフェデレーション にあるように、異なるX-Roadエコシステム間をフェデレーションできる。 複数のエコシステムとフェデレーション可能だが、推移的なつながりは不可能であり、直接つながった同士のみ。

Federating X-Road System Overview にも記載がある。

開発ロードマップ

X-Roadの開発ロードマップ によると、

  • 同期的なメッセージングからPub/Subモデルへの対応(Kafkaなど)
  • 大量データノストリームへの対応
  • アーキテクチャの改善。モジュライズの促進。拡張機能を取り込みやすくする

といった改善が見込まれているようだ。次期バージョンはX-Road 7。

歴史

X-Roadの歴史 にあるように、2001年にエストニアにおいてプロジェクトが始まった。 前身から考えるとかなり歴史のあるプロダクトである。 また、歴史的背景から、最初は公的機関での利用が前提となっており、そこから民間に広がりつつあるような印象である。 民間での利用を促進するにはどうしたらよいのか?という主旨と思われる論文も公開されている。 -> On the Systematic Exploitation of the Estonian Data Exchange Layer X-Road for Strengthening Public-Private Partnerships

世界中で採用が進む

X-ROAD® WORLD MAP を見ると、世界各地でX-Road採用実績がある。 日本にも採用実績がある。 ニチガスの例。 なお、本事例にPlanetway社が関係している。Planetway社のプロフィール 参照。

実装モデル

X-Road Implementation Models に示されている通り、X-Roadの実装モデルは複数挙げられる。 上記サイトに掲載されていた例は以下の通り。

  • 国家で単一のX-Roadモデル
  • 地域ごとのX-Roadモデル
  • ビジネスドメインやセクタごとのX-Roadモデル
  • 組織ごとのX-Roadモデル

またX-Roadはフェデレーション機能を持っているので、 異なるX-Roadエコシステム間を繋ぎ利用することができる。 国際間で繋いでいる例もある。

またIstioのようなマイクロサービスアーキテクチャに対し、 X-Roadはあくまで異なるステークホルダ間のデータ連携を対象としているため、 一見似たように見えても異なるモデルである。

開発(ソースコード)

X-RoadのGitHub

関連文献

関連する論文がいくつかありそうだった。 On the Systematic Exploitation of the Estonian Data Exchange Layer X-Road for Strengthening Public-Private Partnerships 等。

なお、X-Roadのドキュメントライブラリに一覧が載っている。

X-Roadの広がりに関する課題

On the Systematic Exploitation of the Estonian Data Exchange Layer X-Road for Strengthening Public-Private Partnerships にて X-Roadの民間利用が進まない原因と対策のマトリクスが紹介されていた。 以下に簡単に翻訳したものを示す。

問題/ソリューション 認知を挙げる 契約の簡素化(単一化) セミナー・ハッカソン 導入マテリアル 収益性に関する具体的な例 メンバーシップパッケージの充実(?) セクター別のアプローチ 官僚を減らす(?) 様々なチャンネルでの情報発信
X-Roadのアクセスや利用が複雑すぎる y y y y y
プロセスに関係者が多すぎる y y y
プラットフォームが高すぎる y y
十分に認知されていない y y y
収益性についての理解が足りない y y y

ソリューションに付帯されるデータ共有の仕組み

Snowflake Data Sharing

Snowflake Secure Data Sharingの紹介 のとおり、 Snowflakeに保存したオブジェクトを他のSnowflakeアカウントに共有できる。 ポイントは「他のSnowflakeアカウントに」というところである。つまり異るサービス間で共有する目的のものではなさそう。 ただ、その後の説明を読むと「リーダーアカウント」(ここではReaderか)を発行でき、リーダーアカウント向けに読み取り専用でデータを共有できるようだ。

共有できるオブジェクトは以下の通り。

  • テーブル
  • 外部テーブル
  • 安全なビュー
  • 安全なマテリアライズドビュー
  • 安全な UDFs

共有されたデータを利用する際は、「読み取り専用」のデータベースが定義される。 また実際にコピー/転送されるわけではない。(同じ実態をコンピュートリソースが参照するという意味だと理解)

S3 Access Points

S3 Access Pointsに関するAWSブログ に記載の通り、2019年時点でS3に個別のアクセスポイントを設ける機能がリリースされている。

アクセスポイントを発行し、アクセスポイントポリシーを適用可能。 以前であれば、バケットに対してバケットポリシーを適用していたのだが、S3 Access Pointsを利用して個別のアクセスポイントを払い出すことができるようになった。 つまり、バケットポリシーで個別の制御を実現しようとすると、たくさんのポリシーをひとつのバケットポリシーに記載する必要があったが、 ひとつの接続可否をひとつのポリシーで管理出来るようになった、と言える。

S3アクセスポイントのうれしい点を自分なりの理解で解説してみる あたりにユーザ視点での意義が書かれている。分かり易い。

Amazon S3 Access Points が公式情報。

AWS Storage Gateway

AWS Storage Gateway にアーキテクチャの絵がまとまっているが、S3とオンプレのシステムを繋いでデータ共有するにあたり、 ゲートウェイの機能を提供する。

AWS上で素朴にS3バケットポリシーを使ってアカウント間共有

もっと素朴に実現する方法はあるのか?という意味では、Amazon S3のバケットを特定のAWSアカウントに共有する のブログの通り、S3のバケットポリシー上の工夫で対応可能。 バケットポリシーで特定のAWSアカウントを指定して共有できる。

ただし、この場合は、S3 Access Pointsで示したようなバケットポリシーの課題が生じる。

AWS Redshift Data Sharing

Amazon Redshift Data Sharing が一般提供開始となり、東京リージョンでもご利用可能 のとおり、 Redshift間でデータを共有する機能がある。

Announcing Amazon Redshift data sharing (preview) の通り、アカウントまたぎも可能そうに見える。

Treasure Data Data Exchange

トレジャーデータ契約企業同士のデータ連携を簡単に実現する「Data Exchange」機能を紹介! の通り、 ユーザが自身で所有しているデータを他のアカウントにデータを共有する事が可能。

また共有する際に、データ抽出量のクエリを渡すことができる。 これにより、特定のデータだけ渡す、などが可能。

PlanetCross

Planetway 社が提供するソリューション。 PlanetCorss の通り、エストニアの電子政府で利用されているX-Roadを 民間企業向けにカスタマイズして提供している。

上記ウェブサイトに記載されているが、ユーザ企業ごとにPlanetCross環境を立て、 それを通じて互いにデータ連携できるようになっている。

このモデルだと、GAIA-X/IDSのコネクタでデータスペースをつなげるモデルに近い。

受賞歴を見ると、特に2016、2017年あたりに注目を集めた様子。

(関連)Nortal社

Nortal 社はエストニアの電子政府ソリューションの40%異常を計画・実行した実績を持つ。 eIDに強みを持つようだ。

Aktors社

Aktors 社はX-Roadの先駆者。 Aktors CEOのAleksander Reitsakasは、2001年の最初のX-Roadプロジェクトマネージャ。

データエスクロー(第三者預託)

Crossbeam

CROSSBEAMDATA ESCROW 101あたりに紹介されている。 データを預かり、利害関係者に部分的に共有する機能を提供する。 パートナーマネジメントの一貫。

codekeeper

Codekeeperのエスクローサービス に記載の通り、重要なデータを預かるサービス。

データコラボレーションに関する研究機関

AIDAC

AIDAC の通り、筑波大学の研究グループ。 データコラボレーション 解析 を中心とした研究。

主旨は以下の通り。

組織や分野を越えたデータ統合には、データサイズが巨大であることや組織ごとのデータ特性の違いなどの課題があります。またさらにデータの秘匿性についても考慮する必要があります。本事業では、組織や分野にまたがって分散したデータに対して、元データの直接的な統合を行うことなく、高度な統合解析を可能とするための技術開発を行います。

また、以下の通りNEDOプロジェクトとして手がけた様だ。

本プロジェクトは、独立行政法人新エネルギー・産業技術総合開発機構(NEDO)の委託を受けて、 「人工知能技術適用によるスマート社会の実現(人工知能技術の社会実装に関する日米共同研究開発)」において実施するものです。

人工知能技術適用によるスマート社会の実現によると、 実施期間は以下の通り。

事業期間:2018年度~2022年度、2020年度予算:19.5億円

人工知能技術適用によるスマート社会の実現 プロジェクト紹介 に具体的なプロジェクトタイトルが載っている。

気になったプロジェクトは以下の通り。

  • MyData に基づく人工知能開発運用プラットフォームの構築:国立大学法人東京大学他
  • データコラボレーション解析による生産性向上を目指した次世代人工知能技術の研究開発:国立大学法人筑波大学他

セキュリティの情報共有

MISP

MISP にあるとおり、セキュリティの脅威を共有するプラットフォームとオープンソースソフトウェア。

共有

Source Code Reading of Delta Sharing

参考

メモ

サーバの参考実装

io.delta.sharing.server.DeltaSharingServiceの概略(起動の流れ)

サーバ起動スクリプトを見ると、以下のように io.delta.sharing.server.DeltaSharingService クラスが用いられている事が分かる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
declare -r lib_dir="$(realpath "${app_home}/../lib")"
declare -a app_mainclass=(io.delta.sharing.server.DeltaSharingService)
```

エントリポイントは以下の通り、

io/delta/sharing/server/DeltaSharingService.scala:276

```scala
def main(args: Array[String]): Unit = {
val ns = parser.parseArgsOrFail(args)
val serverConfigPath = ns.getString("config")
val serverConf = ServerConfig.load(serverConfigPath)
start(serverConf).blockUntilShutdown()
}

コンフィグのロード

渡された設定ファイルのPATHを用いて、設定を読み込む。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Load the configurations for the server from the config file. If the file name ends with
* `.yaml` or `.yml`, load it using the YAML parser. Otherwise, throw an error.
*/
def load(configFile: String): ServerConfig = {
if (configFile.endsWith(".yaml") || configFile.endsWith(".yml")) {
val serverConfig =
createYamlObjectMapper.readValue(new File(configFile), classOf[ServerConfig])
serverConfig.checkConfig()
serverConfig
} else {
throw new IOException("The server config file must be a yml or yaml file")
}
}

startメソッド

オブジェクトのmainメソッド。その中で用いられている、 start メソッド内を確認していく。

io/delta/sharing/server/DeltaSharingService.scala:230

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def start(serverConfig: ServerConfig): Server = {
lazy val server = {
updateDefaultJsonPrinterForScalaPbConverterUtil()
val builder = Server.builder()
.defaultHostname(serverConfig.getHost)
.disableDateHeader()
.disableServerHeader()
.annotatedService(serverConfig.endpoint, new DeltaSharingService(serverConfig): Any)
if (serverConfig.ssl == null) {
builder.http(serverConfig.getPort)
} else {
builder.https(serverConfig.getPort)
if (serverConfig.ssl.selfSigned) {
builder.tlsSelfSigned()
} else {
if (serverConfig.ssl.certificatePasswordFile == null) {
builder.tls(
new File(serverConfig.ssl.certificateFile),
new File(serverConfig.ssl.certificateKeyFile))
} else {
builder.tls(
new File(serverConfig.ssl.certificateFile),
new File(serverConfig.ssl.certificateKeyFile),
FileUtils.readFileToString(new File(serverConfig.ssl.certificatePasswordFile), UTF_8)
)
}
}
}
if (serverConfig.getAuthorization != null) {
// Authorization is set. Set up the authorization using the token in the server config.
val authServiceBuilder =
AuthService.builder.addOAuth2((_: ServiceRequestContext, token: OAuth2Token) => {
// Use `MessageDigest.isEqual` to do a time-constant comparison to avoid timing attacks
val authorized = MessageDigest.isEqual(
token.accessToken.getBytes(UTF_8),
serverConfig.getAuthorization.getBearerToken.getBytes(UTF_8))
CompletableFuture.completedFuture(authorized)
})
builder.decorator(authServiceBuilder.newDecorator)
}
builder.build()
}
server.start().get()
server
}

一番最後の箇所の通り、

1
2
server.start().get()
server

server は、 Armeria のビルダを用いてインスタンス化されたサーバを起動する。 なお、startメソッド内ではTLS周りの設定、トークンの設定などが行われる。

なお、サーバに渡されるクラスは以下の通り、

1
2
3
4
5
val builder = Server.builder()
.defaultHostname(serverConfig.getHost)
.disableDateHeader()
.disableServerHeader()
.annotatedService(serverConfig.endpoint, new DeltaSharingService(serverConfig): Any)

com.linecorp.armeria.server.ServerBuilder#annotatedService(java.lang.String, java.lang.Object) メソッドを用いて渡される。 渡されているのは io.delta.sharing.server.DeltaSharingService クラスである。 annotatedService メソッドについては、 ArmeriaのAnnotated service を参照。

Shareのリストを返す箇所の実装

以下の通り、

io/delta/sharing/server/DeltaSharingService.scala:108

1
2
3
4
5
6
7
8
@Get("/shares")
@ProducesJson
def listShares(
@Param("maxResults") @Default("500") maxResults: Int,
@Param("pageToken") @Nullable pageToken: String): ListSharesResponse = processRequest {
val (shares, nextPageToken) = sharedTableManager.listShares(Option(pageToken), Some(maxResults))
ListSharesResponse(shares, nextPageToken)
}

io.delta.sharing.server.SharedTableManager#SharedTableManager クラスのインスタンスを利用し、 io.delta.sharing.server.SharedTableManager#listShares メソッドを用いて、 設定ファイルから読み込んだShareのリストを取得し返す。

つまり、このあたりの値を返す時にはデータ本体にアクセスしていない、ということになる。

仮に...もし設定ファイルに書かれたデータの実体が無かった場合はどうなるのだろうか。 ★要確認

テーブルのリスト取得も同じような感じだった。

テーブルのバージョンを返す箇所の実装

io/delta/sharing/server/DeltaSharingService.scala:144

1
2
3
4
5
6
7
8
9
10
@Head("/shares/{share}/schemas/{schema}/tables/{table}")
def getTableVersion(
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String): HttpResponse = processRequest {
val tableConfig = sharedTableManager.getTable(share, schema, table)
val version = deltaSharedTableLoader.loadTable(tableConfig).tableVersion
val headers = createHeadersBuilderForTableVersion(version).build()
HttpResponse.of(headers)
}

io.delta.sharing.server.SharedTableManager#getTable メソッドを利用し、コンフィグから読み込んだ情報を元に テーブル情報を取得する。

つづいて、 io.delta.standalone.internal.DeltaSharedTableLoader#loadTable メソッドを利用し、 io.delta.standalone.internal.DeltaSharedTable#DeltaSharedTable クラスのインスタンスを取得する。

DeltaSharedTable クラスは、 DeltaLog クラスをラップした管理用のクラス。

io.delta.standalone.internal.DeltaSharedTable クラス

このクラスは、DeltaLog クラスをラップしたものであり、サーバの管理機能の主要なコンポーネントである。

deltaLogの取得

例えば deltaLog インスタンスを取得できる。

io/delta/standalone/internal/DeltaSharedTableLoader.scala:74

1
2
3
4
5
6
7
8
private val deltaLog = withClassLoader {
val tablePath = new Path(tableConfig.getLocation)
val fs = tablePath.getFileSystem(conf)
if (!fs.isInstanceOf[S3AFileSystem]) {
throw new IllegalStateException("Cannot share tables on non S3 file systems")
}
DeltaLog.forTable(conf, tablePath).asInstanceOf[DeltaLogImpl]
}

上記の通り、内部では io.delta.standalone.DeltaLog#forTable(org.apache.hadoop.conf.Configuration, org.apache.hadoop.fs.Path) メソッドが 用いられており、実際にDelta Lake形式で保存されたデータ本体(Delta Lakeのメタデータ)にアクセスする。

テーブルバージョン取得

Delta Logの機能を利用し、テーブルのバージョンを取得できる。

1
2
3
4
5
6
/** Return the current table version */
def tableVersion: Long = withClassLoader {
val snapshot = deltaLog.snapshot
validateDeltaTable(snapshot)
snapshot.version
}

この機能は io.delta.sharing.server.DeltaSharingService#getTableVersion の実装に利用されている。

クエリ

io.delta.standalone.internal.DeltaSharedTable#query メソッドは、 後述の通り、DeltaSharingSerivce 内でファイルリストを取得したり、メタデータを取得したりするときに用いられる。

ファイルリストを返す箇所の実装

query はファイルリストを返すAPIである。

io/delta/sharing/server/DeltaSharingService.scala:169

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Post("/shares/{share}/schemas/{schema}/tables/{table}/query")
@ConsumesJson
def listFiles(
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
queryTableRequest: QueryTableRequest): HttpResponse = processRequest {
val tableConfig = sharedTableManager.getTable(share, schema, table)
val (version, actions) = deltaSharedTableLoader.loadTable(tableConfig).query(
includeFiles = true,
queryTableRequest.predicateHints,
queryTableRequest.limitHint)
streamingOutput(version, actions)
}

特徴的なのは、 predicateHintslimitHint が渡されており、フィルタ条件が指定できる点。 ただし、公式ドキュメント上では「ベストエフォート」と書かれている。

このメソッドのポイントは、 io.delta.standalone.internal.DeltaSharedTableLoader#DeltaSharedTableLoader クラスを用いたクエリの部分だと考えられる。 このクラスは、Deltaテーブルの本体にアクセスし、各種情報を読みこむためのもので、コメントを読む限り、キャッシュする機能なども有している。 内部では io.delta.standalone.internal.DeltaSharedTable クラスが利用されている。 DeltaSharedTable クラスは、 DeltaLog をラップしたものであり、Delta Logの各種情報をサーバ内で扱うための機能を提供する。

特に、今回用いられているのは io.delta.standalone.internal.DeltaSharedTable#query メソッドである。

io/delta/standalone/internal/DeltaSharedTableLoader.scala:118

1
2
3
4
5
6
7
  def query(
includeFiles: Boolean,
predicateHits: Seq[String],
limitHint: Option[Int]): (Long, Seq[model.SingleAction]) = withClassLoader {
// TODO Support `limitHint`

(snip)

メソッドの定義の通り、このクエリは最終的に、

io/delta/standalone/internal/DeltaSharedTableLoader.scala:165

1
snapshot.version -> actions

Delta Logから情報取得したスナップショットのバージョン、ファイル情報のシーケンスを返す。

なお、 actions に格納されている SingleAction クラスは以下の通り、 ファイルひとつに関する情報、プロトコルの情報、メタデータについての情報を保持している。

io/delta/sharing/server/model.scala:22

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case class SingleAction(
file: AddFile = null,
metaData: Metadata = null,
protocol: Protocol = null) {

def unwrap: Action = {
if (file != null) {
file
} else if (metaData != null) {
metaData
} else if (protocol != null) {
protocol
} else {
null
}
}
}

アクションの生成される箇所

上記の通り、ここでいう「アクション」とはデータの実体であるファイル1個に関連する情報の塊である。 生成されるのは以下の箇所。

io/delta/standalone/internal/DeltaSharedTableLoader.scala:137

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val actions = Seq(modelProtocol.wrap, modelMetadata.wrap) ++ {
if (includeFiles) {
val selectedFiles = state.activeFiles.values.toSeq
val filteredFilters =
if (evaluatePredicateHints && modelMetadata.partitionColumns.nonEmpty) {
PartitionFilterUtils.evaluatePredicate(
modelMetadata.schemaString,
modelMetadata.partitionColumns,
predicateHits,
selectedFiles
)
} else {
selectedFiles
}
filteredFilters.map { addFile =>
val cloudPath = absolutePath(deltaLog.dataPath, addFile.path)
val signedUrl = signFile(cloudPath)
val modelAddFile = model.AddFile(url = signedUrl,
id = Hashing.md5().hashString(addFile.path, UTF_8).toString,
partitionValues = addFile.partitionValues,
size = addFile.size,
stats = addFile.stats)
modelAddFile.wrap
}
} else {
Nil
}
}

最初から主要な部分の実装を確認する。

まず、シーケンスを作る際、先頭にプロトコルとメタデータをラップしたアクションが保持される。

1
val actions = Seq(modelProtocol.wrap, modelMetadata.wrap) ++ {

その後、適宜ファイルに関する情報が格納される。 一応 includeFiles で空情報を返すかどうかの判定があるが、ここではTrueだとして話を進める。

まず、Delta Lakeテーブルのアクティブなファイルが取得される。

1
val selectedFiles = state.activeFiles.values.toSeq

つづいて、Predicateヒントがあり、パーティションカラムが設定されている場合は、 それに基づいてフィルタが行われる。

1
2
3
4
5
6
7
8
9
10
11
val filteredFilters =
if (evaluatePredicateHints && modelMetadata.partitionColumns.nonEmpty) {
PartitionFilterUtils.evaluatePredicate(
modelMetadata.schemaString,
modelMetadata.partitionColumns,
predicateHits,
selectedFiles
)
} else {
selectedFiles
}

ヒントを用いてフィルタしている箇所については後述する。

ここではフィルタされたファイルのリストが返されたとして話を続ける。

次に、AddFile の情報を用いて、より詳細な情報が確認される。

1
2
3
4
5
6
7
8
9
10
filteredFilters.map { addFile =>
val cloudPath = absolutePath(deltaLog.dataPath, addFile.path)
val signedUrl = signFile(cloudPath)
val modelAddFile = model.AddFile(url = signedUrl,
id = Hashing.md5().hashString(addFile.path, UTF_8).toString,
partitionValues = addFile.partitionValues,
size = addFile.size,
stats = addFile.stats)
modelAddFile.wrap
}

絶対PATH(というかURL)の取得、下回りのストレージ(現時点ではS3のみ対応)の署名済みURL取得が行われ、 改めて AddFile に格納されたのち、アクションにラップされて返される。

ここで、1点だけ署名済みURL取得する部分だけ確認する。 取得には、 io.delta.standalone.internal.DeltaSharedTable#signFile メソッドが用いられる。

io/delta/standalone/internal/DeltaSharedTableLoader.scala:192

1
2
3
4
5
6
private def signFile(path: Path): String = {
val absPath = path.toUri
val bucketName = absPath.getHost
val objectKey = absPath.getPath.stripPrefix("/")
signer.sign(bucketName, objectKey).toString
}

このメソッドでは、上記の通り、内部でバケット名やオブジェクトキーを渡しながら、 io.delta.sharing.server.S3FileSigner#sign メソッドが利用されている。

なお、signer インスタンスは以下のようになっており、

1
2
3
private val signer = withClassLoader {
new S3FileSigner(deltaLog.dataPath.toUri, conf, preSignedUrlTimeoutSeconds)
}

将来的にS3以外にも対応できるような余地が残されている。

さて、 sign メソッドに戻る。

io/delta/sharing/server/CloudFileSigner.scala:41

1
2
3
4
5
6
7
8
override def sign(bucket: String, objectKey: String): URL = {
val expiration =
new Date(System.currentTimeMillis() + SECONDS.toMillis(preSignedUrlTimeoutSeconds))
val request = new GeneratePresignedUrlRequest(bucket, objectKey)
.withMethod(HttpMethod.GET)
.withExpiration(expiration)
s3Client.generatePresignedUrl(request)
}

上記の通り、 com.amazonaws.services.s3.model.GeneratePresignedUrlRequest#GeneratePresignedUrlRequest(java.lang.String, java.lang.String) メソッドを利用してS3の署名付きURLが取得されている事が分かる。

ヒントが利用されている箇所

2021/9/18現在では、 predicateHits は用いられているが、limitHint は用いていないように見える。

predicateHints が用いられるのは以下の個所。

io/delta/standalone/internal/DeltaSharedTableLoader.scala:142

1
2
3
4
5
6
PartitionFilterUtils.evaluatePredicate(
modelMetadata.schemaString,
modelMetadata.partitionColumns,
predicateHits,
selectedFiles
)

io.delta.standalone.internal.PartitionFilterUtils$#evaluatePredicate メソッドは、 引数に predicateHits や対象となるDelta Tableに紐づいているアクティブなファイルリストを引数に渡される。 内部で、有効なpredicateかどうかなどをチェックされたのち、有効なpredicate文言があれば、 それに従いファイルリストがフィルタされる。

なお、内部的にはDelta LakeやSpark(の特にCatalyst)の実装に依存しているような箇所がみられる。

具体的には、 evaluatePredicate メソッド内の以下の個所。

io/delta/standalone/internal/PartitionFilterUtils.scala:61

1
2
3
4
addFiles.filter { addFile =>
val converter = CatalystTypeConverters.createToCatalystConverter(addSchema)
predicate.eval(converter(addFile).asInstanceOf[InternalRow])
}

ここで predicateorg.apache.spark.sql.catalyst.expressions.InterpretedPredicate クラスの インスタンスである。(SparkのCatalystで用いられる、Predicate表現のひとつ) このとき、eval メソッドが呼び出されていることがわかる。

/home/dobachi/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-catalyst_2.12/2.4.7/spark-catalyst_2.12-2.4.7-sources.jar!/org/apache/spark/sql/catalyst/expressions/predicates.scala:39

1
2
case class InterpretedPredicate(expression: Expression) extends BasePredicate {
override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]

このように、SparkのCatalystにおいては eval メソッドには、Spark SQLの行の表現である InternalRow のインスタンスが渡され、当該 predicate に合致するかどうかがチェックされる。

クラスローダについて

このクラスには、 withClassLoader が定義されており、 Armeriaではなく、DeltaSharedTableのクラスローダの下で関数を実行できるようになっている。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Run `func` under the classloader of `DeltaSharedTable`. We cannot use the classloader set by
* Armeria as Hadoop needs to search the classpath to find its classes.
*/
private def withClassLoader[T](func: => T): T = {
val classLoader = Thread.currentThread().getContextClassLoader
if (classLoader == null) {
Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader)
try func finally {
Thread.currentThread().setContextClassLoader(null)
}
} else {
func
}
}
共有

Recent hot activities of Delta Sharing

参考

メモ

いま何がどのくらいできるようになっているのか?

2021/9時点で何ができるのか? Delta Sharing公式GitHub のREADMEによると以下の通り。

クライアント、開発言語バインディング

  • SQL(Spark SQL)
  • Python
  • Scala
  • Java
  • R

サーバの参考実装

Delta Sharing Protocal に乗っ取ってデータ共有の動作確認が可能な 参考実装が公開されている。

ただし、この実装にはセキュリティ周りの機能が十分に実装されておらず、 もし公開して利用する場合はプロキシの裏側で使うことが推奨されている。

サーバを立ち上げるときに共有するデータの定義を渡せる。 delta-sharing-server.yaml.template にコンフィグのサンプルが載っている。

Concept にデータモデル、主要なコンポーネントが載っている。

  • Share(シェア):
    • 共有の論理的なグループ
    • このグループに含まれるものが受信者に共有される。
  • Schema(スキーマ):
    • 複数のTableを含む
  • Table(テーブル):
    • Delta Lakeのテーブルもしくはビュー
  • Recipient(受信者):
    • シェアにアクセス可能なBearerトークンをもつもの
  • Sharing Server(共有サーバ):
    • プロトコルを実装したサーバ

プロトコル

Delta Sharing Protocal に載っている通り、 Delta Sharingのサーバとクライアントの間のプロトコルが規定されている。

  • シェアをリスト
  • シェア内のスキーマをリスト
  • スキーマ内のテーブルをリスト
  • テーブルのバージョン情報取得
    • おそらくDelta Lakeとしてのバージョン情報
  • テーブルのメタデータ取得
  • データ取得
    • データ取得時には、PredicateHint、LimitHintを渡せる

テーブルメタデータ

Metadata にテーブルに付帯できるメタデータの定義が載っている。 スキーマやフォーマットなどの情報に加え、任意の説明を保持できるようだ。

Support writing to Delta Shares

Support writing to Delta Shares #33

書き込みへの対応について提案がなされたが、2021/8/27現在は保留となっている。 いったん読み込みに注力、とのこと。

なお、Treasure DataのTaroさんがTreasure Dataにおける対応例を紹介している。 (が、それに対する応答はない)

Add pre-signed URL request logging to server

Add pre-signed URL request logging to server

今のところ、プリサインドURLの件数を知る術がない。 そこで、DEBUGログにプリサインドURLの発行情報を出力するのはどうか、というチケットが挙がっていた。

Support Azure Storage in Delta Sharing Server

Support Azure Storage in Delta Sharing Server

Azureでは、 Delegate access with a shared access signature の機能が提供されているので、割と対応しやすいのでは?という議論になっている。 AzureのSASは、定められたパーミッションで定められた期間だけリソースにアクセス可能にする。

なお、SASにはアカウントレベル、サービスレベル、ユーザデリゲーションの3種類がある。 それぞれスコープやできることが異なる。

Delta Sharingがストレージに特化しているのであれば、まずはBlobに対するデリゲーションURLを発行できれば良さそうである。

Support Google Cloud Storage in Delta Sharing Server

Support Google Cloud Storage in Delta Sharing Server

Azureと同じく、 署名付きURL に対応しているため、そこまで難しくは無いだろう、というコメントがあった。

これはどうだろうか?

共有

Spark Summit 2021の前にきになってみたセッションのメモ

参考

メモ

Spark Summit 2021の気になったセッションのメモ

Lakehouse with Delta Lake Deep Dive Training

Manufacturing and Distribution Industry Forum

Data Distribution and Ordering for Efficient Data Source V2

Delta Lake Streaming Under the Hood

Introducing Delta Live Tables Make Reliable ETL Easy on Delta Lake

Magnet Shuffle Service Push-based Shuffle at LinkedIn

共有

Socks proxy for Jupyter Lab

参考

メモ

以下の条件に当てはまる場合に設定する項目。

  • Jupyter Labをリモート環境で実行している
  • Socksプロキシを使っている
  • ChromeやFirefoxのエクステンションでFoxyProxyのようなURLマッチングでプロキシを差し替えている

このとき、https?に加え、wsも指定する必要があります。 具体的には以下の通り。

Foxy Proxyでの設定例1
Foxy Proxyでの設定例2

つまりは、ウェブソケットのことを忘れることなかれ、ということである。 (いつも忘れるから、備忘録として記載しておく)

共有

Minio getting started

参考

公式

ブログ

メモ

S3互換の仕組みを導入したく、検索してヒットしたので試した。

動作確認

前提

1
2
$ cat /etc/redhat-release
CentOS Linux release 7.9.2009 (Core)

サーバ起動

Minioの公式ウェブサイト を確認し、

1
2
3
4
5
6
$ mkdir ~/Minio
$ cd ~/Minio
$ wget https://dl.min.io/server/minio/release/linux-amd64/minio
$ chmod +x minio
$ mkdir data
$ ./minio server data

http://localhost:9000/minio/ にアクセスすると、ログイン画面が出るので、 デフォルトのID、パスワードを入力する。

コンフィグ

Minioのコンフィグガイド を見ると、アドミンユーザとパスワードの設定方法が載っていた。

1
2
3
$ export MINIO_ROOT_USER=minio
$ export MINIO_ROOT_PASSWORD=miniominio
$ ./minio server data

上記のようにすればよさそうである。なお、USERは3文字以上、PASSWORDは8文字以上が必要。 ここでは、ローカル環境用にお試し用のID、パスワードとした。

mcコマンドの利用

MinioのアドミンガイドMinioのクライアントガイド を参考に、いくつか動作確認、設定してみる。

1
2
$ cd ~/Minio
$ wget https://dl.min.io/client/mc/release/linux-amd64/mc

エイリアスを設定。

1
$ ./mc alias set myminio http://172.24.88.24:9000 minio miniominio

情報を確認。

1
2
3
4
5
$ ./mc admin info myminio
● 172.24.88.24:9000
Uptime: 6 minutes
Version: 2021-06-09T18:51:39Z
Network: 1/1 OK

バケットを作成

1
2
$ ./mc mb myminio/test
Bucket created successfully `myminio/test`.

ファイルを書き込み

1
2
3
$ echo test > /tmp/test.txt
$ ./mc cp /tmp/test.txt myminio/test/
/tmp/test.txt: 5 B / 5 B ┃┃ 224 B/s 0s

s3コマンドで利用

開発のためにローカルにもS3が欲しいというわがまま、MINIOが叶えてくれました を参照し、 S3としてアクセスしてみる。

1
2
3
4
5
6
$ aws --profile myminio configure
AWS Access Key ID [None]: minio
AWS Secret Access Key [None]: miniominio
Default region name [None]:
Default output format [None]:
$ aws --endpoint-url http://127.0.0.1:9000 --profile myminio s3 mb s3://test2

上記のように、エンドポイントを指定して利用する。

1
2
$ aws --endpoint-url http://127.0.0.1:9000 --profile myminio s3 cp /tmp/test.txt s3://test2
upload: ../../../tmp/test.txt to s3://test2/test.txt

アップデート

以下のメッセージが出たので、試しにアップデートしてみた。

1
2
You are running an older version of MinIO released 2 months ago
Update: Run `mc admin update`

まず、Minioのサーバを起動しておく。

1
$ ./minio server data

この後別のターミナルを開き、エイリアすを設定しておく。

1
$ ./mc alias set myminio http://172.24.32.250:9000 minioadmin minioadmin

アップデート実行

1
2
$ ./mc admin update myminio
Server `myminio` updated successfully from 2021-06-09T18:51:39Z to 2021-08-25T00-41-18Z

署名済みURL発行

boto3 を用いて MinIO でユーザの追加と Security Token Service(STS) による一時認証情報での署名済み URL を発行する を参考に、 上記環境で署名済みURLを発行してみる。 これにより、あるオブジェクトへのアクセス権をデリゲート可能になるはずである。

S3互換サービスのためのエイリアス

前述の通り、 mc alias を利用して、S3互換サービスのためのエイリアスを作成済みである。 ここでは、その作成した myminio を利用する。

作業用バケット作成

今回の動作確認で使用するバケットを作成する。

1
$ ./mc mb myminio/share

ポリシーファイルの作成

以下のファイルを作成する。

share.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ cat << EOF > share.json
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:GetObject",
> "s3:PutObject"
> ],
> "Effect": "Allow",
> "Resource": [
> "arn:aws:s3:::share/*"
> ],
> "Sid": ""
> }
> ]
> }
> EOF

ポリシーを適用

mc admin policy の通り、AWSのIAM互換のポリシーを利用し、Minioのポリシーを設定する。 先程作成したファイルを用いる。

1
$ ./mc admin policy add myminio share share.json

適用されたことを確かめる

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ ./mc admin policy info  myminio share
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::share/*"
]
}
]
}

ユーザ作成

mc admin userを利用したユーザ作成 を利用して、 動作確認のためのユーザを作成する。

1
$ ./mc admin user add myminio bob bob123456

なお、上記ドキュメントでは

mc admin user add ALIAS ACCESSKEY SECRETKEY

と記載されているが、ACCESSKEY にしていた内容がユーザ名になるようだ。

ユーザにポリシーを設定

1
$ ./mc admin policy set myminio share user=bob

ユーザ作成され、ポリシーが設定されたことを確かめる。

1
2
$ ./mc admin user list myminio
enabled bob share

共有の動作確認用のファイルの準備

手元でファイルを作り、Minio上にアップロードしておく。

1
2
$ echo "hoge" > test.txt
$ ./mc cp test.txt myminio/share/test.txt

Python botoを利用して一時アカウント発行&署名済みURL発行

ウェブUIからの発行

実は、ウェブUIからも発行できるため、先にそちらを試した。

ウェブUIからの署名済みURL発行の例

オブジェクトブラウザから辿りオブジェクトのページを開き、共有マークを押下すると発行できた。 実際にアクセスしたところ、ファイル本体にアクセスできた。

前提

  • Pythonバージョン: 3.7.10
  • 利用パッケージ: boto3、jupyter

ソースコードの例

https://github.com/dobachi/MinioPresignedURLExample にサンプルを載せておくことにする。 こちらの手順で特に問題なく、発行できた。

(トラブルシュート)署名済みURLにアクセスしてみたらエラー

署名済みURLを発行してアクセスしてみたら、以下のようなエラーが生じた。

1
2
3
4
5
6
7
8
9
<Error>
<Code>AccessDenied</Code>
<Message>Access Denied.</Message>
<Key>test.txt</Key>
<BucketName>share</BucketName>
<Resource>/share/test.txt</Resource>
<RequestId>16A34CDD784CBCCC</RequestId>
<HostId>baec9ee7-bfb0-441b-a70a-493bfd80d745</HostId>
</Error>

ひとまず、./mc admin traceを利用してデバッグメッセージを確認しながらアクセスしてみる。 生じたエラーメッセージは以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
172.24.32.250:9000 [REQUEST s3.GetObject] [2021-09-10T00:34:26:000] [Client IP: 172.24.32.1]
172.24.32.250:9000 GET /share/test.txt?xxxxxxxx
172.24.32.250:9000 Proto: HTTP/1.1
172.24.32.250:9000 Host: 172.24.32.250:9000
172.24.32.250:9000 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
172.24.32.250:9000 Accept-Encoding: gzip, deflate
172.24.32.250:9000 Cache-Control: max-age=0
172.24.32.250:9000 Content-Length: 0
172.24.32.250:9000 Dnt: 1
172.24.32.250:9000 Upgrade-Insecure-Requests: 1
172.24.32.250:9000 Accept-Language: ja,en;q=0.9,en-GB;q=0.8,en-US;q=0.7
172.24.32.250:9000 Connection: keep-alive
172.24.32.250:9000 Cookie: token=xxxxxxxxx
172.24.32.250:9000 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36 Edg/92.0.902.84
172.24.32.250:9000 <BODY>
172.24.32.250:9000 [RESPONSE] [2021-09-10T00:34:26:000] [ Duration 387µs ↑ 132 B ↓ 617 B ]
172.24.32.250:9000 403 Forbidden
172.24.32.250:9000 Accept-Ranges: bytes
172.24.32.250:9000 Content-Length: 294
172.24.32.250:9000 Server: MinIO
172.24.32.250:9000 X-Content-Type-Options: nosniff
172.24.32.250:9000 X-Amz-Request-Id: 16A34EBDA67B7B88
172.24.32.250:9000 X-Xss-Protection: 1; mode=block
172.24.32.250:9000 Content-Security-Policy: block-all-mixed-content
172.24.32.250:9000 Content-Type: application/xml
172.24.32.250:9000 Strict-Transport-Security: max-age=31536000; includeSubDomains
172.24.32.250:9000 Vary: Origin
172.24.32.250:9000 <?xml version="1.0" encoding="UTF-8"?>
<Error><Code>AccessDenied</Code><Message>Request has expired</Message><Key>test.txt</Key><BucketName>share</BucketName><Resource>/share/test.txt</Resource><RequestId>16A34EBDA67B7B88</RequestId><HostId>baec9ee7-bfb0-441b-a70a-493bfd80d745</HostId></Error>
172.24.32.250:9000
172.24.32.250:9000 [REQUEST s3.ListObjectsV1] [2021-09-10T00:34:26:000] [Client IP: 172.24.32.1]
172.24.32.250:9000 GET /favicon.ico
172.24.32.250:9000 Proto: HTTP/1.1
172.24.32.250:9000 Host: 172.24.32.250:9000
172.24.32.250:9000 Accept: image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8
172.24.32.250:9000 Cache-Control: no-cache
172.24.32.250:9000 Connection: keep-alive
172.24.32.250:9000 Cookie: token=xxxxxxxxxxxxxxxxxxx
172.24.32.250:9000 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36 Edg/92.0.902.84
172.24.32.250:9000 Accept-Encoding: gzip, deflate
172.24.32.250:9000 Accept-Language: ja,en;q=0.9,en-GB;q=0.8,en-US;q=0.7
172.24.32.250:9000 Content-Length: 0
172.24.32.250:9000 Dnt: 1
172.24.32.250:9000 Pragma: no-cache
172.24.32.250:9000 Referer: http://172.24.32.250:9000/share/test.txt?xxxxxxxxxxxxxx
172.24.32.250:9000
172.24.32.250:9000 [RESPONSE] [2021-09-10T00:34:26:000] [ Duration 184µs ↑ 121 B ↓ 596 B ]
172.24.32.250:9000 403 Forbidden
172.24.32.250:9000 X-Xss-Protection: 1; mode=block
172.24.32.250:9000 Content-Length: 273
172.24.32.250:9000 Content-Type: application/xml
172.24.32.250:9000 Strict-Transport-Security: max-age=31536000; includeSubDomains
172.24.32.250:9000 X-Amz-Request-Id: 16A34EBDAA2A2630
172.24.32.250:9000 X-Content-Type-Options: nosniff
172.24.32.250:9000 Accept-Ranges: bytes
172.24.32.250:9000 Content-Security-Policy: block-all-mixed-content
172.24.32.250:9000 Server: MinIO
172.24.32.250:9000 Vary: OriginAccept-Encoding
172.24.32.250:9000 <?xml version="1.0" encoding="UTF-8"?>
<Error><Code>AccessDenied</Code><Message>Access Denied.</Message><BucketName>favicon.ico</BucketName><Resource>/favicon.ico</Resource><RequestId>16A34EBDAA2A2630</RequestId><HostId>baec9ee7-bfb0-441b-a70a-493bfd80d745</HostId></Error>
172.24.32.250:9000

あまり追加情報はなさそう。ひとまず、いずれにせよ403 Forbidenエラーであることがわかる。

で、結論としては、PythonでURLを発行する際に指定したポリシが間違っていた。

以下のようにすべきところを、

1
2
3
"Resource": [
"arn:aws:s3:::share/*"
],

以下のように * が抜けていた。

1
2
3
"Resource": [
"arn:aws:s3:::share/"
],

結果として指定するスコープが小さすぎたということか。

共有