Skip to content

Commit

Permalink
WIP refactoring watch resources
Browse files Browse the repository at this point in the history
  • Loading branch information
attilakreiner committed Jun 5, 2024
1 parent 5fd5e5b commit e1d5642
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 168 deletions.
2 changes: 1 addition & 1 deletion runtime/engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.77</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.76</jacoco.coverage.ratio>
<jacoco.missed.count>5</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@
import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineManager;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker;
import io.aklivity.zilla.runtime.engine.internal.registry.FileWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.registry.HttpWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.registry.ResourceWatcher;
import io.aklivity.zilla.runtime.engine.internal.registry.WatcherTask;
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigFileWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigHttpWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.watcher.ConfigWatcher;
import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.MetricGroup;
import io.aklivity.zilla.runtime.engine.model.Model;
Expand All @@ -93,15 +93,15 @@ public final class Engine implements Collector, AutoCloseable
private final AtomicInteger nextTaskId;
private final ThreadFactory factory;

private final WatcherTask watcherTask;
private final ConfigWatcher configWatcherTask;
private final URL configURL;
private final List<EngineWorker> workers;
private final boolean readonly;
private final EngineConfiguration config;
private final EngineManager manager;
private final ResourceWatcher resourceWatcher;
private final ResourceWatchManager resourceWatchManager;

private Future<Void> watcherTaskRef;
private Future<Void> configWatcherTaskRef;

Engine(
EngineConfiguration config,
Expand Down Expand Up @@ -162,14 +162,14 @@ public final class Engine implements Collector, AutoCloseable
}
this.tuning = tuning;

this.resourceWatcher = new ResourceWatcher();
this.resourceWatchManager = new ResourceWatchManager();
List<EngineWorker> workers = new ArrayList<>(workerCount);
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
{
EngineWorker worker =
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters,
guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader,
eventFormatterFactory, resourceWatcher, workerIndex, readonly, this::process);
eventFormatterFactory, resourceWatchManager, workerIndex, readonly, this::process);
workers.add(worker);
}
this.workers = workers;
Expand Down Expand Up @@ -210,20 +210,20 @@ public final class Engine implements Collector, AutoCloseable
config,
extensions,
this::readURL,
resourceWatcher);
resourceWatchManager);

this.configURL = config.configURL();
String protocol = configURL.getProtocol();
if ("file".equals(protocol) || "jar".equals(protocol))
{
Function<String, String> watcherReadURL = l -> readURL(configURL, l);
this.watcherTask = new FileWatcherTask(manager::reconfigure, null, watcherReadURL);
this.resourceWatcher.initialize(manager::reloadNamespacesWithChangedResources, watcherReadURL);
this.configWatcherTask = new ConfigFileWatcherTask(manager::reconfigure, watcherReadURL);
this.resourceWatchManager.initialize(manager::reloadNamespacesWithChangedResources, watcherReadURL);
}
else if ("http".equals(protocol) || "https".equals(protocol))
{
this.watcherTask = new HttpWatcherTask(manager::reconfigure, manager::reloadNamespacesWithChangedResources,
config.configPollIntervalSeconds());
this.configWatcherTask = new ConfigHttpWatcherTask(manager::reconfigure, config.configPollIntervalSeconds());
// TODO: Ati - implement http
}
else
{
Expand Down Expand Up @@ -261,11 +261,11 @@ public void start() throws Exception
worker.doStart();
}

watcherTaskRef = watcherTask.submit();
configWatcherTaskRef = configWatcherTask.submit();
if (!readonly)
{
// ignore the config file in read-only mode; no config will be read so no namespaces, bindings, etc will be attached
watcherTask.watchConfig(configURL).get();
configWatcherTask.watchConfig(configURL).get();
}
}

Expand All @@ -279,9 +279,9 @@ public void close() throws Exception

final List<Throwable> errors = new ArrayList<>();

resourceWatcher.close();
watcherTask.close();
watcherTaskRef.get();
resourceWatchManager.close();
configWatcherTask.close();
configWatcherTaskRef.get();

