diff --git a/pom.xml b/pom.xml index 7249b7d..47e213b 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ io.reactiverse - consul-cluster-manager + consul-clust 1.2.1 Vert.x Consul Cluster Manager https://github.com/reactiverse/consul-cluster-manager diff --git a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java index e7ab4d1..6b75ff6 100644 --- a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java +++ b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulAsyncMultiMap.java @@ -22,7 +22,6 @@ import io.vertx.core.Promise; import io.vertx.core.impl.TaskQueue; import io.vertx.core.impl.VertxInternal; -import io.vertx.core.json.Json; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.spi.cluster.AsyncMultiMap; @@ -35,8 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -70,28 +67,12 @@ public class ConsulAsyncMultiMap extends ConsulMap implements AsyncM private final TaskQueue taskQueue = new TaskQueue(); private final KeyValueOptions kvOpts; - private final boolean preferConsistency; - /* - * Implementation of local IN-MEMORY multimap cache which is essentially concurrent hash map under the hood. - * Cache is enabled ONLY when {@code preferConsistency} is set to false i.e. availability (better latency) is preferred. - * If cache is enabled: - * Cache read operations happen synchronously by simply reading from {@link java.util.concurrent.ConcurrentHashMap}. - * Cache WRITE operations happen either: - * - through consul watch that monitors the consul kv store for updates (see https://www.consul.io/docs/agent/watches.html). - * - when consul agent acknowledges the success of write operation (local node's data gets immediately cached without even waiting for a watch to take place.) - * Note: local cache updates still might kick in through consul watch in case update succeeded in consul agent but wasn't yet acknowledged back to node. Eventually last write wins. - */ - private ConcurrentMap> cache; - public ConsulAsyncMultiMap(String name, boolean preferConsistency, ClusterManagerInternalContext appContext) { + + public ConsulAsyncMultiMap(String name, ClusterManagerInternalContext appContext) { super(name, appContext); - this.preferConsistency = preferConsistency; // options to make entries of this map ephemeral. this.kvOpts = new KeyValueOptions().setAcquireSession(appContext.getEphemeralSessionId()); - if (!preferConsistency) { // if cp is disabled then disable caching. - cache = new ConcurrentHashMap<>(); - startListening(); - } } private static String getRidOfNodeId(String consulKeyPath) { @@ -112,7 +93,7 @@ public void remove(K k, V v, Handler> completionHandler) { .compose(aVoid -> getAll(keyPathForAllByAddress(k))) .compose(consulEntries -> { List futures = new ArrayList<>(); - consulEntries.forEach(consulEntry -> futures.add(delete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId()))); + consulEntries.forEach(consulEntry -> futures.add(doDelete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId()))); return CompositeFuture.all(futures).map(compositeFuture -> { for (int i = 0; i < compositeFuture.size(); i++) { boolean resAt = compositeFuture.resultAt(i); @@ -137,7 +118,7 @@ public void removeAllMatching(Predicate p, Handler> complet consulEntries.forEach(consulEntry -> consulEntry.getValue().forEach(v -> { if (p.test(v)) { - futures.add(delete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId())); + futures.add(doDelete(consulEntry.getKey(), v, toChoosableSet(consulEntry.getValue()), consulEntry.getNodeId())); } })); return CompositeFuture.all(futures).compose(compositeFuture -> Future.succeededFuture()); @@ -160,63 +141,37 @@ public void get(K k, Handler>> resultHandler) { * @param entries holds entries to which the new entries will be added, these entries have to be queried first. * @return {@link Future} */ - private Future doAdd(K k, V v, Set entries) { - return preferConsistency ? nonCacheableAdd(k, entries, v) : cacheableAdd(k, entries, v); - } - - private Future cacheableAdd(K k, Set entries, V sub) { - return nonCacheableAdd(k, entries, sub) - .compose(aVoid -> { - addEntryToCache(k, sub); - return succeededFuture(); - }); - - } - - private Future nonCacheableAdd(K k, Set subs, V sub) { - Set newOne = new HashSet<>(subs); - newOne.add(sub); + Future doAdd(K k, V v, Set entries) { + Set newOne = new HashSet<>(entries); + newOne.add(v); return addToConsulKv(k, newOne, appContext.getNodeId()) - .compose(aBoolean -> aBoolean ? succeededFuture() : failedFuture(sub.toString() + ": wasn't added to: " + name)); + .compose(aBoolean -> aBoolean ? succeededFuture() : failedFuture(v.toString() + ": wasn't added to: " + name)); } - private Future addToConsulKv(K key, Set vs, String nodeId) { - return asFutureString(key, vs, nodeId) - .compose(encodedValue -> putPlainValue(keyPathForAllByAddressAndByNodeId(key, nodeId), encodedValue, kvOpts)); - } - - /* - * We are wrapping async call into sync and execute it on the taskQueue. This way we maintain the order - * in which "get" tasks are executed. - * If we simply implement this method as : return preferConsistency ? nonCacheableGet(key) : cacheableGet(key); - * then {@link ClusteredEventBusTest.sendNoContext} will fail due to the fact async calls to get subs by key are unordered. - * TODO: Is there any way in vert.x ecosystem to execute tasks on the event loop by not giving up an order ? + /** + * Gets an entry from consul kv store + * @param key represents key of the entry (i.e. event bus address). + * @return set of subscribers. */ - private Future> doGet(K key) { + + Future> doGet(K key) { + /* + * We are wrapping async call into sync and execute it on the taskQueue. This way we maintain the order + * in which "get" tasks are executed. + * If we simply implement this method as : return preferConsistency ? nonCacheableGet(key) : cacheableGet(key); + * then {@link ClusteredEventBusTest.sendNoContext} will fail due to the fact async calls to get subs by key are unordered. + * TODO: Is there any way in vert.x ecosystem to execute tasks on the event loop by not giving up an order ? + */ Promise> out = Promise.promise(); VertxInternal vertxInternal = (VertxInternal) appContext.getVertx(); vertxInternal.getOrCreateContext().>executeBlocking(event -> { - Future> future = preferConsistency - ? nonCacheableGet(key) : cacheableGet(key); + Future> future = getAllByKey(keyPathForAllByAddress(key)).compose(vs -> succeededFuture(toChoosableSet(vs))); ChoosableSet choosableSet = completeAndGet(future, 5000); event.complete(choosableSet); }, taskQueue, res -> out.complete(res.result())); return out.future(); } - private Future> cacheableGet(K key) { - if (cache.containsKey(key)) return succeededFuture(cache.get(key)); - else return nonCacheableGet(key) - .compose(vs -> { - addEntriesToCache(key, vs); - return succeededFuture(vs); - }); - } - - private Future> nonCacheableGet(K key) { - return getAllByKey(keyPathForAllByAddress(key)).compose(vs -> succeededFuture(toChoosableSet(vs))); - } - /** * Deletes then entry from consul kv store. * @@ -226,21 +181,7 @@ private Future> nonCacheableGet(K key) { * @param nodeId represents node id that the entry belongs to. * @return {@link Future} */ - private Future delete(K key, V value, ChoosableSet from, String nodeId) { - return preferConsistency ? nonCacheableDelete(key, value, from, nodeId) : cacheableDelete(key, value, from, nodeId); - } - - private Future cacheableDelete(K key, V value, ChoosableSet from, String nodeId) { - return nonCacheableDelete(key, value, from, nodeId) - .compose(aBoolean -> { - if (aBoolean) { - removeEntryFromCache(key, value); - } - return succeededFuture(aBoolean); - }); - } - - private Future nonCacheableDelete(K key, V value, ChoosableSet from, String nodeId) { + Future doDelete(K key, V value, ChoosableSet from, String nodeId) { if (from.remove(value)) { if (from.isEmpty()) return deleteValueByKeyPath(keyPathForAllByAddressAndByNodeId(key, nodeId)); else return addToConsulKv(key, toHashSet(from), nodeId); @@ -249,6 +190,11 @@ private Future nonCacheableDelete(K key, V value, ChoosableSet from, } } + private Future addToConsulKv(K key, Set vs, String nodeId) { + return asFutureString(key, vs, nodeId) + .compose(encodedValue -> putPlainValue(keyPathForAllByAddressAndByNodeId(key, nodeId), encodedValue, kvOpts)); + } + /** * Returns a consul key path used to fetch all entries (all subscribers of all event buses that are registered). */ @@ -322,53 +268,4 @@ private Set toHashSet(ChoosableSet set) { set.forEach(hashSet::add); return hashSet; } - - private void addEntryToCache(K key, V value) { - ChoosableSet choosableSet = cache.get(key); - if (choosableSet == null) choosableSet = new ChoosableSet<>(1); - choosableSet.add(value); - cache.put(key, choosableSet); - if (log.isTraceEnabled()) { - log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after put of " + key + " -> " + value + ": " + Json.encode(cache)); - } - } - - private void removeEntryFromCache(K key, V value) { - ChoosableSet choosableSet = cache.get(key); - if (choosableSet == null) return; - choosableSet.remove(value); - if (choosableSet.isEmpty()) cache.remove(key); - else cache.put(key, choosableSet); - if (log.isTraceEnabled()) { - log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after remove of " + key + " -> " + value + ": " + Json.encode(cache)); - } - } - - private void addEntriesToCache(K key, ChoosableSet values) { - cache.put(key, values); - } - - @Override - protected synchronized void entryUpdated(EntryEvent event) { - if (log.isTraceEnabled()) { - log.trace("[" + appContext.getNodeId() + "]" + " Entry: " + event.getEntry().getKey() + " is for " + event.getEventType()); - } - ConsulEntry> entry; - try { - entry = asConsulEntry(event.getEntry().getValue()); - } catch (Exception e) { - log.warn("Failed to decode: " + event.getEntry().getKey() + " -> " + event.getEntry().getValue(), e); - return; - } - switch (event.getEventType()) { - case WRITE: - entry.getValue().forEach(v -> addEntryToCache(entry.getKey(), v)); - break; - case REMOVE: - entry.getValue().forEach(v -> removeEntryFromCache(entry.getKey(), v)); - break; - default: - break; - } - } } diff --git a/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCacheableAsyncMultiMap.java b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCacheableAsyncMultiMap.java new file mode 100644 index 0000000..b0678aa --- /dev/null +++ b/src/main/java/io/vertx/spi/cluster/consul/impl/ConsulCacheableAsyncMultiMap.java @@ -0,0 +1,136 @@ +/* + * Copyright (C) 2019 Roman Levytskyi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.vertx.spi.cluster.consul.impl; + +import io.vertx.core.Future; +import io.vertx.core.json.Json; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; + +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static io.vertx.core.Future.succeededFuture; +import static io.vertx.spi.cluster.consul.impl.ConversationUtils.asConsulEntry; + +/** + * {@link ConsulAsyncMultiMap} implementation with caching capabilities. + *

