Skip to content

Commit

Permalink
feat(dsp): catalog pagination through continuation token and Link hea…
Browse files Browse the repository at this point in the history
…der (#4103)

feat(dsp): catalog pagination through Link header
  • Loading branch information
ndr-brt authored Apr 15, 2024
1 parent b303638 commit 949a7ed
Show file tree
Hide file tree
Showing 21 changed files with 824 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {

testImplementation(testFixtures(project(":extensions:common:http:jersey-core")))
testImplementation(project(":core:common:junit"))
testImplementation(project(":core:common:lib:transform-lib"))
testImplementation(project(":data-protocols:dsp:dsp-catalog:dsp-catalog-transform"))
testImplementation(libs.restAssured)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import org.eclipse.edc.connector.controlplane.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.connector.controlplane.services.spi.catalog.CatalogProtocolService;
import org.eclipse.edc.connector.controlplane.services.spi.protocol.ProtocolVersionRegistry;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.protocol.dsp.catalog.http.api.controller.DspCatalogApiController;
import org.eclipse.edc.protocol.dsp.catalog.http.api.controller.DspCatalogApiController20241;
import org.eclipse.edc.protocol.dsp.catalog.http.api.decorator.Base64continuationTokenSerDes;
import org.eclipse.edc.protocol.dsp.catalog.http.api.decorator.ContinuationTokenManagerImpl;
import org.eclipse.edc.protocol.dsp.catalog.http.api.validation.CatalogRequestMessageValidator;
import org.eclipse.edc.protocol.dsp.http.spi.configuration.DspApiConfiguration;
import org.eclipse.edc.protocol.dsp.http.spi.message.DspRequestHandler;
Expand All @@ -28,6 +31,7 @@
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;
import org.eclipse.edc.web.spi.WebService;

Expand Down Expand Up @@ -58,6 +62,10 @@ public class DspCatalogApiExtension implements ServiceExtension {
private CriterionOperatorRegistry criterionOperatorRegistry;
@Inject
private ProtocolVersionRegistry versionRegistry;
@Inject
private TypeTransformerRegistry typeTransformerRegistry;
@Inject
private JsonLd jsonLd;

@Override
public String name() {
Expand All @@ -68,8 +76,10 @@ public String name() {
public void initialize(ServiceExtensionContext context) {
validatorRegistry.register(DSPACE_TYPE_CATALOG_REQUEST_MESSAGE, CatalogRequestMessageValidator.instance(criterionOperatorRegistry));

webService.registerResource(apiConfiguration.getContextAlias(), new DspCatalogApiController(service, dspRequestHandler));
webService.registerResource(apiConfiguration.getContextAlias(), new DspCatalogApiController20241(service, dspRequestHandler));
var continuationTokenSerDes = new Base64continuationTokenSerDes(typeTransformerRegistry.forContext("dsp-api"), jsonLd);
var catalogPaginationResponseDecoratorFactory = new ContinuationTokenManagerImpl(continuationTokenSerDes, context.getMonitor());
webService.registerResource(apiConfiguration.getContextAlias(), new DspCatalogApiController(service, dspRequestHandler, catalogPaginationResponseDecoratorFactory));
webService.registerResource(apiConfiguration.getContextAlias(), new DspCatalogApiController20241(service, dspRequestHandler, catalogPaginationResponseDecoratorFactory));

dataServiceRegistry.register(DataService.Builder.newInstance()
.terms("connector")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
package org.eclipse.edc.protocol.dsp.catalog.http.api.controller;

import jakarta.json.JsonObject;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HeaderParam;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriInfo;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
import org.eclipse.edc.connector.controlplane.services.spi.catalog.CatalogProtocolService;
import org.eclipse.edc.protocol.dsp.http.spi.message.ContinuationTokenManager;
import org.eclipse.edc.protocol.dsp.http.spi.message.DspRequestHandler;
import org.eclipse.edc.protocol.dsp.http.spi.message.GetDspRequest;
import org.eclipse.edc.protocol.dsp.http.spi.message.PostDspRequest;
Expand All @@ -49,24 +54,36 @@ public class DspCatalogApiController {

private final CatalogProtocolService service;
private final DspRequestHandler dspRequestHandler;
private final ContinuationTokenManager continuationTokenManager;

public DspCatalogApiController(CatalogProtocolService service, DspRequestHandler dspRequestHandler) {
public DspCatalogApiController(CatalogProtocolService service, DspRequestHandler dspRequestHandler, ContinuationTokenManager continuationTokenManager) {
this.service = service;
this.dspRequestHandler = dspRequestHandler;
this.continuationTokenManager = continuationTokenManager;
}

@POST
@Path(CATALOG_REQUEST)
public Response requestCatalog(JsonObject jsonObject, @HeaderParam(AUTHORIZATION) String token) {
public Response requestCatalog(JsonObject jsonObject, @HeaderParam(AUTHORIZATION) String token, @Context UriInfo uriInfo,
@QueryParam("continuationToken") String continuationToken) {
JsonObject messageJson;
if (continuationToken == null) {
messageJson = jsonObject;
} else {
messageJson = continuationTokenManager.applyQueryFromToken(jsonObject, continuationToken)
.orElseThrow(f -> new BadRequestException(f.getFailureDetail()));
}

var request = PostDspRequest.Builder.newInstance(CatalogRequestMessage.class, Catalog.class)
.token(token)
.expectedMessageType(DSPACE_TYPE_CATALOG_REQUEST_MESSAGE)
.message(jsonObject)
.message(messageJson)
.serviceCall(service::getCatalog)
.errorType(DSPACE_TYPE_CATALOG_ERROR)
.build();

return dspRequestHandler.createResource(request);
var responseDecorator = continuationTokenManager.createResponseDecorator(uriInfo.getAbsolutePath().toString());
return dspRequestHandler.createResource(request, responseDecorator);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import org.eclipse.edc.connector.controlplane.services.spi.catalog.CatalogProtocolService;
import org.eclipse.edc.protocol.dsp.http.spi.message.ContinuationTokenManager;
import org.eclipse.edc.protocol.dsp.http.spi.message.DspRequestHandler;
import org.eclipse.edc.protocol.dsp.spi.version.DspVersions;

Expand All @@ -32,7 +33,8 @@
@Path(DspVersions.V_2024_1_PATH + BASE_PATH)
public class DspCatalogApiController20241 extends DspCatalogApiController {

public DspCatalogApiController20241(CatalogProtocolService service, DspRequestHandler dspRequestHandler) {
super(service, dspRequestHandler);
public DspCatalogApiController20241(CatalogProtocolService service, DspRequestHandler dspRequestHandler,
ContinuationTokenManager responseDecorator) {
super(service, dspRequestHandler, responseDecorator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.catalog.http.api.decorator;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.protocol.dsp.http.spi.message.ContinuationTokenSerDes;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import java.io.ByteArrayInputStream;
import java.util.Base64;

public class Base64continuationTokenSerDes implements ContinuationTokenSerDes {

private final TypeTransformerRegistry typeTransformerRegistry;
private final JsonLd jsonLd;

public Base64continuationTokenSerDes(TypeTransformerRegistry typeTransformerRegistry, JsonLd jsonLd) {
this.typeTransformerRegistry = typeTransformerRegistry;
this.jsonLd = jsonLd;
}

@Override
public Result<String> serialize(QuerySpec querySpec) {
return typeTransformerRegistry.transform(querySpec, JsonObject.class)
.map(Object::toString)
.map(String::getBytes)
.map(Base64.getEncoder()::encodeToString);
}

@Override
public Result<JsonObject> deserialize(String serialized) {
try {
var decode = Base64.getDecoder().decode(serialized);
var jsonObject = Json.createReader(new ByteArrayInputStream(decode)).readObject();
return jsonLd.expand(jsonObject);
} catch (Exception e) {
return Result.failure("Cannot deserialize continuationToken: " + e.getMessage());
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.catalog.http.api.decorator;

import jakarta.ws.rs.core.Response;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.protocol.dsp.http.spi.message.ContinuationTokenSerDes;
import org.eclipse.edc.protocol.dsp.http.spi.message.ResponseDecorator;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.QuerySpec;

import java.net.URI;
import java.util.function.IntBinaryOperator;

public class CatalogPaginationResponseDecorator implements ResponseDecorator<CatalogRequestMessage, Catalog> {

private static final String NEXT = "next";
private static final String PREV = "prev";

private final String requestUrl;
private final ContinuationTokenSerDes continuationTokenSerDes;
private final Monitor monitor;

public CatalogPaginationResponseDecorator(String requestUrl, ContinuationTokenSerDes continuationTokenSerDes, Monitor monitor) {
this.requestUrl = requestUrl;
this.continuationTokenSerDes = continuationTokenSerDes;
this.monitor = monitor;
}

@Override
public Response.ResponseBuilder decorate(Response.ResponseBuilder responseBuilder, CatalogRequestMessage requestBody, Catalog responseBody) {
var currentQuerySpec = requestBody.getQuerySpec();
if (responseBody.getDatasets().size() == currentQuerySpec.getLimit()) {
addLink(NEXT, responseBuilder, currentQuerySpec, (offset, limit) -> offset + limit);
}

if (currentQuerySpec.getOffset() >= currentQuerySpec.getLimit()) {
addLink(PREV, responseBuilder, currentQuerySpec, (offset, limit) -> offset - limit);
}

return responseBuilder;
}

private void addLink(String rel, Response.ResponseBuilder responseBuilder, QuerySpec currentQuerySpec, IntBinaryOperator newOffsetOperator) {
var newOffset = newOffsetOperator.applyAsInt(currentQuerySpec.getOffset(), currentQuerySpec.getLimit());
continuationTokenSerDes.serialize(currentQuerySpec.toBuilder().offset(newOffset).build())
.onSuccess(token -> responseBuilder.link(URI.create(requestUrl + "?continuationToken=" + token), rel))
.onFailure(failure -> monitor.warning("Cannot serialize continuationToken for catalog pagination: " + failure.getFailureDetail()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.catalog.http.api.decorator;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
import org.eclipse.edc.connector.controlplane.catalog.spi.CatalogRequestMessage;
import org.eclipse.edc.protocol.dsp.http.spi.message.ContinuationTokenManager;
import org.eclipse.edc.protocol.dsp.http.spi.message.ResponseDecorator;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;

import static org.eclipse.edc.protocol.dsp.spi.type.DspCatalogPropertyAndTypeNames.DSPACE_PROPERTY_FILTER;

public class ContinuationTokenManagerImpl implements ContinuationTokenManager {

private final Base64continuationTokenSerDes continuationTokenSerDes;
private final Monitor monitor;

public ContinuationTokenManagerImpl(Base64continuationTokenSerDes continuationTokenSerDes, Monitor monitor) {
this.continuationTokenSerDes = continuationTokenSerDes;
this.monitor = monitor;
}

@Override
public ResponseDecorator<CatalogRequestMessage, Catalog> createResponseDecorator(String requestUrl) {
return new CatalogPaginationResponseDecorator(requestUrl, continuationTokenSerDes, monitor);
}

@Override
public Result<JsonObject> applyQueryFromToken(JsonObject requestMessage, String continuationToken) {
return continuationTokenSerDes.deserialize(continuationToken)
.map(query -> Json.createArrayBuilder().add(query))
.map(filter -> Json.createObjectBuilder(requestMessage).add(DSPACE_PROPERTY_FILTER, filter))
.map(JsonObjectBuilder::build);
}
}
Loading

0 comments on commit 949a7ed

Please sign in to comment.