Dataspace Protocol of EDC

メモ

EDCは現在IDSが提唱する、Dataspace Protocolにしたがって、コネクタ間でやりとりする。

DSP Data Planeの実装を確認する

data-protocols/dspdsp)以下に、Dataspace Protocolに対応したモジュールが含まれている。

例えば、org.eclipse.edc.protocol.dsp.dispatcher.PostDspHttpRequestFactoryorg.eclipse.edc.protocol.dsp.dispatcher.GetDspHttpRequestFactoryなどのファクトリが定義されている。 これは、前述のPOST、GETオペレーションに対応するリクエストを生成するためのファクトリである。

以下は、カタログのリクエストを送るための実装である。

org/eclipse/edc/protocol/dsp/catalog/dispatcher/DspCatalogHttpDispatcherExtension.java:54

1
2
3
4
5
6
7
8
9
10
11
12
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerMessage(
CatalogRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + CATALOG_REQUEST),
new CatalogRequestHttpRawDelegate()
);
messageDispatcher.registerMessage(
DatasetRequestMessage.class,
new GetDspHttpRequestFactory<>(m -> BASE_PATH + DATASET_REQUEST + "/" + m.getDatasetId()),
new DatasetRequestHttpRawDelegate()
);
}

他にも、org.eclipse.edc.protocol.dsp.transferprocess.dispatcher.DspTransferProcessDispatcherExtensionなどが挙げられる。 これは以下のように、org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessageが含まれており、ConsumerがProviderにデータ転送プロセスをリクエストする際のメッセージのディスパッチャが登録されていることがわかる。

org/eclipse/edc/protocol/dsp/transferprocess/dispatcher/DspTransferProcessDispatcherExtension.java:60

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void initialize(ServiceExtensionContext context) {
messageDispatcher.registerMessage(
TransferRequestMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + TRANSFER_INITIAL_REQUEST),
new TransferRequestDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferCompletionMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_COMPLETION),
new TransferCompletionDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferStartMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_START),
new TransferStartDelegate(remoteMessageSerializer)
);
messageDispatcher.registerMessage(
TransferTerminationMessage.class,
new PostDspHttpRequestFactory<>(remoteMessageSerializer, m -> BASE_PATH + m.getProcessId() + TRANSFER_TERMINATION),
new TransferTerminationDelegate(remoteMessageSerializer)
);
}

◆参考情報はじめ

このファクトリは、ディスパッチャの org.eclipse.edc.protocol.dsp.dispatcher.DspHttpRemoteMessageDispatcherImpl#dispatch メソッドから、間接的に呼び出されて利用される。 このメソッドはorg.eclipse.edc.spi.message.RemoteMessageDispatcher#dispatchメソッドを実装したものである。ディスパッチャとして、リモートへ送信するメッセージ生成をディスパッチするための。メソッドである。 さらに、これは org.eclipse.edc.connector.core.base.RemoteMessageDispatcherRegistryImpl 内で使われている。ディスパッチャのレジストリ内で、ディスパッチ処理が起動、管理されるようだ。 なお、これはorg.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry#dispatch を実装したものである。このメソッドは、色々なところから呼び出される。

例えば、TransferCoreExtensionクラスではサービス起動時に、転送プロセスを管理するorg.eclipse.edc.connector.transfer.process.TransferProcessManagerImplを起動する。

org/eclipse/edc/connector/transfer/TransferCoreExtension.java:205

1
2
3
4
@Override
public void start() {
processManager.start();
}

これにより、以下のようにステートマシンがビルド、起動され、各プロセッサが登録される。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:143

1
2
3
4
5
6
7
8
9
10
11
12
stateMachineManager = StateMachineManager.Builder.newInstance("transfer-process", monitor, executorInstrumentation, waitStrategy)
.processor(processTransfersInState(INITIAL, this::processInitial))
.processor(processTransfersInState(PROVISIONING, this::processProvisioning))
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processTransfersInState(REQUESTING, this::processRequesting))
.processor(processTransfersInState(STARTING, this::processStarting))
.processor(processTransfersInState(STARTED, this::processStarted))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning))
.build();
stateMachineManager.start();

