Skip to content

Commit

Permalink
initial invokeBindingList implementation
Browse files Browse the repository at this point in the history
Signed-off-by: salaboy <[email protected]>
  • Loading branch information
salaboy committed Jul 8, 2024
1 parent 3dadc0b commit 338f286
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

package io.dapr.actors.runtime;

import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;

/**
* Class used to test different serializer implementations.
Expand Down Expand Up @@ -57,6 +58,22 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) {
try (ObjectInputStream ois = new ObjectInputStream(bis)) {
try {
return (List<T>) ois.readObject();
} catch (Exception e) {
throw new IOException("Could not deserialize Java object.", e);
}
}
}
}

/**
* {@inheritDoc}
*/
Expand Down
27 changes: 20 additions & 7 deletions sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@

package io.dapr.client;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
Expand Down Expand Up @@ -48,13 +56,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Abstract class with convenient methods common between client implementations.
*
Expand Down Expand Up @@ -270,6 +271,18 @@ public <T> Mono<T> invokeBinding(
return this.invokeBinding(bindingName, operation, data, metadata, TypeRef.get(clazz));
}

/**
* {@inheritDoc}
*/
@Override
public <T> Mono<List<T>> invokeBindingList(
String bindingName, String operation, Object data, Map<String, String> metadata, TypeRef<T> type) {
InvokeBindingRequest request = new InvokeBindingRequest(bindingName, operation)
.setData(data)
.setMetadata(metadata);
return this.invokeBindingList(request, type);
}

/**
* {@inheritDoc}
*/
Expand Down
34 changes: 29 additions & 5 deletions sdk/src/main/java/io/dapr/client/DaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package io.dapr.client;

import java.util.List;
import java.util.Map;
import java.util.function.Function;

import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DaprMetadata;
import io.dapr.client.domain.DeleteStateRequest;
Expand Down Expand Up @@ -40,10 +44,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
*
Expand Down Expand Up @@ -299,7 +299,7 @@ Mono<byte[]> invokeMethod(String appId, String methodName, byte[] request, HttpE
*/
<T> Mono<T> invokeBinding(String bindingName, String operation, Object data, Map<String, String> metadata,
TypeRef<T> type);

/**
* Invokes a Binding operation.
*
Expand All @@ -324,6 +324,30 @@ <T> Mono<T> invokeBinding(String bindingName, String operation, Object data, Map
*/
<T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type);

/**
* Invokes a Binding operation that returns a List.
*
* @param request The binding invocation request.
* @param type The type being returned.
* @param <T> The type of the return
* @return a Mono plan of type List.
*/
<T> Mono<List<T>> invokeBindingList(InvokeBindingRequest request, TypeRef<T> type);

/**
* Invokes a Binding operation and expect a list of objects as return.
*
* @param bindingName The name of the biding to call.
* @param operation The operation to be performed by the binding request processor.
* @param data The data to be processed, use byte[] to skip serialization.
* @param metadata The metadata map.
* @param type The type being returned.
* @param <T> The type of the return
* @return a Mono plan of type List.
*/
<T> Mono<List<T>> invokeBindingList(String bindingName, String operation, Object data, Map<String, String> metadata,
TypeRef<T> type);

/**
* Retrieve a State based on their key.
*
Expand Down
48 changes: 48 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,54 @@ private <T> Mono<T> getMonoForHttpResponse(TypeRef<T> type, DaprHttp.Response r)
}
}


/**
* {@inheritDoc}
*/
@Override
public <T> Mono<List<T>> invokeBindingList(InvokeBindingRequest request, TypeRef<T> type) {
try {
final String name = request.getName();
final String operation = request.getOperation();
final Object data = request.getData();
final Map<String, String> metadata = request.getMetadata();
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Binding name cannot be null or empty.");
}

if (operation == null || operation.trim().isEmpty()) {
throw new IllegalArgumentException("Binding operation cannot be null or empty.");
}

byte[] byteData = objectSerializer.serialize(data);
DaprProtos.InvokeBindingRequest.Builder builder = DaprProtos.InvokeBindingRequest.newBuilder()
.setName(name).setOperation(operation);
if (byteData != null) {
builder.setData(ByteString.copyFrom(byteData));
}
if (metadata != null) {
builder.putAllMetadata(metadata);
}
DaprProtos.InvokeBindingRequest envelope = builder.build();

return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
)
).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserializeList(it.getData().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
}
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}

/**
* {@inheritDoc}
*/
Expand Down
46 changes: 42 additions & 4 deletions sdk/src/main/java/io/dapr/client/ObjectSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@

package io.dapr.client;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.MessageLite;

import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.lang.reflect.Method;

