Management Domain of EDC

メモ

概要

2024/6あたりから、 EDC/management-domains のドキュメントが追加された。

EDC/management-domains/Introduction の通り、Management Domainを利用することで、EDCコンポーネント群を組織的に管理することができる。 例えば、管理組織と下部組織にわけ、下部組織は管理組織に管理を委譲できる。

ここで言うEDCコンポーネントとは、例えば以下のようなものが挙げられている。

  • Catalog Server
  • Control Plane
  • Data Plane
  • Identity Hub

2024/7/14現在では、対象は以下のようになっている。

1
The current document will outline how management domains operate for three EDC components: the Catalog Server, Control Plane, and Data Plane.

Topology

デプロイメトには種類がある。 大まかに分けて、単一構成(Single Management Domain)と分散型構成(Distributed Management Domains)である。

分散型の構成は、親Management Domainがどこまでを担うかによります。

Architecture

EDCはDCAT3のCatalog型を利用する。Catalog型の親はDataset型である。 またCatalog型はほかの複数のCatalogを含むことができる。その際、Service型で定義する。 したがって、以下のような例になる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"@context": "https://w3id.org/dspace/v0.8/context.json",
"@id": "urn:uuid:3afeadd8-ed2d-569e-d634-8394a8836d57",
"@type": "dcat:Catalog",
"dct:title": "Data Provider Root Catalog",
"dct:description": [
"A catalog of catalogs"
],
"dct:publisher": "Data Provider A",
"dcat:catalog": {
"@type": "dcat:Catalog",
"dct:publisher": "Data Provider A",
"dcat:distribution": {
"@type": "dcat:Distribution",
"dcat:accessService": "urn:uuid:4aa2dcc8-4d2d-569e-d634-8394a8834d77"
},
"dcat:service": [
{
"@id": "urn:uuid:4aa2dcc8-4d2d-569e-d634-8394a8834d77",
"@type": "dcat:DataService",
"dcat:endpointURL": "https://provder-a.com/subcatalog"
}
]
}
}

Sub CatalogはDCATのService型で関連付けされる。 この例は、Distributed Management Domainの2aに相当する。

アクセス管理についても考える。 2bのパターンのでは、親のCatalog Serverが下部組織のManagement Domainを管理する。そのため、ContractDefinitionにアクセス権を定義することで下部組織へのアクセス管理を実現する。

また、さらに集中型のControl Planeを設ける場合は、親のControl Planeが子のData Planeを管理する。

また、2024/7時点での設計では、複製を避けるようになっている。これは性能と単純さのためである。複合Catalogは上記の通り、ハイパーリンクを使用するようにしており、非同期のクローラで遅延ナビゲートされることも可能。 また、Catalogもそうだが、Contract NegotiationやTransfer Processのメタデータも複製されないことにも重きが置かれている。各Management Domainはそれぞれ責任を持って管理するべき、としている。 もし組織間で連携が必要な場合は読み取り専用の複製として渡すEDC拡張機能として実現可能である。 なお、個人的な所感だが、これは2aや2bであれば実現しうるが、2cだとControl Planeが親側に存在するため、Contract Negotiationの管理主体が親になってしまうのではないか、と思うがどうだろうか。Data Plane側はもしかしたら、下部組織側に残せるかもしれない。

実装

以上の内容を実現するため、いくつかEDCに変更が行われる。

  • Asset にCatalogを示す真理値を追加。もし @typeedc:CatalogAsset の場合は真になる。
  • Management APIが更新される。 Asset@type を持てるように。
  • Dataset の拡張として Catalog を追加
  • DatasetResolverImpl を更新。 Asset のCatalogサブタイプを扱えるように。
  • JsonObjectFromDatasetTransformer をリファンクたリング。 Catalog サブタイプを扱えるように。

合わせて、Fedrated Catalog Crawler (FCC) をリファクタリング。複合カタログをナビゲートし、キャッシュできるように。 参加者ごとにCatalogの更新をアトミックに実行できるようにする必要がある。

Management APIはCatalogを要求する際、ローカルのFCCキャッシュを参照するようにリファクタリングされる。

Catalog Serverもリファクタリングされる。

参考

共有

MVD_of_EDC

メモ

2024/7/12時点でプロジェクトのドキュメントが更新され、旧来Developer Documentとされていたものがなくなった。 その代わり、README に色々な記述が追加されている。今回はこれを使って試す。

Identity & Trustについての説明

README#Introduction にある通り、Eclipse Dataspace Working Groupの下で、IdentityやTrustの仕様検討が行われている。 ただ、当該章に記載のリンクをたどると、Tractus-Xのサイトにたどり着く。 -> Eclipse Tractus-X/identity-trust 当該レポジトリのREADMEでも記載されているが、この状態は暫定のようだ。

1
Until the first version of the specification from the working group is released and implemented, this repository contains the current specification implemented in the Catena-X data space. Maintenance is limited to urgent issues concerning the operation of this data space.

何はともあれ、2024/7現在は、Verifiable Credentialを用いたDecentralized Claims Protocolは絶賛仕様決定・実装中である。

目的

README#Purpose には、このデモの目的が記載されている。 このデモでは、2者のデータスペース参加者が、クレデンシャルを交換した後、DSPメッセージをやり取りする例を示す。

もちろん、旧来からの通り、このデモは商用化品質のものではなく、いくつかの shortcut が存在する。

シナリオ

management-domain を用いたFederated Catalogsを用いている。

シナリオの概要をいかに示す。

MVDシナリオの概要

図の通り、Provider CorpとConsumer Corpの2種類の企業があり、 Provdier CorpにはQ&AとManufacturingという組織が存在する。Q&AとManufacturingにはそれぞれEDCが起動している。Provdier Copの親組織にはCatalogとIdentityHubが起動している。このIdentityHubはCatalog、EDC(provider-qna)、EDC(provider-manufacuturing)に共通である。また、participantIdも共通のものを使う。Consumer CorpにはEDCとIdentityHubがある。

Data setup

図の通り、実際のデータ(assets)は各下部組織にある。そのカタログは直接外部に晒されない。その代わり、親組織であるProvider CorpのCatalog Server内のroot catalogにそのポインタ(catalog assets)が保持される。これにより、Consumer Corpはroot catalogを用いて、実際のassetの情報を解決できる。

(wip)

参考

共有

EDCSample

メモ

EDC/Samples の内容を紹介する。

動作確認環境

本プロジェクトに沿って動作確認した際の環境情報を以下に載せる。

1
2
3
4
5
6
7
8
9
10
$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=22.04
DISTRIB_CODENAME=jammy
DISTRIB_DESCRIPTION="Ubuntu 22.04.4 LTS"

$ java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode, sharing)

EDCで利用しているJDKに合わせてJDK17を用いている。

またdobachi/EDCSampleAnsibleに環境準備などのAnsible Playbookを置く。

README

EDC/Samples/README には目的、必要事項(準備)、スコープの説明がある。

本プロジェクトの目的はオンボーディングの支援。 初心者をナビゲートしやすいような順番でサンプルが構成されており、ステップバイステップで学べる。

必要事項(準備)

このプロジェクトでは、基本的にEclipse Dataspace Componentで用いられる用語は理解している前提で説明が進められる。 そのため、EDC/documentation は読んでいることが望ましい。 また、EDC/YTの動画も参考になる。

また、関連ツールとして、Git、Gradle、Java、HTTPあたりは押さえておきたい、とされている。

動作確認する場合は、Java11+がインストールされていることが必要。

スコープ(Scopes)

サンプル群は、Scopeと呼ばれるグループで区分されている。 何を学びたいか、で分かれている。 ひとまず、初心者であればbasicが良いとのこと。

ざっと以下に解説する。

Scope名 説明
Basic EDC Frameworkを使い始めるための基礎。セットアップ、起動、拡張の作成
Transfer データ転送について
Advanced 高度な例

Basicスコープの概要

EDC/Samples/Basic/READMEには以下のサンプルが載っている。

サンプル名 説明
Basic Sample 1 コネクタを起動するのに必要なことを学ぶ
Basic Sample 2 拡張機能の作成、拡張機能の使い方を学ぶ
Basic Sample 3 プロパティファイルを用いて設定値を扱う方法を学ぶ

Basic/Sample1

EDC/Samples/Basic/Basic1/READMEには以下の通り説明されている。

Runtimeとビルドファイルで構成されている。 ビルドファイルは、build.gradle.ktsである。

このサンプルでは、EDCのBaseRuntimeを用いている。

また、EDC/Samples/Basic/Basic1/build.gradle.ktsを見ることでプロジェクトが依存するものがわかる。

basic/basic-01-basic-connector/build.gradle.kts:22

1
2
3
4
dependencies {
implementation(libs.edc.boot)
implementation(libs.edc.connector.core)
}

READMEの通り、Sampleプロジェクトのrootでコンパイル実行すれば良い。

1
2
./gradlew clean basic:basic-01-basic-connector:build
java -jar basic/basic-01-basic-connector/build/libs/basic-connector.jar

特に何があるわけではない。コンソールに起動のメッセージが出力されるはず。

Basic/Sample2

EDC/Samples/Basic/Basic2/README には以下の通り説明されている。

ServiceExtensionsrc/main/resources/META-INF/services ディレクトリ以下のプラグインファイルで構成される。 この例では HealthEndpointExtension というServiceExtensionを実装。

1
2
3
4
5
6
7
8
9
10
public class HealthEndpointExtension implements ServiceExtension {

@Inject
WebService webService;

@Override
public void initialize(ServiceExtensionContext context) {
webService.registerResource(new HealthApiController(context.getMonitor()));
}
}

上記の通り、 ServiceExtension を拡張。また、中ではWebServiceをインジェクとしている。(DIのインジェクトについてはインターネットの情報を参考) initialize メソッド内で、先にインジェクとした webService にリソースを登録している。

org.eclipse.edc.web.spi.WebService インターフェースは、例えば org.eclipse.edc.web.jersey.JerseyRestService として実装されている。 Jersey はシンプルなRESTfulサービスのフレームワークである。

上記で登録されているコントローラ HealthApiController は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
@Path("/")
public class HealthApiController {

private final Monitor monitor;

public HealthApiController(Monitor monitor) {
this.monitor = monitor;
}

@GET
@Path("health")
public String checkHealth() {
monitor.info("Received a health request");
return "{\"response\":\"I'm alive!\"}";
}
}

特別なことはないが、上記の通り、 checkHealth メソッドを叩かれるとメッセージを残す。 実行の仕方は以下の通り。

