Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote zilla configuration with change detection #1071

Merged
merged 47 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a370a7b
WIP reconfigure only changed namespaces
attilakreiner May 31, 2024
3757300
WIP watch resources
attilakreiner Jun 5, 2024
c02e5c4
WIP refactoring watch resources
attilakreiner Jun 5, 2024
8fcae70
WIP refactoring 2
attilakreiner Jun 5, 2024
96f070e
WIP fix
attilakreiner Jun 6, 2024
df0b998
WIP EngineConfigWatcher
attilakreiner Jun 6, 2024
c0a554f
WIP ResourceResolver
attilakreiner Jun 6, 2024
b726267
WIP fix ResourceResolver
attilakreiner Jun 6, 2024
9c2cbbe
fix indentation
attilakreiner Jun 10, 2024
abb923f
fix EngineManager method names
attilakreiner Jun 10, 2024
b9355c6
rm ResourceResolver
attilakreiner Jun 10, 2024
c58ebc9
WIP EngineManager addResources
attilakreiner Jun 10, 2024
2285123
Revert "WIP reconfigure only changed namespaces"
attilakreiner Jun 11, 2024
39f34d0
fix EngineConfigWatcher addResources
attilakreiner Jun 11, 2024
b482fac
Merge branch 'develop' into config-reload
attilakreiner Jun 11, 2024
fd17488
Merge branch 'develop' into config-reload
attilakreiner Jun 13, 2024
b6c2ff2
Merge branch 'develop' into config-reload
attilakreiner Jun 14, 2024
2da3603
WIP readPath resolvePath readLocation
attilakreiner Jun 11, 2024
c85bdc1
WIP filesystem-http
attilakreiner Jun 5, 2024
ab94dac
WIP
attilakreiner Jun 18, 2024
22a2d03
fix
attilakreiner Jun 18, 2024
dfe9610
Merge branch 'develop' into config-reload
attilakreiner Jun 18, 2024
e4cf407
fix
attilakreiner Jun 18, 2024
df172d8
fix EngineConfiguration configPath
attilakreiner Jun 19, 2024
bef04ad
WIP fix
attilakreiner Jun 19, 2024
3177c80
WIP hfs 1
attilakreiner Jun 20, 2024
2a217f8
WIP hfs uri
attilakreiner Jun 20, 2024
4f5737a
WIP hfs resolveSibling
attilakreiner Jun 20, 2024
6395780
WIP hfs rm timeout
attilakreiner Jun 20, 2024
54a2b87
WIP hfs revert send
attilakreiner Jun 20, 2024
a5d0a39
WIP hfs HP readBody resolveBody
attilakreiner Jun 20, 2024
f19e623
WIP hfs HP readBody etag
attilakreiner Jun 20, 2024
a8a4015
WIP hfs HP
attilakreiner Jun 20, 2024
c2efe21
WIP hfs ignore AppIT
attilakreiner Jun 20, 2024
873b5e1
WIP hfs fix AppIT
attilakreiner Jun 20, 2024
c166431
WIP hfs watchBody 1
attilakreiner Jun 20, 2024
0c0f7f4
WIP hfs watchBody 2
attilakreiner Jun 20, 2024
31b420c
WIP hfs watchBody 3
attilakreiner Jun 21, 2024
d3109f7
WIP hfs watchBody 4
attilakreiner Jun 21, 2024
3198b10
WIP hfs watchBody 5
attilakreiner Jun 21, 2024
ef2d377
WIP hfs watchBody 6
attilakreiner Jun 21, 2024
8f982fe
WIP hfs getPath
attilakreiner Jun 21, 2024
50c8792
WIP hfs watchBody 7
attilakreiner Jun 21, 2024
71231a9
WIP fix
attilakreiner Jun 21, 2024
7535666
Remove BindingConfig.readLocation
jfallows Jun 26, 2024
e7c6c56
No watch event needed for identical response body
jfallows Jun 26, 2024
5423718
Handle status 204 with null body and infer delay for optional prefer …
jfallows Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.catalog.filesystem.internal.config;

import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;

