ComfyUI

メモ

導入方法

ComfyUIとは?ノードベースGUIでStable Diffusionを最大活用する機能・特徴・導入方法徹底解説! に導入方法などの基礎が書いてある。 ComfyUIの導入方法が2種類紹介されているが、他のツールなどの管理も兼ねるため、ここではStability Matrixを用いた。

StabilityMatrix のリリースから各環境用のインストーラをダウンロード。 執筆時点では、 v2.12.0が最新だった。

Windwows用は以下。

https://github.com/LykosAI/StabilityMatrix/releases/download/v2.12.0/StabilityMatrix-win-x64.zip

使い方

画像生成AI「Stable Diffusion」を使い倒す! モジュラーシンセみたいな「ComfyUI」をインストール が参考になる。 先に StabilityMatrix のModel Browserで適当なモデルをダウンロードしておくこと。 StabilityMatrix で ComfyUI をインストールした場合、 ComfyUIのコンフィグが自動で編集され、ダウンロードしたモデルのパスが追加されるようになっている。ただし、新しいモデルをダウンロードした場合は ComfyUIの再起動が必要。

参考

共有

Jan

メモ

LLMプラットフォームの「JAN」、様々なモデルでチャットAIを作れる を参考に、 Janを使ってみる。

Janの公式サイト に掲載の通り、パッケージをダウンロードする。 ここではUbuntu環境を前提にAppImageを用いる。 このとき最新版だった、 jan-linux-x86_64-0.5.4.AppImage をダウンロードした。自分の環境の場合は、 ~/Applications/ 以下にダウンロードした。

Ansibleプレイブックとしてはいかが参考になる。

  • https://github.com/dobachi/ansible-miscs/blob/master/playbooks/conf/linux/jan.yml
  • https://github.com/dobachi/ansible-miscs/blob/master/roles/jan/tasks/main.yml

環境が整ったら実行。

1
$ ~/Applications/jan/default/jan-linux-x86_64-0.5.4.AppImage

初回起動時はインストールするかどうかを聞かれるのでYesとする。

GUIからひとまず lama3.1-8b-instruct を試すことにした。

続いて、 LLMプラットフォームの「JAN」、様々なモデルでチャットAIを作れる の記事でも用いられている、huggingfaceからモデルを探して試す。 Huggingfaceについては、 Hugging Faceとは?Hugging Face Hubの機能や使い方・ライブラリをわかりやすく解説! を参照。

参考

共有

US keyboard layout on Ubuntu22

メモ

VMWare WorkstationでUbuntu 22系をインストールしたところ、物理キーボードがUS配列なのに、日本語配列で認識されてしまったので直した。

Ubuntu 22.04 でキーボードレイアウトおかしくなる問題。 にも記載されているのだが、/usr/share/ibus/component/mozc.xmlには以下のような記載がある。

1
2
3
4
5
<!-- Settings of <engines> and <layout> are stored in ibus_config.textproto -->
<!-- under the user configuration directory, which is either of: -->
<!-- * $XDG_CONFIG_HOME/mozc/ibus_config.textproto -->
<!-- * $HOME/.config/mozc/ibus_config.textproto -->
<!-- * $HOME/.mozc/ibus_config.textproto -->

自分の環境だと、以下のような場所にあった。

1
2
3
4
5
$ sudo find / -name *ibus_config.textproto*

(snip)

/home/dobachi/.config/mozc/ibus_config.textproto

以下のようにlayoutを変更する。もともとdefaultである。

1
$ cat /home/dobachi/.config/mozc/ibus_config.textproto
1
2
3
4
5
engines {
name : "mozc-jp"
longname : "Mozc"
layout : "us"
}

参考

共有

Building Block of Data Spaces

メモ

各イニシアティブのビルディングブロックのまとめかたを軽くメモしたもの。

メモのボード

参考

共有

Data Sovereignty

メモ

世における基本的な考え方

データ主権(Data Sovereignty)については色々な解釈をされているが、 一つの考え方は特定地域で収集・保存されたデータがその地域の法律にしたがって運用されること、という考え方が挙げられる。

この考え方を示している記事が多い。

あまり量が多くなっても混乱するので、主要な話だけさらってみるようにする。

簡単なまとめメモ

セーフハーバー法

関連することとして、セーフハーバー法が挙げられる。つまり、特定の条件を満たす限り、違法行為とみなされない範囲を定めた規定である。 総務省の平成29年版白書では、 第1部 特集 データ主導経済と社会変革 にEUと米国のセーフハーバーにちうて記載あり。 米国とEUの間では2000年に個人データ移転についての原則を記したセールハーバー協定を締結したのだが、いわゆるSnowden事件により欧州司法裁判所による無効の判断。そして、その後、2016/2にEU-USプライバシーシールが新たに制定。

Snowden事件とデータローカライゼーション

データ保護については、Snowden事件や米国政府によるマイクロソフトのメール検閲を挙げるケースがある。 2013年ころのことである。参考: 世界を揺るがしたスノーデン事件から5年--変わったこと、変わらなかったこと

ここから、欧州のGDPR(2016)やカナダのデータ主権措置に関する戦略(2016-2020)などのデータ越境移転に関する規定、 つまりデータローカライゼーション規定の流れが考えられる。 参考: データローカライゼーション規制とは?必要となる背景や現状を徹底解説 2022年当時の欧州、米国などのデータローカライゼーション規定については、経産省の「データの越境移転に関する研究会」資料がわかりやすい。 参考: 各国のデータガバナンスにおけるデータ越境流通に関連する制度調査

著名なところだと、中国サイバーセキュリティ法(2017/6施行)の第37条が挙げられる。参考: China’s data localization 他にも、ベトナムのインターネットサービスとオンライン情報コンテンツの管理、提供、仕様に関する法令(2013/9制定)、インドネシアの法令では国内データセンタへのデータ配置が求められている(2014/1)、ロシアのデータローカライゼーション法(2015/9発行)など。

一方で、このようなデータローカライゼーションに関する制約はビジネスや技術発展において足かせとなるという主旨の論調もあった。 参考: The Cloud’s Biggest Threat Are Data Sovereignty Laws

日本からは、2019/1のスイス・ジュネーブにおける、いわゆるダボス会議にて、当時の首相だった安倍晋三からDFFTの提言があった。

GDPRとData Act

データローカライゼーションの議論をする際には、EUにおけるGDPR(一般データ保護規則)は外せない。 個人データの移転と処理について規定したものである。2018年適用開始。参考: GDPR(General Data Protection Regulation:一般データ保護規則)

一方で、個人データだけではなく、非個人データを含むデータの利用促進、データへの公平なアクセスおよびその利用を目的として2020年の欧州データ戦略の一環としてData Actが成立。2024/1発効、2025/9施行。 IoT機器が生成するデータであり、非個人データも対象となる。自動車、スマート家電、などなど影響大。 参考: EUデータ法の解説 - 適用場面ごとのルールと日本企業が講ずべき実務対応を整理Data Act

両者は補完関係と考えることもできる。

