Skip to content

Commit

Permalink
Close Terracotta-OSS#191: Failover tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieucarbou committed Dec 20, 2016
1 parent d29ac19 commit b17bc75
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public void answerManagementCall(ClientDescriptor caller, String managementCallI

@Override
public void onBecomeActive() {
LOGGER.trace("[{}] onBecomeActive()", this.consumerId);
clear();
}

@Override
Expand All @@ -119,13 +121,16 @@ public void onUnfetch(long consumerId, ClientDescriptor clientDescriptor) {
public void onEntityDestroyed(long consumerId) {
if (consumerId == this.consumerId) {
LOGGER.trace("[{}] onEntityDestroyed()", this.consumerId);
manageableClients.clear();
clear();
}
}

@Override
public void onEntityFailover(long consumerId) {
onEntityDestroyed(consumerId);
if (consumerId == this.consumerId) {
LOGGER.trace("[{}] onEntityFailover()", this.consumerId);
clear();
}
}

void fireMessage(Message message) {
Expand Down Expand Up @@ -155,4 +160,8 @@ private void send(ClientDescriptor client, Message message) {
}
}

private void clear() {
manageableClients.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public boolean pushServerEntityNotification(Object managedObjectSource, String t

@Override
public void onBecomeActive() {
LOGGER.trace("[{}] onBecomeActive()", consumerId);
clear();
}

@Override
Expand All @@ -106,14 +108,21 @@ public void onUnfetch(long consumerId, ClientDescriptor clientDescriptor) {
public void onEntityDestroyed(long consumerId) {
if (consumerId == this.consumerId) {
LOGGER.trace("[{}] onEntityDestroyed()", consumerId);
managementProviders.forEach(ManagementProvider::close);
managementProviders.clear();
clear();
}
}

@Override
public void onEntityFailover(long consumerId) {
onEntityDestroyed(consumerId);
if (consumerId == this.consumerId) {
LOGGER.trace("[{}] onEntityFailover()", consumerId);
clear();
}
}

private void clear() {
managementProviders.forEach(ManagementProvider::close);
managementProviders.clear();
previouslyExposed.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public String sendManagementCallRequest(ClientDescriptor caller, final Context c

@Override
public void onBecomeActive() {
LOGGER.trace("[{}] onBecomeActive()", this.consumerId);
clear();
}

@Override
Expand All @@ -140,14 +142,16 @@ public void onUnfetch(long consumerId, ClientDescriptor clientDescriptor) {
public void onEntityDestroyed(long consumerId) {
if (consumerId == this.consumerId) {
LOGGER.trace("[{}] onEntityDestroyed()", this.consumerId);
managementCallRequests.clear();
buffer = null;
clear();
}
}

@Override
public void onEntityFailover(long consumerId) {
onEntityDestroyed(consumerId);
if (consumerId == this.consumerId) {
LOGGER.trace("[{}] onEntityFailover()", this.consumerId);
clear();
}
}

void fireMessage(Message message) {
Expand Down Expand Up @@ -203,4 +207,12 @@ private void track(ClientDescriptor caller, String managementCallIdentifier) {
.add(managementCallIdentifier);
}

private void clear() {
managementCallRequests.clear();
full = null;
if (buffer != null) {
buffer.clear();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static java.util.stream.Stream.concat;

/**
* @author Mathieu Carbou
Expand All @@ -66,8 +61,8 @@ public class MonitoringServiceProvider implements ServiceProvider, Closeable {
private final Map<Long, DefaultManagementService> managementServices = new ConcurrentHashMap<>();
private final Map<Long, DefaultClientMonitoringService> clientMonitoringServices = new ConcurrentHashMap<>();
private final Map<Long, DefaultConsumerManagementRegistry> consumerManagementRegistries = new ConcurrentHashMap<>();
private final Map<Long, DefaultActiveEntityMonitoringService> activeEntityMonitoringService = new ConcurrentHashMap<>();
private final Map<Long, DefaultPassiveEntityMonitoringService> passiveEntityMonitoringService = new ConcurrentHashMap<>();
private final Map<Long, DefaultActiveEntityMonitoringService> activeEntityMonitoringServices = new ConcurrentHashMap<>();
private final Map<Long, DefaultPassiveEntityMonitoringService> passiveEntityMonitoringServices = new ConcurrentHashMap<>();

private final TimeSource timeSource = TimeSource.BEST;
private final DefaultSharedManagementRegistry sharedManagementRegistry = new DefaultSharedManagementRegistry(consumerManagementRegistries);
Expand Down Expand Up @@ -98,32 +93,12 @@ public boolean initialize(ServiceProviderConfiguration configuration, PlatformCo
this.topologyService.addTopologyEventListener(new TopologyEventListenerAdapter() {
@Override
public void onEntityDestroyed(long consumerId) {
LOGGER.trace("[0] onEntityDestroyed({})", consumerId);
LOGGER.trace("[{}] onEntityDestroyed()", consumerId);
topologyService.removeTopologyEventListener(managementServices.remove(consumerId));
topologyService.removeTopologyEventListener(clientMonitoringServices.remove(consumerId));
topologyService.removeTopologyEventListener(consumerManagementRegistries.remove(consumerId));
passiveEntityMonitoringService.remove(consumerId);
activeEntityMonitoringService.remove(consumerId);
}

@Override
public void onEntityFailover(long consumerId) {
onEntityDestroyed(consumerId);
}

@Override
public void onBecomeActive() {
// clear some states that can have been created by placeholders entities with restartability on before they become active
// platform does not send us any event about that so we do not know when these placeholder entities get destroyed
Set<Long> consumerIds = concat(concat(concat(concat(
managementServices.keySet().stream(),
clientMonitoringServices.keySet().stream()),
consumerManagementRegistries.keySet().stream()),
activeEntityMonitoringService.keySet().stream()),
passiveEntityMonitoringService.keySet().stream()
).collect(Collectors.toCollection(TreeSet::new));
LOGGER.trace("[0] onBecomeActive({})", consumerIds);
consumerIds.forEach(this::onEntityDestroyed);
passiveEntityMonitoringServices.remove(consumerId);
activeEntityMonitoringServices.remove(consumerId);
}
});
return true;
Expand All @@ -142,6 +117,7 @@ public <T> T getService(long consumerID, ServiceConfiguration<T> configuration)

// for platform, which requests either a IStripeMonitoring to send platform events or a IStripeMonitoring to send callbacks from passive entities
if (IStripeMonitoring.class == serviceType) {
LOGGER.trace("[{}] getService({})", consumerID, IStripeMonitoring.class.getSimpleName());
if (consumerID == PLATFORM_CONSUMER_ID) {
return serviceType.cast(platformListenerAdapter);
} else {
Expand All @@ -152,25 +128,35 @@ public <T> T getService(long consumerID, ServiceConfiguration<T> configuration)

// get or create a shared registry used to do aggregated operations on all consumer registries (i.e. management calls)
if (SharedManagementRegistry.class == serviceType) {
LOGGER.trace("[{}] getService({})", consumerID, SharedManagementRegistry.class.getSimpleName());
return serviceType.cast(sharedManagementRegistry);
}

// get or creates a registry specific to this entity to handle stats and management calls
if (ConsumerManagementRegistry.class == serviceType) {
if (configuration instanceof ConsumerManagementRegistryConfiguration) {
return serviceType.cast(consumerManagementRegistries.computeIfAbsent(consumerID, cid -> {
ConsumerManagementRegistryConfiguration consumerManagementRegistryConfiguration = (ConsumerManagementRegistryConfiguration) configuration;
StatisticsService statisticsService = statisticsServiceFactory.createStatisticsService(consumerManagementRegistryConfiguration.getStatisticConfiguration());
DefaultConsumerManagementRegistry consumerManagementRegistry = new DefaultConsumerManagementRegistry(
consumerID,
consumerManagementRegistryConfiguration.getEntityMonitoringService(),
statisticsService);
if (consumerManagementRegistryConfiguration.wantsServerManagementProviders()) {
addServerManagementProviders(consumerID, consumerManagementRegistry);
}
topologyService.addTopologyEventListener(consumerManagementRegistry);
return consumerManagementRegistry;
}));
ConsumerManagementRegistryConfiguration consumerManagementRegistryConfiguration = (ConsumerManagementRegistryConfiguration) configuration;
// in a failover, we are not aware of passive entity destruction so if we find a previous service with the same consumer id, we clean it
// this is true for this service specifically
DefaultConsumerManagementRegistry managementRegistry = consumerManagementRegistries.remove(consumerID);
if (managementRegistry != null) {
LOGGER.trace("[{}] getService({}): clearing previous instance", consumerID, ConsumerManagementRegistry.class.getSimpleName());
topologyService.removeTopologyEventListener(managementRegistry);
managementRegistry.onEntityDestroyed(consumerID);
}
// create a new registry
LOGGER.trace("[{}] getService({})", consumerID, ConsumerManagementRegistry.class.getSimpleName());
StatisticsService statisticsService = statisticsServiceFactory.createStatisticsService(consumerManagementRegistryConfiguration.getStatisticConfiguration());
managementRegistry = new DefaultConsumerManagementRegistry(
consumerID,
consumerManagementRegistryConfiguration.getEntityMonitoringService(),
statisticsService);
if (consumerManagementRegistryConfiguration.wantsServerManagementProviders()) {
addServerManagementProviders(consumerID, managementRegistry);
}
topologyService.addTopologyEventListener(managementRegistry);
consumerManagementRegistries.put(consumerID, managementRegistry);
return serviceType.cast(managementRegistry);
} else {
throw new IllegalArgumentException("Missing configuration " + ConsumerManagementRegistryConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName());
}
Expand All @@ -179,19 +165,21 @@ public <T> T getService(long consumerID, ServiceConfiguration<T> configuration)
// get or creates a client-side monitoring service
if (ClientMonitoringService.class == serviceType) {
if (configuration instanceof ClientMonitoringServiceConfiguration) {
if (!topologyService.isCurrentServerActive()) {
throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!");
}
return serviceType.cast(clientMonitoringServices.computeIfAbsent(consumerID, cid -> {
DefaultClientMonitoringService clientMonitoringService = clientMonitoringServices.get(consumerID);
if (clientMonitoringService == null) {
LOGGER.trace("[{}] getService({})", consumerID, ClientMonitoringService.class.getSimpleName());
ClientMonitoringServiceConfiguration clientMonitoringServiceConfiguration = (ClientMonitoringServiceConfiguration) configuration;
DefaultClientMonitoringService clientMonitoringService = new DefaultClientMonitoringService(
clientMonitoringService = new DefaultClientMonitoringService(
consumerID,
topologyService,
firingService,
clientMonitoringServiceConfiguration.getClientCommunicator());
topologyService.addTopologyEventListener(clientMonitoringService);
return clientMonitoringService;
}));
clientMonitoringServices.put(consumerID, clientMonitoringService);
} else {
LOGGER.trace("[{}] getService({}): re-using.", consumerID, ClientMonitoringService.class.getSimpleName());
}
return serviceType.cast(clientMonitoringService);
} else {
throw new IllegalArgumentException("Missing configuration " + ClientMonitoringServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName());
}
Expand All @@ -200,20 +188,22 @@ public <T> T getService(long consumerID, ServiceConfiguration<T> configuration)
// get or creates a monitoring accessor service (for tms)
if (ManagementService.class == serviceType) {
if (configuration instanceof ManagementServiceConfiguration) {
if (!topologyService.isCurrentServerActive()) {
throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!");
}
return serviceType.cast(managementServices.computeIfAbsent(consumerID, cid -> {
DefaultManagementService managementService = managementServices.get(consumerID);
if (managementService == null) {
LOGGER.trace("[{}] getService({})", consumerID, ManagementService.class.getSimpleName());
ManagementServiceConfiguration managementServiceConfiguration = (ManagementServiceConfiguration) configuration;
DefaultManagementService managementService = new DefaultManagementService(
managementService = new DefaultManagementService(
consumerID,
topologyService,
firingService,
managementServiceConfiguration.getClientCommunicator(),
sequenceGenerator);
topologyService.addTopologyEventListener(managementService);
return managementService;
}));
managementServices.put(consumerID, managementService);
} else {
LOGGER.trace("[{}] getService({}): re-using.", consumerID, ManagementService.class.getSimpleName());
}
return serviceType.cast(managementService);
} else {
throw new IllegalArgumentException("Missing configuration " + ManagementServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName());
}
Expand All @@ -222,17 +212,19 @@ public <T> T getService(long consumerID, ServiceConfiguration<T> configuration)
// get or creates a monitoring service for an active entity
if (ActiveEntityMonitoringService.class == serviceType) {
if (configuration instanceof ActiveEntityMonitoringServiceConfiguration) {
if (!topologyService.isCurrentServerActive()) {
throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not active!");
}
return serviceType.cast(activeEntityMonitoringService.computeIfAbsent(consumerID, cid -> {
DefaultActiveEntityMonitoringService activeEntityMonitoringService = this.activeEntityMonitoringServices.get(consumerID);
if (activeEntityMonitoringService == null) {
LOGGER.trace("[{}] getService({})", consumerID, ActiveEntityMonitoringService.class.getSimpleName());
ActiveEntityMonitoringServiceConfiguration activeEntityMonitoringServiceConfiguration = (ActiveEntityMonitoringServiceConfiguration) configuration;
DefaultActiveEntityMonitoringService activeEntityMonitoringService = new DefaultActiveEntityMonitoringService(
activeEntityMonitoringService = new DefaultActiveEntityMonitoringService(
consumerID,
topologyService,
firingService);
return activeEntityMonitoringService;
}));
activeEntityMonitoringServices.put(consumerID, activeEntityMonitoringService);
} else {
LOGGER.trace("[{}] getService({}): re-using.", consumerID, ActiveEntityMonitoringService.class.getSimpleName());
}
return serviceType.cast(activeEntityMonitoringService);
} else {
throw new IllegalArgumentException("Missing configuration " + ActiveEntityMonitoringServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName());
}
Expand All @@ -241,19 +233,21 @@ public <T> T getService(long consumerID, ServiceConfiguration<T> configuration)
// get or creates a monitoring service for a passive entity, bridging calls to IMonitoringProducer
if (PassiveEntityMonitoringService.class == serviceType) {
if (configuration instanceof PassiveEntityMonitoringServiceConfiguration) {
if (topologyService.isCurrentServerActive()) {
throw new IllegalStateException("Server " + platformConfiguration.getServerName() + " is not passive!");
}
return serviceType.cast(passiveEntityMonitoringService.computeIfAbsent(consumerID, cid -> {
DefaultPassiveEntityMonitoringService passiveEntityMonitoringService = passiveEntityMonitoringServices.get(consumerID);
if (passiveEntityMonitoringService == null) {
LOGGER.trace("[{}] getService({})", consumerID, PassiveEntityMonitoringService.class.getSimpleName());
PassiveEntityMonitoringServiceConfiguration passiveEntityMonitoringServiceConfiguration = (PassiveEntityMonitoringServiceConfiguration) configuration;
IMonitoringProducer monitoringProducer = passiveEntityMonitoringServiceConfiguration.getMonitoringProducer();
if (monitoringProducer == null) {
LOGGER.warn("Platform service " + IMonitoringProducer.class.getSimpleName() + " is not accessible.");
return null;
}
DefaultPassiveEntityMonitoringService passiveEntityMonitoringService = new DefaultPassiveEntityMonitoringService(consumerID, monitoringProducer);
return passiveEntityMonitoringService;
}));
passiveEntityMonitoringService = new DefaultPassiveEntityMonitoringService(consumerID, monitoringProducer);
passiveEntityMonitoringServices.put(consumerID, passiveEntityMonitoringService);
} else {
LOGGER.trace("[{}] getService({}): re-using.", consumerID, PassiveEntityMonitoringService.class.getSimpleName());
}
return serviceType.cast(passiveEntityMonitoringService);
} else {
throw new IllegalArgumentException("Missing configuration " + PassiveEntityMonitoringServiceConfiguration.class.getSimpleName() + " when requesting service " + serviceType.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,27 @@ class TopologyService implements PlatformListener {
public synchronized void serverDidBecomeActive(PlatformServer self) {
LOGGER.trace("[0] serverDidBecomeActive({})", self.getServerName());

serverDidJoinStripe(self);
Server server = Server.create(self.getServerName())
.setBindAddress(self.getBindAddress())
.setBindPort(self.getBindPort())
.setBuildId(self.getBuild())
.setGroupPort(self.getGroupPort())
.setHostName(self.getHostName())
.setStartTime(self.getStartTime())
.setHostAddress(self.getHostAddress())
.setVersion(self.getVersion())
.computeUpTime();

stripe.addServer(server);

currentActive = stripe.getServerByName(self.getServerName()).get();
currentActive.setState(Server.State.ACTIVE);
currentActive.setActivateTime(timeSource.getTimestamp());

topologyEventListeners.forEach(TopologyEventListener::onBecomeActive);

firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_JOINED.name()));

long time = timeSource.getTimestamp();
serverStateChanged(self, new ServerState("ACTIVE", time, time));
}

@Override
Expand Down
Loading

0 comments on commit b17bc75

Please sign in to comment.