1
2
./gradlew clean basic:basic-02-health-endpoint:build
java -jar basic/basic-02-health-endpoint/build/libs/connector-health.jar

http://localhost:8181/api/health にアクセスすればブラウザ上でメッセージが表示されることに加え、コンソールにもメッセージが生じる。

Basic/Sample3

EDC/Samples/Basic/Basic3/README に以下のような説明がある。

設定をくくりだした ConfigurationExtension がある。 org.eclipse.edc.configuration.filesystem.FsConfigurationExtension がデフォルトの設定ファイル取り扱いのクラス。 このサンプルでは、Gradleの依存関係記述にて、 以下を指定することでJarファイルを含めるようにする。

basic/basic-03-configuration/build.gradle.kts:29

1
2
3
(snip)
implementation(libs.edc.configuration.filesystem)
(snip)

ここでは /etc/eclipse/EDC/config.properties という設定ファイルを作り利用する例を示す。 ポート番号を9191に変更する例。

1
2
sudo mkdir -p /etc/eclipse/EDC
sudo sh -c 'echo "web.http.port=9191" > /etc/eclipse/EDC/config.properties'

プロパティファイルを作ったので、それを利用するよう引数で指定して実行する。

1
2
./gradlew clean basic:basic-03-configuration:build
java -Dedc.fs.config=/etc/eclipse/EDC/config.properties -jar basic/basic-03-configuration/build/libs/filesystem-config-connector.jar

起動時に以下のようなログが見えるはず。

1
2
INFO 2024-08-26T16:16:38.946828558 HTTP context 'default' listening on port 9191
DEBUG 2024-08-26T16:16:38.986329567 Port mappings: {alias='default', port=9191, path='/api'}

ちなみに、設定ファイルのデフォルトは以下の通り。

/home/dobachi/.gradle/caches/modules-2/files-2.1/org.eclipse.edc/configuration-filesystem/0.8.1/7741c1f790be0f02a2da844cd05edb469a5d095b/configuration-filesystem-0.8.1-sources.jar!/org/eclipse/edc/configuration/filesystem/FsConfigurationExtension.java:67

1
var configLocation = propOrEnv(FS_CONFIG, "dataspaceconnector-configuration.properties");

独自のプロパティの利用

本サンプルのREADMEに書いてある通り、先程のプロパティファイルに以下を追記。

1
edc.samples.basic.03.logprefix=MyLogPrefix

ログのプリフィックス文言を決めるためのプロパティを定義。

続いて、Sample2でも使用した org.eclipse.edc.extension.health.HealthEndpointExtension を改変。 プロパティ名を定義し、 org.eclipse.edc.spi.system.SettingResolver#getSetting(java.lang.String, java.lang.String) メソッドを用いてプロパティの値を取れるようにした。

org/eclipse/edc/extension/health/HealthEndpointExtension.java:22

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HealthEndpointExtension implements ServiceExtension {

@Inject
WebService webService;

private static final String LOG_PREFIX_SETTING = "edc.samples.basic.03.logprefix"; // this constant is new

@Override
public void initialize(ServiceExtensionContext context) {
var logPrefix = context.getSetting(LOG_PREFIX_SETTING, "health"); //this line is new
webService.registerResource(new HealthApiController(context.getMonitor(), logPrefix));
}
}

http://localhsot:9191/health にアクセスすると、コンソールに以下のログが出る。

1
INFO 2024-08-26T16:25:55.233519477 health :: Received a health request

Transferスコープの概要

2個のコネクタ間でデータをやり取りする。プロバイダとコンシューマ。

Transfer/Sample0(準備)

EDC/Samples/Transfer/Transfer0/READMEの通り、準備をする。 なお、このサンプルでは簡単化のために同一マシン上で、プロバイダとコンシューマを起動するが、本来は別々のところで起動するものである。

プロジェクトrootで以下を実行。

1
./gradlew transfer:transfer-00-prerequisites:connector:build

結果、ビルドされたJARファイルがここに配置される。 transfer/transfer-00-prerequisites/connector/build/libs/connector.jar

プロバイダとコンシューマは設定が異なるのみであり、JARは共通。 PATH配下の通り。

transfer/transfer-00-prerequisites/resources/configuration/consumer-configuration.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
edc.participant.id=consumer
edc.dsp.callback.address=http://localhost:29194/protocol
web.http.port=29191
web.http.path=/api
web.http.management.port=29193
web.http.management.path=/management
web.http.protocol.port=29194
web.http.protocol.path=/protocol
edc.transfer.proxy.token.signer.privatekey.alias=private-key
edc.transfer.proxy.token.verifier.publickey.alias=public-key
web.http.public.port=29291
web.http.public.path=/public
web.http.control.port=29192
web.http.control.path=/control

transfer/transfer-00-prerequisites/resources/configuration/provider-configuration.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
edc.participant.id=provider
edc.dsp.callback.address=http://localhost:19194/protocol
web.http.port=19191
web.http.path=/api
web.http.management.port=19193
web.http.management.path=/management
web.http.protocol.port=19194
web.http.protocol.path=/protocol
edc.transfer.proxy.token.signer.privatekey.alias=private-key
edc.transfer.proxy.token.verifier.publickey.alias=public-key
web.http.public.port=19291
web.http.public.path=/public
web.http.control.port=19192
web.http.control.path=/control
edc.dataplane.api.public.baseurl=http://localhost:19291/public

用いるポートを変えているのと、プロバイダ側にはデータプレーンのプロパティがあることがわかる。

プロバイダ実行:

1
java -Dedc.keystore=transfer/transfer-00-prerequisites/resources/certs/cert.pfx -Dedc.keystore.password=123456 -Dedc.fs.config=transfer/transfer-00-prerequisites/resources/configuration/provider-configuration.properties -jar transfer/transfer-00-prerequisites/connector/build/libs/connector.jar

キーやキーストアの設定、プロパティファイルのPATHを渡すのみ。

コンシューマ起動:

1
java -Dedc.keystore=transfer/transfer-00-prerequisites/resources/certs/cert.pfx -Dedc.keystore.password=123456 -Dedc.fs.config=transfer/transfer-00-prerequisites/resources/configuration/consumer-configuration.properties -jar transfer/transfer-00-prerequisites/connector/build/libs/connector.jar

プロバイダと同等。

Transfer/Sample1(コントラクト・ネゴシエーション)

EDC/Samples/Transfer/Transfer1/README の通り以下のステップで進める。

  • プロバイダのアセット(共有される対象のデータ)を作成
  • プロバイダでアクセスポリシーを作成
  • プロバイダでコントラクト定義を作成
  • その後、コンシューマからコントラクト・ネゴシエーション

まずプロバイダでアセットを作成。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/create-asset.json \
-H 'content-type: application/json' http://localhost:19193/management/v3/assets \
-s | jq

curlコマンドの-dataオプションに @ でファイルを渡しをしている。中身は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
},
"@id": "assetId",
"properties": {
"name": "product description",
"contenttype": "application/json"
},
"dataAddress": {
"type": "HttpData",
"name": "Test asset",
"baseUrl": "https://jsonplaceholder.typicode.com/users",
"proxyPath": "true"
}
}

ポリシ定義。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/create-policy.json \
-H 'content-type: application/json' http://localhost:19193/management/v3/policydefinitions \
-s | jq

渡しているファイルの中身は以下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
},
"@id": "aPolicy",
"policy": {
"@context": "http://www.w3.org/ns/odrl.jsonld",
"@type": "Set",
"permission": [],
"prohibition": [],
"obligation": []
}
}

ポリシの中身は空に見える。 ちなみにレスポンスは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "aPolicy",
"createdAt": 1724662296875,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

コントラクト定義を作成。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/create-contract-definition.json \
-H 'content-type: application/json' http://localhost:19193/management/v3/contractdefinitions \
-s | jq

渡しているファイルの中身は以下。

1
2
3
4
5
6
7
8
9
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
},
"@id": "1",
"accessPolicyId": "aPolicy",
"contractPolicyId": "aPolicy",
"assetsSelector": []
}

アクセスポリシやコントラクトポリシのIDを渡している。

コンシューマからカタログ情報を取得する。

1
2
3
curl -X POST "http://localhost:29193/management/v3/catalog/request" \
-H 'Content-Type: application/json' \
-d @transfer/transfer-01-negotiation/resources/fetch-catalog.json -s | jq

得られるカタログ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
"@id": "4ce1bc3d-ce1e-4308-b03e-b6596958a69d",
"@type": "dcat:Catalog",
"dcat:dataset": {
"@id": "assetId",
"@type": "dcat:Dataset",
"odrl:hasPolicy": {
"@id": "MQ==:YXNzZXRJZA==:Y2ZmODc1NmYtYTRjMC00NzMxLWJlNTItM2M2ZTVlMGI2YzA3",
"@type": "odrl:Offer",
"odrl:permission": [],
"odrl:prohibition": [],
"odrl:obligation": []
},
"dcat:distribution": [
{
"@type": "dcat:Distribution",
"dct:format": {
"@id": "HttpData-PULL"
},
"dcat:accessService": {
"@id": "bb73ab52-bd72-448d-9629-8a21624b0577",
"@type": "dcat:DataService",
"dcat:endpointDescription": "dspace:connector",
"dcat:endpointUrl": "http://localhost:19194/protocol",
"dct:terms": "dspace:connector",
"dct:endpointUrl": "http://localhost:19194/protocol"
}
},
{
"@type": "dcat:Distribution",
"dct:format": {
"@id": "HttpData-PUSH"
},
"dcat:accessService": {
"@id": "bb73ab52-bd72-448d-9629-8a21624b0577",
"@type": "dcat:DataService",
"dcat:endpointDescription": "dspace:connector",
"dcat:endpointUrl": "http://localhost:19194/protocol",
"dct:terms": "dspace:connector",
"dct:endpointUrl": "http://localhost:19194/protocol"
}
}
],
"name": "product description",
"id": "assetId",
"contenttype": "application/json"
},
"dcat:distribution": [],
"dcat:service": {
"@id": "bb73ab52-bd72-448d-9629-8a21624b0577",
"@type": "dcat:DataService",
"dcat:endpointDescription": "dspace:connector",
"dcat:endpointUrl": "http://localhost:19194/protocol",
"dct:terms": "dspace:connector",
"dct:endpointUrl": "http://localhost:19194/protocol"
},
"dspace:participantId": "provider",
"participantId": "provider",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"dcat": "http://www.w3.org/ns/dcat#",
"dct": "http://purl.org/dc/terms/",
"odrl": "http://www.w3.org/ns/odrl/2/",
"dspace": "https://w3id.org/dspace/v0.8/"
}
}