また、Data ActはData Governance Actとも補完関係と考えられる。

日本とEU・英国の間のデータ越境移転に関するセーフハーバー

欧州から日本に対しては、2019/1/23に十分性認定がされており、EU域内と同等の個人データ保護水準を持つ国と認定されている。 参考: 日EU間・日英間のデータ越境移転について 日本は個人情報保護法第28条でEUを指定、EUはGDPR第45条に基づき日本を十分性認定。

ただし、個人情報保護法に基づき補完的ルール適用が必要。参考: 補完的ルール 補完的ルールでは以下のような規定。

  • 要配慮個人情報の範囲の拡張
  • 保有個人データの範囲の拡張
  • データ主体の権利保護
  • データの安全管理措置

まとめ

データ主権を議論する際には、基本的事項としてデータを収集・保存した地域における法律遵守が主旨になり、 さらにいわゆるデータローカライゼーションの原則が挙げられる。個人情報保護の観点がひとつは挙げられるが、それに限らない。 一方で経済や技術発展のためには、バランスをとったデータ越境移転が必要であり、DFFTのような提唱も行われている。

参考

共有

Use HOME in case of using sudo [ansible]

メモ

ansible で sudo 時に実行ユーザの HOME 環境変数を取得する を参考にした。

以下のようなロールを作っておき、プレイブックの最初の方に含めておくと良い。 以降のロール実行時に、変数 ansible_home に格納された値を利用できる。

roles/regsiter_home/tasks/main.yml

1
2
3
4
5
6
7
8
9
10
11
- block:
- name: Get ansible_user home directory
shell: 'getent passwd "{{ansible_env.SUDO_USER}}" | cut -d: -f6'
register: ansible_home_result

- name: Set the fact for the other scripts to use
set_fact:
ansible_home: '{{ansible_home_result.stdout}}'
cacheable: true
tags:
- register_home

元記事と変えているのは、SUDO実行ユーザ名を取るところ。環境変数から取るようにしている。

プレイブックでは以下のようにする。

playbooks/something.yml

1
2
3
4
- hosts: "{{ server | default('mypc') }}"
roles:
- register_home
- something

参考

共有

Management Domain of EDC

メモ

概要

2024/6あたりから、 EDC/management-domains のドキュメントが追加された。

EDC/management-domains/Introduction の通り、Management Domainを利用することで、EDCコンポーネント群を組織的に管理することができる。 例えば、管理組織と下部組織にわけ、下部組織は管理組織に管理を委譲できる。

ここで言うEDCコンポーネントとは、例えば以下のようなものが挙げられている。

  • Catalog Server
  • Control Plane
  • Data Plane
  • Identity Hub

2024/7/14現在では、対象は以下のようになっている。

1
The current document will outline how management domains operate for three EDC components: the Catalog Server, Control Plane, and Data Plane.

Topology

デプロイメトには種類がある。 大まかに分けて、単一構成(Single Management Domain)と分散型構成(Distributed Management Domains)である。

分散型の構成は、親Management Domainがどこまでを担うかによります。

Architecture

EDCはDCAT3のCatalog型を利用する。Catalog型の親はDataset型である。 またCatalog型はほかの複数のCatalogを含むことができる。その際、Service型で定義する。 したがって、以下のような例になる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
"@context": "https://w3id.org/dspace/v0.8/context.json",
"@id": "urn:uuid:3afeadd8-ed2d-569e-d634-8394a8836d57",
"@type": "dcat:Catalog",
"dct:title": "Data Provider Root Catalog",
"dct:description": [
"A catalog of catalogs"
],
"dct:publisher": "Data Provider A",
"dcat:catalog": {
"@type": "dcat:Catalog",
"dct:publisher": "Data Provider A",
"dcat:distribution": {
"@type": "dcat:Distribution",
"dcat:accessService": "urn:uuid:4aa2dcc8-4d2d-569e-d634-8394a8834d77"
},
"dcat:service": [
{
"@id": "urn:uuid:4aa2dcc8-4d2d-569e-d634-8394a8834d77",
"@type": "dcat:DataService",
"dcat:endpointURL": "https://provder-a.com/subcatalog"
}
]
}
}

Sub CatalogはDCATのService型で関連付けされる。 この例は、Distributed Management Domainの2aに相当する。

アクセス管理についても考える。 2bのパターンのでは、親のCatalog Serverが下部組織のManagement Domainを管理する。そのため、ContractDefinitionにアクセス権を定義することで下部組織へのアクセス管理を実現する。

また、さらに集中型のControl Planeを設ける場合は、親のControl Planeが子のData Planeを管理する。

また、2024/7時点での設計では、複製を避けるようになっている。これは性能と単純さのためである。複合Catalogは上記の通り、ハイパーリンクを使用するようにしており、非同期のクローラで遅延ナビゲートされることも可能。 また、Catalogもそうだが、Contract NegotiationやTransfer Processのメタデータも複製されないことにも重きが置かれている。各Management Domainはそれぞれ責任を持って管理するべき、としている。 もし組織間で連携が必要な場合は読み取り専用の複製として渡すEDC拡張機能として実現可能である。 なお、個人的な所感だが、これは2aや2bであれば実現しうるが、2cだとControl Planeが親側に存在するため、Contract Negotiationの管理主体が親になってしまうのではないか、と思うがどうだろうか。Data Plane側はもしかしたら、下部組織側に残せるかもしれない。

実装

以上の内容を実現するため、いくつかEDCに変更が行われる。

  • Asset にCatalogを示す真理値を追加。もし @typeedc:CatalogAsset の場合は真になる。
  • Management APIが更新される。 Asset@type を持てるように。
  • Dataset の拡張として Catalog を追加
  • DatasetResolverImpl を更新。 Asset のCatalogサブタイプを扱えるように。
  • JsonObjectFromDatasetTransformer をリファンクたリング。 Catalog サブタイプを扱えるように。

合わせて、Fedrated Catalog Crawler (FCC) をリファクタリング。複合カタログをナビゲートし、キャッシュできるように。 参加者ごとにCatalogの更新をアトミックに実行できるようにする必要がある。

Management APIはCatalogを要求する際、ローカルのFCCキャッシュを参照するようにリファクタリングされる。

Catalog Serverもリファクタリングされる。

参考

共有

MVD_of_EDC

メモ

2024/7/12時点でプロジェクトのドキュメントが更新され、旧来Developer Documentとされていたものがなくなった。 その代わり、README に色々な記述が追加されている。今回はこれを使って試す。

Identity & Trustについての説明

README#Introduction にある通り、Eclipse Dataspace Working Groupの下で、IdentityやTrustの仕様検討が行われている。 ただ、当該章に記載のリンクをたどると、Tractus-Xのサイトにたどり着く。 -> Eclipse Tractus-X/identity-trust 当該レポジトリのREADMEでも記載されているが、この状態は暫定のようだ。

1
Until the first version of the specification from the working group is released and implemented, this repository contains the current specification implemented in the Catena-X data space. Maintenance is limited to urgent issues concerning the operation of this data space.