/**
* Serializes and deserializes an internal object.
*/
Expand Down Expand Up @@ -99,6 +102,41 @@ public <T> T deserialize(byte[] content, Class<T> clazz) throws IOException {
return deserialize(content, OBJECT_MAPPER.constructType(clazz));
}

/**
* Deserializes the byte array into the a list of the clazz paramter type object.
*
* @param content Content to be parsed.
* @param clazz Type of the object being deserialized.
* @param <T> Generic type of the object being deserialized.
* @return Object of type T.
* @throws IOException In case content cannot be deserialized.
*/
@SuppressWarnings("unchecked")
public <T> List<T> deserializeList(byte[] content, Class<T> clazz) throws IOException {
Class<? extends Object[]> clazzArray = ((T[]) java.lang.reflect.Array.newInstance(clazz, 1)).getClass();
return Arrays.asList(deserialize(content, OBJECT_MAPPER.constructType(clazzArray)));
}

/**
* Deserializes the byte array into the original object.
*
* @param content Content to be parsed.
* @param type Type of the object being deserialized.
* @param <T> Generic type of the object being deserialized.
* @return Object of type T.
* @throws IOException In case content cannot be deserialized.
*/
@SuppressWarnings("unchecked")
public <T> List<T> deserializeList(byte[] content, TypeRef<T> type) throws IOException {
try {
Class<T> clazz = (Class<T>) Class.forName(type.getType().getTypeName());
return deserializeList(content, clazz);
}catch (Exception e){
e.printStackTrace();
}
return null;
}

private <T> T deserialize(byte[] content, JavaType javaType) throws IOException {
if ((javaType == null) || javaType.isTypeOrSubTypeOf(Void.class)) {
return null;
Expand Down Expand Up @@ -137,7 +175,7 @@ private <T> T deserialize(byte[] content, JavaType javaType) throws IOException
throw new IOException(e);
}
}

return OBJECT_MAPPER.readValue(content, javaType);
}

Expand Down
16 changes: 14 additions & 2 deletions sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

package io.dapr.serializer;

import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.util.List;

import io.dapr.utils.TypeRef;

/**
* Serializes and deserializes application's objects.
Expand All @@ -42,6 +43,17 @@ public interface DaprObjectSerializer {
*/
<T> T deserialize(byte[] data, TypeRef<T> type) throws IOException;

/**
* Deserializes the given byte[] into a list of object.
*
* @param data Data to be deserialized.
* @param type Type of object to be deserialized.
* @param <T> Type of object to be deserialized.
* @return Deserialized List of objects.
* @throws IOException If cannot deserialize object.
*/
<T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException;

/**
* Returns the content type of the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

package io.dapr.serializer;

import java.io.IOException;
import java.util.List;

import io.dapr.client.ObjectSerializer;
import io.dapr.utils.TypeRef;

import java.io.IOException;

/**
* Default serializer/deserializer for request/response objects and for state objects too.
Expand All @@ -39,6 +41,14 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return super.deserialize(data, type);
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
return super.deserializeList(data, type);
}

/**
* {@inheritDoc}
*/
Expand Down
21 changes: 21 additions & 0 deletions sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,27 @@ public void invokeBindingResponseObjectTypeRefTest() throws IOException {
assertEquals("OK", result.block());
}

@Test
public void invokeBindingListResponseObjectTypeRefTest() throws IOException {
List<MyObject> list = new ArrayList<>();
MyObject obj1 = new MyObject(1, "Object1");
MyObject obj2 = new MyObject(2, "Objet2");
list.add(obj1);
list.add(obj2);
DaprProtos.InvokeBindingResponse.Builder responseBuilder =
DaprProtos.InvokeBindingResponse.newBuilder().setData(serialize(list));
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.InvokeBindingResponse> observer = (StreamObserver<DaprProtos.InvokeBindingResponse>) invocation.getArguments()[1];
observer.onNext(responseBuilder.build());
observer.onCompleted();
return null;
}).when(daprStub).invokeBinding(any(DaprProtos.InvokeBindingRequest.class), any());

Mono<List<MyObject>> result = client.invokeBindingList("BindingName", "MyOperation", null, null, TypeRef.get(MyObject.class));

assertEquals(list, result.block());
}

@Test
public void invokeBindingObjectNoHotMono() throws IOException {
AtomicBoolean called = new AtomicBoolean(false);
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return XML_MAPPER.readValue(data, new TypeReference<T>() {});
}

@Override
public <T> List<T> deserializeList(byte[] data, TypeRef<T> type) throws IOException {
return XML_MAPPER.readValue(data, new TypeReference<List<T>>() {});
}

@Override
public String getContentType() {
return "application/xml";
Expand Down

0 comments on commit 338f286

Please sign in to comment.