上記の通り、DCATで定義されている。ODRLで定義されたポリシのIDが示される、など。 他にもHttpDataでデータ共有されることや、サービス情報が載っている。

ここからコンシューマからプロバイダに向けてコントラクト・ネゴシエーションを行う。大まかな流れは以下。

  • コンシューマがコントラクトオファを送る。
  • プロバイダが自身のオファと照らし合わせ、届いたオファを検証する。
  • プロバイダがアグリメントか、リジェクションを送る。
  • 検証が成功していれば、アグリメントを保存する。

negotiate-contract.json のうち、NaN となっている箇所を、先程のカタログ情報から拾って埋める。 具体的には、以下のIDで埋める。

1
2
"odrl:hasPolicy": {
"@id": "MQ==:YXNzZXRJZA==:Y2ZmODc1NmYtYTRjMC00NzMxLWJlNTItM2M2ZTVlMGI2YzA3",

コントラクト・ネゴシエーション実行。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/negotiate-contract.json \
-X POST -H 'content-type: application/json' http://localhost:29193/management/v3/contractnegotiations \
-s | jq

以下のようなレスポンスが得られる。コントラクト・ネゴシエーション中のIDである。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "4515e87b-5bd0-4c19-aa92-1968b65005e5",
"createdAt": 1724664162792,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

コントラクト・ネゴシエーションはプロバイダとコンシューマのそれぞれで非同期的に進む。 最終的に両者がconfirmedかdeclinedになったら終了。

コンシューマでの状態確認。先程得られたIDを使う。

1
2
3
curl -X GET "http://localhost:29193/management/v3/contractnegotiations/4515e87b-5bd0-4c19-aa92-1968b65005e5" \
--header 'Content-Type: application/json' \
-s | jq

以下のように、Finalized担ったことがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@type": "ContractNegotiation",
"@id": "4515e87b-5bd0-4c19-aa92-1968b65005e5",
"type": "CONSUMER",
"protocol": "dataspace-protocol-http",
"state": "FINALIZED",
"counterPartyId": "provider",
"counterPartyAddress": "http://localhost:19194/protocol",
"callbackAddresses": [],
"createdAt": 1724664162792,
"contractAgreementId": "74ed3714-3380-4888-93f9-71b948d09553",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

Transfer/Sample2(コンシューマPull)

EDC/Samples/Transfer/Transfer2/README に記載の通り、以下のステップで進める。

  • コンシューマからファイル転送(の手続き)を始める。
  • プロバイダが EndpointDataReference をコンシューマに送る。
  • コンシューマがエンドポイントを使ってデータをフェッチする。

転送開始。 前の章で得られた、コントラクト・アグリメントのIDを transfer/transfer-02-consumer-pull/resources/start-transfer.json に埋め込む。

1
2
3
4
curl -X POST "http://localhost:29193/management/v3/transferprocesses" \
-H "Content-Type: application/json" \
-d @transfer/transfer-02-consumer-pull/resources/start-transfer.json \
-s | jq

結果は以下の通り。 TransferProcess が生成された。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "0778ef5a-0999-42fb-ad16-6572ca1c04d6",
"createdAt": 1724665640700,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

状態を確認する。先程得られたTransferProcessのIDを利用。

1
2
curl http://localhost:29193/management/v3/transferprocesses/0778ef5a-0999-42fb-ad16-6572ca1c04d6 \
-s | jq

結果の例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@id": "0778ef5a-0999-42fb-ad16-6572ca1c04d6",
"@type": "TransferProcess",
"state": "STARTED",
"stateTimestamp": 1724665641835,
"type": "CONSUMER",
"callbackAddresses": [],
"correlationId": "f0837ef5-999c-41dc-89b9-74d9337ecb5f",
"assetId": "assetId",
"contractId": "74ed3714-3380-4888-93f9-71b948d09553",
"transferType": "HttpData-PULL",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

statestarted 担っていることがわかる。

DTRがコンシューマに送られているはず、さらにキャッシュされているはずなので、それを取得する。

1
2
curl http://localhost:29193/management/v3/edrs/0778ef5a-0999-42fb-ad16-6572ca1c04d6/dataaddress \
-s | jq

結果の例。

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"@type": "DataAddress",
"type": "https://w3id.org/idsa/v4.1/HTTP",
"endpoint": "http://localhost:19291/public",
"authType": "bearer",
"endpointType": "https://w3id.org/idsa/v4.1/HTTP",
"authorization": "eyJraWQiOiJwdWJsaWMta2V5IiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJwcm92aWRlciIsImF1ZCI6ImNvbnN1bWVyIiwic3ViIjoicHJvdmlkZXIiLCJpYXQiOjE3MjQ2NjU2NDE3NzEsImp0aSI6ImJjNmQ1YzIzLTExMWUtNGZkZi1hMjMwLTJkZjk3NmE3NTFlOSJ9.dYBBx2z2XCjg1TgbEPiDgb2KYdUlNMd7702P4t0NgwYKhDemAKF64qGpLsU63huQ2WMb9Co4sC1euopyY-48F3UvfjK0ulKODlrfVGO-7xvSNs4qX2HVC9JyLGGbdks0wQOkv9oA7AcKxD11yTjcfbtLc-DUoOF4w4RTpI2MATSa-ETvKo_22FxMrIHgnsLOCHtjVLazJchFm4bAhVa7mRfHykkTpIIEaPuqOJpdtKcX1YUAZloFaI1ZinfXNtHjvollVC9Mjb4H12Gh1B7tLxgZ0AbP0izFeOHnQleQ09ZThajwF-xpnPQ5P5fsNiFqthW2A7Gu5-PLGT6tp2lYIg",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

上記の通り、エンドポイントURLと認証キーがわかる。 データを取得する。

1
2
3
curl --location --request GET 'http://localhost:19291/public/' \
--header 'Authorization: eyJraWQiOiJwdWJsaWMta2V5IiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJwcm92aWRlciIsImF1ZCI6ImNvbnN1bWVyIiwic3ViIjoicHJvdmlkZXIiLCJpYXQiOjE3MjQ2NjU2NDE3NzEsImp0aSI6ImJjNmQ1YzIzLTExMWUtNGZkZi1hMjMwLTJkZjk3NmE3NTFlOSJ9.dYBBx2z2XCjg1TgbEPiDgb2KYdUlNMd7702P4t0NgwYKhDemAKF64qGpLsU63huQ2WMb9Co4sC1euopyY-48F3UvfjK0ulKODlrfVGO-7xvSNs4qX2HVC9JyLGGbdks0wQOkv9oA7AcKxD11yTjcfbtLc-DUoOF4w4RTpI2MATSa-ETvKo_22FxMrIHgnsLOCHtjVLazJchFm4bAhVa7mRfHykkTpIIEaPuqOJpdtKcX1YUAZloFaI1ZinfXNtHjvollVC9Mjb4H12Gh1B7tLxgZ0AbP0izFeOHnQleQ09ZThajwF-xpnPQ5P5fsNiFqthW2A7Gu5-PLGT6tp2lYIg' \
-s | jq

以下のような結果が得られる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[                                                               
{
"id": 1,
"name": "Leanne Graham",
"username": "Bret",
"email": "Sincere@april.biz",
"address": {
"street": "Kulas Light",
"suite": "Apt. 556",
"city": "Gwenborough",
"zipcode": "92998-3874",
"geo": {
"lat": "-37.3159",
"lng": "81.1496"
}
},
(snip)

https://jsonplaceholder.typicode.com/users にあるデータをコネクタのコントラクトを通じて得られた。

Pullするときに、以下のようにするとユーザIDを指定できる。

http://localhost:19291/public/1

Transfer/Sample3(プロバイダPush)

EDC/Samples/Transfer/Transfer3/README に記載の通り、今度はプロバイダからPushする例である。 大まかな流れは以下の通り。

  • ファイル転送
    • コンシューマからファイル転送(の手続き)を始める。
    • プロバイダのコントロールプレーンが実際のデータのアドレスを取得し、DataRequest をもとに DataFlowRequest を作成する。
  • プロバイダデータプレーンが実際のデータソースからデータを取得。
  • プロバイダデータプレーンがコンシューマサービスにデータを送る。

まずコンシューマ側のバックエンドのロガーを起動する。これがPush先になると見られる。今回はDockerで起動する。

1
2
docker build -t http-request-logger util/http-request-logger
docker run -p 4000:4000 http-request-logger

続いて以下のファイルを編集した上でTransferProcessを開始する。 transfer/transfer-03-provider-push/resources/start-transfer.json なお、変更するのはコントラクト・アグリメントIDである。上記のケースでは、 74ed3714-3380-4888-93f9-71b948d09553 である。

1
2
3
4
curl -X POST "http://localhost:29193/management/v3/transferprocesses" \
-H "Content-Type: application/json" \
-d @transfer/transfer-03-provider-push/resources/start-transfer.json \
-s | jq

なお、 start-transfer.json で指定している dataDestination にHttpProxy以外のものを指定しているのがポイントのようだ。

transfer/transfer-03-provider-push/resources/start-transfer.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
},
"@type": "TransferRequestDto",
"connectorId": "provider",
"counterPartyAddress": "http://localhost:19194/protocol",
"contractId": "74ed3714-3380-4888-93f9-71b948d09553",
"assetId": "assetId",
"protocol": "dataspace-protocol-http",
"transferType": "HttpData-PUSH",
"dataDestination": {
"type": "HttpData",
"baseUrl": "http://localhost:4000/api/consumer/store"
}
}

結果の例。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "105477c9-dbff-4577-adb5-2e860f9be585",
"createdAt": 1724678897615,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

結果確認。

1
2
curl http://localhost:29193/management/v3/transferprocesses/105477c9-dbff-4577-adb5-2e860f9be585 \
-s | jq

結果の例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  "@id": "105477c9-dbff-4577-adb5-2e860f9be585",
"@type": "TransferProcess",
"state": "COMPLETED",
"stateTimestamp": 1724678900583,
"type": "CONSUMER",
"callbackAddresses": [],
"correlationId": "657b63b0-920d-412c-9c09-edb48057e717",
"assetId": "assetId",
"contractId": "74ed3714-3380-4888-93f9-71b948d09553",
"transferType": "HttpData-PUSH",
"dataDestination": {
"@type": "DataAddress",
"type": "HttpData",
"baseUrl": "http://localhost:4000/api/consumer/store"
},
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

Completed である。

ロガーのコンソールに以下の表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
Incoming request                                                                         
Method: POST
Path: /api/consumer/store
Body:
[
{
"id": 1,
"name": "Leanne Graham",
"username": "Bret",
"email": "Sincere@april.biz",
"address": {
"street": "Kulas Light",
(snip)

これ、Push先のエンドポイントはどうやって知るのか。カタログ?

Transfer/Sample4

EDC/Samples/Transfer/Transfer4/README の通り、コンシューマで転送完了に反応する機能を追加する。

モジュール構成は以下。

  • consumer-with-listener: コンシューマ(コネクタ)のイベント・コンシューマ拡張
  • listener: イベントを処理する TransferProcessListener の実装

ドキュメントでは、 TransferProcessListener の実装として org.eclipse.edc.sample.extension.listener.TransferProcessStartedListener を紹介。

エントリポイントは org.eclipse.edc.sample.extension.listener.TransferProcessStartedListenerExtension である。

org/eclipse/edc/sample/extension/listener/TransferProcessStartedListenerExtension.java:21

1
2
3
4
5
6
7
8
9
10
public class TransferProcessStartedListenerExtension implements ServiceExtension {

@Override
public void initialize(ServiceExtensionContext context) {
var transferProcessObservable = context.getService(TransferProcessObservable.class);
var monitor = context.getMonitor();
transferProcessObservable.registerListener(new TransferProcessStartedListener(monitor));
}

}

上記の通り、 org.eclipse.edc.sample.extension.listener.TransferProcessStartedListener をリスナとして登録している。

ポイントは以下。

org/eclipse/edc/sample/extension/listener/TransferProcessStartedListener.java:34

1
2
3
4
5
@Override
public void preStarted(final TransferProcess process) {
monitor.debug("TransferProcessStartedListener received STARTED event");
// do something meaningful before transfer start
}

START イベントを受け取ったときに、ログに出力する。

このサンプルのビルドと実行をするのだが、その前に前の章まで使用していたコンシューマを止める。

その上で、以下を実行して、Sample4のコンシューマを起動する。

1
2
./gradlew transfer:transfer-04-event-consumer:consumer-with-listener:build
java -Dedc.keystore=transfer/transfer-00-prerequisites/resources/certs/cert.pfx -Dedc.keystore.password=123456 -Dedc.fs.config=transfer/transfer-00-prerequisites/resources/configuration/consumer-configuration.properties -jar transfer/transfer-04-event-consumer/consumer-with-listener/build/libs/connector.jar

続いて、新しいコントラクト・ネゴシエーション。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/negotiate-contract.json \
-X POST -H 'content-type: application/json' http://localhost:29193/management/v3/contractnegotiations \
-s | jq

結果例。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "fe8c4f16-283c-4d74-a440-715e8371d228",
"createdAt": 1724766448685,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

先の例と同様にコントラクト・アグリメントIDを取得する。 直前の結果から、コントラクト・ネゴシエーションIDを抜き出して下記のコマンドに入れるのを忘れずに。

1
2
3
curl -X GET "http://localhost:29193/management/v3/contractnegotiations/fe8c4f16-283c-4d74-a440-715e8371d228" \
--header 'Content-Type: application/json' \
-s | jq

結果例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@type": "ContractNegotiation",
"@id": "fe8c4f16-283c-4d74-a440-715e8371d228",
"type": "CONSUMER",
"protocol": "dataspace-protocol-http",
"state": "FINALIZED",
"counterPartyId": "provider",
"counterPartyAddress": "http://localhost:19194/protocol",
"callbackAddresses": [],
"createdAt": 1724766448685,
"contractAgreementId": "8e96f6c5-cb0e-4f06-beb0-ac553ddb160e",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

コントラクト・アグリメントIDは 8e96f6c5-cb0e-4f06-beb0-ac553ddb160e である。

つづいて、 transfer/transfer-02-consumer-pull/resources/start-transfer.json の今コントラクト・アグリメントIDを書き換えつつ、ファイル転送を開始する。 該当箇所は以下。

1
"contractId": "8e96f6c5-cb0e-4f06-beb0-ac553ddb160e",

実行コマンドは以下。

1
2
3
4
curl -X POST "http://localhost:29193/management/v3/transferprocesses" \
-H "Content-Type: application/json" \
-d @transfer/transfer-02-consumer-pull/resources/start-transfer.json \
-s | jq

実行結果例。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "b4dff4b1-dfb2-470e-a765-7bd549a1e2b6",
"createdAt": 1724766768403,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

先ほど起動したコンシューマのコンソールログを見ると

1
DEBUG 2024-08-27T22:52:49.930105146 TransferProcessStartedListener received STARTED event

のようなメッセージが見つかるはず。

Transfer/Sample5

(このサンプルはAzureとAWSの環境が必要なのと手違いの被害が大きいことからメモだけとする)

EDC/Samples/Transfer/Transfer5/README に記載の通り、これまで実行したサンプルに対し、通常使うであろう機能を足していく。 この例では、具体的には「Azureのストレージから読んで、AWSのストレージに書く」というのを実現する。

なお、環境構成には、Terraformが使われている。 transfer/transfer-05-file-transfer-cloud/terraform にもろもろ格納されている。 中身を見ると、割といろいろとセットアップしているのがわかる。念の為、潰しても良いクリーンな環境で試すのが良い。 一応6章にて環境をクリーンナップする手順が記載されている。

READMEにある通り、クライアントIDなどはVault内に保持するようになっている。 依存関係にもVaultがある。

transfer/transfer-05-file-transfer-cloud/cloud-transfer-consumer/build.gradle.kts:31

1
2
3
(snip)
implementation(libs.edc.vault.azure)
(snip)

今サンプルのメインの一つは、 org.eclipse.edc.sample.extension.transfer.CloudTransferExtension である。

org.eclipse.edc.sample.extension.transfer.CloudTransferExtension#registerDataEntries メソッドがアセットを定義する。

1
2
3
4
5
6
7
8
9
10
public void registerDataEntries() {
var dataAddress = DataAddress.Builder.newInstance()
.type("AzureStorage")
.property("account", "<storage-account-name>")
.property("container", "src-container")
.property("blobname", "test-document.txt")
.keyName("<storage-account-name>-key1")
.build();
var asset = Asset.Builder.newInstance().id("1").dataAddress(dataAddress).build();
assetIndex.create(asset);

なお、org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStoreorg.eclipse.edc.connector.controlplane.contract.spi.offer.store.ContractDefinitionStore があり、それぞれポリシとコントラクトを保持する。今回のサンプルではインメモリで保持する実装を利用。

このあとはいつもどおり、コントラクト・ネゴシエーションしてコントラクト・アグリメントIDを取得し、実際のデータ転送。 ほぼこれまで通りだが、dataDestinationでAWS S3を指定しているところがポイント。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
curl --location --request POST 'http://localhost:9192/management/v3/transferprocesses' \
--header 'X-API-Key: password' \
--header 'Content-Type: application/json' \
--data-raw '
{
"counterPartyAddress": "http://localhost:8282/protocol",
"protocol": "dataspace-protocol-http",
"connectorId": "consumer",
"assetId": "1",
"contractId": "<ContractAgreementId>",
"dataDestination": {
"type": "AmazonS3",
"region": "us-east-1",
"bucketName": "<Unique bucket name>"
},
"transferType": {
"contentType": "application/octet-stream",
"isFinite": true
}
}'

参考

レポジトリ内

EDCレポジトリ

外部

共有

Dataspace Protocol of EDC

メモ

注意:このメモはかなり初期のものなので、今となっては怪しい内容が含まれてる・・・。

EDCは現在IDSが提唱する、Dataspace Protocolにしたがって、コネクタ間でやりとりする。

DSP Data Planeの実装を確認する

data-protocols/dspdsp)以下に、Dataspace Protocolに対応したモジュールが含まれている。

例えば、org.eclipse.edc.protocol.dsp.dispatcher.PostDspHttpRequestFactoryorg.eclipse.edc.protocol.dsp.dispatcher.GetDspHttpRequestFactoryなどのファクトリが定義されている。 これは、前述のPOST、GETオペレーションに対応するリクエストを生成するためのファクトリである。

以下は、カタログのリクエストを送るための実装である。

org/eclipse/edc/protocol/dsp/catalog/dispatcher/DspCatalogHttpDispatcherExtension.java:54

1
2
3
4
5
6
7
8
9
10
11
12
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerMessage(
CatalogRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + CATALOG_REQUEST),
new CatalogRequestHttpRawDelegate()
);
messageDispatcher.registerMessage(
DatasetRequestMessage.class,
new GetDspHttpRequestFactory<>(m -> BASE_PATH + DATASET_REQUEST + "/" + m.getDatasetId()),
new DatasetRequestHttpRawDelegate()
);
}

他にも、org.eclipse.edc.protocol.dsp.transferprocess.dispatcher.DspTransferProcessDispatcherExtensionなどが挙げられる。 これは以下のように、org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessageが含まれており、ConsumerがProviderにデータ転送プロセスをリクエストする際のメッセージのディスパッチャが登録されていることがわかる。

org/eclipse/edc/protocol/dsp/transferprocess/dispatcher/DspTransferProcessDispatcherExtension.java:60

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerMessage(
TransferRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + TRANSFER_INITIAL_REQUEST),
new TransferRequestDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferCompletionMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_COMPLETION),
new TransferCompletionDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferStartMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_START),
new TransferStartDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferTerminationMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_TERMINATION),
new TransferTerminationDelegate(remoteMessageSerializer)
);
}