Expand All @@ -37,6 +38,18 @@ public static <T> FilesystemOptionsConfigBuilder<T> builder(
public FilesystemOptionsConfig(
List<FilesystemSchemaConfig> subjects)
{
super(List.of(), resolveResources(subjects));
this.subjects = subjects;
}

private static List<String> resolveResources(
List<FilesystemSchemaConfig> subjects)
{
List<String> resources = new LinkedList<>();
for (FilesystemSchemaConfig subject : subjects)
{
resources.add(subject.path);
}
return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.aklivity.zilla.runtime.binding.http.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.http.internal.types.String8FW;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class HttpOptionsConfig extends OptionsConfig
Expand Down Expand Up @@ -55,41 +56,46 @@ public static <T> HttpOptionsConfigBuilder<T> builder(
HttpAuthorizationConfig authorization,
List<HttpRequestConfig> requests)
{
super(requests != null && !requests.isEmpty()
? requests.stream()
.flatMap(request -> Stream.concat(
Stream.of(request.content),
Stream.concat(
request.headers != null
? request.headers.stream().flatMap(header -> Stream.of(header != null ? header.model : null))
: Stream.empty(),
Stream.concat(
request.pathParams != null
? request.pathParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(
request.queryParams != null
? request.queryParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(request.responses != null
? request.responses.stream().flatMap(param -> Stream.of(param != null
? param.content
: null))
: Stream.empty(), request.responses != null
? request.responses.stream()
.flatMap(response -> response.headers != null
? response.headers.stream()
.flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty())
: Stream.empty())
)))).filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList());

super(resolveModels(requests), List.of());
this.versions = versions;
this.overrides = overrides;
this.access = access;
this.authorization = authorization;
this.requests = requests;
}

private static List<ModelConfig> resolveModels(
List<HttpRequestConfig> requests)
{
return requests != null && !requests.isEmpty()
? requests.stream()
.flatMap(request -> Stream.concat(
Stream.of(request.content),
Stream.concat(
request.headers != null
? request.headers.stream().flatMap(header -> Stream.of(header != null ? header.model : null))
: Stream.empty(),
Stream.concat(
request.pathParams != null
? request.pathParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(
request.queryParams != null
? request.queryParams.stream().flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty(),
Stream.concat(request.responses != null
? request.responses.stream().flatMap(param -> Stream.of(param != null
? param.content
: null))
: Stream.empty(), request.responses != null
? request.responses.stream()
.flatMap(response -> response.headers != null
? response.headers.stream()
.flatMap(param -> Stream.of(param != null ? param.model : null))
: Stream.empty())
: Stream.empty())
)))).filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;
import java.util.stream.Stream;

import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class KafkaOptionsConfig extends OptionsConfig
Expand All @@ -49,15 +50,21 @@ public static <T> KafkaOptionsConfigBuilder<T> builder(
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
super(topics != null && !topics.isEmpty()
? topics.stream()
.flatMap(t -> Stream.of(t.key, t.value))
.filter(Objects::nonNull)
.collect(toList())
: emptyList());
super(resolveModels(topics), List.of());
this.bootstrap = bootstrap;
this.topics = topics;
this.servers = servers;
this.sasl = sasl;
}

private static List<ModelConfig> resolveModels(
List<KafkaTopicConfig> topics)
{
return topics != null && !topics.isEmpty()
? topics.stream()
.flatMap(t -> Stream.of(t.key, t.value))
.filter(Objects::nonNull)
.collect(toList())
attilakreiner marked this conversation as resolved.
Show resolved Hide resolved
: emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Stream;

import io.aklivity.zilla.runtime.binding.mqtt.internal.config.MqttVersion;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public class MqttOptionsConfig extends OptionsConfig
Expand All @@ -50,18 +51,24 @@ public MqttOptionsConfig(
List<MqttTopicConfig> topics,
List<MqttVersion> versions)
{
super(topics != null && !topics.isEmpty()
super(resolveModels(topics), List.of());
this.authorization = authorization;
this.topics = topics;
this.versions = versions;
}

private static List<ModelConfig> resolveModels(
List<MqttTopicConfig> topics)
{
return topics != null && !topics.isEmpty()
? topics.stream()
.flatMap(topic -> Stream.concat(
Stream.of(topic.content),
Stream.of(topic.content),
Optional.ofNullable(topic.userProperties).orElseGet(Collections::emptyList).stream()
.flatMap(p -> Stream.of(p.value))
.filter(Objects::nonNull))
.flatMap(p -> Stream.of(p.value))
.filter(Objects::nonNull))
.filter(Objects::nonNull))
.collect(Collectors.toList())
: emptyList());
this.authorization = authorization;
this.topics = topics;
this.versions = versions;
: emptyList();
}
}
2 changes: 1 addition & 1 deletion runtime/engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.77</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.74</jacoco.coverage.ratio>
<jacoco.missed.count>5</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,14 @@
package io.aklivity.zilla.runtime.engine;

import static io.aklivity.zilla.runtime.engine.internal.layouts.metrics.HistogramsLayout.BUCKETS;
import static java.net.http.HttpClient.Redirect.NORMAL;
import static java.net.http.HttpClient.Version.HTTP_2;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -40,11 +33,9 @@
import java.util.ServiceLoader.Provider;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand All @@ -60,6 +51,7 @@
import io.aklivity.zilla.runtime.engine.catalog.Catalog;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.NamespaceConfig;
import io.aklivity.zilla.runtime.engine.config.ResourceResolver;
import io.aklivity.zilla.runtime.engine.event.EventFormatterFactory;
import io.aklivity.zilla.runtime.engine.exporter.Exporter;
import io.aklivity.zilla.runtime.engine.ext.EngineExtContext;
Expand All @@ -71,9 +63,6 @@
import io.aklivity.zilla.runtime.engine.internal.layouts.EventsLayout;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineManager;
import io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker;
import io.aklivity.zilla.runtime.engine.internal.registry.FileWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.registry.HttpWatcherTask;
import io.aklivity.zilla.runtime.engine.internal.registry.WatcherTask;
import io.aklivity.zilla.runtime.engine.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.engine.metrics.Collector;
import io.aklivity.zilla.runtime.engine.metrics.MetricGroup;
Expand All @@ -92,15 +81,12 @@ public final class Engine implements Collector, AutoCloseable
private final AtomicInteger nextTaskId;
private final ThreadFactory factory;

private final WatcherTask watcherTask;
private final URL configURL;
private final List<EngineWorker> workers;
private final boolean readonly;
private final EngineConfiguration config;
private final EngineManager manager;

private Future<Void> watcherTaskRef;

Engine(
EngineConfiguration config,
Collection<Binding> bindings,
Expand Down Expand Up @@ -193,6 +179,8 @@ public final class Engine implements Collector, AutoCloseable
final Map<String, Guard> guardsByType = guards.stream()
.collect(Collectors.toMap(g -> g.name(), g -> g));

ResourceResolver resources = new ResourceResolver(catalogs, vaults);
this.configURL = config.configURL();
EngineManager manager = new EngineManager(
schemaTypes,
bindingsByType::get,
Expand All @@ -206,23 +194,9 @@ public final class Engine implements Collector, AutoCloseable
context,
config,
extensions,
this::readURL);

this.configURL = config.configURL();
String protocol = configURL.getProtocol();
if ("file".equals(protocol) || "jar".equals(protocol))
{
Function<String, String> watcherReadURL = l -> readURL(configURL, l);
this.watcherTask = new FileWatcherTask(manager::reconfigure, watcherReadURL);
}
else if ("http".equals(protocol) || "https".equals(protocol))
{
this.watcherTask = new HttpWatcherTask(manager::reconfigure, config.configPollIntervalSeconds());
}
else
{
throw new UnsupportedOperationException();
}
this.configURL,
attilakreiner marked this conversation as resolved.
Show resolved Hide resolved
this::readURL,
attilakreiner marked this conversation as resolved.
Show resolved Hide resolved
resources);

this.bindings = bindings;
this.tasks = tasks;
Expand Down Expand Up @@ -255,11 +229,10 @@ public void start() throws Exception
worker.doStart();
}

watcherTaskRef = watcherTask.submit();
// ignore the config file in read-only mode; no config will be read so no namespaces, bindings, etc. will be attached
if (!readonly)
{
// ignore the config file in read-only mode; no config will be read so no namespaces, bindings, etc will be attached
watcherTask.watch(configURL).get();
manager.startWatcher();
attilakreiner marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -273,8 +246,7 @@ public void close() throws Exception

final List<Throwable> errors = new ArrayList<>();

watcherTask.close();
watcherTaskRef.get();
manager.closeWatcher();
attilakreiner marked this conversation as resolved.
Show resolved Hide resolved

for (EngineWorker worker : workers)
{
Expand Down Expand Up @@ -317,46 +289,23 @@ public static EngineBuilder builder()
}

private String readURL(
URL configURL,
String location)
{
String output = null;
String result;
try
{
final URL fileURL = new URL(configURL, location);
if ("http".equals(fileURL.getProtocol()) || "https".equals(fileURL.getProtocol()))
URL url = new URL(configURL, location);
URLConnection connection = url.openConnection();
try (InputStream input = connection.getInputStream())
{
HttpClient client = HttpClient.newBuilder()
.version(HTTP_2)
.followRedirects(NORMAL)
.build();

HttpRequest request = HttpRequest.newBuilder()
.GET()
.uri(fileURL.toURI())
.build();

HttpResponse<String> response = client.send(
request,
HttpResponse.BodyHandlers.ofString());

output = response.body();
}
else
{

URLConnection connection = fileURL.openConnection();
try (InputStream input = connection.getInputStream())
{
output = new String(input.readAllBytes(), UTF_8);
}
result = new String(input.readAllBytes(), UTF_8);
}
}
catch (IOException | URISyntaxException | InterruptedException ex)
catch (Exception ex)
{
output = "";
result = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't want to default to empty string here.

After moving Files.readString(path) to each call site, handling an IOException will let the caller decide the best remedy instead of assuming empty string is always appropriate.

}
return output;
return result;
}

private Thread newTaskThread(
Expand Down
Loading