Skip to content

Commit

Permalink
Merge pull request #248 from mathieucarbou/failover-test
Browse files Browse the repository at this point in the history
#191: Some failover tests now pass + Version upgrade to latest tc-core
  • Loading branch information
anthonydahanne authored Dec 21, 2016
2 parents 148bb26 + e042afc commit c911838
Show file tree
Hide file tree
Showing 10 changed files with 602 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public synchronized void serverDidBecomeActive(PlatformServer self) {

firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_JOINED.name()));

long time = timeSource.getTimestamp();
serverStateChanged(self, new ServerState("ACTIVE", time, time));
// we assume server.getStartTime() == activate time, but this will be fixed after into serverStateChanged() call by platform
serverStateChanged(self, new ServerState("ACTIVE", server.getStartTime(), server.getStartTime()));
}

@Override
Expand Down Expand Up @@ -320,14 +320,26 @@ public synchronized void serverStateChanged(PlatformServer sender, ServerState s
Server server = stripe.getServerByName(sender.getServerName())
.<IllegalStateException>orElseThrow(() -> newIllegalTopologyState("Missing server: " + sender.getServerName()));

Server.State oldState = server.getState();

if (oldState == Server.State.ACTIVE && currentActive != null && currentActive.getServerName().equals(server.getServerName())) {
// in case of a failover, the server state changed is replayed. So the server is active but will become passive and will become active again
// we filter this out
return;
}

server.setState(Server.State.parse(serverState.getState()));
server.setActivateTime(serverState.getActivate());

Map<String, String> attrs = new HashMap<>();
attrs.put("state", serverState.getState());
attrs.put("activateTime", serverState.getActivate() > 0 ? String.valueOf(serverState.getActivate()) : "0");
if (oldState != server.getState()) {
// avoid sending another event to report the same state as before, to avoid duplicates

firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_STATE_CHANGED.name(), attrs));
Map<String, String> attrs = new HashMap<>();
attrs.put("state", serverState.getState());
attrs.put("activateTime", serverState.getActivate() > 0 ? String.valueOf(serverState.getActivate()) : "0");

firingService.fireNotification(new ContextualNotification(server.getContext(), SERVER_STATE_CHANGED.name(), attrs));
}
}