+ * Concurrent hash map is used to implement the local IN-MEMORY multimap cache. + * Cache is enabled ONLY when {@code preferConsistency} is set to false i.e. availability (better latency) is preferred. + * If cache is enabled: + * Cache read operations happen synchronously by simply reading from {@link java.util.concurrent.ConcurrentHashMap}. + * Cache WRITE operations happen either: + * - through consul watch that monitors the consul kv store for updates (see https://www.consul.io/docs/agent/watches.html). + * - when consul agent acknowledges the success of write operation (local node's data gets immediately cached without even waiting for a watch to take place.) + * Note: local cache updates still might kick in through consul watch in case update succeeded in consul agent but wasn't yet acknowledged back to node. Eventually last write wins. + * + * @author Roman Levytskyi + */ +public class ConsulCacheableAsyncMultiMap extends ConsulAsyncMultiMap { + + private final static Logger log = LoggerFactory.getLogger(ConsulCacheableAsyncMultiMap.class); + + private ConcurrentMap> cache; + + public ConsulCacheableAsyncMultiMap(String name, ClusterManagerInternalContext appContext) { + super(name, appContext); + cache = new ConcurrentHashMap<>(); + startListening(); + } + + @Override + Future doAdd(K k, V v, Set entries) { + return super.doAdd(k, v, entries).compose(aVoid -> { + addEntryToCache(k, v); + return succeededFuture(); + }); + } + + @Override + Future> doGet(K key) { + ChoosableSet cachedEntries = cache.get(key); + if (Objects.nonNull(cachedEntries)) { + return succeededFuture(cachedEntries); + } else { + return super.doGet(key).compose(vs -> { + addEntriesToCache(key, vs); + return succeededFuture(vs); + }); + } + } + + @Override + Future doDelete(K key, V value, ChoosableSet from, String nodeId) { + return super.doDelete(key, value, from, nodeId).compose(isDeleted -> { + if (isDeleted) { + removeEntryFromCache(key, value); + } + return succeededFuture(isDeleted); + }); + } + + private void addEntryToCache(K key, V value) { + ChoosableSet choosableSet = cache.get(key); + if (choosableSet == null) choosableSet = new ChoosableSet<>(1); + choosableSet.add(value); + cache.put(key, choosableSet); + if (log.isTraceEnabled()) { + log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after put of " + key + " -> " + value + ": " + Json.encode(cache)); + } + } + + private void removeEntryFromCache(K key, V value) { + ChoosableSet choosableSet = cache.get(key); + if (choosableSet == null) return; + choosableSet.remove(value); + if (choosableSet.isEmpty()) cache.remove(key); + else cache.put(key, choosableSet); + if (log.isTraceEnabled()) { + log.trace("[" + appContext.getNodeId() + "]" + " Cache: " + name + " after remove of " + key + " -> " + value + ": " + Json.encode(cache)); + } + } + + private void addEntriesToCache(K key, ChoosableSet values) { + cache.put(key, values); + } + + @Override + protected synchronized void entryUpdated(EntryEvent event) { + if (log.isTraceEnabled()) { + log.trace("[" + appContext.getNodeId() + "]" + " Entry: " + event.getEntry().getKey() + " is for " + event.getEventType()); + } + ConsulEntry> entry; + try { + entry = asConsulEntry(event.getEntry().getValue()); + } catch (Exception e) { + log.warn("Failed to decode: " + event.getEntry().getKey() + " -> " + event.getEntry().getValue(), e); + return; + } + switch (event.getEventType()) { + case WRITE: + entry.getValue().forEach(v -> addEntryToCache(entry.getKey(), v)); + break; + case REMOVE: + entry.getValue().forEach(v -> removeEntryFromCache(entry.getKey(), v)); + break; + default: + break; + } + } +}