◆参考情報はじめ

このファクトリは、ディスパッチャの org.eclipse.edc.protocol.dsp.dispatcher.DspHttpRemoteMessageDispatcherImpl#dispatch メソッドから、間接的に呼び出されて利用される。 このメソッドはorg.eclipse.edc.spi.message.RemoteMessageDispatcher#dispatchメソッドを実装したものである。ディスパッチャとして、リモートへ送信するメッセージ生成をディスパッチするための。メソッドである。 さらに、これは org.eclipse.edc.connector.core.base.RemoteMessageDispatcherRegistryImpl 内で使われている。ディスパッチャのレジストリ内で、ディスパッチ処理が起動、管理されるようだ。 なお、これはorg.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry#dispatch を実装したものである。このメソッドは、色々なところから呼び出される。

例えば、TransferCoreExtensionクラスではサービス起動時に、転送プロセスを管理するorg.eclipse.edc.connector.transfer.process.TransferProcessManagerImplを起動する。

org/eclipse/edc/connector/transfer/TransferCoreExtension.java:205

1
2
3
4
@Override
public void start() {
processManager.start();
}

これにより、以下のようにステートマシンがビルド、起動され、各プロセッサが登録される。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:143

1
2
3
4
5
6
7
8
9
10
11
12
stateMachineManager = StateMachineManager.Builder.newInstance("transfer-process", monitor, executorInstrumentation, waitStrategy)
.processor(processTransfersInState(INITIAL, this::processInitial))
.processor(processTransfersInState(PROVISIONING, this::processProvisioning))
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processTransfersInState(REQUESTING, this::processRequesting))
.processor(processTransfersInState(STARTING, this::processStarting))
.processor(processTransfersInState(STARTED, this::processStarted))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning))
.build();
stateMachineManager.start();

