From 0a1d2de7b8ed0f16cae0b6497a450569e6d8a2c3 Mon Sep 17 00:00:00 2001 From: "epieffe.eth" Date: Thu, 26 Dec 2024 23:49:09 +0100 Subject: [PATCH] Support identity placeholder in MQTT binding topics --- .../internal/config/MqttBindingConfig.java | 4 +- .../internal/config/MqttConditionMatcher.java | 73 +++--- .../mqtt/internal/config/MqttRouteConfig.java | 12 +- .../config/MqttConditionMatcherTest.java | 209 ++++++++++++++++++ .../mqtt/schema/mqtt.schema.patch.json | 4 +- 5 files changed, 262 insertions(+), 40 deletions(-) create mode 100644 runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java index f6dd199e19..d577748cd0 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttBindingConfig.java @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe( String topic) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic)) + .filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic, authorization)) .findFirst() .orElse(null); } @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish( String topic) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matchesPublish(topic)) + .filter(r -> r.authorized(authorization) && r.matchesPublish(topic, authorization)) .findFirst() .orElse(null); } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java index 7d524e2b34..4c0f368a82 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcher.java @@ -22,25 +22,29 @@ import java.util.stream.Collectors; import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; +import io.aklivity.zilla.runtime.engine.config.GuardedConfig; public final class MqttConditionMatcher { private final List sessionMatchers; - private final List subscribeMatchers; - private final List publishMatchers; + private final List subscribeTopics; + private final List publishTopics; + private final List guarded; public MqttConditionMatcher( - MqttConditionConfig condition) + MqttConditionConfig condition, + List guarded) { this.sessionMatchers = condition.sessions != null && !condition.sessions.isEmpty() ? asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null; - this.subscribeMatchers = + this.subscribeTopics = condition.subscribes != null && !condition.subscribes.isEmpty() ? - asTopicMatcher(condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; - this.publishMatchers = + condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList()) : null; + this.publishTopics = condition.publishes != null && !condition.publishes.isEmpty() ? - asTopicMatcher(condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList())) : null; + condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList()) : null; + this.guarded = guarded; } public boolean matchesSession( @@ -62,14 +66,15 @@ public boolean matchesSession( } public boolean matchesSubscribe( - String topic) + String topic, + long authorization) { boolean match = false; - if (subscribeMatchers != null) + if (subscribeTopics != null) { - for (Matcher matcher : subscribeMatchers) + for (String subscribeTopic : subscribeTopics) { - match = matcher.reset(topic).matches(); + match = topicMatches(topic, subscribeTopic, authorization); if (match) { break; @@ -80,14 +85,15 @@ public boolean matchesSubscribe( } public boolean matchesPublish( - String topic) + String topic, + long authorization) { boolean match = false; - if (publishMatchers != null) + if (publishTopics != null) { - for (Matcher matcher : publishMatchers) + for (String publishTopic : publishTopics) { - match = matcher.reset(topic).matches(); + match = topicMatches(topic, publishTopic, authorization); if (match) { break; @@ -97,6 +103,27 @@ public boolean matchesPublish( return match; } + private boolean topicMatches(String topic, String pattern, long authorization) + { + for (GuardedConfig g : guarded) + { + String identity = g.identity.apply(authorization); + if (identity != null) + { + pattern = pattern.replace(String.format("{guarded[%s].identity}", g.name), identity); + } + } + return topic.matches(pattern + .replace("{", "\\{") + .replace("}", "\\}") + .replace("[", "\\[") + .replace("]", "\\]") + .replace(".", "\\.") + .replace("$", "\\$") + .replace("+", "[^/]*") + .replace("#", ".*")); + } + private static List asWildcardMatcher( List wildcards) { @@ -115,20 +142,4 @@ private static List asWildcardMatcher( return matchers; } - - private static List asTopicMatcher( - List wildcards) - { - List matchers = new ArrayList<>(); - for (String wildcard : wildcards) - { - matchers.add(Pattern.compile(wildcard - .replace(".", "\\.") - .replace("$", "\\$") - .replace("+", "[^/]*") - .replace("#", ".*")).matcher("")); - - } - return matchers; - } } diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java index 983760993f..c77cdfaaee 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttRouteConfig.java @@ -39,7 +39,7 @@ public MqttRouteConfig( this.id = route.id; this.when = route.when.stream() .map(MqttConditionConfig.class::cast) - .map(MqttConditionMatcher::new) + .map(conf -> new MqttConditionMatcher(conf, route.guarded)) .collect(toList()); this.with = (MqttWithConfig) route.with; this.authorized = route.authorized; @@ -63,14 +63,16 @@ boolean matchesSession( } boolean matchesSubscribe( - String topic) + String topic, + long authorization) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic, authorization)); } boolean matchesPublish( - String topic) + String topic, + long authorization) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic, authorization)); } } diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java new file mode 100644 index 0000000000..e6e3f60a70 --- /dev/null +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttConditionMatcherTest.java @@ -0,0 +1,209 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.mqtt.internal.config; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Map; + +import org.junit.Test; + +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttPublishConfig; +import io.aklivity.zilla.runtime.binding.mqtt.config.MqttSubscribeConfig; +import io.aklivity.zilla.runtime.engine.config.GuardedConfig; + +public class MqttConditionMatcherTest +{ + + @Test + public void shouldMatchIsolatedMultiLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "#", + "#"); + assertTrue(matcher.matchesPublish("#", 1L)); + assertTrue(matcher.matchesSubscribe("#", 1L)); + assertTrue(matcher.matchesPublish("topic", 1L)); + assertTrue(matcher.matchesSubscribe("topic", 1L)); + assertTrue(matcher.matchesPublish("topic/pub", 1L)); + assertTrue(matcher.matchesSubscribe("topic/sub", 1L)); + assertTrue(matcher.matchesPublish("topic/+/pub", 1L)); + assertTrue(matcher.matchesSubscribe("topic/+/sub", 1L)); + assertTrue(matcher.matchesPublish("topic/pub/#", 1L)); + assertTrue(matcher.matchesSubscribe("topic/sub/#", 1L)); + } + + @Test + public void shouldMatchMultipleTopicNames() + { + MqttConditionMatcher matcher = buildMatcher( + "topic/pub", + "topic/sub"); + assertTrue(matcher.matchesPublish("topic/pub", 1L)); + assertTrue(matcher.matchesSubscribe("topic/sub", 1L)); + assertFalse(matcher.matchesPublish("topic/#", 1L)); + assertFalse(matcher.matchesSubscribe("topic/#", 1L)); + assertFalse(matcher.matchesPublish("topic/+", 1L)); + assertFalse(matcher.matchesSubscribe("topic/+", 1L)); + assertFalse(matcher.matchesPublish("topic/sub", 1L)); + assertFalse(matcher.matchesSubscribe("topic/pub", 1L)); + assertFalse(matcher.matchesPublish("topic/pu", 1L)); + assertFalse(matcher.matchesSubscribe("topic/su", 1L)); + assertFalse(matcher.matchesPublish("topic/put", 1L)); + assertFalse(matcher.matchesSubscribe("topic/sup", 1L)); + assertFalse(matcher.matchesPublish("topic/publ", 1L)); + assertFalse(matcher.matchesSubscribe("topic/subs", 1L)); + assertFalse(matcher.matchesPublish("topicpub", 1L)); + assertFalse(matcher.matchesSubscribe("topicsub", 1L)); + assertFalse(matcher.matchesPublish("opic/pub", 1L)); + assertFalse(matcher.matchesSubscribe("opic/sub", 1L)); + assertFalse(matcher.matchesPublish("popic/pub", 1L)); + assertFalse(matcher.matchesSubscribe("zopic/sub", 1L)); + } + + @Test + public void shouldMatchMultipleTopicNamesWithSingleLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "topic/pub/+", + "topic/sub/+"); + assertTrue(matcher.matchesPublish("topic/pub/aa", 1L)); + assertTrue(matcher.matchesSubscribe("topic/sub/bbb", 1L)); + assertTrue(matcher.matchesPublish("topic/pub/+", 1L)); + assertTrue(matcher.matchesSubscribe("topic/sub/+", 1L)); + assertFalse(matcher.matchesPublish("topic/sub/aa", 1L)); + assertFalse(matcher.matchesSubscribe("topic/pub/bbb", 1L)); + assertFalse(matcher.matchesPublish("topic/pub/aa/one", 1L)); + assertFalse(matcher.matchesSubscribe("topic/sub/bbb/two", 1L)); + } + + @Test + public void shouldMatchMultipleTopicNamesWithSingleAndMultiLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "topic/+/pub/#", + "topic/+/sub/#"); + assertTrue(matcher.matchesPublish("topic/x/pub/aa", 1L)); + assertTrue(matcher.matchesSubscribe("topic/y/sub/b", 1L)); + assertTrue(matcher.matchesPublish("topic/x/pub/test/cc", 1L)); + assertTrue(matcher.matchesSubscribe("topic/y/sub/test/bb", 1L)); + assertFalse(matcher.matchesPublish("topic/pub/aa", 1L)); + assertFalse(matcher.matchesSubscribe("topic/sub/b", 1L)); + } + + @Test + public void shouldMatchTopicNameWithIdentityPlaceholder() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{guarded[gname].identity}", + "sub/{guarded[gname].identity}", + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertTrue(matcher.matchesPublish("pub/myuser", 1L)); + assertTrue(matcher.matchesSubscribe("sub/myuser", 1L)); + assertTrue(matcher.matchesPublish("pub/otheruser", 2L)); + assertTrue(matcher.matchesSubscribe("sub/otheruser", 2L)); + assertFalse(matcher.matchesPublish("pub/myuser", 2L)); + assertFalse(matcher.matchesSubscribe("sub/myuser", 2L)); + assertFalse(matcher.matchesPublish("pub/otheruser", 1L)); + assertFalse(matcher.matchesSubscribe("sub/otheruser", 1L)); + assertFalse(matcher.matchesPublish("pub/myuset", 1L)); + assertFalse(matcher.matchesSubscribe("sub/myuset", 1L)); + assertFalse(matcher.matchesPublish("pub/myusert", 1L)); + assertFalse(matcher.matchesSubscribe("sub/myusert", 1L)); + assertFalse(matcher.matchesPublish("pub/myuser/a", 1L)); + assertFalse(matcher.matchesSubscribe("sub/myuser/a", 1L)); + assertFalse(matcher.matchesPublish("pub/myuser", 3L)); + assertFalse(matcher.matchesSubscribe("sub/myuser", 3L)); + assertFalse(matcher.matchesPublish("pub/null", 3L)); + assertFalse(matcher.matchesSubscribe("sub/null", 3L)); + } + + @Test + public void shouldMatchTopicNameWithIdentityPlaceholderAndMultiLevelWildcard() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{guarded[gname].identity}/#", + "sub/{guarded[gname].identity}/#", + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertTrue(matcher.matchesPublish("pub/myuser/pubtest", 1L)); + assertTrue(matcher.matchesSubscribe("sub/myuser/subtest", 1L)); + assertTrue(matcher.matchesPublish("pub/myuser/pubtest/aaa", 1L)); + assertTrue(matcher.matchesSubscribe("sub/myuser/subtest/aa", 1L)); + assertTrue(matcher.matchesPublish("pub/otheruser/pubtest", 2L)); + assertTrue(matcher.matchesSubscribe("sub/otheruser/subtest", 2L)); + assertTrue(matcher.matchesPublish("pub/otheruser/pubtest/aa", 2L)); + assertTrue(matcher.matchesSubscribe("sub/otheruser/subtest/aa", 2L)); + assertFalse(matcher.matchesPublish("pub/myuser/pubtest", 2L)); + assertFalse(matcher.matchesSubscribe("sub/myuser/subtest", 2L)); + assertFalse(matcher.matchesPublish("pub/myuser/pubtest/aaa", 2L)); + assertFalse(matcher.matchesSubscribe("sub/myuser/subtest/aa", 2L)); + assertFalse(matcher.matchesPublish("pub/otheruser/pubtest", 1L)); + assertFalse(matcher.matchesSubscribe("sub/otheruser/subtest", 1L)); + assertFalse(matcher.matchesPublish("pub/otheruser/pubtest/aa", 1L)); + assertFalse(matcher.matchesSubscribe("sub/otheruser/subtest/aa", 1L)); + } + + @Test + public void shouldNotMatchTopicNameWithInvalidIdentityPlaceholder() + { + MqttConditionMatcher matcher = buildMatcher( + "pub/{guarded[invalid].identity}", + "sub/{guarded[invalid].identity}", + "gname", + Map.of( + 1L, "myuser", + 2L, "otheruser")); + assertTrue(matcher.matchesPublish("pub/{guarded[invalid].identity}", 1L)); + assertTrue(matcher.matchesSubscribe("sub/{guarded[invalid].identity}", 1L)); + assertFalse(matcher.matchesPublish("pub/myuser", 1L)); + assertFalse(matcher.matchesSubscribe("sub/myuser", 1L)); + assertFalse(matcher.matchesPublish("pub/otheruser", 2L)); + assertFalse(matcher.matchesSubscribe("sub/otheruser", 2L)); + } + + private static MqttConditionMatcher buildMatcher( + String publishTopic, + String subscribeTopic) + { + return buildMatcher(publishTopic, subscribeTopic, "", Map.of()); + } + + private static MqttConditionMatcher buildMatcher( + String publishTopic, + String subscribeTopic, + String guardName, + Map identities) + { + MqttConditionConfig condition = MqttConditionConfig.builder() + .publish(MqttPublishConfig.builder().topic(publishTopic).build()) + .subscribe(MqttSubscribeConfig.builder().topic(subscribeTopic).build()) + .build(); + GuardedConfig guarded = GuardedConfig.builder() + .name(guardName) + .build(); + guarded.identity = identities::get; + return new MqttConditionMatcher(condition, List.of(guarded)); + } +} diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json index 7303b0ad33..81eb318869 100644 --- a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/schema/mqtt.schema.patch.json @@ -84,7 +84,7 @@ { "title": "Topic", "type": "string", - "pattern": "^(\\/?([\\w{}-]*|\\+)(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?)$" + "pattern": "^(\\/?([\\w{}\\[\\]\\.-]*|\\+)(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?)$" } } } @@ -101,7 +101,7 @@ "topic": { "title": "Topic", "type": "string", - "pattern": "^(\\/?([\\w{}-]*|\\+)(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?)$" + "pattern": "^(\\/?([\\w{}\\[\\]\\.-]*|\\+)(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?)$" } } }