Skip to content

Commit

Permalink
Set load balancing strategy config when present
Browse files Browse the repository at this point in the history
Whenever somebody provides load balancing settings, these should be
taken into account through the AdminChannel. This requires a little
effort to map the strategies to the processor-to-context naming scheme,
requiring a slight duplication compared to the approach taken in the
EventProcessorControlService

#112
  • Loading branch information
smcvb committed Oct 27, 2023
1 parent dcfbd68 commit 39bf9f0
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.axonframework.extensions.multitenancy.autoconfig;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
Expand All @@ -28,8 +29,17 @@
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.axonframework.lifecycle.Phase;
import org.axonframework.lifecycle.StartHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toMap;

/**
* Multi-tenant implementation of {@link EventProcessorControlService}.
Expand All @@ -43,6 +53,8 @@ public class MultiTenantEventProcessorControlService
extends EventProcessorControlService
implements MultiTenantAwareComponent {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/**
* Initialize a {@link MultiTenantEventProcessorControlService}.
* <p>
Expand Down Expand Up @@ -101,19 +113,107 @@ public void start() {
return;
}

Map<String, AxonServerConnection> contextToConnection = new HashMap<>();
Map<String, EventProcessor> eventProcessors = eventProcessingConfiguration.eventProcessors();
eventProcessors.forEach((name, processor) -> {
Map<String, String> strategiesPerProcessor = strategiesPerProcessor(eventProcessors);

eventProcessors.forEach((processorAndContext, processor) -> {
if (processor instanceof MultiTenantEventProcessor) {
return;
}
String context = name.substring(name.indexOf("@") + 1);
ControlChannel controlChannel = axonServerConnectionManager.getConnection(context)
.controlChannel();
AxonProcessorInstructionHandler instructionHandler = new AxonProcessorInstructionHandler(processor, name);
controlChannel.registerEventProcessor(name, infoSupplier(processor), instructionHandler);

String processorName = processorNameFromCombination(processorAndContext);
String context = contextFromCombination(processorAndContext);
AxonServerConnection connection =
contextToConnection.computeIfAbsent(context, axonServerConnectionManager::getConnection);

registerInstructionHandler(connection.controlChannel(), processorAndContext, processor);
String strategyForProcessor = strategiesPerProcessor.get(processorName);
if (strategyForProcessor != null) {
setLoadBalancingStrategy(connection.adminChannel(), processorName, strategyForProcessor);
}
});
}

private Map<String, String> strategiesPerProcessor(Map<String, EventProcessor> eventProcessors) {
List<String> processorNames =
eventProcessors.entrySet()
.stream()
// Filter out MultiTenantEventProcessors as those aren't registered with Axon Server anyhow.
.filter(entry -> !(entry.getValue() instanceof MultiTenantEventProcessor))
.map(Map.Entry::getKey)
.map(MultiTenantEventProcessorControlService::processorNameFromCombination)
.collect(Collectors.toList());
return processorConfig.entrySet()
.stream()
.filter(entry -> {
if (!processorNames.contains(entry.getKey())) {
logger.info("Event Processor [{}] is not a registered. "
+ "Please check the name or register the Event Processor",
entry.getKey());
return false;
}
return true;
})
.collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getLoadBalancingStrategy()));
}

private static String contextFromCombination(String processorAndContext) {
return processorAndContext.substring(processorAndContext.indexOf("@") + 1);
}

private void registerInstructionHandler(ControlChannel controlChannel,
String processorAndContext,
EventProcessor processor) {
controlChannel.registerEventProcessor(processorAndContext,
infoSupplier(processor),
new AxonProcessorInstructionHandler(processor, processorAndContext));
}

private void setLoadBalancingStrategy(AdminChannel adminChannel, String processorName, String strategy) {
Optional<String> optionalIdentifier = tokenStoreIdentifierFor(processorName);
if (!optionalIdentifier.isPresent()) {
logger.warn("Cannot find token store identifier for processor [{}]. "
+ "Load balancing cannot be configured without this identifier.", processorName);
return;
}
String tokenStoreIdentifier = optionalIdentifier.get();

adminChannel.loadBalanceEventProcessor(processorName, tokenStoreIdentifier, strategy)
.whenComplete((r, e) -> {
if (e == null) {
logger.debug("Successfully requested to load balance processor [{}]"
+ " with strategy [{}].", processorName, strategy);
return;
}
logger.warn("Requesting to load balance processor [{}] with strategy [{}] failed.",
processorName, strategy, e);
});
if (processorConfig.get(processorName).isAutomaticBalancing()) {
adminChannel.setAutoLoadBalanceStrategy(processorName, tokenStoreIdentifier, strategy)
.whenComplete((r, e) -> {
if (e == null) {
logger.debug("Successfully requested to automatically balance processor [{}]"
+ " with strategy [{}].", processorName, strategy);
return;
}
logger.warn(
"Requesting to automatically balance processor [{}] with strategy [{}] failed.",
processorName, strategy, e
);
});
}
}

private Optional<String> tokenStoreIdentifierFor(String processorName) {
return eventProcessingConfiguration.tokenStore(processorName)
.retrieveStorageIdentifier();
}

private static String processorNameFromCombination(String processorAndContext) {
return processorAndContext.substring(0, processorAndContext.indexOf("@"));
}

@Override
public Registration registerTenant(TenantDescriptor tenantDescriptor) {
//Already registered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@

import com.google.common.collect.ImmutableMap;
import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.control.ControlChannel;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.junit.jupiter.api.*;
import org.mockito.*;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.mockito.Mockito.*;

/**
Expand All @@ -37,96 +44,143 @@
*/
class MultiTenantEventProcessorControlServiceTest {

private static final String PROCESSOR_NAME = "some-processor";
private static final String TOKEN_STORE_IDENTIFIER = "token-store-identifier";
private static final String LOAD_BALANCING_STRATEGY = "some-strategy";

private AxonServerConnectionManager axonServerConnectionManager;
private EventProcessingConfiguration eventProcessingConfiguration;

private ControlChannel controlTenant1;
private AdminChannel adminTenant1;
private ControlChannel controlTenant2;
private AdminChannel adminTenant2;

private MultiTenantEventProcessorControlService testSubject;

@BeforeEach
void setUp() {
axonServerConnectionManager = mock(AxonServerConnectionManager.class);
mockConnectionManager();

eventProcessingConfiguration = mock(EventProcessingConfiguration.class);
AxonServerConfiguration axonServerConfiguration = mock(AxonServerConfiguration.class);
when(axonServerConfiguration.getEventhandling())
.thenReturn(new AxonServerConfiguration.Eventhandling());
TokenStore tokenStore = mock(TokenStore.class);
when(tokenStore.retrieveStorageIdentifier()).thenReturn(Optional.of(TOKEN_STORE_IDENTIFIER));
when(eventProcessingConfiguration.tokenStore(PROCESSOR_NAME)).thenReturn(tokenStore);

AxonServerConfiguration axonServerConfig = mock(AxonServerConfiguration.class);
mockAxonServerConfig(axonServerConfig);

testSubject = new MultiTenantEventProcessorControlService(axonServerConnectionManager,
eventProcessingConfiguration,
axonServerConfiguration);
axonServerConfig);
}