上記のプロセッサとして登録されているorg.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#processStartingの中では org.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#sendTransferStartMessage が呼び出されている。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:376

1
2
3
4
5
6
return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.initiate(process.getDataRequest(), contentAddress, policy))
.onSuccess((p, dataFlowResponse) -> sendTransferStartMessage(p, dataFlowResponse, policy))
.onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.onFailure((t, failure) -> transitionToStarting(t))
.onRetryExhausted((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.execute(description);

org.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#sendTransferStartMessage メソッド内では、 org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessageのメッセージがビルドされ、 ディスパッチャにメッセージとして渡される。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:386

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var message = TransferStartMessage.Builder.newInstance()
.processId(process.getCorrelationId())
.protocol(process.getProtocol())
.dataAddress(dataFlowResponse.getDataAddress())
.counterPartyAddress(process.getConnectorAddress())
.policy(policy)
.build();

var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());

entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> transferProcessStore.findById(id))
.onSuccess((t, content) -> transitionToStarted(t))
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute(description);

◆参考情報おわり

ということで、org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcherというディスパッチャは、Dataspace Protocolに基づくリモートメッセージを生成する際に用いられるディスパッチャである。

おまけ)古い(?)Data Planeの実装を確認する(HTTPの例)

Dataspace Protocol以前の実装か?

extensions/data-plane 以下にData Planeの実装が拡張として含まれている。

例えば、 extensions/data-plane/data-plane-http には、HTTPを用いてデータ共有するための拡張の実装が含まれている。 当該拡張のREADMEの通り、 (transfer APIの)DataFlowRequestHttpDataだった場合に、

  • HttpDataSourceFactory
  • HttpDataSinkFactory
  • HttpDataSource
  • HttpDataSink

の実装が用いられる。パラメータもREADMEに(data-plane-httpのデザイン指針)記載されている。 基本的には、バックエンドがHTTPなのでそれにアクセスするためのパラメータが定義されている。

当該ファクトリは、 org.eclipse.edc.connector.dataplane.http.DataPlaneHttpExtension#initialize 内で用いられている。

org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java:75

1
2
3
4
5
6
7
var httpRequestFactory = new HttpRequestFactory();

var sourceFactory = new HttpDataSourceFactory(httpClient, paramsProvider, monitor, httpRequestFactory);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor, paramsProvider, httpRequestFactory);
pipelineService.registerFactory(sinkFactory);

ここでは、試しにData Source側を確認してみる。

org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java:63

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public DataSource createSource(DataFlowRequest request) {
var dataAddress = HttpDataAddress.Builder.newInstance()
.copyFrom(request.getSourceDataAddress())
.build();
return HttpDataSource.Builder.newInstance()
.httpClient(httpClient)
.monitor(monitor)
.requestId(request.getId())
.name(dataAddress.getName())
.params(requestParamsProvider.provideSourceParams(request))
.requestFactory(requestFactory)
.build();
}

上記の通り、まずデータのアドレスを格納するインスタンスが生成され、 つづいて、HTTPのデータソースがビルドされる。

HTTPのData Sourceの実体は org.eclipse.edc.connector.dataplane.http.pipeline.HttpDataSource である。 このクラスはSPIの org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceインタフェースを実装したものである。

org.eclipse.edc.connector.dataplane.http.pipeline.HttpDataSource#openPartStream がオーバライドされて実装されている。 詳しくは、openPartStream参照。

参考

ドキュメント

ソースコード

共有

Generate OpenAPI Spec of EDC Connector

メモ

EDCのConnectorのOpenAPIスペックを出力するための手順がGenerating the OpenApi Spec (*.yaml)に記載されている。 これに従い、試しに出力してみることにする。

ただ、このSpecはいわゆる現在EDCが採用している、Dataspace Protocol仕様ではないものが含まれている可能性が高い。 pathが/v2となっているのは、Dataspace Protocol準拠か? → 実際に調べてみると、v2が必ずしも、Dataspace Protocol向けというわけではなさそうである。

ちなみに、参考までに、IDSA Dataspace ConnectorのOpenAPI Specは Dataspace ConnectorのOpenAPI Spec にある。 このコネクタは昨年からあまり更新されていないので注意。

準備

もしまだソースコードを取得していなければ取得しておく。

1
2
git pull git@github.com:eclipse-edc/Connector.git
cd Connector

生成

ビルド環境にはJDK17を利用したいので、今回はDockerで簡単に用意する。

そのまま実行する場合:

1
docker run --rm -v ${PWD}:/local --workdir /local openjdk:17-alpine  ./gradlew clean resolve

いったんシェル立ち上げる場合:

1
2
docker run -it --rm -v ${PWD}:/local --workdir /local openjdk:17-alpine sh
./gradlew clean resolve

BUILD SUCCESSFULとなったらOK。

ちなみに、このYAMLファイル生成は自前のビルドツールを用いているようだ。参考:SwaggerGeneratorExtension

Data Planeの中身を軽く確認

resources/openapi/yaml/control-api/data-plane-api.yaml にある、Data Planeを試しに見てみる。

概要

description部分を機械翻訳したのが以下である。

1
2
Data PlaneのパブリックAPIはデータプロキシであり、データコンシューマがData Planeインスタンスを通じて、プロバイダのデータソース(バックエンドのRest APIや内部データベースなど)から能動的にデータを問い合わせることを可能にします。
Data PlaneのパブリックAPIはプロキシであるため、すべての動詞(GET、POST、PUT、PATCH、DELETEなど)をサポートしており、データソースが必要になるまでデータを転送することができます。これは、実際のデータソースがRest APIそのものである場合に特に便利です。同じように、任意のクエリパラメータ、パスパラメータ、リクエストボディのセットも(HTTPサーバによって固定された範囲内で)サポートされ、実際のデータソースに伝えることができます。

企業が持つデータストアをデータソースとしてデータ連携する際、そのプロキシとして働く。

paths

APIのパスを確認する。

transfer

データ転送をリクエストする。 リクエストボディには、データ転送のリクエスト情報が含まれる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/transfer:
post:
description: Initiates a data transfer for the given request. The transfer will
be performed asynchronously.
operationId: initiateTransfer
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/DataFlowRequest'
responses:
"200":
description: Data transfer initiated
"400":
description: Failed to validate request
tags:
- Data Plane control API

transfer/{processId}

パラメータprocessIdで与えられたIDのデータ転送処理の状態を確認する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/transfer/{processId}:
get:
description: Get the current state of a data transfer.
operationId: getTransferState
parameters:
- in: path
name: processId
required: true
schema:
type: string
responses:
"200":
description: Missing access token
tags:
- Data Plane control API

/{any}

/{any}以下にはDELETE、GET、PATCH、POST、PUTのOperationが定義されている。

1
2
3
4
5
6
7
8
9
10
/{any}:
delete:
(snip)
get:
(snip)
patch:
(snip)
post:
(snip)
put:

単純にデータを取得するだけではない。

Transfer Data Plane

resources/openapi/yaml/control-api/transfer-data-plane.yaml に含まれるのは以下のSpecだった。 トークンを受け取り検証するAPIのようだ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
openapi: 3.0.1
paths:
/token:
get:
description: "Checks that the provided token has been signed by the present\
\ entity and asserts its validity. If token is valid, then the data address\
\ contained in its claims is decrypted and returned back to the caller."
operationId: validate
parameters:
- in: header
name: Authorization
schema:
type: string
responses:
"200":
description: Token is valid
"400":
description: Request was malformed
"403":
description: Token is invalid
tags:
- Consumer Pull Token Validation
components:
schemas:
DataAddress:
type: object
properties:
properties:
type: object
additionalProperties:
type: object

control-plane-api

resources/openapi/yaml/control-api/control-plane-api.yaml にコントロールプレーンのSpecが含まれている。

/transferprocess/{processId}/complete

転送プロセスの完了をリクエストする。 転送が非同期、処理なので、受付成功が返る。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/transferprocess/{processId}/complete:
post:
description: "Requests completion of the transfer process. Due to the asynchronous\
\ nature of transfers, a successful response only indicates that the request\
\ was successfully received"
operationId: complete
parameters:
- in: path
name: processId
required: true
schema:
type: string
responses:
"400":
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ApiErrorDetail'
description: "Request was malformed, e.g. id was null"
tags:
- Transfer Process Control Api

/transferprocess/{processId}/fail

転送プロセスを失敗で完了させるリクエストを送る。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
post:
description: "Requests completion of the transfer process. Due to the asynchronous\
\ nature of transfers, a successful response only indicates that the request\
\ was successfully received"
operationId: fail
parameters:
- in: path
name: processId
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/TransferProcessFailStateDto'
responses:
"400":
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ApiErrorDetail'
description: "Request was malformed, e.g. id was null"
tags:
- Transfer Process Control Api

マネージメントAPIの類

resources/openapi/yaml/management-api 以下には、マネージメント系のAPIのSpecが。含まれている。

例えば、

  • カタログ: おそらくDataspace Protocolに対応している。DCATカタログのやり取り。
    • /v2/catalog/dataset/request
    • /v2/catalog/request
  • データアセット: データアドレスの情報と合わせて、データアセットを登録する
    • /v2/assets
      • post: 登録
      • put: 更新
    • /v2/assets/request: クエリに従ってアセット群を取得する
    • /v2/assets/{assetId}/dataaddress: データアドレスの更新
    • /v2/assets/{id}
      • delete: 消す
      • get: アセット取得
    • /v2/assets/{id}/dataaddress: アドレス取得
    • /v3/assets ... v3とは?
      • v2とおおよそ同じ
    • /v3/assets/request
      • v2とおおよそ同じ

など。ただ、/v2としていながら、DSPではなかったりするものがある(例:/v2/contractnegotiations)など注意が必要。

Dataspace Protocol Architecture

IDS Dataspace Protocolのドキュメント にIDSプロトコル対応の概要が記載されている。

後方互換性

当該ドキュメントに記載の通り、後方互換性を保証するものではない。 新しいプロトコルに対応次第、古い実装は破棄される。

