From c02e5c4df80d9739ceb0519986942e02422f8783 Mon Sep 17 00:00:00 2001 From: Attila Kreiner Date: Wed, 5 Jun 2024 17:33:11 +0200 Subject: [PATCH] WIP refactoring watch resources --- runtime/engine/pom.xml | 2 +- .../aklivity/zilla/runtime/engine/Engine.java | 38 ++--- .../internal/registry/EngineManager.java | 9 +- .../internal/registry/EngineRegistry.java | 9 +- .../internal/registry/EngineWorker.java | 5 +- .../internal/registry/NamespaceRegistry.java | 11 +- .../ConfigFileWatcherTask.java} | 61 +++----- .../ConfigHttpWatcherTask.java} | 19 +-- .../internal/watcher/ConfigWatcher.java | 30 ++++ .../watcher/ResourceFileWatcherTask.java | 140 ++++++++++++++++++ .../ResourceWatchManager.java} | 14 +- .../internal/watcher/ResourceWatcher.java | 31 ++++ .../WatchedItem.java} | 45 ++---- .../{registry => watcher}/WatcherTask.java | 48 +----- 14 files changed, 294 insertions(+), 168 deletions(-) rename runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/{registry/FileWatcherTask.java => watcher/ConfigFileWatcherTask.java} (59%) rename runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/{registry/HttpWatcherTask.java => watcher/ConfigHttpWatcherTask.java} (94%) create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java rename runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/{registry/ResourceWatcher.java => watcher/ResourceWatchManager.java} (90%) create mode 100644 runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java rename runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/{registry/WatchedConfig.java => watcher/WatchedItem.java} (81%) rename runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/{registry => watcher}/WatcherTask.java (51%) diff --git a/runtime/engine/pom.xml b/runtime/engine/pom.xml index 0c55bb67f6..219c430e54 100644 --- a/runtime/engine/pom.xml +++ b/runtime/engine/pom.xml @@ -26,7 +26,7 @@ 11 11 - 0.77 + 0.76 5 diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java index e3365a15ba..0de5794dc1 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/Engine.java @@ -71,11 +71,11 @@ import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout; import io.aklivity.zilla.runtime.engine.internal.registry.EngineManager; import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker; -import io.aklivity.zilla.runtime.engine.internal.registry.FileWatcherTask; -import io.aklivity.zilla.runtime.engine.internal.registry.HttpWatcherTask; -import io.aklivity.zilla.runtime.engine.internal.registry.ResourceWatcher; -import io.aklivity.zilla.runtime.engine.internal.registry.WatcherTask; import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW; +import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigFileWatcherTask; +import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigHttpWatcherTask; +import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigWatcher; +import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager; import io.aklivity.zilla.runtime.engine.metrics.Collector; import io.aklivity.zilla.runtime.engine.metrics.MetricGroup; import io.aklivity.zilla.runtime.engine.model.Model; @@ -93,15 +93,15 @@ public final class Engine implements Collector, AutoCloseable private final AtomicInteger nextTaskId; private final ThreadFactory factory; - private final WatcherTask watcherTask; + private final ConfigWatcher configWatcherTask; private final URL configURL; private final List workers; private final boolean readonly; private final EngineConfiguration config; private final EngineManager manager; - private final ResourceWatcher resourceWatcher; + private final ResourceWatchManager resourceWatchManager; - private Future watcherTaskRef; + private Future configWatcherTaskRef; Engine( EngineConfiguration config, @@ -162,14 +162,14 @@ public final class Engine implements Collector, AutoCloseable } this.tuning = tuning; - this.resourceWatcher = new ResourceWatcher(); + this.resourceWatchManager = new ResourceWatchManager(); List workers = new ArrayList<>(workerCount); for (int workerIndex = 0; workerIndex < workerCount; workerIndex++) { EngineWorker worker = new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters, guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader, - eventFormatterFactory, resourceWatcher, workerIndex, readonly, this::process); + eventFormatterFactory, resourceWatchManager, workerIndex, readonly, this::process); workers.add(worker); } this.workers = workers; @@ -210,20 +210,20 @@ public final class Engine implements Collector, AutoCloseable config, extensions, this::readURL, - resourceWatcher); + resourceWatchManager); this.configURL = config.configURL(); String protocol = configURL.getProtocol(); if ("file".equals(protocol) || "jar".equals(protocol)) { Function watcherReadURL = l -> readURL(configURL, l); - this.watcherTask = new FileWatcherTask(manager::reconfigure, null, watcherReadURL); - this.resourceWatcher.initialize(manager::reloadNamespacesWithChangedResources, watcherReadURL); + this.configWatcherTask = new ConfigFileWatcherTask(manager::reconfigure, watcherReadURL); + this.resourceWatchManager.initialize(manager::reloadNamespacesWithChangedResources, watcherReadURL); } else if ("http".equals(protocol) || "https".equals(protocol)) { - this.watcherTask = new HttpWatcherTask(manager::reconfigure, manager::reloadNamespacesWithChangedResources, - config.configPollIntervalSeconds()); + this.configWatcherTask = new ConfigHttpWatcherTask(manager::reconfigure, config.configPollIntervalSeconds()); + // TODO: Ati - implement http } else { @@ -261,11 +261,11 @@ public void start() throws Exception worker.doStart(); } - watcherTaskRef = watcherTask.submit(); + configWatcherTaskRef = configWatcherTask.submit(); if (!readonly) { // ignore the config file in read-only mode; no config will be read so no namespaces, bindings, etc will be attached - watcherTask.watchConfig(configURL).get(); + configWatcherTask.watchConfig(configURL).get(); } } @@ -279,9 +279,9 @@ public void close() throws Exception final List errors = new ArrayList<>(); - resourceWatcher.close(); - watcherTask.close(); - watcherTaskRef.get(); + resourceWatchManager.close(); + configWatcherTask.close(); + configWatcherTaskRef.get(); for (EngineWorker worker : workers) { diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java index 9bda92919b..5459f142f4 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineManager.java @@ -63,6 +63,7 @@ import io.aklivity.zilla.runtime.engine.guard.Guard; import io.aklivity.zilla.runtime.engine.internal.Tuning; import io.aklivity.zilla.runtime.engine.internal.config.NamespaceAdapter; +import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager; import io.aklivity.zilla.runtime.engine.namespace.NamespacedId; import io.aklivity.zilla.runtime.engine.resolver.Resolver; @@ -84,7 +85,7 @@ public class EngineManager private final List extensions; private final BiFunction readURL; private final Resolver expressions; - private final ResourceWatcher resourceWatcher; + private final ResourceWatchManager resourceWatchManager; private EngineConfig current; @@ -102,7 +103,7 @@ public EngineManager( EngineConfiguration config, List extensions, BiFunction readURL, - ResourceWatcher resourceWatcher) + ResourceWatchManager resourceWatchManager) { this.schemaTypes = schemaTypes; this.bindingByType = bindingByType; @@ -118,7 +119,7 @@ public EngineManager( this.extensions = extensions; this.readURL = readURL; this.expressions = Resolver.instantiate(config); - this.resourceWatcher = resourceWatcher; + this.resourceWatchManager = resourceWatchManager; } public EngineConfig reconfigure( @@ -423,7 +424,7 @@ private void unregister( { System.out.println("unregister: " + namespace.name); // TODO: Ati unregister(namespace); - resourceWatcher.removeNamespace(namespace.name); + resourceWatchManager.removeNamespace(namespace.name); } } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java index fd6216ea2d..036dce2778 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineRegistry.java @@ -28,6 +28,7 @@ import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; import io.aklivity.zilla.runtime.engine.exporter.ExporterContext; import io.aklivity.zilla.runtime.engine.guard.GuardContext; +import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager; import io.aklivity.zilla.runtime.engine.metrics.Collector; import io.aklivity.zilla.runtime.engine.metrics.Metric; import io.aklivity.zilla.runtime.engine.metrics.MetricContext; @@ -51,7 +52,7 @@ public class EngineRegistry private final LongConsumer detachBinding; private final Collector collector; private final Consumer process; - private final ResourceWatcher resourceWatcher; + private final ResourceWatchManager resourceWatchManager; public EngineRegistry( Function bindingsByType, @@ -67,7 +68,7 @@ public EngineRegistry( LongConsumer detachBinding, Collector collector, Consumer process, - ResourceWatcher resourceWatcher) + ResourceWatchManager resourceWatchManager) { this.bindingsByType = bindingsByType; this.guardsByType = guardsByType; @@ -83,7 +84,7 @@ public EngineRegistry( this.detachBinding = detachBinding; this.collector = collector; this.process = process; - this.resourceWatcher = resourceWatcher; + this.resourceWatchManager = resourceWatchManager; } public void process( @@ -199,7 +200,7 @@ private void attachNamespace( NamespaceRegistry registry = new NamespaceRegistry(namespace, bindingsByType, guardsByType, vaultsByType, catalogsByType, metricsByName, exportersByType, supplyLabelId, this::resolveMetric, exporterAttached, exporterDetached, - supplyMetricRecorder, detachBinding, collector, resourceWatcher); + supplyMetricRecorder, detachBinding, collector, resourceWatchManager); namespacesById.put(registry.namespaceId(), registry); registry.attach(); } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java index d082bd4dd8..f1acac9221 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/EngineWorker.java @@ -137,6 +137,7 @@ import io.aklivity.zilla.runtime.engine.internal.types.stream.ResetFW; import io.aklivity.zilla.runtime.engine.internal.types.stream.SignalFW; import io.aklivity.zilla.runtime.engine.internal.types.stream.WindowFW; +import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager; import io.aklivity.zilla.runtime.engine.metrics.Collector; import io.aklivity.zilla.runtime.engine.metrics.Metric; import io.aklivity.zilla.runtime.engine.metrics.MetricContext; @@ -253,7 +254,7 @@ public EngineWorker( Collector collector, Supplier supplyEventReader, EventFormatterFactory eventFormatterFactory, - ResourceWatcher resourceWatcher, + ResourceWatchManager resourceWatchManager, int index, boolean readonly, Consumer process) @@ -428,7 +429,7 @@ public EngineWorker( this.registry = new EngineRegistry( bindingsByType::get, guardsByType::get, vaultsByType::get, catalogsByType::get, metricsByName::get, exportersByType::get, labels::supplyLabelId, this::onExporterAttached, this::onExporterDetached, - this::supplyMetricWriter, this::detachStreams, collector, process, resourceWatcher); + this::supplyMetricWriter, this::detachStreams, collector, process, resourceWatchManager); this.taskQueue = new ConcurrentLinkedDeque<>(); this.correlations = new Long2ObjectHashMap<>(); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java index 5937785181..2d90e82f2d 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/NamespaceRegistry.java @@ -44,6 +44,7 @@ import io.aklivity.zilla.runtime.engine.exporter.ExporterContext; import io.aklivity.zilla.runtime.engine.exporter.ExporterHandler; import io.aklivity.zilla.runtime.engine.guard.GuardContext; +import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager; import io.aklivity.zilla.runtime.engine.metrics.Collector; import io.aklivity.zilla.runtime.engine.metrics.Metric; import io.aklivity.zilla.runtime.engine.metrics.MetricContext; @@ -74,7 +75,7 @@ public class NamespaceRegistry private final ObjectLongLongFunction supplyMetricRecorder; private final LongConsumer detachBinding; private final Collector collector; - private final ResourceWatcher resourceWatcher; + private final ResourceWatchManager resourceWatchManager; public NamespaceRegistry( NamespaceConfig namespace, @@ -91,7 +92,7 @@ public NamespaceRegistry( ObjectLongLongFunction supplyMetricRecorder, LongConsumer detachBinding, Collector collector, - ResourceWatcher resourceWatcher) + ResourceWatchManager resourceWatchManager) { this.namespace = namespace; this.bindingsByType = bindingsByType; @@ -114,7 +115,7 @@ public NamespaceRegistry( this.metricsById = new Int2ObjectHashMap<>(); this.exportersById = new Int2ObjectHashMap<>(); this.collector = collector; - this.resourceWatcher = resourceWatcher; + this.resourceWatchManager = resourceWatchManager; } public int namespaceId() @@ -268,7 +269,7 @@ private void attachVault( VaultRegistry registry = new VaultRegistry(config, context); vaultsById.put(vaultId, registry); registry.attach(); - resourceWatcher.addResources(registry.handler().resources(), config.namespace); + resourceWatchManager.addResources(registry.handler().resources(), config.namespace); } private void detachVault( @@ -315,7 +316,7 @@ private void attachCatalog( CatalogRegistry registry = new CatalogRegistry(config, context); catalogsById.put(catalogId, registry); registry.attach(); - resourceWatcher.addResources(registry.handler().resources(), config.namespace); + resourceWatchManager.addResources(registry.handler().resources(), config.namespace); } private void detachCatalog( diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/FileWatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigFileWatcherTask.java similarity index 59% rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/FileWatcherTask.java rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigFileWatcherTask.java index 9e7a5e8382..7fda5d39da 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/FileWatcherTask.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigFileWatcherTask.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.engine.internal.registry; +package io.aklivity.zilla.runtime.engine.internal.watcher; import static org.agrona.LangUtil.rethrowUnchecked; @@ -25,29 +25,27 @@ import java.nio.file.WatchService; import java.util.IdentityHashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import io.aklivity.zilla.runtime.engine.config.EngineConfig; -public class FileWatcherTask extends WatcherTask +public class ConfigFileWatcherTask extends WatcherTask implements ConfigWatcher { - private final Map watchedConfigs; + private final Map watchedItems; private final WatchService watchService; + private final BiFunction configChangeListener; private final Function readURL; - public FileWatcherTask( + public ConfigFileWatcherTask( BiFunction configChangeListener, - Consumer> resourceChangeListener, Function readURL) { - super(configChangeListener, resourceChangeListener); + this.configChangeListener = configChangeListener; this.readURL = readURL; - this.watchedConfigs = new IdentityHashMap<>(); + this.watchedItems = new IdentityHashMap<>(); WatchService watchService = null; try @@ -78,27 +76,23 @@ public Void call() { final WatchKey key = watchService.take(); - WatchedConfig watchedConfig = watchedConfigs.get(key); + WatchedItem watchedItem = watchedItems.get(key); - if (watchedConfig != null && watchedConfig.isWatchedKey(key)) + if (watchedItem != null && watchedItem.isWatchedKey(key)) { // Even if no reconfigure needed, recalculation is necessary, since symlinks might have changed. - watchedConfig.keys().forEach(watchedConfigs::remove); - watchedConfig.unregister(); - watchedConfig.register(); - watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig)); - String newConfigText = readURL.apply(watchedConfig.getURL().toString()); - byte[] newConfigHash = computeHash(newConfigText); - if (watchedConfig.isReconfigureNeeded(newConfigHash)) + watchedItem.keys().forEach(watchedItems::remove); + watchedItem.unregister(); + watchedItem.register(); + watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem)); + String newText = readURL.apply(watchedItem.getURL().toString()); + byte[] newHash = computeHash(newText); + if (watchedItem.isReconfigureNeeded(newHash)) { - watchedConfig.setConfigHash(newConfigHash); + watchedItem.setHash(newHash); if (configChangeListener != null) { - configChangeListener.apply(watchedConfig.getURL(), newConfigText); - } - if (resourceChangeListener != null) - { - resourceChangeListener.accept(namespaces); + configChangeListener.apply(watchedItem.getURL(), newText); } } } @@ -116,11 +110,11 @@ public Void call() public CompletableFuture watchConfig( URL configURL) { - WatchedConfig watchedConfig = new WatchedConfig(configURL, watchService); - watchedConfig.register(); - watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig)); + WatchedItem watchedItem = new WatchedItem(configURL, watchService); + watchedItem.register(); + watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem)); String configText = readURL.apply(configURL.toString()); - watchedConfig.setConfigHash(computeHash(configText)); + watchedItem.setHash(computeHash(configText)); CompletableFuture configFuture; try @@ -136,17 +130,6 @@ public CompletableFuture watchConfig( return configFuture; } - @Override - public void watchResource( - URL resourceURL) - { - WatchedConfig watchedConfig = new WatchedConfig(resourceURL, watchService); - watchedConfig.register(); - watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig)); - String resource = readURL.apply(resourceURL.toString()); - watchedConfig.setConfigHash(computeHash(resource)); - } - @Override public void close() throws IOException { diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/HttpWatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigHttpWatcherTask.java similarity index 94% rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/HttpWatcherTask.java rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigHttpWatcherTask.java index 2ccadb83da..b616436981 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/HttpWatcherTask.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigHttpWatcherTask.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.engine.internal.registry; +package io.aklivity.zilla.runtime.engine.internal.watcher; import static java.net.http.HttpClient.Redirect.NORMAL; import static java.net.http.HttpClient.Version.HTTP_2; @@ -29,7 +29,6 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -37,26 +36,25 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.function.Consumer; import io.aklivity.zilla.runtime.engine.config.EngineConfig; -public class HttpWatcherTask extends WatcherTask +public class ConfigHttpWatcherTask extends WatcherTask implements ConfigWatcher { private static final URI CLOSE_REQUESTED = URI.create("http://localhost:12345"); + private final BiFunction configChangeListener; private final Map etags; private final Map configHashes; private final Map> futures; private final BlockingQueue configQueue; private final int pollSeconds; - public HttpWatcherTask( + public ConfigHttpWatcherTask( BiFunction configChangeListener, - Consumer> resourceChangeListener, int pollSeconds) { - super(configChangeListener, resourceChangeListener); + this.configChangeListener = configChangeListener; this.etags = new ConcurrentHashMap<>(); this.configHashes = new ConcurrentHashMap<>(); this.futures = new ConcurrentHashMap<>(); @@ -106,13 +104,6 @@ public CompletableFuture watchConfig( return configFuture; } - @Override - public void watchResource( - URL resourceURL) - { - // TODO: Ati - } - @Override public void close() { diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java new file mode 100644 index 0000000000..1f15cae833 --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ConfigWatcher.java @@ -0,0 +1,30 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.watcher; + +import java.io.Closeable; +import java.net.URL; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +import io.aklivity.zilla.runtime.engine.config.EngineConfig; + +public interface ConfigWatcher extends Closeable +{ + Future submit(); + + CompletableFuture watchConfig(URL configURL); +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java new file mode 100644 index 0000000000..f2c8910517 --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceFileWatcherTask.java @@ -0,0 +1,140 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.watcher; + +import static org.agrona.LangUtil.rethrowUnchecked; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.function.Function; + +public class ResourceFileWatcherTask extends WatcherTask implements ResourceWatcher +{ + private final Map watchedItems; + private final WatchService watchService; + private final Consumer> resourceChangeListener; + private final Function readURL; + private final Set namespaces; + + public ResourceFileWatcherTask( + Consumer> resourceChangeListener, + Function readURL) + { + this.resourceChangeListener = resourceChangeListener; + this.readURL = readURL; + this.watchedItems = new IdentityHashMap<>(); + this.namespaces = new HashSet<>(); + WatchService watchService = null; + + try + { + watchService = FileSystems.getDefault().newWatchService(); + } + catch (IOException ex) + { + rethrowUnchecked(ex); + } + + this.watchService = watchService; + + } + + @Override + public Future submit() + { + return executor.submit(this); + } + + @Override + public Void call() + { + while (true) + { + try + { + final WatchKey key = watchService.take(); + + WatchedItem watchedItem = watchedItems.get(key); + + if (watchedItem != null && watchedItem.isWatchedKey(key)) + { + // Even if no reconfigure needed, recalculation is necessary, since symlinks might have changed. + watchedItem.keys().forEach(watchedItems::remove); + watchedItem.unregister(); + watchedItem.register(); + watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem)); + String newText = readURL.apply(watchedItem.getURL().toString()); + byte[] newHash = computeHash(newText); + if (watchedItem.isReconfigureNeeded(newHash)) + { + watchedItem.setHash(newHash); + if (resourceChangeListener != null) + { + resourceChangeListener.accept(namespaces); + } + } + } + } + catch (InterruptedException | ClosedWatchServiceException ex) + { + break; + } + } + + return null; + } + + @Override + public void watchResource( + URL resourceURL) + { + WatchedItem watchedItem = new WatchedItem(resourceURL, watchService); + watchedItem.register(); + watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem)); + String resource = readURL.apply(resourceURL.toString()); + watchedItem.setHash(computeHash(resource)); + } + + @Override + public void addNamespace( + String namespace) + { + namespaces.add(namespace); + } + + @Override + public void removeNamespace( + String namespace) + { + namespaces.remove(namespace); + } + + @Override + public void close() throws IOException + { + watchService.close(); + } +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/ResourceWatcher.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatchManager.java similarity index 90% rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/ResourceWatcher.java rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatchManager.java index d709bd2394..7b7f1b2a80 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/ResourceWatcher.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatchManager.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.engine.internal.registry; +package io.aklivity.zilla.runtime.engine.internal.watcher; import static org.agrona.LangUtil.rethrowUnchecked; @@ -26,15 +26,15 @@ import java.util.function.Consumer; import java.util.function.Function; -public class ResourceWatcher +public class ResourceWatchManager { private final Map> resources; - private final Map resourceTasks; + private final Map resourceTasks; private Consumer> resourceChangeListener; private Function readURL; - public ResourceWatcher() + public ResourceWatchManager() { this.resources = new ConcurrentHashMap<>(); this.resourceTasks = new ConcurrentHashMap<>(); @@ -60,7 +60,7 @@ public void addResources( return ConcurrentHashMap.newKeySet(); } ).add(namespace); - resourceTasks.get(resource).addNamespaces(namespace); + resourceTasks.get(resource).addNamespace(namespace); } ); } @@ -93,8 +93,8 @@ private void startWatchingResource( { try { - FileWatcherTask watcherTask = new FileWatcherTask(null, resourceChangeListener, readURL); - watcherTask.addNamespaces(namespace); + ResourceWatcher watcherTask = new ResourceFileWatcherTask(resourceChangeListener, readURL); + watcherTask.addNamespace(namespace); watcherTask.submit(); URL resourceURL = Path.of(resource).toUri().toURL(); watcherTask.watchResource(resourceURL); diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java new file mode 100644 index 0000000000..20b0dcc9d1 --- /dev/null +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/ResourceWatcher.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021-2023 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.engine.internal.watcher; + +import java.io.Closeable; +import java.net.URL; +import java.util.concurrent.Future; + +public interface ResourceWatcher extends Closeable +{ + Future submit(); + + void watchResource(URL configURL); + + void addNamespace(String namespace); + + void removeNamespace(String namespace); +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatchedConfig.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatchedItem.java similarity index 81% rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatchedConfig.java rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatchedItem.java index a6ee866978..5bbacf7ef3 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatchedConfig.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatchedItem.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.engine.internal.registry; +package io.aklivity.zilla.runtime.engine.internal.watcher; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; @@ -33,20 +33,20 @@ import java.util.LinkedList; import java.util.Set; -public class WatchedConfig +public class WatchedItem { private final WatchService watchService; private final Set watchKeys; - private final URL configURL; - private byte[] configHash; + private final URL watchedURL; + private byte[] hash; - public WatchedConfig( - URL configURL, + public WatchedItem( + URL watchedURL, WatchService watchService) { this.watchService = watchService; this.watchKeys = new HashSet<>(); - this.configURL = configURL; + this.watchedURL = watchedURL; } public Set keys() @@ -56,7 +56,7 @@ public Set keys() public void register() { - Path configPath = Paths.get(configURL.getPath()).toAbsolutePath(); + Path configPath = Paths.get(watchedURL.getPath()).toAbsolutePath(); try { Set watchedPaths = new HashSet<>(); @@ -123,27 +123,27 @@ public boolean isWatchedKey( public boolean isReconfigureNeeded( byte[] newConfigHash) { - return !Arrays.equals(configHash, newConfigHash); + return !Arrays.equals(hash, newConfigHash); } - public void setConfigHash( - byte[] newConfigHash) + public void setHash( + byte[] newHash) { - configHash = newConfigHash; + hash = newHash; } public URL getURL() { - return configURL; + return watchedURL; } private WatchKey registerPath( - Path configPath) + Path path) { WatchKey key = null; try { - key = configPath.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE); + key = path.register(watchService, ENTRY_MODIFY, ENTRY_CREATE, ENTRY_DELETE); } catch (IOException ex) { @@ -151,19 +151,4 @@ private WatchKey registerPath( } return key; } - - private Path toRealPath( - Path configPath) - { - try - { - configPath = configPath.toRealPath(); - } - catch (IOException ex) - { - rethrowUnchecked(ex); - } - return configPath; - } - } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatcherTask.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatcherTask.java similarity index 51% rename from runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatcherTask.java rename to runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatcherTask.java index 173f7a1d0e..77c532efd7 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/registry/WatcherTask.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/internal/watcher/WatcherTask.java @@ -13,71 +13,33 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.aklivity.zilla.runtime.engine.internal.registry; +package io.aklivity.zilla.runtime.engine.internal.watcher; import static java.nio.charset.StandardCharsets.UTF_8; import static org.agrona.LangUtil.rethrowUnchecked; -import java.io.Closeable; -import java.net.URL; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import io.aklivity.zilla.runtime.engine.config.EngineConfig; - -public abstract class WatcherTask implements Callable, Closeable +public abstract class WatcherTask implements Callable { private final MessageDigest md5; protected final ScheduledExecutorService executor; - protected final BiFunction configChangeListener; - protected final Consumer> resourceChangeListener; - protected final Set namespaces; - protected WatcherTask( - BiFunction configChangeListener, - Consumer> resourceChangeListener) + protected WatcherTask() { - this.configChangeListener = configChangeListener; - this.resourceChangeListener = resourceChangeListener; this.md5 = initMessageDigest("MD5"); this.executor = Executors.newScheduledThreadPool(2); - this.namespaces = new HashSet<>(); - } - - public void addNamespaces( - String namespace) - { - namespaces.add(namespace); - } - - public void removeNamespace( - String namespace) - { - namespaces.remove(namespace); } - public abstract Future submit(); - - public abstract CompletableFuture watchConfig( - URL configURL); - - public abstract void watchResource( - URL resourceURL); - protected byte[] computeHash( - String configText) + String text) { - return md5.digest(configText.getBytes(UTF_8)); + return md5.digest(text.getBytes(UTF_8)); } private MessageDigest initMessageDigest(