@Test
void start() {
private void mockConnectionManager() {
AxonServerConnection connectionTenant1 = mock(AxonServerConnection.class);
ControlChannel controlTenant1 = mock(ControlChannel.class);
controlTenant1 = mock(ControlChannel.class);
when(connectionTenant1.controlChannel()).thenReturn(controlTenant1);
adminTenant1 = mock(AdminChannel.class);
when(adminTenant1.loadBalanceEventProcessor(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(adminTenant1.setAutoLoadBalanceStrategy(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(connectionTenant1.adminChannel()).thenReturn(adminTenant1);
AxonServerConnection connectionTenant2 = mock(AxonServerConnection.class);
ControlChannel controlTenant2 = mock(ControlChannel.class);
controlTenant2 = mock(ControlChannel.class);
when(connectionTenant2.controlChannel()).thenReturn(controlTenant2);

adminTenant2 = mock(AdminChannel.class);
when(adminTenant2.loadBalanceEventProcessor(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(adminTenant2.setAutoLoadBalanceStrategy(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
when(connectionTenant2.adminChannel()).thenReturn(adminTenant2);
ArgumentCaptor<String> contextCapture = ArgumentCaptor.forClass(String.class);
when(axonServerConnectionManager.getConnection(contextCapture.capture())).thenAnswer(a -> {
if (contextCapture.getValue().equals("tenant-1")) {
return connectionTenant1;
} else {
return connectionTenant2;
}
});
when(axonServerConnectionManager.getConnection(contextCapture.capture()))
.thenAnswer(a -> contextCapture.getValue().equals("tenant-1") ? connectionTenant1 : connectionTenant2);
}

private static void mockAxonServerConfig(AxonServerConfiguration axonServerConfig) {
Map<String, AxonServerConfiguration.Eventhandling.ProcessorSettings> processorSettings = new HashMap<>();
AxonServerConfiguration.Eventhandling eventHandling = mock(AxonServerConfiguration.Eventhandling.class);
AxonServerConfiguration.Eventhandling.ProcessorSettings tepSettings =
new AxonServerConfiguration.Eventhandling.ProcessorSettings();
tepSettings.setLoadBalancingStrategy(LOAD_BALANCING_STRATEGY);
tepSettings.setAutomaticBalancing(true);
processorSettings.put(PROCESSOR_NAME, tepSettings);
when(eventHandling.getProcessors()).thenReturn(processorSettings);
when(axonServerConfig.getEventhandling()).thenReturn(eventHandling);
}

@Test
void registersInstructionHandlersWithEachContextControlChannelOnStart() {
when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
"tep@tenant-1",
mock(EventProcessor.class),
"tep@tenant-2",
mock(EventProcessor.class),
"proxy-ep",
mock(MultiTenantEventProcessor.class)
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
PROCESSOR_NAME + "@tenant-2", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

testSubject.start();

verify(controlTenant1).registerEventProcessor(eq("tep@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq("tep@tenant-2"), any(), any());
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());
}

@Test
void addingNewTenantAfterStart() {
AxonServerConnection connectionTenant1 = mock(AxonServerConnection.class);
ControlChannel controlTenant1 = mock(ControlChannel.class);
when(connectionTenant1.controlChannel()).thenReturn(controlTenant1);
AxonServerConnection connectionTenant2 = mock(AxonServerConnection.class);
ControlChannel controlTenant2 = mock(ControlChannel.class);
when(connectionTenant2.controlChannel()).thenReturn(controlTenant2);

ArgumentCaptor<String> contextCapture = ArgumentCaptor.forClass(String.class);
when(axonServerConnectionManager.getConnection(contextCapture.capture())).thenAnswer(a -> {
if (contextCapture.getValue().equals("tenant-1")) {
return connectionTenant1;
} else {
return connectionTenant2;
}
});

when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
"tep@tenant-1",
mock(EventProcessor.class),
"proxy-ep",
mock(MultiTenantEventProcessor.class)
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

testSubject.start();

verify(controlTenant1).registerEventProcessor(eq("tep@tenant-1"), any(), any());
verify(controlTenant2, times(0)).registerEventProcessor(eq("tep@tenant-2"), any(), any());
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
verify(controlTenant2, times(0)).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());

when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
"tep@tenant-1",
mock(EventProcessor.class),
"tep@tenant-2",
mock(EventProcessor.class),
"proxy-ep",
mock(MultiTenantEventProcessor.class)
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
PROCESSOR_NAME + "@tenant-2", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

testSubject.registerAndStartTenant(TenantDescriptor.tenantWithId("tenant-2"));
verify(controlTenant2).registerEventProcessor(eq("tep@tenant-2"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());
}

@Test
void willSetLoadBalancingStrategyForProcessorsWithPropertiesOnStart() {
String processorNameWithoutSettings = "processor-without-load-balancing";
String expectedStrategy = "some-strategy";

// Given
// Mock Event Processor Configuration
when(eventProcessingConfiguration.eventProcessors()).thenReturn(ImmutableMap.of(
PROCESSOR_NAME + "@tenant-1", mock(EventProcessor.class),
PROCESSOR_NAME + "@tenant-2", mock(EventProcessor.class),
processorNameWithoutSettings + "@tenant-1", mock(EventProcessor.class),
processorNameWithoutSettings + "@tenant-2", mock(EventProcessor.class),
"proxy-ep", mock(MultiTenantEventProcessor.class)
));

// When
testSubject.start();

// Then
// Registers instruction handlers
verify(controlTenant1).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(PROCESSOR_NAME + "@tenant-2"), any(), any());
verify(controlTenant1).registerEventProcessor(eq(processorNameWithoutSettings + "@tenant-1"), any(), any());
verify(controlTenant2).registerEventProcessor(eq(processorNameWithoutSettings + "@tenant-2"), any(), any());
// Load balances Processors
verify(adminTenant1).loadBalanceEventProcessor(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant2).loadBalanceEventProcessor(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant1, never()).loadBalanceEventProcessor(eq(processorNameWithoutSettings), any(), any());
verify(adminTenant2, never()).loadBalanceEventProcessor(eq(processorNameWithoutSettings), any(), any());
// Enables automatic load balancing
verify(adminTenant1).setAutoLoadBalanceStrategy(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant2).setAutoLoadBalanceStrategy(PROCESSOR_NAME, TOKEN_STORE_IDENTIFIER, expectedStrategy);
verify(adminTenant1, never()).setAutoLoadBalanceStrategy(eq(processorNameWithoutSettings), any(), any());
verify(adminTenant2, never()).setAutoLoadBalanceStrategy(eq(processorNameWithoutSettings), any(), any());
}
}

0 comments on commit 39bf9f0

Please sign in to comment.