上記のプロセッサとして登録されているorg.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#processStartingの中では org.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#sendTransferStartMessage が呼び出されている。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:376

1
2
3
4
5
6
return entityRetryProcessFactory.doSyncProcess(process, () -> dataFlowManager.initiate(process.getDataRequest(), contentAddress, policy))
.onSuccess((p, dataFlowResponse) -> sendTransferStartMessage(p, dataFlowResponse, policy))
.onFatalError((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.onFailure((t, failure) -> transitionToStarting(t))
.onRetryExhausted((p, failure) -> transitionToTerminating(p, failure.getFailureDetail()))
.execute(description);

org.eclipse.edc.connector.transfer.process.TransferProcessManagerImpl#sendTransferStartMessage メソッド内では、 org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessageのメッセージがビルドされ、 ディスパッチャにメッセージとして渡される。

org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java:386

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var message = TransferStartMessage.Builder.newInstance()
.processId(process.getCorrelationId())
.protocol(process.getProtocol())
.dataAddress(dataFlowResponse.getDataAddress())
.counterPartyAddress(process.getConnectorAddress())
.policy(policy)
.build();

var description = format("Send %s to %s", message.getClass().getSimpleName(), process.getConnectorAddress());

entityRetryProcessFactory.doAsyncStatusResultProcess(process, () -> dispatcherRegistry.dispatch(Object.class, message))
.entityRetrieve(id -> transferProcessStore.findById(id))
.onSuccess((t, content) -> transitionToStarted(t))
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute(description);

◆参考情報おわり

ということで、org.eclipse.edc.protocol.dsp.spi.dispatcher.DspHttpRemoteMessageDispatcherというディスパッチャは、Dataspace Protocolに基づくリモートメッセージを生成する際に用いられるディスパッチャである。

おまけ)古い(?)Data Planeの実装を確認する(HTTPの例)

Dataspace Protocol以前の実装か?

extensions/data-plane 以下にData Planeの実装が拡張として含まれている。

例えば、 extensions/data-plane/data-plane-http には、HTTPを用いてデータ共有するための拡張の実装が含まれている。 当該拡張のREADMEの通り、 (transfer APIの)DataFlowRequestHttpDataだった場合に、

  • HttpDataSourceFactory
  • HttpDataSinkFactory
  • HttpDataSource
  • HttpDataSink

の実装が用いられる。パラメータもREADMEに(data-plane-httpのデザイン指針)記載されている。 基本的には、バックエンドがHTTPなのでそれにアクセスするためのパラメータが定義されている。

当該ファクトリは、 org.eclipse.edc.connector.dataplane.http.DataPlaneHttpExtension#initialize 内で用いられている。

org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java:75

1
2
3
4
5
6
7
var httpRequestFactory = new HttpRequestFactory();

var sourceFactory = new HttpDataSourceFactory(httpClient, paramsProvider, monitor, httpRequestFactory);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor, paramsProvider, httpRequestFactory);
pipelineService.registerFactory(sinkFactory);

ここでは、試しにData Source側を確認してみる。

org/eclipse/edc/connector/dataplane/http/pipeline/HttpDataSourceFactory.java:63

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public DataSource createSource(DataFlowRequest request) {
var dataAddress = HttpDataAddress.Builder.newInstance()
.copyFrom(request.getSourceDataAddress())
.build();
return HttpDataSource.Builder.newInstance()
.httpClient(httpClient)
.monitor(monitor)
.requestId(request.getId())
.name(dataAddress.getName())
.params(requestParamsProvider.provideSourceParams(request))
.requestFactory(requestFactory)
.build();
}

上記の通り、まずデータのアドレスを格納するインスタンスが生成され、 つづいて、HTTPのデータソースがビルドされる。

HTTPのData Sourceの実体は org.eclipse.edc.connector.dataplane.http.pipeline.HttpDataSource である。 このクラスはSPIの org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceインタフェースを実装したものである。

org.eclipse.edc.connector.dataplane.http.pipeline.HttpDataSource#openPartStream がオーバライドされて実装されている。 詳しくは、openPartStream参照。

参考

ドキュメント

ソースコード

共有