何はともあれ、2024/7現在は、Verifiable Credentialを用いたDecentralized Claims Protocolは絶賛仕様決定・実装中である。

目的

README#Purpose には、このデモの目的が記載されている。 このデモでは、2者のデータスペース参加者が、クレデンシャルを交換した後、DSPメッセージをやり取りする例を示す。

もちろん、旧来からの通り、このデモは商用化品質のものではなく、いくつかの shortcut が存在する。

シナリオ

management-domain を用いたFederated Catalogsを用いている。

シナリオの概要をいかに示す。

MVDシナリオの概要

図の通り、Provider CorpとConsumer Corpの2種類の企業があり、 Provdier CorpにはQ&AとManufacturingという組織が存在する。Q&AとManufacturingにはそれぞれEDCが起動している。Provdier Copの親組織にはCatalogとIdentityHubが起動している。このIdentityHubはCatalog、EDC(provider-qna)、EDC(provider-manufacuturing)に共通である。また、participantIdも共通のものを使う。Consumer CorpにはEDCとIdentityHubがある。

Data setup

図の通り、実際のデータ(assets)は各下部組織にある。そのカタログは直接外部に晒されない。その代わり、親組織であるProvider CorpのCatalog Server内のroot catalogにそのポインタ(catalog assets)が保持される。これにより、Consumer Corpはroot catalogを用いて、実際のassetの情報を解決できる。

(wip)

参考

共有

EDCSample

メモ

EDC/Samples の内容を紹介する。

動作確認環境

本プロジェクトに沿って動作確認した際の環境情報を以下に載せる。

1
2
3
4
5
6
7
8
9
10
$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=22.04
DISTRIB_CODENAME=jammy
DISTRIB_DESCRIPTION="Ubuntu 22.04.4 LTS"

$ java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode, sharing)

EDCで利用しているJDKに合わせてJDK17を用いている。

またdobachi/EDCSampleAnsibleに環境準備などのAnsible Playbookを置く。

README

EDC/Samples/README には目的、必要事項(準備)、スコープの説明がある。

本プロジェクトの目的はオンボーディングの支援。 初心者をナビゲートしやすいような順番でサンプルが構成されており、ステップバイステップで学べる。

必要事項(準備)

このプロジェクトでは、基本的にEclipse Dataspace Componentで用いられる用語は理解している前提で説明が進められる。 そのため、EDC/documentation は読んでいることが望ましい。 また、EDC/YTの動画も参考になる。

また、関連ツールとして、Git、Gradle、Java、HTTPあたりは押さえておきたい、とされている。

動作確認する場合は、Java11+がインストールされていることが必要。

スコープ(Scopes)

サンプル群は、Scopeと呼ばれるグループで区分されている。 何を学びたいか、で分かれている。 ひとまず、初心者であればbasicが良いとのこと。

ざっと以下に解説する。

Scope名 説明
Basic EDC Frameworkを使い始めるための基礎。セットアップ、起動、拡張の作成
Transfer データ転送について
Advanced 高度な例

Basicスコープの概要

EDC/Samples/Basic/READMEには以下のサンプルが載っている。

サンプル名 説明
Basic Sample 1 コネクタを起動するのに必要なことを学ぶ
Basic Sample 2 拡張機能の作成、拡張機能の使い方を学ぶ
Basic Sample 3 プロパティファイルを用いて設定値を扱う方法を学ぶ

Basic/Sample1

EDC/Samples/Basic/Basic1/READMEには以下の通り説明されている。

Runtimeとビルドファイルで構成されている。 ビルドファイルは、build.gradle.ktsである。

このサンプルでは、EDCのBaseRuntimeを用いている。

また、EDC/Samples/Basic/Basic1/build.gradle.ktsを見ることでプロジェクトが依存するものがわかる。

basic/basic-01-basic-connector/build.gradle.kts:22

1
2
3
4
dependencies {
implementation(libs.edc.boot)
implementation(libs.edc.connector.core)
}

READMEの通り、Sampleプロジェクトのrootでコンパイル実行すれば良い。

1
2
./gradlew clean basic:basic-01-basic-connector:build
java -jar basic/basic-01-basic-connector/build/libs/basic-connector.jar

特に何があるわけではない。コンソールに起動のメッセージが出力されるはず。

Basic/Sample2

EDC/Samples/Basic/Basic2/README には以下の通り説明されている。

ServiceExtensionsrc/main/resources/META-INF/services ディレクトリ以下のプラグインファイルで構成される。 この例では HealthEndpointExtension というServiceExtensionを実装。

1
2
3
4
5
6
7
8
9
10
public class HealthEndpointExtension implements ServiceExtension {

@Inject
WebService webService;

@Override
public void initialize(ServiceExtensionContext context) {
webService.registerResource(new HealthApiController(context.getMonitor()));
}
}

上記の通り、 ServiceExtension を拡張。また、中ではWebServiceをインジェクとしている。(DIのインジェクトについてはインターネットの情報を参考) initialize メソッド内で、先にインジェクとした webService にリソースを登録している。

org.eclipse.edc.web.spi.WebService インターフェースは、例えば org.eclipse.edc.web.jersey.JerseyRestService として実装されている。 Jersey はシンプルなRESTfulサービスのフレームワークである。

上記で登録されているコントローラ HealthApiController は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
@Path("/")
public class HealthApiController {

private final Monitor monitor;

public HealthApiController(Monitor monitor) {
this.monitor = monitor;
}

@GET
@Path("health")
public String checkHealth() {
monitor.info("Received a health request");
return "{\"response\":\"I'm alive!\"}";
}
}

特別なことはないが、上記の通り、 checkHealth メソッドを叩かれるとメッセージを残す。 実行の仕方は以下の通り。

1
2
./gradlew clean basic:basic-02-health-endpoint:build
java -jar basic/basic-02-health-endpoint/build/libs/connector-health.jar

http://localhost:8181/api/health にアクセスすればブラウザ上でメッセージが表示されることに加え、コンソールにもメッセージが生じる。

Basic/Sample3

EDC/Samples/Basic/Basic3/README に以下のような説明がある。

設定をくくりだした ConfigurationExtension がある。 org.eclipse.edc.configuration.filesystem.FsConfigurationExtension がデフォルトの設定ファイル取り扱いのクラス。 このサンプルでは、Gradleの依存関係記述にて、 以下を指定することでJarファイルを含めるようにする。

basic/basic-03-configuration/build.gradle.kts:29

1
2
3
(snip)
implementation(libs.edc.configuration.filesystem)
(snip)

ここでは /etc/eclipse/EDC/config.properties という設定ファイルを作り利用する例を示す。 ポート番号を9191に変更する例。

1
2
sudo mkdir -p /etc/eclipse/EDC
sudo sh -c 'echo "web.http.port=9191" > /etc/eclipse/EDC/config.properties'

プロパティファイルを作ったので、それを利用するよう引数で指定して実行する。

1
2
./gradlew clean basic:basic-03-configuration:build
java -Dedc.fs.config=/etc/eclipse/EDC/config.properties -jar basic/basic-03-configuration/build/libs/filesystem-config-connector.jar

