diff --git a/runtime/engine/pom.xml b/runtime/engine/pom.xml
index 0c55bb67f6..219c430e54 100644
--- a/runtime/engine/pom.xml
+++ b/runtime/engine/pom.xml
@@ -26,7 +26,7 @@
11
11
- 0.77
+ 0.76
5
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java
index e3365a15ba..0de5794dc1 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java
@@ -71,11 +71,11 @@
import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineManager;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker;
-import io.aklivity.zilla.runtime.engine.internal.registry.FileWatcherTask;
-import io.aklivity.zilla.runtime.engine.internal.registry.HttpWatcherTask;
-import io.aklivity.zilla.runtime.engine.internal.registry.ResourceWatcher;
-import io.aklivity.zilla.runtime.engine.internal.registry.WatcherTask;
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigFileWatcherTask;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigHttpWatcherTask;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigWatcher;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.MetricGroup;
import io.aklivity.zilla.runtime.engine.model.Model;
@@ -93,15 +93,15 @@ public final class Engine implements Collector, AutoCloseable
private final AtomicInteger nextTaskId;
private final ThreadFactory factory;
- private final WatcherTask watcherTask;
+ private final ConfigWatcher configWatcherTask;
private final URL configURL;
private final List workers;
private final boolean readonly;
private final EngineConfiguration config;
private final EngineManager manager;
- private final ResourceWatcher resourceWatcher;
+ private final ResourceWatchManager resourceWatchManager;
- private Future watcherTaskRef;
+ private Future configWatcherTaskRef;
Engine(
EngineConfiguration config,
@@ -162,14 +162,14 @@ public final class Engine implements Collector, AutoCloseable
}
this.tuning = tuning;
- this.resourceWatcher = new ResourceWatcher();
+ this.resourceWatchManager = new ResourceWatchManager();
List workers = new ArrayList<>(workerCount);
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
{
EngineWorker worker =
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters,
guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader,
- eventFormatterFactory, resourceWatcher, workerIndex, readonly, this::process);
+ eventFormatterFactory, resourceWatchManager, workerIndex, readonly, this::process);
workers.add(worker);
}
this.workers = workers;
@@ -210,20 +210,20 @@ public final class Engine implements Collector, AutoCloseable
config,
extensions,
this::readURL,
- resourceWatcher);
+ resourceWatchManager);
this.configURL = config.configURL();
String protocol = configURL.getProtocol();
if ("file".equals(protocol) || "jar".equals(protocol))
{
Function watcherReadURL = l -> readURL(configURL, l);
- this.watcherTask = new FileWatcherTask(manager::reconfigure, null, watcherReadURL);
- this.resourceWatcher.initialize(manager::reloadNamespacesWithChangedResources, watcherReadURL);
+ this.configWatcherTask = new ConfigFileWatcherTask(manager::reconfigure, watcherReadURL);
+ this.resourceWatchManager.initialize(manager::reloadNamespacesWithChangedResources, watcherReadURL);
}
else if ("http".equals(protocol) || "https".equals(protocol))
{
- this.watcherTask = new HttpWatcherTask(manager::reconfigure, manager::reloadNamespacesWithChangedResources,
- config.configPollIntervalSeconds());
+ this.configWatcherTask = new ConfigHttpWatcherTask(manager::reconfigure, config.configPollIntervalSeconds());
+ // TODO: Ati - implement http
}
else
{
@@ -261,11 +261,11 @@ public void start() throws Exception
worker.doStart();
}
- watcherTaskRef = watcherTask.submit();
+ configWatcherTaskRef = configWatcherTask.submit();
if (!readonly)
{
// ignore the config file in read-only mode; no config will be read so no namespaces, bindings, etc will be attached
- watcherTask.watchConfig(configURL).get();
+ configWatcherTask.watchConfig(configURL).get();
}
}
@@ -279,9 +279,9 @@ public void close() throws Exception
final List errors = new ArrayList<>();
- resourceWatcher.close();
- watcherTask.close();
- watcherTaskRef.get();
+ resourceWatchManager.close();
+ configWatcherTask.close();
+ configWatcherTaskRef.get();
for (EngineWorker worker : workers)
{
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java
index 9bda92919b..5459f142f4 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java
@@ -63,6 +63,7 @@
import io.aklivity.zilla.runtime.engine.guard.Guard;
import io.aklivity.zilla.runtime.engine.internal.Tuning;
import io.aklivity.zilla.runtime.engine.internal.config.NamespaceAdapter;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.namespace.NamespacedId;
import io.aklivity.zilla.runtime.engine.resolver.Resolver;
@@ -84,7 +85,7 @@ public class EngineManager
private final List extensions;
private final BiFunction readURL;
private final Resolver expressions;
- private final ResourceWatcher resourceWatcher;
+ private final ResourceWatchManager resourceWatchManager;
private EngineConfig current;
@@ -102,7 +103,7 @@ public EngineManager(
EngineConfiguration config,
List extensions,
BiFunction readURL,
- ResourceWatcher resourceWatcher)
+ ResourceWatchManager resourceWatchManager)
{
this.schemaTypes = schemaTypes;
this.bindingByType = bindingByType;
@@ -118,7 +119,7 @@ public EngineManager(
this.extensions = extensions;
this.readURL = readURL;
this.expressions = Resolver.instantiate(config);
- this.resourceWatcher = resourceWatcher;
+ this.resourceWatchManager = resourceWatchManager;
}
public EngineConfig reconfigure(
@@ -423,7 +424,7 @@ private void unregister(
{
System.out.println("unregister: " + namespace.name); // TODO: Ati
unregister(namespace);
- resourceWatcher.removeNamespace(namespace.name);
+ resourceWatchManager.removeNamespace(namespace.name);
}
}
}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java
index fd6216ea2d..036dce2778 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java
@@ -28,6 +28,7 @@
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
import io.aklivity.zilla.runtime.engine.exporter.ExporterContext;
import io.aklivity.zilla.runtime.engine.guard.GuardContext;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricContext;
@@ -51,7 +52,7 @@ public class EngineRegistry
private final LongConsumer detachBinding;
private final Collector collector;
private final Consumer process;
- private final ResourceWatcher resourceWatcher;
+ private final ResourceWatchManager resourceWatchManager;
public EngineRegistry(
Function bindingsByType,
@@ -67,7 +68,7 @@ public EngineRegistry(
LongConsumer detachBinding,
Collector collector,
Consumer process,
- ResourceWatcher resourceWatcher)
+ ResourceWatchManager resourceWatchManager)
{
this.bindingsByType = bindingsByType;
this.guardsByType = guardsByType;
@@ -83,7 +84,7 @@ public EngineRegistry(
this.detachBinding = detachBinding;
this.collector = collector;
this.process = process;
- this.resourceWatcher = resourceWatcher;
+ this.resourceWatchManager = resourceWatchManager;
}
public void process(
@@ -199,7 +200,7 @@ private void attachNamespace(
NamespaceRegistry registry =
new NamespaceRegistry(namespace, bindingsByType, guardsByType, vaultsByType, catalogsByType,
metricsByName, exportersByType, supplyLabelId, this::resolveMetric, exporterAttached, exporterDetached,
- supplyMetricRecorder, detachBinding, collector, resourceWatcher);
+ supplyMetricRecorder, detachBinding, collector, resourceWatchManager);
namespacesById.put(registry.namespaceId(), registry);
registry.attach();
}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java
index d082bd4dd8..f1acac9221 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java
@@ -137,6 +137,7 @@
import io.aklivity.zilla.runtime.engine.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.engine.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.engine.internal.types.stream.WindowFW;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricContext;
@@ -253,7 +254,7 @@ public EngineWorker(
Collector collector,
Supplier supplyEventReader,
EventFormatterFactory eventFormatterFactory,
- ResourceWatcher resourceWatcher,
+ ResourceWatchManager resourceWatchManager,
int index,
boolean readonly,
Consumer process)
@@ -428,7 +429,7 @@ public EngineWorker(
this.registry = new EngineRegistry(
bindingsByType::get, guardsByType::get, vaultsByType::get, catalogsByType::get, metricsByName::get,
exportersByType::get, labels::supplyLabelId, this::onExporterAttached, this::onExporterDetached,
- this::supplyMetricWriter, this::detachStreams, collector, process, resourceWatcher);
+ this::supplyMetricWriter, this::detachStreams, collector, process, resourceWatchManager);
this.taskQueue = new ConcurrentLinkedDeque<>();
this.correlations = new Long2ObjectHashMap<>();
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java
index 5937785181..2d90e82f2d 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java
@@ -44,6 +44,7 @@
import io.aklivity.zilla.runtime.engine.exporter.ExporterContext;
import io.aklivity.zilla.runtime.engine.exporter.ExporterHandler;
import io.aklivity.zilla.runtime.engine.guard.GuardContext;
+import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricContext;
@@ -74,7 +75,7 @@ public class NamespaceRegistry
private final ObjectLongLongFunction supplyMetricRecorder;
private final LongConsumer detachBinding;
private final Collector collector;
- private final ResourceWatcher resourceWatcher;
+ private final ResourceWatchManager resourceWatchManager;
public NamespaceRegistry(
NamespaceConfig namespace,
@@ -91,7 +92,7 @@ public NamespaceRegistry(
ObjectLongLongFunction supplyMetricRecorder,
LongConsumer detachBinding,
Collector collector,
- ResourceWatcher resourceWatcher)
+ ResourceWatchManager resourceWatchManager)
{
this.namespace = namespace;
this.bindingsByType = bindingsByType;
@@ -114,7 +115,7 @@ public NamespaceRegistry(
this.metricsById = new Int2ObjectHashMap<>();
this.exportersById = new Int2ObjectHashMap<>();
this.collector = collector;
- this.resourceWatcher = resourceWatcher;
+ this.resourceWatchManager = resourceWatchManager;
}
public int namespaceId()
@@ -268,7 +269,7 @@ private void attachVault(
VaultRegistry registry = new VaultRegistry(config, context);
vaultsById.put(vaultId, registry);
registry.attach();
- resourceWatcher.addResources(registry.handler().resources(), config.namespace);
+ resourceWatchManager.addResources(registry.handler().resources(), config.namespace);
}
private void detachVault(
@@ -315,7 +316,7 @@ private void attachCatalog(
CatalogRegistry registry = new CatalogRegistry(config, context);
catalogsById.put(catalogId, registry);
registry.attach();
- resourceWatcher.addResources(registry.handler().resources(), config.namespace);
+ resourceWatchManager.addResources(registry.handler().resources(), config.namespace);
}
private void detachCatalog(
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/FileWatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigFileWatcherTask.java
similarity index 59%
rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/FileWatcherTask.java
rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigFileWatcherTask.java
index 9e7a5e8382..7fda5d39da 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/FileWatcherTask.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigFileWatcherTask.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.aklivity.zilla.runtime.engine.internal.registry;
+package io.aklivity.zilla.runtime.engine.internal.watcher;
import static org.agrona.LangUtil.rethrowUnchecked;
@@ -25,29 +25,27 @@
import java.nio.file.WatchService;
import java.util.IdentityHashMap;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.Function;
import io.aklivity.zilla.runtime.engine.config.EngineConfig;
-public class FileWatcherTask extends WatcherTask
+public class ConfigFileWatcherTask extends WatcherTask implements ConfigWatcher
{
- private final Map watchedConfigs;
+ private final Map watchedItems;
private final WatchService watchService;
+ private final BiFunction configChangeListener;
private final Function readURL;
- public FileWatcherTask(
+ public ConfigFileWatcherTask(
BiFunction configChangeListener,
- Consumer> resourceChangeListener,
Function readURL)
{
- super(configChangeListener, resourceChangeListener);
+ this.configChangeListener = configChangeListener;
this.readURL = readURL;
- this.watchedConfigs = new IdentityHashMap<>();
+ this.watchedItems = new IdentityHashMap<>();
WatchService watchService = null;
try
@@ -78,27 +76,23 @@ public Void call()
{
final WatchKey key = watchService.take();
- WatchedConfig watchedConfig = watchedConfigs.get(key);
+ WatchedItem watchedItem = watchedItems.get(key);
- if (watchedConfig != null && watchedConfig.isWatchedKey(key))
+ if (watchedItem != null && watchedItem.isWatchedKey(key))
{
// Even if no reconfigure needed, recalculation is necessary, since symlinks might have changed.
- watchedConfig.keys().forEach(watchedConfigs::remove);
- watchedConfig.unregister();
- watchedConfig.register();
- watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig));
- String newConfigText = readURL.apply(watchedConfig.getURL().toString());
- byte[] newConfigHash = computeHash(newConfigText);
- if (watchedConfig.isReconfigureNeeded(newConfigHash))
+ watchedItem.keys().forEach(watchedItems::remove);
+ watchedItem.unregister();
+ watchedItem.register();
+ watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem));
+ String newText = readURL.apply(watchedItem.getURL().toString());
+ byte[] newHash = computeHash(newText);
+ if (watchedItem.isReconfigureNeeded(newHash))
{
- watchedConfig.setConfigHash(newConfigHash);
+ watchedItem.setHash(newHash);
if (configChangeListener != null)
{
- configChangeListener.apply(watchedConfig.getURL(), newConfigText);
- }
- if (resourceChangeListener != null)
- {
- resourceChangeListener.accept(namespaces);
+ configChangeListener.apply(watchedItem.getURL(), newText);
}
}
}
@@ -116,11 +110,11 @@ public Void call()
public CompletableFuture watchConfig(
URL configURL)
{
- WatchedConfig watchedConfig = new WatchedConfig(configURL, watchService);
- watchedConfig.register();
- watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig));
+ WatchedItem watchedItem = new WatchedItem(configURL, watchService);
+ watchedItem.register();
+ watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem));
String configText = readURL.apply(configURL.toString());
- watchedConfig.setConfigHash(computeHash(configText));
+ watchedItem.setHash(computeHash(configText));
CompletableFuture configFuture;
try
@@ -136,17 +130,6 @@ public CompletableFuture watchConfig(
return configFuture;
}
- @Override
- public void watchResource(
- URL resourceURL)
- {
- WatchedConfig watchedConfig = new WatchedConfig(resourceURL, watchService);
- watchedConfig.register();
- watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig));
- String resource = readURL.apply(resourceURL.toString());
- watchedConfig.setConfigHash(computeHash(resource));
- }
-
@Override
public void close() throws IOException
{
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/HttpWatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigHttpWatcherTask.java
similarity index 94%
rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/HttpWatcherTask.java
rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigHttpWatcherTask.java
index 2ccadb83da..b616436981 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/HttpWatcherTask.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigHttpWatcherTask.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.aklivity.zilla.runtime.engine.internal.registry;
+package io.aklivity.zilla.runtime.engine.internal.watcher;
import static java.net.http.HttpClient.Redirect.NORMAL;
import static java.net.http.HttpClient.Version.HTTP_2;
@@ -29,7 +29,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,26 +36,25 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import io.aklivity.zilla.runtime.engine.config.EngineConfig;
-public class HttpWatcherTask extends WatcherTask
+public class ConfigHttpWatcherTask extends WatcherTask implements ConfigWatcher
{
private static final URI CLOSE_REQUESTED = URI.create("http://localhost:12345");
+ private final BiFunction configChangeListener;
private final Map etags;
private final Map configHashes;
private final Map> futures;
private final BlockingQueue configQueue;
private final int pollSeconds;
- public HttpWatcherTask(
+ public ConfigHttpWatcherTask(
BiFunction configChangeListener,
- Consumer> resourceChangeListener,
int pollSeconds)
{
- super(configChangeListener, resourceChangeListener);
+ this.configChangeListener = configChangeListener;
this.etags = new ConcurrentHashMap<>();
this.configHashes = new ConcurrentHashMap<>();
this.futures = new ConcurrentHashMap<>();
@@ -106,13 +104,6 @@ public CompletableFuture watchConfig(
return configFuture;
}
- @Override
- public void watchResource(
- URL resourceURL)
- {
- // TODO: Ati
- }
-
@Override
public void close()
{
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java
new file mode 100644
index 0000000000..1f15cae833
--- /dev/null
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc.
+ *
+ * Aklivity licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.aklivity.zilla.runtime.engine.internal.watcher;
+
+import java.io.Closeable;
+import java.net.URL;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+import io.aklivity.zilla.runtime.engine.config.EngineConfig;
+
+public interface ConfigWatcher extends Closeable
+{
+ Future submit();
+
+ CompletableFuture watchConfig(URL configURL);
+}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java
new file mode 100644
index 0000000000..f2c8910517
--- /dev/null
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc.
+ *
+ * Aklivity licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.aklivity.zilla.runtime.engine.internal.watcher;
+
+import static org.agrona.LangUtil.rethrowUnchecked;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.ClosedWatchServiceException;
+import java.nio.file.FileSystems;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public class ResourceFileWatcherTask extends WatcherTask implements ResourceWatcher
+{
+ private final Map watchedItems;
+ private final WatchService watchService;
+ private final Consumer> resourceChangeListener;
+ private final Function readURL;
+ private final Set namespaces;
+
+ public ResourceFileWatcherTask(
+ Consumer> resourceChangeListener,
+ Function readURL)
+ {
+ this.resourceChangeListener = resourceChangeListener;
+ this.readURL = readURL;
+ this.watchedItems = new IdentityHashMap<>();
+ this.namespaces = new HashSet<>();
+ WatchService watchService = null;
+
+ try
+ {
+ watchService = FileSystems.getDefault().newWatchService();
+ }
+ catch (IOException ex)
+ {
+ rethrowUnchecked(ex);
+ }
+
+ this.watchService = watchService;
+
+ }
+
+ @Override
+ public Future submit()
+ {
+ return executor.submit(this);
+ }
+
+ @Override
+ public Void call()
+ {
+ while (true)
+ {
+ try
+ {
+ final WatchKey key = watchService.take();
+
+ WatchedItem watchedItem = watchedItems.get(key);
+
+ if (watchedItem != null && watchedItem.isWatchedKey(key))
+ {
+ // Even if no reconfigure needed, recalculation is necessary, since symlinks might have changed.
+ watchedItem.keys().forEach(watchedItems::remove);
+ watchedItem.unregister();
+ watchedItem.register();
+ watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem));
+ String newText = readURL.apply(watchedItem.getURL().toString());
+ byte[] newHash = computeHash(newText);
+ if (watchedItem.isReconfigureNeeded(newHash))
+ {
+ watchedItem.setHash(newHash);
+ if (resourceChangeListener != null)
+ {
+ resourceChangeListener.accept(namespaces);
+ }
+ }
+ }
+ }
+ catch (InterruptedException | ClosedWatchServiceException ex)
+ {
+ break;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void watchResource(
+ URL resourceURL)
+ {
+ WatchedItem watchedItem = new WatchedItem(resourceURL, watchService);
+ watchedItem.register();
+ watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem));
+ String resource = readURL.apply(resourceURL.toString());
+ watchedItem.setHash(computeHash(resource));
+ }
+
+ @Override
+ public void addNamespace(
+ String namespace)
+ {
+ namespaces.add(namespace);
+ }
+
+ @Override
+ public void removeNamespace(
+ String namespace)
+ {
+ namespaces.remove(namespace);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ watchService.close();
+ }
+}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/ResourceWatcher.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatchManager.java
similarity index 90%
rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/ResourceWatcher.java
rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatchManager.java
index d709bd2394..7b7f1b2a80 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/ResourceWatcher.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatchManager.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.aklivity.zilla.runtime.engine.internal.registry;
+package io.aklivity.zilla.runtime.engine.internal.watcher;
import static org.agrona.LangUtil.rethrowUnchecked;
@@ -26,15 +26,15 @@
import java.util.function.Consumer;
import java.util.function.Function;
-public class ResourceWatcher
+public class ResourceWatchManager
{
private final Map> resources;
- private final Map resourceTasks;
+ private final Map resourceTasks;
private Consumer> resourceChangeListener;
private Function readURL;
- public ResourceWatcher()
+ public ResourceWatchManager()
{
this.resources = new ConcurrentHashMap<>();
this.resourceTasks = new ConcurrentHashMap<>();
@@ -60,7 +60,7 @@ public void addResources(
return ConcurrentHashMap.newKeySet();
}
).add(namespace);
- resourceTasks.get(resource).addNamespaces(namespace);
+ resourceTasks.get(resource).addNamespace(namespace);
}
);
}
@@ -93,8 +93,8 @@ private void startWatchingResource(
{
try
{
- FileWatcherTask watcherTask = new FileWatcherTask(null, resourceChangeListener, readURL);
- watcherTask.addNamespaces(namespace);
+ ResourceWatcher watcherTask = new ResourceFileWatcherTask(resourceChangeListener, readURL);
+ watcherTask.addNamespace(namespace);
watcherTask.submit();
URL resourceURL = Path.of(resource).toUri().toURL();
watcherTask.watchResource(resourceURL);
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java
new file mode 100644
index 0000000000..20b0dcc9d1
--- /dev/null
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc.
+ *
+ * Aklivity licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.aklivity.zilla.runtime.engine.internal.watcher;
+
+import java.io.Closeable;
+import java.net.URL;
+import java.util.concurrent.Future;
+
+public interface ResourceWatcher extends Closeable
+{
+ Future submit();
+
+ void watchResource(URL configURL);
+
+ void addNamespace(String namespace);
+
+ void removeNamespace(String namespace);
+}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatchedConfig.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatchedItem.java
similarity index 81%
rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatchedConfig.java
rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatchedItem.java
index a6ee866978..5bbacf7ef3 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatchedConfig.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatchedItem.java
@@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.aklivity.zilla.runtime.engine.internal.registry;
+package io.aklivity.zilla.runtime.engine.internal.watcher;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
@@ -33,20 +33,20 @@
import java.util.LinkedList;
import java.util.Set;
-public class WatchedConfig
+public class WatchedItem
{
private final WatchService watchService;
private final Set watchKeys;
- private final URL configURL;
- private byte[] configHash;
+ private final URL watchedURL;
+ private byte[] hash;
- public WatchedConfig(
- URL configURL,
+ public WatchedItem(
+ URL watchedURL,
WatchService watchService)
{
this.watchService = watchService;
this.watchKeys = new HashSet<>();
- this.configURL = configURL;
+ this.watchedURL = watchedURL;
}
public Set keys()
@@ -56,7 +56,7 @@ public Set keys()
public void register()
{
- Path configPath = Paths.get(configURL.getPath()).toAbsolutePath();
+ Path configPath = Paths.get(watchedURL.getPath()).toAbsolutePath();
try
{
Set watchedPaths = new HashSet<>();
@@ -123,27 +123,27 @@ public boolean isWatchedKey(
public boolean isReconfigureNeeded(
byte[] newConfigHash)
{
- return !Arrays.equals(configHash, newConfigHash);
+ return !Arrays.equals(hash, newConfigHash);
}
- public void setConfigHash(
- byte[] newConfigHash)
+ public void setHash(
+ byte[] newHash)
{
- configHash = newConfigHash;
+ hash = newHash;
}
public URL getURL()
{
- return configURL;
+ return watchedURL;
}
private WatchKey registerPath(
- Path configPath)
+ Path path)
{
WatchKey key = null;
try
{
- key = configPath.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE);
+ key = path.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE);
}
catch (IOException ex)
{
@@ -151,19 +151,4 @@ private WatchKey registerPath(
}
return key;
}
-
- private Path toRealPath(
- Path configPath)
- {
- try
- {
- configPath = configPath.toRealPath();
- }
- catch (IOException ex)
- {
- rethrowUnchecked(ex);
- }
- return configPath;
- }
-
}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatcherTask.java
similarity index 51%
rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatcherTask.java
rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatcherTask.java
index 173f7a1d0e..77c532efd7 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatcherTask.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatcherTask.java
@@ -13,71 +13,33 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
-package io.aklivity.zilla.runtime.engine.internal.registry;
+package io.aklivity.zilla.runtime.engine.internal.watcher;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.agrona.LangUtil.rethrowUnchecked;
-import java.io.Closeable;
-import java.net.URL;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import io.aklivity.zilla.runtime.engine.config.EngineConfig;
-
-public abstract class WatcherTask implements Callable, Closeable
+public abstract class WatcherTask implements Callable
{
private final MessageDigest md5;
protected final ScheduledExecutorService executor;
- protected final BiFunction configChangeListener;
- protected final Consumer> resourceChangeListener;
- protected final Set namespaces;
- protected WatcherTask(
- BiFunction configChangeListener,
- Consumer> resourceChangeListener)
+ protected WatcherTask()
{
- this.configChangeListener = configChangeListener;
- this.resourceChangeListener = resourceChangeListener;
this.md5 = initMessageDigest("MD5");
this.executor = Executors.newScheduledThreadPool(2);
- this.namespaces = new HashSet<>();
- }
-
- public void addNamespaces(
- String namespace)
- {
- namespaces.add(namespace);
- }
-
- public void removeNamespace(
- String namespace)
- {
- namespaces.remove(namespace);
}
- public abstract Future submit();
-
- public abstract CompletableFuture watchConfig(
- URL configURL);
-
- public abstract void watchResource(
- URL resourceURL);
-
protected byte[] computeHash(
- String configText)
+ String text)
{
- return md5.digest(configText.getBytes(UTF_8));
+ return md5.digest(text.getBytes(UTF_8));
}
private MessageDigest initMessageDigest(