Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure ActorRuntime can be instantiated using Properties #1203

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions sdk-actors/src/main/java/io/dapr/actors/client/ActorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@

import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.utils.NetworkUtils;
import io.dapr.v1.DaprGrpc;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.util.Collections;
Expand Down Expand Up @@ -83,7 +82,7 @@ public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOp
* @param resiliencyOptions Client resiliency options.
*/
public ActorClient(Properties overrideProperties, Map<String, String> metadata, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties),
this(NetworkUtils.buildGrpcManagedChannel(overrideProperties),
metadata,
resiliencyOptions,
overrideProperties.getValue(Properties.API_TOKEN));
Expand Down Expand Up @@ -129,26 +128,6 @@ public void close() {
}
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @param overrideProperties Overrides
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel(Properties overrideProperties) {
int port = overrideProperties.getValue(Properties.GRPC_PORT);
if (port <= 0) {
throw new IllegalArgumentException("Invalid port.");
}

var sidecarHost = overrideProperties.getValue(Properties.SIDECAR_IP);

return ManagedChannelBuilder.forAddress(sidecarHost, port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* Build an instance of the Client based on the provided setup.
*
Expand Down
61 changes: 35 additions & 26 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Version;
import io.dapr.utils.NetworkUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import reactor.core.publisher.Mono;

import java.io.Closeable;
Expand Down Expand Up @@ -77,20 +76,30 @@ public class ActorRuntime implements Closeable {
/**
* The default constructor. This should not be called directly.
*
* @throws IllegalStateException If cannot instantiate Runtime.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime() throws IllegalStateException {
this(buildManagedChannel());
this(new Properties());
}

/**
* Constructor once channel is available. This should not be called directly.
*
* @param properties Properties to use.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime(Properties properties) throws IllegalStateException {
this(NetworkUtils.buildGrpcManagedChannel(properties));
}

/**
* Constructor once channel is available. This should not be called directly.
*
* @param channel GRPC managed channel to be closed (or null).
* @throws IllegalStateException If cannot instantiate Runtime.
* @throws IllegalStateException If you cannot instantiate Runtime.
*/
private ActorRuntime(ManagedChannel channel) throws IllegalStateException {
this(channel, buildDaprClient(channel));
this(channel, new DaprClientImpl(channel));
}

/**
Expand All @@ -112,7 +121,7 @@ private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws Illeg
}

/**
* Returns an ActorRuntime object.
* Creates or returns an existing ActorRuntime object.
*
* @return An ActorRuntime object.
*/
Expand All @@ -128,6 +137,24 @@ public static ActorRuntime getInstance() {
return instance;
}

/**
* Creates or returns an existing ActorRuntime object.
*
* @param properties Properties to use.
* @return An ActorRuntime object.
*/
public static ActorRuntime getInstance(Properties properties) {
if (instance == null) {
synchronized (ActorRuntime.class) {
if (instance == null) {
instance = new ActorRuntime(properties);
}
}
}

return instance;
}

/**
* Gets the Actor configuration for this runtime.
*
Expand All @@ -149,7 +176,6 @@ public byte[] serializeConfig() throws IOException {

/**
* Registers an actor with the runtime, using {@link DefaultObjectSerializer} and {@link DefaultActorFactory}.
*
* {@link DefaultObjectSerializer} is not recommended for production scenarios.
*
* @param clazz The type of actor.
Expand Down Expand Up @@ -314,27 +340,10 @@ private ActorManager getActorManager(String actorTypeName) {
* @return an instance of the setup Client
* @throws java.lang.IllegalStateException if any required field is missing
*/
private static DaprClient buildDaprClient(ManagedChannel channel) {
private DaprClient buildDaprClient(ManagedChannel channel) {
return new DaprClientImpl(channel);
}

/**
* Creates a GRPC managed channel (or null, if not applicable).
*
* @return GRPC managed channel or null.
*/
private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}

return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorType;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -27,7 +26,11 @@
import java.util.Arrays;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;

public class ActorRuntimeTest {
Expand Down Expand Up @@ -97,8 +100,6 @@ public int count() {

private static Constructor<ActorRuntime> constructor;

private DaprClient mockDaprClient;

private ActorRuntime runtime;

@BeforeAll
Expand All @@ -113,8 +114,7 @@ public static void beforeAll() throws Exception {

@BeforeEach
public void setup() throws Exception {
this.mockDaprClient = mock(DaprClient.class);
this.runtime = constructor.newInstance(null, this.mockDaprClient);
this.runtime = constructor.newInstance(null, mock(DaprClient.class));
}

@Test
Expand Down Expand Up @@ -143,21 +143,21 @@ public void registerActorNullStateSerializer() {
@Test
public void setActorIdleTimeout() throws Exception {
this.runtime.getConfig().setActorIdleTimeout(Duration.ofSeconds(123));
Assertions.assertEquals("{\"entities\":[],\"actorIdleTimeout\":\"0h2m3s0ms\"}",
assertEquals("{\"entities\":[],\"actorIdleTimeout\":\"0h2m3s0ms\"}",
new String(this.runtime.serializeConfig()));
}

@Test
public void setActorScanInterval() throws Exception {
this.runtime.getConfig().setActorScanInterval(Duration.ofSeconds(123));
Assertions.assertEquals("{\"entities\":[],\"actorScanInterval\":\"0h2m3s0ms\"}",
assertEquals("{\"entities\":[],\"actorScanInterval\":\"0h2m3s0ms\"}",
new String(this.runtime.serializeConfig()));
}

@Test
public void setDrainBalancedActors() throws Exception {
this.runtime.getConfig().setDrainBalancedActors(true);
Assertions.assertEquals("{\"entities\":[],\"drainBalancedActors\":true}",
assertEquals("{\"entities\":[],\"drainBalancedActors\":true}",
new String(this.runtime.serializeConfig()));
}

Expand All @@ -183,7 +183,7 @@ public void addActorTypeConfig() throws Exception {
this.runtime.getConfig().addActorTypeConfig(actorTypeConfig2);
this.runtime.getConfig().addRegisteredActorType("actor2");

Assertions.assertEquals(
assertEquals(
"{\"entities\":[\"actor1\",\"actor2\"],\"entitiesConfig\":[{\"entities\":[\"actor1\"],\"actorIdleTimeout\":\"0h2m3s0ms\",\"actorScanInterval\":\"0h2m3s0ms\",\"drainOngoingCallTimeout\":\"0h2m3s0ms\",\"drainBalancedActors\":true,\"remindersStoragePartitions\":1},{\"entities\":[\"actor2\"],\"actorIdleTimeout\":\"0h2m3s0ms\",\"actorScanInterval\":\"0h2m3s0ms\",\"drainOngoingCallTimeout\":\"0h2m3s0ms\",\"drainBalancedActors\":false,\"remindersStoragePartitions\":2}]}",
new String(this.runtime.serializeConfig())
);
Expand All @@ -194,28 +194,28 @@ public void addNullActorTypeConfig() throws Exception {
try {
this.runtime.getConfig().addActorTypeConfig(null);
} catch (Exception ex) {
Assertions.assertTrue(ex instanceof IllegalArgumentException);
Assertions.assertTrue(ex.getMessage().contains("Add actor type config failed."));
assertInstanceOf(IllegalArgumentException.class, ex);
assertTrue(ex.getMessage().contains("Add actor type config failed."));
}
try {
this.runtime.getConfig().addRegisteredActorType(null);
} catch (Exception ex) {
Assertions.assertTrue(ex instanceof IllegalArgumentException);
Assertions.assertTrue(ex.getMessage().contains("Registered actor must have a type name."));
assertInstanceOf(IllegalArgumentException.class, ex);
assertTrue(ex.getMessage().contains("Registered actor must have a type name."));
}
}

@Test
public void setDrainOngoingCallTimeout() throws Exception {
this.runtime.getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(123));
Assertions.assertEquals("{\"entities\":[],\"drainOngoingCallTimeout\":\"0h2m3s0ms\"}",
assertEquals("{\"entities\":[],\"drainOngoingCallTimeout\":\"0h2m3s0ms\"}",
new String(this.runtime.serializeConfig()));
}

@Test
public void setRemindersStoragePartitions() throws Exception {
this.runtime.getConfig().setRemindersStoragePartitions(12);
Assertions.assertEquals("{\"entities\":[],\"remindersStoragePartitions\":12}",
assertEquals("{\"entities\":[],\"remindersStoragePartitions\":12}",
new String(this.runtime.serializeConfig()));
}

Expand All @@ -226,7 +226,7 @@ public void invokeActor() throws Exception {

byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block();
String message = ACTOR_STATE_SERIALIZER.deserialize(response, String.class);
Assertions.assertEquals("Nothing to say.", message);
assertEquals("Nothing to say.", message);
}

@Test
Expand Down Expand Up @@ -256,8 +256,8 @@ public void lazyDeactivate() throws Exception {
deactivateCall.block();

this.runtime.invoke(ACTOR_NAME, actorId, "say", null)
.doOnError(e -> Assertions.assertTrue(e.getMessage().contains("Could not find actor")))
.doOnSuccess(s -> Assertions.fail()).onErrorReturn("".getBytes()).block();
.doOnError(e -> assertTrue(e.getMessage().contains("Could not find actor")))
.doOnSuccess(s -> fail()).onErrorReturn("".getBytes()).block();
}

@Test
Expand All @@ -269,13 +269,13 @@ public void lazyInvoke() throws Exception {

byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block();
int count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class);
Assertions.assertEquals(0, count);
assertEquals(0, count);

invokeCall.block();

response = this.runtime.invoke(ACTOR_NAME, actorId, "count", null).block();
count = ACTOR_STATE_SERIALIZER.deserialize(response, Integer.class);
Assertions.assertEquals(1, count);
assertEquals(1, count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import io.dapr.config.Properties;
import io.dapr.it.BaseIT;
import io.dapr.it.actors.app.MyActorService;
import io.dapr.utils.Version;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.dapr.utils.NetworkUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -36,8 +34,8 @@

import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs;
import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ActorTurnBasedConcurrencyIT extends BaseIT {

Expand All @@ -56,7 +54,7 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
@AfterEach
public void cleanUpTestCase() {
// Delete the reminder in case the test failed, otherwise it may interfere with future tests since it is persisted.
var channel = buildManagedChannel();
var channel = NetworkUtils.buildGrpcManagedChannel(new Properties());
try {
System.out.println("Invoking during cleanup");
DaprClientHttpUtils.unregisterActorReminder(channel, ACTOR_TYPE, ACTOR_ID, REMINDER_NAME);
Expand Down Expand Up @@ -120,7 +118,7 @@ public void invokeOneActorMethodReminderAndTimer() throws Exception {
String msg = "message" + i;
String reversedString = new StringBuilder(msg).reverse().toString();
String output = proxy.invokeMethod("say", "message" + i, String.class).block();
assertTrue(reversedString.equals(output));
assertEquals(reversedString, output);
expectedSayMethodInvocations.incrementAndGet();
});

Expand Down Expand Up @@ -166,7 +164,7 @@ public void invokeOneActorMethodReminderAndTimer() throws Exception {
* @param logs logs with info about method entries and exits returned from the app
*/
void validateTurnBasedConcurrency(List<MethodEntryTracker> logs) {
if (logs.size() == 0) {
if (logs.isEmpty()) {
logger.warn("No logs");
return;
}
Expand All @@ -176,7 +174,7 @@ void validateTurnBasedConcurrency(List<MethodEntryTracker> logs) {
if (s.getIsEnter()) {
currentMethodName = s.getMethodName();
} else {
assertTrue(currentMethodName.equals(s.getMethodName()));
assertEquals(currentMethodName, s.getMethodName());
}
}

Expand Down Expand Up @@ -228,16 +226,4 @@ void validateEventNotObserved(List<MethodEntryTracker> logs, String startingPoin
}
}
}

private static ManagedChannel buildManagedChannel() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalStateException("Invalid port.");
}

return ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), port)
.usePlaintext()
.userAgent(Version.getSdkVersion())
.build();
}
}
Loading