起動時に以下のようなログが見えるはず。

1
2
INFO 2024-08-26T16:16:38.946828558 HTTP context 'default' listening on port 9191
DEBUG 2024-08-26T16:16:38.986329567 Port mappings: {alias='default', port=9191, path='/api'}

ちなみに、設定ファイルのデフォルトは以下の通り。

/home/dobachi/.gradle/caches/modules-2/files-2.1/org.eclipse.edc/configuration-filesystem/0.8.1/7741c1f790be0f02a2da844cd05edb469a5d095b/configuration-filesystem-0.8.1-sources.jar!/org/eclipse/edc/configuration/filesystem/FsConfigurationExtension.java:67

1
var configLocation = propOrEnv(FS_CONFIG, "dataspaceconnector-configuration.properties");

独自のプロパティの利用

本サンプルのREADMEに書いてある通り、先程のプロパティファイルに以下を追記。

1
edc.samples.basic.03.logprefix=MyLogPrefix

ログのプリフィックス文言を決めるためのプロパティを定義。

続いて、Sample2でも使用した org.eclipse.edc.extension.health.HealthEndpointExtension を改変。 プロパティ名を定義し、 org.eclipse.edc.spi.system.SettingResolver#getSetting(java.lang.String, java.lang.String) メソッドを用いてプロパティの値を取れるようにした。

org/eclipse/edc/extension/health/HealthEndpointExtension.java:22

1
2
3
4
5
6
7
8
9
10
11
12
13
public class HealthEndpointExtension implements ServiceExtension {

@Inject
WebService webService;

private static final String LOG_PREFIX_SETTING = "edc.samples.basic.03.logprefix"; // this constant is new

@Override
public void initialize(ServiceExtensionContext context) {
var logPrefix = context.getSetting(LOG_PREFIX_SETTING, "health"); //this line is new
webService.registerResource(new HealthApiController(context.getMonitor(), logPrefix));
}
}

http://localhsot:9191/health にアクセスすると、コンソールに以下のログが出る。

1
INFO 2024-08-26T16:25:55.233519477 health :: Received a health request

Transferスコープの概要

2個のコネクタ間でデータをやり取りする。プロバイダとコンシューマ。

Transfer/Sample0(準備)

EDC/Samples/Transfer/Transfer0/READMEの通り、準備をする。 なお、このサンプルでは簡単化のために同一マシン上で、プロバイダとコンシューマを起動するが、本来は別々のところで起動するものである。

プロジェクトrootで以下を実行。

1
./gradlew transfer:transfer-00-prerequisites:connector:build

結果、ビルドされたJARファイルがここに配置される。 transfer/transfer-00-prerequisites/connector/build/libs/connector.jar

プロバイダとコンシューマは設定が異なるのみであり、JARは共通。 PATH配下の通り。

transfer/transfer-00-prerequisites/resources/configuration/consumer-configuration.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
edc.participant.id=consumer
edc.dsp.callback.address=http://localhost:29194/protocol
web.http.port=29191
web.http.path=/api
web.http.management.port=29193
web.http.management.path=/management
web.http.protocol.port=29194
web.http.protocol.path=/protocol
edc.transfer.proxy.token.signer.privatekey.alias=private-key
edc.transfer.proxy.token.verifier.publickey.alias=public-key
web.http.public.port=29291
web.http.public.path=/public
web.http.control.port=29192
web.http.control.path=/control

transfer/transfer-00-prerequisites/resources/configuration/provider-configuration.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
edc.participant.id=provider
edc.dsp.callback.address=http://localhost:19194/protocol
web.http.port=19191
web.http.path=/api
web.http.management.port=19193
web.http.management.path=/management
web.http.protocol.port=19194
web.http.protocol.path=/protocol
edc.transfer.proxy.token.signer.privatekey.alias=private-key
edc.transfer.proxy.token.verifier.publickey.alias=public-key
web.http.public.port=19291
web.http.public.path=/public
web.http.control.port=19192
web.http.control.path=/control
edc.dataplane.api.public.baseurl=http://localhost:19291/public

用いるポートを変えているのと、プロバイダ側にはデータプレーンのプロパティがあることがわかる。

プロバイダ実行:

1
java -Dedc.keystore=transfer/transfer-00-prerequisites/resources/certs/cert.pfx -Dedc.keystore.password=123456 -Dedc.fs.config=transfer/transfer-00-prerequisites/resources/configuration/provider-configuration.properties -jar transfer/transfer-00-prerequisites/connector/build/libs/connector.jar

キーやキーストアの設定、プロパティファイルのPATHを渡すのみ。

コンシューマ起動:

1
java -Dedc.keystore=transfer/transfer-00-prerequisites/resources/certs/cert.pfx -Dedc.keystore.password=123456 -Dedc.fs.config=transfer/transfer-00-prerequisites/resources/configuration/consumer-configuration.properties -jar transfer/transfer-00-prerequisites/connector/build/libs/connector.jar

プロバイダと同等。

Transfer/Sample1(コントラクト・ネゴシエーション)

EDC/Samples/Transfer/Transfer1/README の通り以下のステップで進める。

  • プロバイダのアセット(共有される対象のデータ)を作成
  • プロバイダでアクセスポリシーを作成
  • プロバイダでコントラクト定義を作成
  • その後、コンシューマからコントラクト・ネゴシエーション

まずプロバイダでアセットを作成。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/create-asset.json \
-H 'content-type: application/json' http://localhost:19193/management/v3/assets \
-s | jq

curlコマンドの-dataオプションに @ でファイルを渡しをしている。中身は以下の通り。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
},
"@id": "assetId",
"properties": {
"name": "product description",
"contenttype": "application/json"
},
"dataAddress": {
"type": "HttpData",
"name": "Test asset",
"baseUrl": "https://jsonplaceholder.typicode.com/users",
"proxyPath": "true"
}
}

ポリシ定義。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/create-policy.json \
-H 'content-type: application/json' http://localhost:19193/management/v3/policydefinitions \
-s | jq

渡しているファイルの中身は以下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
},
"@id": "aPolicy",
"policy": {
"@context": "http://www.w3.org/ns/odrl.jsonld",
"@type": "Set",
"permission": [],
"prohibition": [],
"obligation": []
}
}

ポリシの中身は空に見える。 ちなみにレスポンスは以下のような感じ。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "aPolicy",
"createdAt": 1724662296875,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

コントラクト定義を作成。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/create-contract-definition.json \
-H 'content-type: application/json' http://localhost:19193/management/v3/contractdefinitions \
-s | jq

渡しているファイルの中身は以下。

1
2
3
4
5
6
7
8
9
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
},
"@id": "1",
"accessPolicyId": "aPolicy",
"contractPolicyId": "aPolicy",
"assetsSelector": []
}

アクセスポリシやコントラクトポリシのIDを渡している。

コンシューマからカタログ情報を取得する。

