Skip to content

Commit

Permalink
Issue #76 Passive support for clustered map
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Jul 26, 2016
1 parent 0bf87e7 commit 5607c6d
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 179 deletions.
3 changes: 0 additions & 3 deletions concurrent-map-entity/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,14 @@
<dependency>
<groupId>org.terracotta</groupId>
<artifactId>passthrough-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.terracotta.connection.Connection;
import org.terracotta.connection.ConnectionException;
import org.terracotta.connection.ConnectionFactory;
import org.terracotta.connection.entity.EntityRef;
import org.terracotta.entity.map.TerracottaClusteredMapClientService;
import org.terracotta.entity.map.common.ConcurrentClusteredMap;
import org.terracotta.entity.map.server.TerracottaClusteredMapService;
import org.terracotta.exception.EntityAlreadyExistsException;
import org.terracotta.exception.EntityNotFoundException;
import org.terracotta.exception.EntityNotProvidedException;
import org.terracotta.exception.EntityVersionMismatchException;
import org.terracotta.passthrough.PassthroughClusterControl;
import org.terracotta.passthrough.PassthroughServer;
import org.terracotta.passthrough.PassthroughTestHelpers;

import java.net.URI;
import java.util.Properties;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;

/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class ClusteredMapActivePassiveTestPT {

private PassthroughClusterControl clusterControl;
private Connection connection;

@Before
public void setUp() throws Exception {
String stripeName = "stripe";
this.clusterControl = PassthroughTestHelpers.createActivePassive(stripeName,
new PassthroughTestHelpers.ServerInitializer() {
@Override
public void registerServicesForServer(PassthroughServer server) {
server.registerClientEntityService(new TerracottaClusteredMapClientService());
server.registerServerEntityService(new TerracottaClusteredMapService());
}
}
);
this.connection = connect(stripeName);
}

public static Connection connect(String stripeName) throws ConnectionException {
URI uri = URI.create("passthrough://" + stripeName);
return ConnectionFactory.connect(uri, new Properties());
}

@After
public void tearDown() throws Exception {
this.connection.close();
clusterControl.tearDown();
}

private <K, V> ConcurrentClusteredMap<K, V> createClusteredMap(String name, Class<K> keyClass, Class<V> valueClass) throws EntityNotProvidedException, EntityAlreadyExistsException, EntityVersionMismatchException, EntityNotFoundException {
EntityRef<ConcurrentClusteredMap, Object> entityRef = connection.getEntityRef(ConcurrentClusteredMap.class, ConcurrentClusteredMap.VERSION, name);
entityRef.create(null);
ConcurrentClusteredMap<K, V> clusteredMap = entityRef.fetchEntity();
clusteredMap.setTypes(keyClass, valueClass);
return clusteredMap;
}

@Test
public void testFailover() throws Exception {
clusterControl.waitForActive();
clusterControl.waitForRunningPassivesInStandby();
ConcurrentClusteredMap<Long, String> map = createClusteredMap("foo", Long.class, String.class);
map.put(1L, "one");
assertThat(map.get(1L), is("one"));
clusterControl.terminateActive();
clusterControl.waitForActive();
assertThat(map.get(1L), is("one"));
}

@Test
public void testPassiveSync() throws Exception {
clusterControl.waitForActive();
clusterControl.terminateOnePassive();
ConcurrentClusteredMap<Long, String> map = createClusteredMap("foo", Long.class, String.class);
for (long i = 0; i < 10; i++) {
map.put(i, "value"+i);
}
clusterControl.startOneServer();
clusterControl.waitForRunningPassivesInStandby();
clusterControl.terminateActive();
clusterControl.waitForActive();
for (long i = 0; i < 10; i++) {
assertThat(map.get(i), is("value"+i));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.terracotta.passthrough.PassthroughConnectionService
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright Terracotta, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.terracotta.entity.map.server;

import org.terracotta.entity.BasicServiceConfiguration;
import org.terracotta.entity.CommonServerEntity;
import org.terracotta.entity.ServiceRegistry;
import org.terracotta.entity.map.common.BooleanResponse;
import org.terracotta.entity.map.common.ConditionalRemoveOperation;
import org.terracotta.entity.map.common.ConditionalReplaceOperation;
import org.terracotta.entity.map.common.ContainsKeyOperation;
import org.terracotta.entity.map.common.ContainsValueOperation;
import org.terracotta.entity.map.common.EntrySetResponse;
import org.terracotta.entity.map.common.GetOperation;
import org.terracotta.entity.map.common.KeySetResponse;
import org.terracotta.entity.map.common.MapOperation;
import org.terracotta.entity.map.common.MapResponse;
import org.terracotta.entity.map.common.MapValueResponse;
import org.terracotta.entity.map.common.NullResponse;
import org.terracotta.entity.map.common.PutAllOperation;
import org.terracotta.entity.map.common.PutIfAbsentOperation;
import org.terracotta.entity.map.common.PutIfPresentOperation;
import org.terracotta.entity.map.common.PutOperation;
import org.terracotta.entity.map.common.RemoveOperation;
import org.terracotta.entity.map.common.SizeResponse;
import org.terracotta.entity.map.common.ValueCollectionResponse;
import org.terracotta.service.reference.holder.ReferenceHolderService;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class AbstractClusteredMap implements CommonServerEntity<MapOperation, MapResponse> {

private static final String DATA_MAP_REF = "dataMap";

protected final ReferenceHolderService referenceHolderService;

protected volatile ConcurrentMap<Object, Object> dataMap;

public AbstractClusteredMap(ServiceRegistry services) {
this.referenceHolderService = services.getService(new BasicServiceConfiguration<ReferenceHolderService>(ReferenceHolderService.class));
}

@Override
public void createNew() {
this.dataMap = referenceHolderService.storeReference(DATA_MAP_REF, new ConcurrentHashMap());
}

@Override
public void loadExisting() {
this.dataMap = referenceHolderService.retrieveReference(DATA_MAP_REF, ConcurrentMap.class);
}

@Override
public void destroy() {
dataMap.clear();
}

MapResponse invokeInternal(MapOperation input) {
MapResponse response;

switch (input.operationType()) {
case PUT: {
PutOperation putOperation = (PutOperation) input;
Object key = putOperation.getKey();
Object old = dataMap.get(key);
dataMap.put(key, putOperation.getValue());
response = new MapValueResponse(old);
break;
}
case GET: {
Object key = ((GetOperation) input).getKey();
response = new MapValueResponse(dataMap.get(key));
break;
}
case REMOVE: {
Object key = ((RemoveOperation) input).getKey();
response = new MapValueResponse(dataMap.remove(key));
break;
}
case CONTAINS_KEY: {
Object key = ((ContainsKeyOperation) input).getKey();
response = new BooleanResponse(dataMap.containsKey(key));
break;
}
case CONTAINS_VALUE: {
Object value = ((ContainsValueOperation) input).getValue();
response = new BooleanResponse(dataMap.containsValue(value));
break;
}
case CLEAR: {
dataMap.clear();
// There is no response from the clear.
response = new NullResponse();
break;
}
case PUT_ALL: {
@SuppressWarnings("unchecked")
Map<Object, Object> newValues = (Map<Object, Object>) ((PutAllOperation)input).getMap();
dataMap.putAll(newValues);
// There is no response from a put all.
response = new NullResponse();
break;
}
case KEY_SET: {
Set<Object> keySet = new HashSet<Object>();
keySet.addAll(dataMap.keySet());
response = new KeySetResponse(keySet);
break;
}
case VALUES: {
Collection<Object> values = new ArrayList<Object>();
values.addAll(dataMap.values());
response = new ValueCollectionResponse(values);
break;
}
case ENTRY_SET: {
Set<Map.Entry<Object, Object>> entrySet = new HashSet<Map.Entry<Object, Object>>();
for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {
entrySet.add(new AbstractMap.SimpleEntry<Object, Object > (entry.getKey(), entry.getValue()));
}
response = new EntrySetResponse(entrySet);
break;
}
case SIZE: {
response = new SizeResponse(dataMap.size());
break;
}
case PUT_IF_ABSENT: {
PutIfAbsentOperation operation = (PutIfAbsentOperation) input;
response = new MapValueResponse(dataMap.putIfAbsent(operation.getKey(), operation.getValue()));
break;
}
case PUT_IF_PRESENT: {
PutIfPresentOperation operation = (PutIfPresentOperation) input;
response = new MapValueResponse(dataMap.replace(operation.getKey(), operation.getValue()));
break;
}
case CONDITIONAL_REMOVE: {
ConditionalRemoveOperation operation = (ConditionalRemoveOperation) input;
response = new BooleanResponse(dataMap.remove(operation.getKey(), operation.getValue()));
break;
}
case CONDITIONAL_REPLACE: {
ConditionalReplaceOperation operation = (ConditionalReplaceOperation) input;
response = new BooleanResponse(dataMap.replace(operation.getKey(), operation.getOldValue(), operation.getNewValue()));
break;
}
default:
// Unknown message type.
throw new AssertionError("Unsupported message type: " + input.operationType());
}
return response;
}

}
Loading

0 comments on commit 5607c6d

Please sign in to comment.