ゴール

  • (将来リリースされる?)IDS-TCK(IDS Test Compatibility Kit)の必須項目をパスすること
  • Dataspace Protocol仕様を満たす他のコネクタと相互運用可能であること
  • Dataspace Protocolよりも前のバージョンのIDSには対応しない。
  • Usage Policyは実装しない。他のプロジェクトで実装される。

アプローチ

Dataspace ProtocolはJSON-LD、DCAT、ODRLで実現されている。 このプロトコルの対応で、Contract NegotiationとTransfer Processステートが新たに実装されることになる。 ただし、新しいプロトコルの対応が完了するまで、テストが通るようにする。

  1. JSON-LD Processing Architecture に基きJSON-LD対応する。
  2. Dataspace Protocol Endpoints and Services Architecture に基きエンドポイントとサービスの拡張を実装する。
  3. Dataspace Protocol Contract Negotiation Architecture に基きContract Negotiationマネージャのステートマシンを更新する。
  4. The Dataspace Protocol Transfer Process Architecture に基きTransfer Processのステートマシンを更新する。
  5. この1から4項目が安定すると、古いモジュールとサービスが削除される。
  6. Management APIを更新する。

JSON-LD Processing Architecture

JSON-LD Processing Architecture にJSON-LDを処理するアーキテクチャに関するコンセプトとアプローチが記載されている。

冒頭に記載あるとおり、結果として、JDS InfoModel Java Libraryを用いるのをやめ、JSON-LDメッセージをやり取りすることになる。

既存の TypeManagerに機能付加する。 JSONP対応する。

文書上は、以下のようなコンセプトが例として載っていた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var mapper = new ObjectMapper();

mapper.registerModule(new JSONPModule());

var module = new SimpleModule() {

@Override
public void setupModule(SetupContext context){
super.setupModule(context);
}

};

mapper.registerModule(module);

typeManager.registerContext("json-ld",mapper)

実際に、2023/9/24時点での実装においても、以下のようにTypeManagerに登録されたJSON_JDのマッパーを利用していることが見られます。

org/eclipse/edc/protocol/dsp/api/configuration/DspApiConfigurationExtension.java:128

1
2
3
var jsonLdMapper = typeManager.getMapper(JSON_LD);
webService.registerResource(config.getContextAlias(), new ObjectMapperProvider(jsonLdMapper));
webService.registerResource(config.getContextAlias(), new JerseyJsonLdInterceptor(jsonLd, jsonLdMapper));

org/eclipse/edc/protocol/dsp/api/configuration/DspApiConfigurationExtension.java:135

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void registerTransformers() {
var mapper = typeManager.getMapper(JSON_LD);
mapper.registerSubtypes(AtomicConstraint.class, LiteralExpression.class);

var jsonBuilderFactory = Json.createBuilderFactory(Map.of());

// EDC model to JSON-LD transformers
transformerRegistry.register(new JsonObjectFromCatalogTransformer(jsonBuilderFactory, mapper));
transformerRegistry.register(new JsonObjectFromDatasetTransformer(jsonBuilderFactory, mapper));
transformerRegistry.register(new JsonObjectFromPolicyTransformer(jsonBuilderFactory));
transformerRegistry.register(new JsonObjectFromDistributionTransformer(jsonBuilderFactory));
transformerRegistry.register(new JsonObjectFromDataServiceTransformer(jsonBuilderFactory));
transformerRegistry.register(new JsonObjectFromAssetTransformer(jsonBuilderFactory, mapper));
transformerRegistry.register(new JsonObjectFromDataAddressTransformer(jsonBuilderFactory));
transformerRegistry.register(new JsonObjectFromQuerySpecTransformer(jsonBuilderFactory));
transformerRegistry.register(new JsonObjectFromCriterionTransformer(jsonBuilderFactory, mapper));

// JSON-LD to EDC model transformers
// DCAT transformers
transformerRegistry.register(new JsonObjectToCatalogTransformer());
transformerRegistry.register(new JsonObjectToDataServiceTransformer());
transformerRegistry.register(new JsonObjectToDatasetTransformer());
transformerRegistry.register(new JsonObjectToDistributionTransformer());

// ODRL Transformers
OdrlTransformersFactory.jsonObjectToOdrlTransformers().forEach(transformerRegistry::register);

transformerRegistry.register(new JsonValueToGenericTypeTransformer(mapper));
transformerRegistry.register(new JsonObjectToAssetTransformer());
transformerRegistry.register(new JsonObjectToQuerySpecTransformer());
transformerRegistry.register(new JsonObjectToCriterionTransformer());
transformerRegistry.register(new JsonObjectToDataAddressTransformer());
}

特に後者の実装は、各種情報をJSONのオブジェクトに変換するトランスフォーマー(や、その逆)を登録している。 JSON-LDにてメッセージをやりとりしている様子の一端をみられる。

また、ドキュメントの方ではコンセプトとして、以下のような変換の流れが例として挙げられていた。

1
2
3
4
5
6
7
8
9
10
11
12
13
// message is de-serialized as Map<String, Object> by Jersey 
var document = JsonDocument.of(mapper.convertValue(message, JsonObject.class));

try {

var compacted = JsonLd.compact(document,EMPTY_CONTEXT).get();
var convertedDocument = mapper.convertValue(compacted,Map.class);

// process converted document

} catch(JsonLdError e) {
throw new RuntimeException(e);
}

もし実際の実装をみるのであれば、 org.eclipse.edc.core.transform.transformer.from.JsonObjectFromCatalogTransformer#transform メソッドのようなものを確認すると良い。

なお、ドキュメントローダとしては、titanium-json-ldが使われているようだ。 参考→ org.eclipse.edc.jsonld.TitaniumJsonLd Dataspace Protocol Endpoints and Services Architecture にもその旨記載されている。

Dataspace Protocol Endpoints and Services Architecture

Dataspace Protocol Endpoints and Services Architecture にIDS Controller Endpoint実装のアプローチが記載されている。

また当該ドキュメントには、以下のように拡張との対応関係が示されている。

Description Repository Extension
Contract Negotiation Connector control-plane-ids
Transfer Process Connector control-plane-ids
Catalog requests Connector catalog-ids

また前述の通り、Dataspace ProtocolではJSON-LDにてメッセージがやりとりされる。 それらを「(デ)マーシャル」(シリアライズ、デシリアライズ)する必要がある。

デシリアライズは以下のように行われると例示されている。

1
2
var document = JsonDocument.of(jsonObject);
var expanded = JsonLd.expand(document).get();

シリアライズは以下のように行われると例示されている。

1
2
3
var document = JsonDocument.of(jsonObject);
var compacted = JsonLd.compact(document,EMPTY_CONTEXT).get();
var compacted = mapper.convertValue(compacted,Map.class);

マイグレーションのポイント

大きなポイントの例は、

  • アセット(DCATにおけるデータセット)にODRLポリシーであるofferを含むようになること
  • データセットはカタログに含まれる。

(もともとEDCが採用していたIDS Infomodelでは、offerにアセットが含まれる)

Contract Definition、Asset、Dataset、ODRL Offerの関係は以下のように表現されていた。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CD = Contract Definition
A = Asset
DS = Dataset
O = ODRL Offer

If the Contract Definitions are:

CD 1 --selects--> [A1, A2]
CD 2 --selects--> [A1, A3]

the resulting Catalog containing Datasets is:

DS 1 -> A1 [O:CD1, O:CD2]
DS 2 -> A2 [O:CD1]
DS 3 -> A3 [O:CD2]

上記は包含関係を表している。

また、ProviderにContract Negotiationyや転送タイプをリクエストするためのエンドポイントは、DCAT Distributionである。 Distributionは、コネクタエンドポイントのメタデータとDataAdress属性で示される転送タイプの組み合わせで示される。

なお、現状のEDCではまだ未実装の部分があり、フューチャーワークとされていた。

また、DCAT CatalogやDatasetは名前空間プロパティを使用して拡張可能である必要がある。CatalogDecoratorが必要。

型変換

もともとあったIdsTypeTransfomerを実装し直す必要がある。 これは先に上げていたJsonObjectFromCatalogTransformerのようなTransformerである。 本ドキュメントには、その実装コンセプト/アプローチが記載されている。

その他

Identificationの取り扱い方についても変更あり。

RemoteMessageDispatcherも変更あり。 以下のようなクラス設計になっている。

1
2
3
4
5
RemoteMessageDispatcher (org.eclipse.edc.spi.message)
GenericHttpRemoteDispatcher (org.eclipse.edc.connector.callback.dispatcher.http)
GenericHttpRemoteDispatcherImpl (org.eclipse.edc.connector.callback.dispatcher.http)
DspHttpRemoteMessageDispatcher (org.eclipse.edc.protocol.dsp.spi.dispatcher)
DspHttpRemoteMessageDispatcherImpl (org.eclipse.edc.protocol.dsp.dispatcher)

Dataspace Protocol対応は、org.eclipse.edc.protocol.dsp.dispatcher.DspHttpRemoteMessageDispatcherImplと考えておくとよい。

Dataspace Protocol Contract Negotiation Architecture

Dataspace Protocol Contract Negotiation Architecture にContract Negotiationの変更アプローチが記載されている。

ステートマシンの変化内容を一覧化した表が載っていた。 表の通り、Dataspace Protocolに対応したのちも、IDSにはもともと無いステートが一部残っている。 Contract Negotiationでは、その状態が重要であるから、より厳密に扱っている印象がある。

EDC Existing EDC New IDS Transition Function Notes
UNSAVED (remove) N/A This state is not needed
INITIAL INITIAL N/A
REQUESTING REQUESTING N/A
REQUESTED REQUESTED REQUESTED Provider (new & counter)
PROVIDER_OFFERING OFFERING N/A
PROVIDER_OFFERED OFFERED OFFERED Consumer
CONSUMER_OFFERING (REQUESTING)
CONSUMER_OFFERED (REQUESTED)
CONSUMER_APPROVING ACCEPTING N/A
CONSUMER_APPROVED ACCEPTED ACCEPTED Provider
DECLINING (TERMINATING)
DECLINED (TERMINATED)
CONFIRMING AGREEING N/A
CONFIRMED AGREED AGREED Consumer
VERIFYING N/A
VERIFIED VERIFIED Provider
FINALIZING N/A
FINALIZED FINALIZED Consumer
TERMINATING N/A
TERMINATED TERMINATED P & C
ERROR (TERMINATED)

参考

プロジェクト

ドキュメント

ソースコード

共有

OpenAPI Generator for Flask

メモ

簡単な動作確認