1
2
3
curl -X POST "http://localhost:29193/management/v3/catalog/request" \
-H 'Content-Type: application/json' \
-d @transfer/transfer-01-negotiation/resources/fetch-catalog.json -s | jq

得られるカタログ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
"@id": "4ce1bc3d-ce1e-4308-b03e-b6596958a69d",
"@type": "dcat:Catalog",
"dcat:dataset": {
"@id": "assetId",
"@type": "dcat:Dataset",
"odrl:hasPolicy": {
"@id": "MQ==:YXNzZXRJZA==:Y2ZmODc1NmYtYTRjMC00NzMxLWJlNTItM2M2ZTVlMGI2YzA3",
"@type": "odrl:Offer",
"odrl:permission": [],
"odrl:prohibition": [],
"odrl:obligation": []
},
"dcat:distribution": [
{
"@type": "dcat:Distribution",
"dct:format": {
"@id": "HttpData-PULL"
},
"dcat:accessService": {
"@id": "bb73ab52-bd72-448d-9629-8a21624b0577",
"@type": "dcat:DataService",
"dcat:endpointDescription": "dspace:connector",
"dcat:endpointUrl": "http://localhost:19194/protocol",
"dct:terms": "dspace:connector",
"dct:endpointUrl": "http://localhost:19194/protocol"
}
},
{
"@type": "dcat:Distribution",
"dct:format": {
"@id": "HttpData-PUSH"
},
"dcat:accessService": {
"@id": "bb73ab52-bd72-448d-9629-8a21624b0577",
"@type": "dcat:DataService",
"dcat:endpointDescription": "dspace:connector",
"dcat:endpointUrl": "http://localhost:19194/protocol",
"dct:terms": "dspace:connector",
"dct:endpointUrl": "http://localhost:19194/protocol"
}
}
],
"name": "product description",
"id": "assetId",
"contenttype": "application/json"
},
"dcat:distribution": [],
"dcat:service": {
"@id": "bb73ab52-bd72-448d-9629-8a21624b0577",
"@type": "dcat:DataService",
"dcat:endpointDescription": "dspace:connector",
"dcat:endpointUrl": "http://localhost:19194/protocol",
"dct:terms": "dspace:connector",
"dct:endpointUrl": "http://localhost:19194/protocol"
},
"dspace:participantId": "provider",
"participantId": "provider",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"dcat": "http://www.w3.org/ns/dcat#",
"dct": "http://purl.org/dc/terms/",
"odrl": "http://www.w3.org/ns/odrl/2/",
"dspace": "https://w3id.org/dspace/v0.8/"
}
}

上記の通り、DCATで定義されている。ODRLで定義されたポリシのIDが示される、など。 他にもHttpDataでデータ共有されることや、サービス情報が載っている。

ここからコンシューマからプロバイダに向けてコントラクト・ネゴシエーションを行う。大まかな流れは以下。

  • コンシューマがコントラクトオファを送る。
  • プロバイダが自身のオファと照らし合わせ、届いたオファを検証する。
  • プロバイダがアグリメントか、リジェクションを送る。
  • 検証が成功していれば、アグリメントを保存する。

negotiate-contract.json のうち、NaN となっている箇所を、先程のカタログ情報から拾って埋める。 具体的には、以下のIDで埋める。

