Skip to content

Commit

Permalink
feat: add support for Hazelcast ReplicatedMap
Browse files Browse the repository at this point in the history
  • Loading branch information
halber committed Oct 9, 2024
1 parent 1567b39 commit 22c7070
Show file tree
Hide file tree
Showing 19 changed files with 931 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setTrackingDataHandlingStrategy((String) member.getValue());
}
break;
case "useReplicatedMaps":
if (member.getValue() instanceof Boolean) {
obj.setUseReplicatedMaps((Boolean) member.getValue());
}
break;
case "verticleDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setVerticleDeploymentTimeout(((Number) member.getValue()).intValue());
Expand Down Expand Up @@ -155,6 +160,7 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
if (obj.getTrackingDataHandlingStrategy() != null) {
json.put("trackingDataHandlingStrategy", obj.getTrackingDataHandlingStrategy());
}
json.put("useReplicatedMaps", obj.isUseReplicatedMaps());
if (obj.getVerticleDeploymentTimeout() != null) {
json.put("verticleDeploymentTimeout", obj.getVerticleDeploymentTimeout());
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/neonbee/NeonBee.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.neonbee.internal.Registry;
import io.neonbee.internal.ReplyInboundInterceptor;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.neonbee.internal.WriteSafeRegistry;
import io.neonbee.internal.buffer.ImmutableBuffer;
import io.neonbee.internal.cluster.ClusterHelper;
Expand All @@ -75,6 +76,7 @@
import io.neonbee.internal.codec.ImmutableJsonObjectMessageCodec;
import io.neonbee.internal.deploy.Deployable;
import io.neonbee.internal.deploy.Deployables;
import io.neonbee.internal.hazelcast.ReplicatedClusterEntityRegistry;
import io.neonbee.internal.helper.ConfigHelper;
import io.neonbee.internal.helper.FileSystemHelper;
import io.neonbee.internal.job.RedeployEntitiesJob;
Expand Down Expand Up @@ -449,7 +451,8 @@ private Future<Void> registerHooks() {
*/
@VisibleForTesting
Future<Void> initializeSharedMaps() {
SharedDataAccessor sharedData = new SharedDataAccessor(vertx, NeonBee.class);
SharedDataAccessor sharedData = new SharedDataAccessorFactory(this)
.getSharedDataAccessor(NeonBee.class);
sharedLocalMap = sharedData.getLocalMap(SHARED_MAP_NAME);
return sharedData.<String, Object>getAsyncMap(SHARED_MAP_NAME).onSuccess(asyncMap -> sharedAsyncMap = asyncMap)
.mapEmpty();
Expand Down Expand Up @@ -677,7 +680,12 @@ private Future<Void> deployModules() {
this.healthRegistry = new HealthCheckRegistry(vertx);
this.modelManager = new EntityModelManager(this);
if (vertx.isClustered()) {
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
if (config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(vertx).isPresent()) {
this.entityRegistry =
new ReplicatedClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
} else {
this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME);
}
} else {
this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME);
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/neonbee/cache/CachingDataVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.neonbee.data.DataRequest;
import io.neonbee.data.DataVerticle;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -147,7 +148,8 @@ public void init(Vertx vertx, Context context) {

// we will only need to retrieve locks if we coalesce requests
if (coalescingTimeout > 0) {
sharedDataAccessor = new SharedDataAccessor(vertx, getClass());
sharedDataAccessor = new SharedDataAccessorFactory(vertx)
.getSharedDataAccessor(getClass());
}
}

Expand Down
38 changes: 37 additions & 1 deletion src/main/java/io/neonbee/config/NeonBeeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

/**
* In contrast to the {@link NeonBeeOptions} the {@link NeonBeeConfig} is persistent configuration in a file.
*
* <p>
* Whilst the {@link NeonBeeOptions} contain information which is to specify when NeonBee starts, such as the port of
* the server to start on and the cluster to connect to, which potentially could be different across cluster nodes, the
* {@link NeonBeeConfig} contains information which is mostly shared across different cluster nodes or you would like to
Expand Down Expand Up @@ -100,6 +100,8 @@ public class NeonBeeConfig {

private int jsonMaxStringSize;

private boolean useReplicatedMaps;

/**
* Are the metrics enabled?
*
Expand Down Expand Up @@ -535,4 +537,38 @@ public NeonBeeConfig setJsonMaxStringSize(int jsonMaxStringSize) {
public int getJsonMaxStringSize() {
return jsonMaxStringSize;
}

/**
* Set the value to enable, disable replicated maps.
* <p>
* <b>Currently this feature is only supported for a Hazelcast-Cluster</b>
* <p>
* Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a
* cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you
* don't need to partition the data.
* <p>
*
* @return true if the replicated maps should be used, false otherwise
*/
public boolean isUseReplicatedMaps() {
return useReplicatedMaps;
}

/**
* Set the value to enable, disable replicated maps.
* <p>
* <b>Currently this feature is only supported for a Hazelcast-Cluster</b>
* <p>
* Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a
* cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you
* don't need to partition the data.
* <p>
*
* @param useReplicatedMaps true if the replicated maps should be enabled, false otherwise
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
public NeonBeeConfig setUseReplicatedMaps(boolean useReplicatedMaps) {
this.useReplicatedMaps = useReplicatedMaps;
return this;
}
}
52 changes: 30 additions & 22 deletions src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.neonbee.endpoint.odatav4.internal.olingo.OlingoEndpointHandler;
import io.neonbee.entity.EntityModel;
import io.neonbee.internal.RegexBlockList;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -201,27 +201,35 @@ public Future<Router> createEndpointRouter(Vertx vertx, String basePath, JsonObj
// when NeonBee is started and / or in case the endpoint is not used.
Route initialRoute = router.route();
initialRoute.handler(
routingContext -> new SharedDataAccessor(vertx, ODataV4Endpoint.class).getLocalLock(asyncLock ->
// immediately initialize the router, this will also "arm" the event bus listener
(!initialized.getAndSet(true)
? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models)
: succeededFuture()).onComplete(handler -> {
// wait for the refresh to finish (the result doesn't matter), remove the initial route, as
// this will redirect all requests to the registered service endpoint handlers (if non have
// been registered, e.g. due to a failure in model loading, it'll result in an 404). Could
// have been removed already by refreshRouter, we don't care!
initialRoute.remove();
if (asyncLock.succeeded()) {
// releasing the lock will cause other requests unblock and not call the initial route
asyncLock.result().release();
}

// let the router again handle the context again, now with either all service endpoints
// registered, or none in case there have been a failure while loading the models.
// NOTE: Re-route is the only elegant way I found to restart the current router to take
// the new routes! Might consider checking again with the Vert.x 4.0 release.
routingContext.reroute(routingContext.request().uri());
})));
routingContext -> new SharedDataAccessorFactory(vertx)
.getSharedDataAccessor(ODataV4Endpoint.class)
.getLocalLock(asyncLock ->
// immediately initialize the router, this will also "arm" the event bus listener
(!initialized.getAndSet(true)
? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models)
: succeededFuture()).onComplete(handler -> {
// wait for the refresh to finish (the result doesn't matter), remove the initial
// route, as
// this will redirect all requests to the registered service endpoint handlers (if
// non have
// been registered, e.g. due to a failure in model loading, it'll result in an 404).
// Could
// have been removed already by refreshRouter, we don't care!
initialRoute.remove();
if (asyncLock.succeeded()) {
// releasing the lock will cause other requests unblock and not call the initial
// route
asyncLock.result().release();
}

// let the router again handle the context again, now with either all service
// endpoints
// registered, or none in case there have been a failure while loading the models.
// NOTE: Re-route is the only elegant way I found to restart the current router to
// take
// the new routes! Might consider checking again with the Vert.x 4.0 release.
routingContext.reroute(routingContext.request().uri());
})));

return succeededFuture(router);
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/io/neonbee/entity/EntityModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.google.common.base.Functions;

import io.neonbee.NeonBee;
import io.neonbee.internal.SharedDataAccessor;
import io.neonbee.internal.SharedDataAccessorFactory;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.eventbus.DeliveryOptions;
Expand Down Expand Up @@ -147,7 +147,10 @@ public Future<Map<String, EntityModel>> getSharedModels() {
}

// if not try to reload the models and return the loaded data model
return new SharedDataAccessor(neonBee.getVertx(), EntityModelManager.class).getLocalLock()

return new SharedDataAccessorFactory(neonBee)
.getSharedDataAccessor(EntityModelManager.class)
.getLocalLock()
.transform(asyncLocalLock -> {
Map<String, EntityModel> retryModels = getBufferedModels();
if (retryModels != null) {
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/io/neonbee/internal/SharedDataAccessorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.neonbee.internal;

import io.neonbee.NeonBee;
import io.neonbee.config.NeonBeeConfig;
import io.neonbee.internal.cluster.ClusterHelper;
import io.neonbee.internal.hazelcast.ReplicatedDataAccessor;
import io.vertx.core.Vertx;

/**
* Factory to create a {@link SharedDataAccessor} based on the configuration.
*/
public class SharedDataAccessorFactory {
private final NeonBee neonBee;

/**
* Create a new instance of {@link SharedDataAccessorFactory}.
*/
public SharedDataAccessorFactory() {
this.neonBee = NeonBee.get();
}

/**
* Create a new instance of {@link SharedDataAccessorFactory}.
*
* @param vertx the Vert.x instance
*/
public SharedDataAccessorFactory(Vertx vertx) {
this.neonBee = NeonBee.get(vertx);
}

/**
* Create a new instance of {@link SharedDataAccessorFactory}.
*
* @param neonBee the NeonBee instance
*/
public SharedDataAccessorFactory(NeonBee neonBee) {
this.neonBee = neonBee;
}

private boolean useHazelcastReplicatedMaps() {
NeonBeeConfig config = neonBee.getConfig();
return config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(neonBee.getVertx()).isPresent();
}

/**
* Get a {@link SharedDataAccessor} based on the configuration.
*
* @param accessClass the class to access the shared data
* @return the shared data accessor
*/
public SharedDataAccessor getSharedDataAccessor(Class<?> accessClass) {
if (useHazelcastReplicatedMaps()) {
return new ReplicatedDataAccessor(neonBee.getVertx(), accessClass);
} else {
return new SharedDataAccessor(neonBee.getVertx(), accessClass);
}
}
}
14 changes: 12 additions & 2 deletions src/main/java/io/neonbee/internal/WriteSafeRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@ public class WriteSafeRegistry<T> implements Registry<T> {

private final String registryName;

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedDataAccessor the shared data accessor
*/
protected WriteSafeRegistry(String registryName, SharedDataAccessor sharedDataAccessor) {
this.registryName = registryName;
this.sharedDataAccessor = sharedDataAccessor;
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
*/
public WriteSafeRegistry(Vertx vertx, String registryName) {
this.registryName = registryName;
this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass());
this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,32 @@ public class ClusterEntityRegistry implements Registry<String> {

private final WriteSafeRegistry<Object> entityRegistry;

/**
* Create a new instance of {@link ClusterEntityRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param entityRegistry the entity registry
* @param clusteringInformation the clustering information registry
*/
protected ClusterEntityRegistry(
Vertx vertx,
WriteSafeRegistry<Object> entityRegistry,
WriteSafeRegistry<JsonObject> clusteringInformation) {
this.entityRegistry = entityRegistry;
this.clusteringInformation = clusteringInformation;
this.vertx = vertx;
}

/**
* Create a new instance of {@link ClusterEntityRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
*/
public ClusterEntityRegistry(Vertx vertx, String registryName) {
this.entityRegistry = new WriteSafeRegistry<>(vertx, registryName);
this.clusteringInformation = new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation");
this.vertx = vertx;
this(vertx,
new WriteSafeRegistry<>(vertx, registryName),
new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation"));
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 22c7070

Please sign in to comment.