ひとまず分かりやすかった OpenAPI GeneratorでPython Web API構築 をそのまま試す。

1
2
3
4
5
6
7
$ mkdir -p  ~/Sources/OpenAPIGenFlaskSample/original
$ cd ~/Sources/OpenAPIGenFlaskSample/original
$ cat << EOF > openapi.yaml

(snip)

EOF

openapi.yamlの中身は OpenAPI GeneratorでPython Web API構築 に記載されている。

1
2
3
$ mkdir -p server client
$ docker run --rm -v ${PWD}:/local openapitools/openapi-generator-cli generate -i /local/openapi.yaml -g python-flask -o /local/server
$ docker run --rm -v ${PWD}:/local openapitools/openapi-generator-cli generate -i /local/openapi.yaml -g go -o /local/client

今回用があるのはサーバ側のPython実装(Flask)の方なので、そちらを確認する。 記事にもあるが以下のようなファイルが生成されているはず。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ tree
.
├── Dockerfile
├── README.md
├── git_push.sh
├── openapi_server
│   ├── __init__.py
│   ├── __main__.py
│   ├── controllers
│   │   ├── __init__.py
│   │   ├── security_controller.py
│   │   └── stock_price_controller.py
│   ├── encoder.py
│   ├── models
│   │   ├── __init__.py
│   │   ├── base_model.py
│   │   ├── error.py
│   │   ├── ok.py
│   │   └── stock_price.py
│   ├── openapi
│   │   └── openapi.yaml
│   ├── test
│   │   ├── __init__.py
│   │   └── test_stock_price_controller.py
│   ├── typing_utils.py
│   └── util.py
├── requirements.txt
├── setup.py
├── test-requirements.txt
└── tox.ini

まずは、 __main__.py を見てみる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3

import connexion

from openapi_server import encoder


def main():
app = connexion.App(__name__, specification_dir='./openapi/')
app.app.json_encoder = encoder.JSONEncoder
app.add_api('openapi.yaml',
arguments={'title': 'Stock API'},
pythonic_params=True)

app.run(port=8080)


if __name__ == '__main__':
main()

上記の通り、 connexionを用いていることがわかる。 connexionはFlaskで動作する、APIとpython関数をマッピングするためのパッケージである。 connexionについては、 connexionを使ってPython APIサーバのAPI定義と実装を関連付ける のような記事を参考にされたし。

openapi_server/controllers/stock_price_controller.py を確認する。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from openapi_server.models.error import Error # noqa: E501
from openapi_server.models.stock_price import StockPrice # noqa: E501
from openapi_server import util


def stock_price(security_cd): # noqa: E501
"""株価取得

現在の株価を取得する # noqa: E501

:param security_cd: 証券コードを指定する
:type security_cd: str

:rtype: Union[StockPrice, Tuple[StockPrice, int], Tuple[StockPrice, int, Dict[str, str]]
"""
return 'do some magic!'

operationId にて指定した名称がコントローラのメソッド名に反映されている。

parameters にて指定したパラメータがコントローラの引数になっていることが確認できる。

components にて指定したスキーマに基づき、openapi_server/models 以下に反映されていることが分かる。 今回の例だと、戻り値用の StockPrice やOK、Errorが定義されている。 なお、これはOpenAPIにて生成されたものであり、それを編集して使うことはあまり想定されていないようだ。

openapi_server/util.py にはデシリアライザなどが含まれている。

さて、記事通り、Dockerで動かしてみる。

1
2
$ docker build -t openapi_server .
$ docker run -p 8080:8080 openapi_server

試しに、適当な引数を与えて動かすと、実装通り戻り値を得られる。

1
2
$ curl http://localhost:8080/v1/sc/4721/stockPrice
"do some magic!"

なおDockerfileはこんな感じである。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FROM python:3-alpine

RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

COPY requirements.txt /usr/src/app/

RUN pip3 install --no-cache-dir -r requirements.txt

COPY . /usr/src/app

EXPOSE 8080

ENTRYPOINT ["python3"]

CMD ["-m", "openapi_server"]

Docker化しなくてもそのままでも動く。 ここでは一応venvを使って仮想環境を構築しておく。

1
2
3
4
$ python -m venv venv
$ . venv/bin/activate
$ pip install -r requirements.txt
$ python -m openapi_server

参考

記事

共有

IDS Dataspace Protocol

メモ

最近のEDCでは、IDSが提唱しているDataspace Protocolが使用されている。 International-Data-Spaces-Association/ids-specification を眺めてコンテンツをざっくり書き下す。

v0.8の位置づけ

以下のような記載あり。あくまでドラフト扱い。

Working Draft 1 February 2023

概要

スキーマとプロトコルの仕様を示すものである。

  • データのPublish
  • 利用合意形成
  • データアクセス

自律エンティティ(要はコネクタか)間でデータ共有するにはメタデータの提供が必要である。 以下、メタデータ管理として挙げられていた項目。

  • data assetをDCAT Catalogに入れる方法
  • Usage ControlをODRLポリシーとして表現する方法
  • 契約合意を構文的に電子記録する方法
  • データ転送プロトコルを用いてデータアセットにアクセスする方法

仕様として挙げられていたのは以下。

  • Dataspace ModelとDataspace Terminologyドキュメント
  • Catalog ProtocolとCatalog HTTPS Bindings(DCATカタログの公開、アクセス方法)
  • Contract Negotiation ProtocolとContract Negotiation HTTPS Bindingドキュメント
  • Transfer Process ProtocolとTransfer Process HTTPS BIndingsドキュメント

概ね、モデルを明らかにし、そのうえでカタログ、契約合意、データ転送の一連の流れに沿って仕様が示されている、と言える。

注意点として、データ転送プロセス自体は言及せずプロトコルのみ示されていることである。

Protocol Overviewに各種情報へのリンクが載っている。

インターオペラビリティの確保

Dataspace Protocolはインターオペラビリティを確保するために用いられる、とされている。 ただし、本プロトコルによりテクニカルな側面は担保されるが、セマンティックな側面はData Space参加者により担保されるべきとしている。

なお、異なるData Spaceを跨ぐインターオペラビリティは本ドキュメントの対象外である。

全体概要は、Protocol Overviewの図を参照されたし。

以下は本ドキュメントではスコープ外だが、Data Space Protocolにとって必要。

  • Identity Provider
    • Trust Frameworkを実現するための情報を提供する
    • 参加者(のエージェント、コネクタ)のバリデーション、請求内容のバリデーションが基本的な機能であるが、 その請求の構造・内容はData Spaceごと、もっといえばIdentity Provider毎に異なる。
  • モニタリング
  • Policy Engine

Terminology

Terminologyに記載されているが、量は多くない。

特にポリシー周りの用語には注意したい。

  • Assest: Participantによって共有されるデータやテクニカルサービス
  • Policy: Asset利用のためのルール、義務、制限
  • Offer: とあるAssetに結びつけられたPolicy
  • Agreement: とあるAssetに結びつけられた具体的なPolicy。Provier、Consumerの両Participantにより署名されている

なお、関係性という意味では、Information ModelやParticipantAgent周りのクラス設計の方がわかりやすい。

Information Model

Information Modelに記載されている。 Information Modelの関係図 に関係図が載っている。

以下、ポイントのみ記載。

Dataspace Authority

ひとつ or 複数のDataspaceを管理する。 Participantの登録も含む。Participantにビジネスサーティフィケーションの取得(提出?)を求める、など。

Participant、Participant Agent

ParticipantがDataspaceへの参加者であり、Participant Agentが実際のタスクを担う。 Participant Agentはcredentialから生成されたverifiable presentationを用いる。credentialは第三者のissuerから発行されたものを用いる。 また第三者のIdentity providerか提供されたID tokenも用いる。

Identity Provider

トラストアンカー。 ID tokenを払い出す。ID tokenはParticipant Agentのアイデンティティの検証を行う。

複数のIdentity ProviderがひとつのDataspaceに存在することも許容される。

ID tokenのセマンティクスは本仕様書の対象外。

Identity Providerは外部でもよいし、Participant自身(e.g. DID)でもよい。

Credential Issuer

Verifiable Credentialを発行する。

ParticipantAgent周りのクラス設計

ParticipantAgent周りのクラス設計の図に大まかな設計が書かれている。

これによると、CatalogServiceとConnectorは同じParticipantAgentの一種である。

DCAT CatalogにはAsest EntryとDCAT DataServiceが含まれる。 ちなみに、DCAT DataServiceはAssetの提供元となるConnectorへのReferenceである。

Asset EntryはODRL Offerを保持する。当該Assetに紐づけられたUsage Control Policyである。

ConnectorもParticipant Agentの一種である。 Contract NegotiationとData Transferを担う。 Contract Negotiation結果、ODRL Agreementが生成される。ODRL Agreementは、合意された当該Assetに関するUsage Control Policyと言える。

実態的なクラス

Dataspace AuthorityやParticipant Agentのように、実際のフローには登場しないエンティティもあるようだ。 Classesドキュメント に実際のフローに登場するクラスの説明が記載されている。

Catalog Protocol仕様

Catalog Protocolの仕様に以下のような要素の仕様が記載されている。

  • message type: メッセージの構造
  • message: message typeのインスタンス
  • catalog: データ提供者によりオファーされたDCAT Catalog
  • catalog service: 提供されたasset(の情報)を公表するParticipant Agent
  • Consumer: 提供されたassetを要望するParticipant Agent

Message Type

JSON-LDでシリアライズされたメッセージ。なお、将来的にはシリアライズ方式が追加される可能性がるようだ。

  • CatalogRequestMessage: ConsumerからCatalog Serviceに送られる要望メッセージ。filterプロパティあり。
  • Catalog: Providerから送られるAsset Entiry
  • CatalogEror: ConsumerもしくはProviderから送られるエラーメッセージ
  • DatasetRequestMessage: ConsumerからCatalog Serviceに送られる要望メッセージ。Catalog ServiceはDataset(DCAT Dataset)を答える。それにはデータセットのIDが含まれる。

DCATとIDS Informationモデルの対応関係

  • Asset Entity: DCAT Dataset
  • Distributions: DCAT Distributions
  • DataService: ConnectorのようなIDSサービスエンドポイント。なお、dataServiceTypeが定義されている。現状ではdspace:connectorのみか。

Technical Consideration

Technical Considerationsとしてはクエリやフィルタ、レプリケーションプロトコル、セキュリティ、ブローカについて言及されている。

Catalog HTTPS Binding

Contract Negotiation仕様