1
2
"odrl:hasPolicy": {
"@id": "MQ==:YXNzZXRJZA==:Y2ZmODc1NmYtYTRjMC00NzMxLWJlNTItM2M2ZTVlMGI2YzA3",

コントラクト・ネゴシエーション実行。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/negotiate-contract.json \
-X POST -H 'content-type: application/json' http://localhost:29193/management/v3/contractnegotiations \
-s | jq

以下のようなレスポンスが得られる。コントラクト・ネゴシエーション中のIDである。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "4515e87b-5bd0-4c19-aa92-1968b65005e5",
"createdAt": 1724664162792,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

コントラクト・ネゴシエーションはプロバイダとコンシューマのそれぞれで非同期的に進む。 最終的に両者がconfirmedかdeclinedになったら終了。

コンシューマでの状態確認。先程得られたIDを使う。

1
2
3
curl -X GET "http://localhost:29193/management/v3/contractnegotiations/4515e87b-5bd0-4c19-aa92-1968b65005e5" \
--header 'Content-Type: application/json' \
-s | jq

以下のように、Finalized担ったことがわかる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@type": "ContractNegotiation",
"@id": "4515e87b-5bd0-4c19-aa92-1968b65005e5",
"type": "CONSUMER",
"protocol": "dataspace-protocol-http",
"state": "FINALIZED",
"counterPartyId": "provider",
"counterPartyAddress": "http://localhost:19194/protocol",
"callbackAddresses": [],
"createdAt": 1724664162792,
"contractAgreementId": "74ed3714-3380-4888-93f9-71b948d09553",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

Transfer/Sample2(コンシューマPull)

EDC/Samples/Transfer/Transfer2/README に記載の通り、以下のステップで進める。

  • コンシューマからファイル転送(の手続き)を始める。
  • プロバイダが EndpointDataReference をコンシューマに送る。
  • コンシューマがエンドポイントを使ってデータをフェッチする。

転送開始。 前の章で得られた、コントラクト・アグリメントのIDを transfer/transfer-02-consumer-pull/resources/start-transfer.json に埋め込む。

1
2
3
4
curl -X POST "http://localhost:29193/management/v3/transferprocesses" \
-H "Content-Type: application/json" \
-d @transfer/transfer-02-consumer-pull/resources/start-transfer.json \
-s | jq

結果は以下の通り。 TransferProcess が生成された。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "0778ef5a-0999-42fb-ad16-6572ca1c04d6",
"createdAt": 1724665640700,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

状態を確認する。先程得られたTransferProcessのIDを利用。

1
2
curl http://localhost:29193/management/v3/transferprocesses/0778ef5a-0999-42fb-ad16-6572ca1c04d6 \
-s | jq

結果の例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@id": "0778ef5a-0999-42fb-ad16-6572ca1c04d6",
"@type": "TransferProcess",
"state": "STARTED",
"stateTimestamp": 1724665641835,
"type": "CONSUMER",
"callbackAddresses": [],
"correlationId": "f0837ef5-999c-41dc-89b9-74d9337ecb5f",
"assetId": "assetId",
"contractId": "74ed3714-3380-4888-93f9-71b948d09553",
"transferType": "HttpData-PULL",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

statestarted 担っていることがわかる。

DTRがコンシューマに送られているはず、さらにキャッシュされているはずなので、それを取得する。

1
2
curl http://localhost:29193/management/v3/edrs/0778ef5a-0999-42fb-ad16-6572ca1c04d6/dataaddress \
-s | jq

結果の例。

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"@type": "DataAddress",
"type": "https://w3id.org/idsa/v4.1/HTTP",
"endpoint": "http://localhost:19291/public",
"authType": "bearer",
"endpointType": "https://w3id.org/idsa/v4.1/HTTP",
"authorization": "eyJraWQiOiJwdWJsaWMta2V5IiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJwcm92aWRlciIsImF1ZCI6ImNvbnN1bWVyIiwic3ViIjoicHJvdmlkZXIiLCJpYXQiOjE3MjQ2NjU2NDE3NzEsImp0aSI6ImJjNmQ1YzIzLTExMWUtNGZkZi1hMjMwLTJkZjk3NmE3NTFlOSJ9.dYBBx2z2XCjg1TgbEPiDgb2KYdUlNMd7702P4t0NgwYKhDemAKF64qGpLsU63huQ2WMb9Co4sC1euopyY-48F3UvfjK0ulKODlrfVGO-7xvSNs4qX2HVC9JyLGGbdks0wQOkv9oA7AcKxD11yTjcfbtLc-DUoOF4w4RTpI2MATSa-ETvKo_22FxMrIHgnsLOCHtjVLazJchFm4bAhVa7mRfHykkTpIIEaPuqOJpdtKcX1YUAZloFaI1ZinfXNtHjvollVC9Mjb4H12Gh1B7tLxgZ0AbP0izFeOHnQleQ09ZThajwF-xpnPQ5P5fsNiFqthW2A7Gu5-PLGT6tp2lYIg",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

上記の通り、エンドポイントURLと認証キーがわかる。 データを取得する。

1
2
3
curl --location --request GET 'http://localhost:19291/public/' \
--header 'Authorization: eyJraWQiOiJwdWJsaWMta2V5IiwiYWxnIjoiUlMyNTYifQ.eyJpc3MiOiJwcm92aWRlciIsImF1ZCI6ImNvbnN1bWVyIiwic3ViIjoicHJvdmlkZXIiLCJpYXQiOjE3MjQ2NjU2NDE3NzEsImp0aSI6ImJjNmQ1YzIzLTExMWUtNGZkZi1hMjMwLTJkZjk3NmE3NTFlOSJ9.dYBBx2z2XCjg1TgbEPiDgb2KYdUlNMd7702P4t0NgwYKhDemAKF64qGpLsU63huQ2WMb9Co4sC1euopyY-48F3UvfjK0ulKODlrfVGO-7xvSNs4qX2HVC9JyLGGbdks0wQOkv9oA7AcKxD11yTjcfbtLc-DUoOF4w4RTpI2MATSa-ETvKo_22FxMrIHgnsLOCHtjVLazJchFm4bAhVa7mRfHykkTpIIEaPuqOJpdtKcX1YUAZloFaI1ZinfXNtHjvollVC9Mjb4H12Gh1B7tLxgZ0AbP0izFeOHnQleQ09ZThajwF-xpnPQ5P5fsNiFqthW2A7Gu5-PLGT6tp2lYIg' \
-s | jq

以下のような結果が得られる。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[                                                               
{
"id": 1,
"name": "Leanne Graham",
"username": "Bret",
"email": "Sincere@april.biz",
"address": {
"street": "Kulas Light",
"suite": "Apt. 556",
"city": "Gwenborough",
"zipcode": "92998-3874",
"geo": {
"lat": "-37.3159",
"lng": "81.1496"
}
},
(snip)

https://jsonplaceholder.typicode.com/users にあるデータをコネクタのコントラクトを通じて得られた。

Pullするときに、以下のようにするとユーザIDを指定できる。

http://localhost:19291/public/1

Transfer/Sample3(プロバイダPush)

EDC/Samples/Transfer/Transfer3/README に記載の通り、今度はプロバイダからPushする例である。 大まかな流れは以下の通り。

  • ファイル転送
    • コンシューマからファイル転送(の手続き)を始める。
    • プロバイダのコントロールプレーンが実際のデータのアドレスを取得し、DataRequest をもとに DataFlowRequest を作成する。
  • プロバイダデータプレーンが実際のデータソースからデータを取得。
  • プロバイダデータプレーンがコンシューマサービスにデータを送る。

まずコンシューマ側のバックエンドのロガーを起動する。これがPush先になると見られる。今回はDockerで起動する。

1
2
docker build -t http-request-logger util/http-request-logger
docker run -p 4000:4000 http-request-logger

続いて以下のファイルを編集した上でTransferProcessを開始する。 transfer/transfer-03-provider-push/resources/start-transfer.json なお、変更するのはコントラクト・アグリメントIDである。上記のケースでは、 74ed3714-3380-4888-93f9-71b948d09553 である。

1
2
3
4
curl -X POST "http://localhost:29193/management/v3/transferprocesses" \
-H "Content-Type: application/json" \
-d @transfer/transfer-03-provider-push/resources/start-transfer.json \
-s | jq

なお、 start-transfer.json で指定している dataDestination にHttpProxy以外のものを指定しているのがポイントのようだ。

transfer/transfer-03-provider-push/resources/start-transfer.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
},
"@type": "TransferRequestDto",
"connectorId": "provider",
"counterPartyAddress": "http://localhost:19194/protocol",
"contractId": "74ed3714-3380-4888-93f9-71b948d09553",
"assetId": "assetId",
"protocol": "dataspace-protocol-http",
"transferType": "HttpData-PUSH",
"dataDestination": {
"type": "HttpData",
"baseUrl": "http://localhost:4000/api/consumer/store"
}
}

結果の例。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "105477c9-dbff-4577-adb5-2e860f9be585",
"createdAt": 1724678897615,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

結果確認。

1
2
curl http://localhost:29193/management/v3/transferprocesses/105477c9-dbff-4577-adb5-2e860f9be585 \
-s | jq

結果の例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  "@id": "105477c9-dbff-4577-adb5-2e860f9be585",
"@type": "TransferProcess",
"state": "COMPLETED",
"stateTimestamp": 1724678900583,
"type": "CONSUMER",
"callbackAddresses": [],
"correlationId": "657b63b0-920d-412c-9c09-edb48057e717",
"assetId": "assetId",
"contractId": "74ed3714-3380-4888-93f9-71b948d09553",
"transferType": "HttpData-PUSH",
"dataDestination": {
"@type": "DataAddress",
"type": "HttpData",
"baseUrl": "http://localhost:4000/api/consumer/store"
},
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

Completed である。

ロガーのコンソールに以下の表示。

1
2
3
4
5
6
7
8
9
10
11
12
13
Incoming request                                                                         
Method: POST
Path: /api/consumer/store
Body:
[
{
"id": 1,
"name": "Leanne Graham",
"username": "Bret",
"email": "Sincere@april.biz",
"address": {
"street": "Kulas Light",
(snip)

これ、Push先のエンドポイントはどうやって知るのか。カタログ?

Transfer/Sample4

EDC/Samples/Transfer/Transfer4/README の通り、コンシューマで転送完了に反応する機能を追加する。

モジュール構成は以下。

  • consumer-with-listener: コンシューマ(コネクタ)のイベント・コンシューマ拡張
  • listener: イベントを処理する TransferProcessListener の実装

ドキュメントでは、 TransferProcessListener の実装として org.eclipse.edc.sample.extension.listener.TransferProcessStartedListener を紹介。

エントリポイントは org.eclipse.edc.sample.extension.listener.TransferProcessStartedListenerExtension である。

org/eclipse/edc/sample/extension/listener/TransferProcessStartedListenerExtension.java:21

1
2
3
4
5
6
7
8
9
10
public class TransferProcessStartedListenerExtension implements ServiceExtension {

@Override
public void initialize(ServiceExtensionContext context) {
var transferProcessObservable = context.getService(TransferProcessObservable.class);
var monitor = context.getMonitor();
transferProcessObservable.registerListener(new TransferProcessStartedListener(monitor));
}

}

上記の通り、 org.eclipse.edc.sample.extension.listener.TransferProcessStartedListener をリスナとして登録している。

ポイントは以下。

org/eclipse/edc/sample/extension/listener/TransferProcessStartedListener.java:34

1
2
3
4
5
@Override
public void preStarted(final TransferProcess process) {
monitor.debug("TransferProcessStartedListener received STARTED event");
// do something meaningful before transfer start
}

START イベントを受け取ったときに、ログに出力する。

このサンプルのビルドと実行をするのだが、その前に前の章まで使用していたコンシューマを止める。

その上で、以下を実行して、Sample4のコンシューマを起動する。

1
2
./gradlew transfer:transfer-04-event-consumer:consumer-with-listener:build
java -Dedc.keystore=transfer/transfer-00-prerequisites/resources/certs/cert.pfx -Dedc.keystore.password=123456 -Dedc.fs.config=transfer/transfer-00-prerequisites/resources/configuration/consumer-configuration.properties -jar transfer/transfer-04-event-consumer/consumer-with-listener/build/libs/connector.jar

続いて、新しいコントラクト・ネゴシエーション。

1
2
3
curl -d @transfer/transfer-01-negotiation/resources/negotiate-contract.json \
-X POST -H 'content-type: application/json' http://localhost:29193/management/v3/contractnegotiations \
-s | jq

結果例。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "fe8c4f16-283c-4d74-a440-715e8371d228",
"createdAt": 1724766448685,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

先の例と同様にコントラクト・アグリメントIDを取得する。 直前の結果から、コントラクト・ネゴシエーションIDを抜き出して下記のコマンドに入れるのを忘れずに。

1
2
3
curl -X GET "http://localhost:29193/management/v3/contractnegotiations/fe8c4f16-283c-4d74-a440-715e8371d228" \
--header 'Content-Type: application/json' \
-s | jq

結果例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"@type": "ContractNegotiation",
"@id": "fe8c4f16-283c-4d74-a440-715e8371d228",
"type": "CONSUMER",
"protocol": "dataspace-protocol-http",
"state": "FINALIZED",
"counterPartyId": "provider",
"counterPartyAddress": "http://localhost:19194/protocol",
"callbackAddresses": [],
"createdAt": 1724766448685,
"contractAgreementId": "8e96f6c5-cb0e-4f06-beb0-ac553ddb160e",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

コントラクト・アグリメントIDは 8e96f6c5-cb0e-4f06-beb0-ac553ddb160e である。

つづいて、 transfer/transfer-02-consumer-pull/resources/start-transfer.json の今コントラクト・アグリメントIDを書き換えつつ、ファイル転送を開始する。 該当箇所は以下。

1
"contractId": "8e96f6c5-cb0e-4f06-beb0-ac553ddb160e",

実行コマンドは以下。

1
2
3
4
curl -X POST "http://localhost:29193/management/v3/transferprocesses" \
-H "Content-Type: application/json" \
-d @transfer/transfer-02-consumer-pull/resources/start-transfer.json \
-s | jq

実行結果例。

1
2
3
4
5
6
7
8
9
10
{
"@type": "IdResponse",
"@id": "b4dff4b1-dfb2-470e-a765-7bd549a1e2b6",
"createdAt": 1724766768403,
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
}
}

先ほど起動したコンシューマのコンソールログを見ると

1
DEBUG 2024-08-27T22:52:49.930105146 TransferProcessStartedListener received STARTED event

のようなメッセージが見つかるはず。

Transfer/Sample5

(このサンプルはAzureとAWSの環境が必要なのと手違いの被害が大きいことからメモだけとする)

EDC/Samples/Transfer/Transfer5/README に記載の通り、これまで実行したサンプルに対し、通常使うであろう機能を足していく。 この例では、具体的には「Azureのストレージから読んで、AWSのストレージに書く」というのを実現する。

なお、環境構成には、Terraformが使われている。 transfer/transfer-05-file-transfer-cloud/terraform にもろもろ格納されている。 中身を見ると、割といろいろとセットアップしているのがわかる。念の為、潰しても良いクリーンな環境で試すのが良い。 一応6章にて環境をクリーンナップする手順が記載されている。

READMEにある通り、クライアントIDなどはVault内に保持するようになっている。 依存関係にもVaultがある。

transfer/transfer-05-file-transfer-cloud/cloud-transfer-consumer/build.gradle.kts:31

1
2
3
(snip)
implementation(libs.edc.vault.azure)
(snip)

今サンプルのメインの一つは、 org.eclipse.edc.sample.extension.transfer.CloudTransferExtension である。

org.eclipse.edc.sample.extension.transfer.CloudTransferExtension#registerDataEntries メソッドがアセットを定義する。

1
2
3
4
5
6
7
8
9
10
public void registerDataEntries() {
var dataAddress = DataAddress.Builder.newInstance()
.type("AzureStorage")
.property("account", "<storage-account-name>")
.property("container", "src-container")
.property("blobname", "test-document.txt")
.keyName("<storage-account-name>-key1")
.build();
var asset = Asset.Builder.newInstance().id("1").dataAddress(dataAddress).build();
assetIndex.create(asset);

なお、org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStoreorg.eclipse.edc.connector.controlplane.contract.spi.offer.store.ContractDefinitionStore があり、それぞれポリシとコントラクトを保持する。今回のサンプルではインメモリで保持する実装を利用。

このあとはいつもどおり、コントラクト・ネゴシエーションしてコントラクト・アグリメントIDを取得し、実際のデータ転送。 ほぼこれまで通りだが、dataDestinationでAWS S3を指定しているところがポイント。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
curl --location --request POST 'http://localhost:9192/management/v3/transferprocesses' \
--header 'X-API-Key: password' \
--header 'Content-Type: application/json' \
--data-raw '
{
"counterPartyAddress": "http://localhost:8282/protocol",
"protocol": "dataspace-protocol-http",
"connectorId": "consumer",
"assetId": "1",
"contractId": "<ContractAgreementId>",
"dataDestination": {
"type": "AmazonS3",
"region": "us-east-1",
"bucketName": "<Unique bucket name>"
},
"transferType": {
"contentType": "application/octet-stream",
"isFinite": true
}
}'

参考

レポジトリ内

EDCレポジトリ

外部

共有

Dataspace Protocol of EDC

メモ

注意:このメモはかなり初期のものなので、今となっては怪しい内容が含まれてる・・・。

EDCは現在IDSが提唱する、Dataspace Protocolにしたがって、コネクタ間でやりとりする。

DSP Data Planeの実装を確認する

data-protocols/dspdsp)以下に、Dataspace Protocolに対応したモジュールが含まれている。

例えば、org.eclipse.edc.protocol.dsp.dispatcher.PostDspHttpRequestFactoryorg.eclipse.edc.protocol.dsp.dispatcher.GetDspHttpRequestFactoryなどのファクトリが定義されている。 これは、前述のPOST、GETオペレーションに対応するリクエストを生成するためのファクトリである。

以下は、カタログのリクエストを送るための実装である。

org/eclipse/edc/protocol/dsp/catalog/dispatcher/DspCatalogHttpDispatcherExtension.java:54

1
2
3
4
5
6
7
8
9
10
11
12
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerMessage(
CatalogRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + CATALOG_REQUEST),
new CatalogRequestHttpRawDelegate()
);
messageDispatcher.registerMessage(
DatasetRequestMessage.class,
new GetDspHttpRequestFactory<>(m -> BASE_PATH + DATASET_REQUEST + "/" + m.getDatasetId()),
new DatasetRequestHttpRawDelegate()
);
}

他にも、org.eclipse.edc.protocol.dsp.transferprocess.dispatcher.DspTransferProcessDispatcherExtensionなどが挙げられる。 これは以下のように、org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessageが含まれており、ConsumerがProviderにデータ転送プロセスをリクエストする際のメッセージのディスパッチャが登録されていることがわかる。

org/eclipse/edc/protocol/dsp/transferprocess/dispatcher/DspTransferProcessDispatcherExtension.java:60

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerMessage(
TransferRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + TRANSFER_INITIAL_REQUEST),
new TransferRequestDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferCompletionMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_COMPLETION),
new TransferCompletionDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferStartMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_START),
new TransferStartDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferTerminationMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_TERMINATION),
new TransferTerminationDelegate(remoteMessageSerializer)
);
}