synchronized void setEntityManagementRegistry(long consumerId, String serverName, ManagementRegistry newRegistry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ public void test_client_identifiers() throws Exception {

private void assertTopologyEquals(String file) throws Exception {
Cluster cluster = managementService.readTopology();
cluster.serverStream().forEach(server -> server.setUpTimeSec(0));
cluster.serverStream().forEach(server -> {
server.setUpTimeSec(0);
});
assertEquals(new String(Files.readAllBytes(new File("src/test/resources/" + file).toPath()), "UTF-8"), mapper.writeValueAsString(cluster.toMap()));
}

Expand Down
4 changes: 2 additions & 2 deletions management/testing/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
</dependency>
<!-- testing libs -->
<dependency>
<groupId>org.terracotta</groupId>
<groupId>org.terracotta.internal</groupId>
<artifactId>galvan-support</artifactId>
<version>${galvan.version}</version>
<version>${terracotta-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ protected JsonNode readJson(String file) {
}
}

protected JsonNode readJsonStr(String json) {
try {
return mapper.readTree(json);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected JsonNode toJson(Object o) {
try {
return mapper.readTree(mapper.writeValueAsString(o));
Expand Down Expand Up @@ -217,7 +225,8 @@ protected String removeRandomValues(String currentTopo) {
.replaceAll("\"clientId\":\"[0-9]+@[^:]*:([^:]*):[^\"]*\"", "\"clientId\":\"[email protected]:$1:<uuid>\"")
.replaceAll("\"logicalConnectionUid\":\"[^\"]*\"", "\"logicalConnectionUid\":\"<uuid>\"")
.replaceAll("\"id\":\"[^\"]*\",\"logicalConnectionUid\":\"[^\"]*\"", "\"id\":\"<uuid>:SINGLE:testServer0:127.0.0.1:0\",\"logicalConnectionUid\":\"<uuid>\"")
.replaceAll("\"vmId\":\"[^\"]*\"", "\"vmId\":\"[email protected]\"");
.replaceAll("\"vmId\":\"[^\"]*\"", "\"vmId\":\"[email protected]\"")
.replaceAll("testServer1", "testServer0");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.terracotta.management.model.capabilities.descriptors.Settings;
import org.terracotta.management.model.cluster.Cluster;
import org.terracotta.management.model.cluster.Server;
import org.terracotta.management.model.cluster.ServerEntity;
import org.terracotta.management.model.message.Message;
import org.terracotta.management.model.notification.ContextualNotification;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

Expand All @@ -56,68 +55,53 @@ public void setUp() throws Exception {
// clear buffer
tmsAgentService.readMessages();

// add some data from client 0
put(0, "clients", "client1", "Mathieu");

// kill active - passive should take the active role
voltron.getClusterControl().terminateActive();
voltron.getClusterControl().waitForActive();
}

@Test
@Ignore
public void topology_recovery_after_failover() throws Exception {
@Ignore("See https://github.com/Terracotta-OSS/terracotta-core/issues/412")
public void all_registries_reexposed_after_failover() throws Exception {
Cluster cluster = tmsAgentService.readTopology();

// verify new server
Server newActive = cluster.serverStream().filter(Server::isActive).findFirst().get();
assertThat(newActive.getState(), equalTo(Server.State.ACTIVE));
assertThat(newActive.getServerName(), equalTo(oldPassive.getServerName()));

// removes all random values
String currentTopo = toJson(cluster.toMap()).toString();
String actual = removeRandomValues(currentTopo);

String expected = readJson("topology.json").toString();
assertEquals(expected, actual);
System.out.println(actual);

assertEquals(readJson("topology-after-failover.json"), readJsonStr(actual));
}

@Test
@Ignore
public void notifications_after_failover() throws Exception {
// read messages
List<Message> messages = tmsAgentService.readMessages();

messages.forEach(System.out::println);

assertThat(messages.size(), equalTo(3));

List<ContextualNotification> notifs = messages.stream()
.filter(message -> message.getType().equals("NOTIFICATION"))
.flatMap(message -> message.unwrap(ContextualNotification.class).stream())
.filter(notif -> notif.getType().equals("SERVER_STATE_CHANGED"))
.collect(Collectors.toList());

assertThat(
notifs.stream().map(notif -> notif.getContext().get(Server.NAME_KEY)).collect(Collectors.toList()),
equalTo(Arrays.asList(oldPassive.getServerName(), oldPassive.getServerName())));

assertThat(
notifs.stream().map(notif -> notif.getAttributes().get("state")).collect(Collectors.toList()),
equalTo(Arrays.asList("ACTIVE", "ACTIVE")));

//TODO: complete with Galvan
//- test topology (like topology_includes_passives), client should have re-exposed their management metadata
//- check notifications: server states
//- check notification that might be there: CLIENT_RECONNECTED and SERVER_ENTITY_FAILOVER_COMPLETE
notifs.stream().map(ContextualNotification::getType).collect(Collectors.toList()),
hasItems(
"SERVER_JOINED", "SERVER_STATE_CHANGED",
"SERVER_ENTITY_CREATED", "SERVER_ENTITY_CREATED", "SERVER_ENTITY_CREATED", "SERVER_ENTITY_CREATED",
"ENTITY_REGISTRY_AVAILABLE", "ENTITY_REGISTRY_AVAILABLE", "ENTITY_REGISTRY_AVAILABLE",
"CLIENT_CONNECTED", "CLIENT_CONNECTED", "CLIENT_CONNECTED",
"SERVER_ENTITY_FETCHED", "SERVER_ENTITY_FETCHED", "SERVER_ENTITY_FETCHED", "SERVER_ENTITY_FETCHED"));

assertThat(notifs.get(1).getContext().get(Server.NAME_KEY), equalTo(oldPassive.getServerName()));
assertThat(notifs.get(1).getAttributes().get("state"), equalTo("ACTIVE"));
}

@Test
@Ignore
public void puts_can_be_seen_on_other_clients_after_failover() throws Exception {
put(0, "clients", "client1", "Mathieu");

// kill active - passive should take the active role
voltron.getClusterControl().terminateActive();
voltron.getClusterControl().waitForActive();

assertThat(get(1, "clients", "client1"), equalTo("Mathieu"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,15 @@ public void get_notifications_when_passive_joins() throws Exception {
.flatMap(message -> message.unwrap(ContextualNotification.class).stream())
.map(ContextualNotification::getType)
.collect(Collectors.toList()),
equalTo(Arrays.asList("SYNC_END", "SYNC_END")));
hasItem("SYNC_END"));

assertThat(messages.stream()
.filter(message -> message.getType().equals("NOTIFICATION"))
.flatMap(message -> message.unwrap(ContextualNotification.class).stream())
.filter(contextualNotification -> contextualNotification.getType().endsWith("SYNC_END"))
.map(contextualNotification -> contextualNotification.getContext().get(Server.NAME_KEY))
.collect(Collectors.toList()),
equalTo(Arrays.asList(passive.getServerName(), passive.getServerName())));
.collect(Collectors.toSet()),
equalTo(new HashSet<>(Arrays.asList(passive.getServerName(), passive.getServerName()))));
}

}
Loading

0 comments on commit c911838

Please sign in to comment.