ProviderとConsumerの間のContract Negotiation(CN)。 CNは、 https://www.w3.org/International/articles/idn-and-iri/ にてユニークに識別できる。 CNは状態遷移を経る。それらはProviderとConsumerにトラックされる。

ステート変化

Contract Negotiationのステート変化の図 にステート変化の概要が描かれている。

ステート 説明
REQUESTED ConsumerからOfferに基づいてリクエストが送られた状態。ProviderがAckを返した後。
OFFERED ProviderがConsumerにOfferを送った状態。ConsumerがAckを返した後。
ACCEPTED Consumerが最新のContract Offerを承諾した状態。ProviderがAckを返した後。
AGREED Providerが最新のContract Offerに承諾し、Consumerに合意を送った状態。ConsumerがAckを返した後。
VERIFIED ConsumerがProviderに合意検証結果を返した状態。ProviderがAckを返した後。
FINALIZED Providerが自身の合意検証結果を含むファイナライズのメッセージをConsumerに送った状態。ConsumerがAckを返した後。データがConsumerに利用可能な状態になっている。
TERMINATED ProviderかConsumerがCNを終了させた状態。どちらからか終了メッセージが送られ、Ackが返る。

メッセージタイプ

Contract NegotiationのMessage Types に上記ステート変化における各種メッセージの説明が記載されている。 ほぼ名称通りの意味。

Contract Negotiation HTTPS Bindings

Transfer Process仕様

CNと同じく、Transfer Process (TP) もProviderとConsumerの両方に関係する。 またCNと同じくステート変化が定義されている。

Control and Data Plane

TPはConnectorにより管理される。Connectorは2個のロジカルコンポーネントで成り立つ。

  • Control Plane: 対抗のメッセージを受領し、TPステートを管理する
  • Data Plane: 実際の転送を担う

これらはロジカルなものであり、実装では単独プロセスとしてもよい。

アセット転送のタイプ

  • push / pull
  • finite / non-finite

Push Transferの流れPull Transferの流れ にそれぞれの流れのイメージが載っている。 基本的な流れは同じだが、実際のデータやり取りの部分だけ異なる。

finiteデータの場合、データの転送が終わったらTPが終わる。 一方、non-finiteデータの場合は明示的に終了させる必要がある。 non-finiteデータはストリームデータ、APIエンドポイントが例として挙げられている。

ステート変化

Trasnfer Processのステート変化 にステート変化の様子が描かれている。

ステートはREQUESTED、STARTED、COMPLETED、SUSPENDED、TERMINATED。

Message Type

Transfer ProcessのMessage Type にメッセージタイプが記載されている。 特筆すべきものはない。

参考

共有

How to create SDK (WIP)

メモ

注意

まだ書き始めの殴り書きなので、中身の完成度や信ぴょう性がかなり低い文章である。

世の中でSDKと呼ばれているものには何が含まれているのか

  • 開発者の強い味方!「SDK(ソフトウェア開発キット)」とは?
    • KDDIによるブログ記事
    • 「ソフトウェアを開発する際に必要なプログラムやAPI・文書・サンプルなどをまとめてパッケージ化したもの」
  • e-WordsのSDK 【Software Development Kit】 ソフトウェア開発キット
    • 「SDKとは、あるシステムに対応したソフトウェアを開発するために必要なプログラムや文書などをひとまとめにしたパッケージのこと。システムの開発元や販売元が希望する開発者に配布あるいは販売する。インターネットを通じて公開されているものもある。」
  • AWS SDKとは?
    • 「特定のプラットフォーム、オペレーティングシステム、またはプログラミング言語で実行されるコードを作成するには、デバッガー、コンパイラー、ライブラリなどのコンポーネントが必要です。SDK は、ソフトウェアの開発と実行に必要なすべてを 1 か所にまとめます。」

製品により異なるのが前提ではあるが、おおむね含まれるとされるのは以下。

  • 開発ツール
    • コンパイラ、デバッガ、プロファイラー、デプロイツール、IDE
  • 既成のプログラム
    • クラスファイル、(APIなど)ライブラリ、モジュール、ドライバ
  • 文書ファイル
    • API、通信プロトコル
  • プログラムファイル
    • サンプルプログラム

SDKの具体例

工夫点

開発ツールの提供形態

  1. 開発に必要な環境ごと丸ごと提供する方法、2. 特定のIDEを前提としたプラグインを提供する方法、などがある。
方法例 メリット デメリット
丸ごと提供 すぐに開発始められる 自由度が低い
プラグイン提供 自由度が高い 環境準備が手間大きめ

ライセンス

SDK自体のライセンスだけではなく、SDKに含まれるライセンスにも注意を払う必要がある。

各言語向けのバインディング

サービスを使うためのSDKの場合は、様々な開発言語向けのバインディングを提供することも考慮したい。

先人の知恵

SDK の開発と維持の難しさ にはAndroidやiOS向けのSDKを開発した際の経緯が書かれていた。

参考

SDKとは?

SDKの具体例

ばいんでぃば

先人の知恵

共有

check windows cpu resources

メモ

マシンが暴走気味になることがあり、原因追跡するためにLinuxなどでいうpsコマンド相当の内容をログ取りたいと思った。 ので、軽く調べたところ、以下のウェブサイトがヒットした。

Windows がなんか重いときにコマンドで調べる(WMIC PROCESS)

上記サイトが丁寧に解説してくれている。 その中から、今回は CPUの利用率でフィルタ あたりを参考にした。

コマンド例

1
WMIC PATH Win32_PerfFormattedData_PerfProc_Process WHERE "PercentUserTime > 1" GET Name,IDProcess,PercentUserTime,CommandLine /FORMAT:CSV

ユーザタイムが1%以上使われたものをリストしている。 もし行数制限したい場合は、 PowerShellでhead,tail相当の処理を行う あたりを参考にする。

コマンド例

1
(WMIC PATH Win32_PerfFormattedData_PerfProc_Process WHERE "PercentUserTime > 1" GET Name,IDProcess,PercentUserTime /FORMAT:CSV)[0..10]

上記では、コマンドの引数が分からない。 そこで、それを詳細に調べようとすると、 コマンドラインからプロセスを特定 を参考にするとよい。 おいおい、自動的に要素を結合できるようにしよう。

参考

共有

Open project of EDC Connector with Intellij

1. メモ

EDC Connector 公式GitHub のプロジェクトをIntellijで開くための手順メモ。 いろいろなやり方があるが一例として。

1.1. 準備

gitクローンしておく。 ここでは、gitプロトコルを用いているが環境に合わせて適宜変更してクローンする。

1
2
$ git clone git@github.com:eclipse-edc/Connector.git
$ cd Connector

なお、必要に応じて特定のタグをチェックアウトしてもよい。 ここでは、v0.2.0をチェックアウトした。

1
$ git checkout -b v0.2.0 refs/tags/v0.2.0

当該プロジェクトでは、ビルドツールにGradleを用いている。プロジェクトに gradlew も含まれている。 本環境では、以下のようにGradle8.0を利用した。

◇参考情報

1
2
3
4
5
6
7
$ cat gradle/wrapper/gradle-wrapper.properties
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.0-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

◇参考情報おわり

さて、ビルドに利用するOpenJDKをインストールしておく。 ここでは17系を用いた。(もともと手元の環境にあった8系を用いようとしたら互換性エラーが生じたため、17系を用いることとした)

1
$ sudo apt install openjdk-17-jdk

なお、 Gradle Compatibility に互換性のあるGradleとJDKのバージョンが記載されているので参考にされたし。

この状態でIntellijを使わずにビルドするには、以下のように実行する。 ここでは念のために、コマンドでビルドできることを確かめるため、あらかじめ以下を実行しておいた。

1
$ ./gradlew clean build

特に問題なければ、success表示が出て終わるはず。

1.2. Intellijで開く

ひとまず、プロジェクトトップでInteliljを開く。

1
$ <Intellijのホームディレクトリ>/bin/idea.sh . &

なお、IntellijにGradle拡張機能がインストールされていなければ、インストールしておく。「Files」→「Settings」→「Plugins」→「Gradleで検索」。

また使用するJDKを先にインストールしたJDK17を用いるようにする。「Files」→「Project Structure」→「Project」を開く。 「SDK:」のところで先ほどインストールしたJDKを設定する。 「Language Lavel:」も合わせて変更しておく。

開いたら、右側の「Gradle」ペインを開き、設定ボタンを押して「Gradle settings ...」を選択する。

「Build and run」章のところは、「Gradle」が選択されていることを確認する。

「Gradle」章のところは、「Use Gradle from:」でgradle wrapperの情報を用いるようになっていること、「Gradle JVM」でProject JDKを用いるようになっていることを確認する。

特に問題なければ、 BUILD SUCCESSFUL となるはずである。

1.3. トラブルシュート

1.3.1. Intellijのメモリ不足

ビルド中にヒープが不足することがある。 その場合は、 Intellijのメモリを増やす設定 を参考に、ヒープサイズを増やす。

「Help」→「Memory Setting」を開き、「2024」(MB)あたりにしておく。

1.3.2. Gradleのメモリ不足

ビルド中にヒープが不足することがある。 プロジェクト内にある、 gradle.properties 内に以下を追記する。ここでは最大4GBとした。

◇diff

1
2
3
4
5
6
7
8
9
10
11
diff --git a/gradle.properties b/gradle.properties
index 376da414a..d8da811a9 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -7,3 +7,6 @@ edcGradlePluginsVersion=0.1.4-SNAPSHOT
metaModelVersion=0.1.4-SNAPSHOT
edcScmUrl=https://github.com/eclipse-edc/Connector.git
edcScmConnection=scm:git:git@github.com:eclipse-edc/Connector.git
+
+# Increase Gradle JVM Heap size
+org.gradle.jvmargs=-Xmx4096M

1.3.3. JDKバージョンの不一致

当初、環境にあったJDK8系を用いていたが、ビルド時に構文エラー(互換性のエラー)が生じたため、JDK17を用いるようにした。

1.3.4. Gradleで使うJDKの選択

環境によっては、独自の証明書などをJDKのcacertsに導入して用いているかもしれない。 その場合は、用いているGradle、JDKに注意が必要。

例えば、/etc/ssl/certs/java/cacertsに独自の証明書を追加し、それを各JDKで用いるようにすると良い。

1
2
3
4
5
6
7
cd /etc/ssl/certs/java
cp cacerts cacerts.org
keytool -import -alias CA -keystore cacerts -file ~/any.cer
# passwordはchnageit
cd ~/.jdks/jbr-xxxxxx/lib/security
mv cacerts cacerts.org
ln -s /etc/ssl/certs/java/cacerts cacerts

2. 参考

共有