◆参考情報はじめ

このファクトリは、ディスパッチャの org.eclipse.edc.protocol.dsp.dispatcher.DspHttpRemoteMessageDispatcherImpl#dispatch メソッドから、間接的に呼び出されて利用される。 このメソッドはorg.eclipse.edc.spi.message.RemoteMessageDispatcher#dispatchメソッドを実装したものである。ディスパッチャとして、リモートへ送信するメッセージ生成をディスパッチするための。メソッドである。 さらに、これは org.eclipse.edc.connector.core.base.RemoteMessageDispatcherRegistryImpl 内で使われている。ディスパッチャのレジストリ内で、ディスパッチ処理が起動、管理されるようだ。 なお、これはorg.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry#dispatch を実装したものである。このメソッドは、色々なところから呼び出される。

例えば、TransferCoreExtensionクラスではサービス起動時に、転送プロセスを管理するorg.eclipse.edc.connector.transfer.process.TransferProcessManagerImplを起動する。

org/eclipse/edc/connector/transfer/TransferCoreExtension.java:205

1
2
3
4
@Override
public void start() {
processManager.start();
}

これにより、以下のようにステートマシンがビルド、起動され、各プロセッサが登録される。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:143

1
2
3
4
5
6
7
8
9
10
11
12
stateMachineManager = StateMachineManager.Builder.newInstance("transfer-process", monitor, executorInstrumentation, waitStrategy)
.processor(processTransfersInState(INITIAL, this::processInitial))
.processor(processTransfersInState(PROVISIONING, this::processProvisioning))
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processTransfersInState(REQUESTING, this::processRequesting))
.processor(processTransfersInState(STARTING, this::processStarting))
.processor(processTransfersInState(STARTED, this::processStarted))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning))
.build();
stateMachineManager.start();