for (EngineWorker worker : workers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.aklivity.zilla.runtime.engine.guard.Guard;
import io.aklivity.zilla.runtime.engine.internal.Tuning;
import io.aklivity.zilla.runtime.engine.internal.config.NamespaceAdapter;
import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.namespace.NamespacedId;
import io.aklivity.zilla.runtime.engine.resolver.Resolver;

Expand All @@ -84,7 +85,7 @@ public class EngineManager
private final List<EngineExtSpi> extensions;
private final BiFunction<URL, String, String> readURL;
private final Resolver expressions;
private final ResourceWatcher resourceWatcher;
private final ResourceWatchManager resourceWatchManager;

private EngineConfig current;

Expand All @@ -102,7 +103,7 @@ public EngineManager(
EngineConfiguration config,
List<EngineExtSpi> extensions,
BiFunction<URL, String, String> readURL,
ResourceWatcher resourceWatcher)
ResourceWatchManager resourceWatchManager)
{
this.schemaTypes = schemaTypes;
this.bindingByType = bindingByType;
Expand All @@ -118,7 +119,7 @@ public EngineManager(
this.extensions = extensions;
this.readURL = readURL;
this.expressions = Resolver.instantiate(config);
this.resourceWatcher = resourceWatcher;
this.resourceWatchManager = resourceWatchManager;
}

public EngineConfig reconfigure(
Expand Down Expand Up @@ -423,7 +424,7 @@ private void unregister(
{
System.out.println("unregister: " + namespace.name); // TODO: Ati
unregister(namespace);
resourceWatcher.removeNamespace(namespace.name);
resourceWatchManager.removeNamespace(namespace.name);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
import io.aklivity.zilla.runtime.engine.exporter.ExporterContext;
import io.aklivity.zilla.runtime.engine.guard.GuardContext;
import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricContext;
Expand All @@ -51,7 +52,7 @@ public class EngineRegistry
private final LongConsumer detachBinding;
private final Collector collector;
private final Consumer<NamespaceConfig> process;
private final ResourceWatcher resourceWatcher;
private final ResourceWatchManager resourceWatchManager;

public EngineRegistry(
Function<String, BindingContext> bindingsByType,
Expand All @@ -67,7 +68,7 @@ public EngineRegistry(
LongConsumer detachBinding,
Collector collector,
Consumer<NamespaceConfig> process,
ResourceWatcher resourceWatcher)
ResourceWatchManager resourceWatchManager)
{
this.bindingsByType = bindingsByType;
this.guardsByType = guardsByType;
Expand All @@ -83,7 +84,7 @@ public EngineRegistry(
this.detachBinding = detachBinding;
this.collector = collector;
this.process = process;
this.resourceWatcher = resourceWatcher;
this.resourceWatchManager = resourceWatchManager;
}

public void process(
Expand Down Expand Up @@ -199,7 +200,7 @@ private void attachNamespace(
NamespaceRegistry registry =
new NamespaceRegistry(namespace, bindingsByType, guardsByType, vaultsByType, catalogsByType,
metricsByName, exportersByType, supplyLabelId, this::resolveMetric, exporterAttached, exporterDetached,
supplyMetricRecorder, detachBinding, collector, resourceWatcher);
supplyMetricRecorder, detachBinding, collector, resourceWatchManager);
namespacesById.put(registry.namespaceId(), registry);
registry.attach();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import io.aklivity.zilla.runtime.engine.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.engine.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.engine.internal.types.stream.WindowFW;
import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricContext;
Expand Down Expand Up @@ -253,7 +254,7 @@ public EngineWorker(
Collector collector,
Supplier<MessageReader> supplyEventReader,
EventFormatterFactory eventFormatterFactory,
ResourceWatcher resourceWatcher,
ResourceWatchManager resourceWatchManager,
int index,
boolean readonly,
Consumer<NamespaceConfig> process)
Expand Down Expand Up @@ -428,7 +429,7 @@ public EngineWorker(
this.registry = new EngineRegistry(
bindingsByType::get, guardsByType::get, vaultsByType::get, catalogsByType::get, metricsByName::get,
exportersByType::get, labels::supplyLabelId, this::onExporterAttached, this::onExporterDetached,
this::supplyMetricWriter, this::detachStreams, collector, process, resourceWatcher);
this::supplyMetricWriter, this::detachStreams, collector, process, resourceWatchManager);

this.taskQueue = new ConcurrentLinkedDeque<>();
this.correlations = new Long2ObjectHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.aklivity.zilla.runtime.engine.exporter.ExporterContext;
import io.aklivity.zilla.runtime.engine.exporter.ExporterHandler;
import io.aklivity.zilla.runtime.engine.guard.GuardContext;
import io.aklivity.zilla.runtime.engine.internal.watcher.ResourceWatchManager;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.Metric;
import io.aklivity.zilla.runtime.engine.metrics.MetricContext;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class NamespaceRegistry
private final ObjectLongLongFunction<Metric.Kind, LongConsumer> supplyMetricRecorder;
private final LongConsumer detachBinding;
private final Collector collector;
private final ResourceWatcher resourceWatcher;
private final ResourceWatchManager resourceWatchManager;

public NamespaceRegistry(
NamespaceConfig namespace,
Expand All @@ -91,7 +92,7 @@ public NamespaceRegistry(
ObjectLongLongFunction<Metric.Kind, LongConsumer> supplyMetricRecorder,
LongConsumer detachBinding,
Collector collector,
ResourceWatcher resourceWatcher)
ResourceWatchManager resourceWatchManager)
{
this.namespace = namespace;
this.bindingsByType = bindingsByType;
Expand All @@ -114,7 +115,7 @@ public NamespaceRegistry(
this.metricsById = new Int2ObjectHashMap<>();
this.exportersById = new Int2ObjectHashMap<>();
this.collector = collector;
this.resourceWatcher = resourceWatcher;
this.resourceWatchManager = resourceWatchManager;
}

public int namespaceId()
Expand Down Expand Up @@ -268,7 +269,7 @@ private void attachVault(
VaultRegistry registry = new VaultRegistry(config, context);
vaultsById.put(vaultId, registry);
registry.attach();
resourceWatcher.addResources(registry.handler().resources(), config.namespace);
resourceWatchManager.addResources(registry.handler().resources(), config.namespace);
}

private void detachVault(
Expand Down Expand Up @@ -315,7 +316,7 @@ private void attachCatalog(
CatalogRegistry registry = new CatalogRegistry(config, context);
catalogsById.put(catalogId, registry);
registry.attach();
resourceWatcher.addResources(registry.handler().resources(), config.namespace);
resourceWatchManager.addResources(registry.handler().resources(), config.namespace);
}

private void detachCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.engine.internal.registry;
package io.aklivity.zilla.runtime.engine.internal.watcher;

import static org.agrona.LangUtil.rethrowUnchecked;

Expand All @@ -25,29 +25,27 @@
import java.nio.file.WatchService;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import io.aklivity.zilla.runtime.engine.config.EngineConfig;

public class FileWatcherTask extends WatcherTask
public class ConfigFileWatcherTask extends WatcherTask implements ConfigWatcher
{
private final Map<WatchKey, WatchedConfig> watchedConfigs;
private final Map<WatchKey, WatchedItem> watchedItems;
private final WatchService watchService;
private final BiFunction<URL, String, EngineConfig> configChangeListener;
private final Function<String, String> readURL;

public FileWatcherTask(
public ConfigFileWatcherTask(
BiFunction<URL, String, EngineConfig> configChangeListener,
Consumer<Set<String>> resourceChangeListener,
Function<String, String> readURL)
{
super(configChangeListener, resourceChangeListener);
this.configChangeListener = configChangeListener;
this.readURL = readURL;
this.watchedConfigs = new IdentityHashMap<>();
this.watchedItems = new IdentityHashMap<>();
WatchService watchService = null;

try
Expand Down Expand Up @@ -78,27 +76,23 @@ public Void call()
{
final WatchKey key = watchService.take();

WatchedConfig watchedConfig = watchedConfigs.get(key);
WatchedItem watchedItem = watchedItems.get(key);

if (watchedConfig != null && watchedConfig.isWatchedKey(key))
if (watchedItem != null && watchedItem.isWatchedKey(key))
{
// Even if no reconfigure needed, recalculation is necessary, since symlinks might have changed.
watchedConfig.keys().forEach(watchedConfigs::remove);
watchedConfig.unregister();
watchedConfig.register();
watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig));
String newConfigText = readURL.apply(watchedConfig.getURL().toString());
byte[] newConfigHash = computeHash(newConfigText);
if (watchedConfig.isReconfigureNeeded(newConfigHash))
watchedItem.keys().forEach(watchedItems::remove);
watchedItem.unregister();
watchedItem.register();
watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem));
String newText = readURL.apply(watchedItem.getURL().toString());
byte[] newHash = computeHash(newText);
if (watchedItem.isReconfigureNeeded(newHash))
{
watchedConfig.setConfigHash(newConfigHash);
watchedItem.setHash(newHash);
if (configChangeListener != null)
{
configChangeListener.apply(watchedConfig.getURL(), newConfigText);
}
if (resourceChangeListener != null)
{
resourceChangeListener.accept(namespaces);
configChangeListener.apply(watchedItem.getURL(), newText);
}
}
}
Expand All @@ -116,11 +110,11 @@ public Void call()
public CompletableFuture<EngineConfig> watchConfig(
URL configURL)
{
WatchedConfig watchedConfig = new WatchedConfig(configURL, watchService);
watchedConfig.register();
watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig));
WatchedItem watchedItem = new WatchedItem(configURL, watchService);
watchedItem.register();
watchedItem.keys().forEach(k -> watchedItems.put(k, watchedItem));
String configText = readURL.apply(configURL.toString());
watchedConfig.setConfigHash(computeHash(configText));
watchedItem.setHash(computeHash(configText));

CompletableFuture<EngineConfig> configFuture;
try
Expand All @@ -136,17 +130,6 @@ public CompletableFuture<EngineConfig> watchConfig(
return configFuture;
}

@Override
public void watchResource(
URL resourceURL)
{
WatchedConfig watchedConfig = new WatchedConfig(resourceURL, watchService);
watchedConfig.register();
watchedConfig.keys().forEach(k -> watchedConfigs.put(k, watchedConfig));
String resource = readURL.apply(resourceURL.toString());
watchedConfig.setConfigHash(computeHash(resource));
}

@Override
public void close() throws IOException
{
Expand Down
Loading

0 comments on commit e1d5642

Please sign in to comment.