上記のプロセッサとして登録されているorg.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#processStartingの中では org.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#sendTransferStartMessage が呼び出されている。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:376

1
2
3
4
5
6
return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.initiate(process.getDataRequest(), contentAddress, policy))
.onSuccess((p, dataFlowResponse) -> sendTransferStartMessage(p, dataFlowResponse, policy))
.onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.onFailure((t, failure) -> transitionToStarting(t))
.onRetryExhausted((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.execute(description);

org.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#sendTransferStartMessage メソッド内では、 org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessageのメッセージがビルドされ、 ディスパッチャにメッセージとして渡される。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:386

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var message = TransferStartMessage.Builder.newInstance()
.processId(process.getCorrelationId())
.protocol(process.getProtocol())
.dataAddress(dataFlowResponse.getDataAddress())
.counterPartyAddress(process.getConnectorAddress())
.policy(policy)
.build();

var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());

entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> transferProcessStore.findById(id))
.onSuccess((t, content) -> transitionToStarted(t))
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute(description);

◆参考情報おわり

ということで、org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcherというディスパッチャは、Dataspace Protocolに基づくリモートメッセージを生成する際に用いられるディスパッチャである。

おまけ)古い(?)Data Planeの実装を確認する(HTTPの例)

Dataspace Protocol以前の実装か?

extensions/data-plane 以下にData Planeの実装が拡張として含まれている。

例えば、 extensions/data-plane/data-plane-http には、HTTPを用いてデータ共有するための拡張の実装が含まれている。 当該拡張のREADMEの通り、 (transfer APIの)DataFlowRequestHttpDataだった場合に、

  • HttpDataSourceFactory
  • HttpDataSinkFactory
  • HttpDataSource
  • HttpDataSink

の実装が用いられる。パラメータもREADMEに(data-plane-httpのデザイン指針)記載されている。 基本的には、バックエンドがHTTPなのでそれにアクセスするためのパラメータが定義されている。

当該ファクトリは、 org.eclipse.edc.connector.dataplane.http.DataPlaneHttpExtension#initialize 内で用いられている。

org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java:75

1
2
3
4
5
6
7
var httpRequestFactory = new HttpRequestFactory();

var sourceFactory = new HttpDataSourceFactory(httpClient, paramsProvider, monitor, httpRequestFactory);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor, paramsProvider, httpRequestFactory);
pipelineService.registerFactory(sinkFactory);

ここでは、試しにData Source側を確認してみる。

org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java:63

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public DataSource createSource(DataFlowRequest request) {
var dataAddress = HttpDataAddress.Builder.newInstance()
.copyFrom(request.getSourceDataAddress())
.build();
return HttpDataSource.Builder.newInstance()
.httpClient(httpClient)
.monitor(monitor)
.requestId(request.getId())
.name(dataAddress.getName())
.params(requestParamsProvider.provideSourceParams(request))
.requestFactory(requestFactory)
.build();
}

上記の通り、まずデータのアドレスを格納するインスタンスが生成され、 つづいて、HTTPのデータソースがビルドされる。

HTTPのData Sourceの実体は org.eclipse.edc.connector.dataplane.http.pipeline.HttpDataSource である。 このクラスはSPIの org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceインタフェースを実装したものである。

org.eclipse.edc.connector.dataplane.http.pipeline.HttpDataSource#openPartStream がオーバライドされて実装されている。 詳しくは、openPartStream参照。

参考

ドキュメント

ソースコード

共有