Skip to content

Commit

Permalink
Support identity placeholder in MQTT binding topics
Browse files Browse the repository at this point in the history
  • Loading branch information
epieffe committed Jan 2, 2025
1 parent 063d7cb commit 0a1d2de
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic))
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic, authorization))
.findFirst()
.orElse(null);
}
Expand All @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic))
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic, authorization))
.findFirst()
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,29 @@
import java.util.stream.Collectors;

import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig;
import io.aklivity.zilla.runtime.engine.config.GuardedConfig;

public final class MqttConditionMatcher
{
private final List<Matcher> sessionMatchers;
private final List<Matcher> subscribeMatchers;
private final List<Matcher> publishMatchers;
private final List<String> subscribeTopics;
private final List<String> publishTopics;
private final List<GuardedConfig> guarded;

public MqttConditionMatcher(
MqttConditionConfig condition)
MqttConditionConfig condition,
List<GuardedConfig> guarded)
{
this.sessionMatchers =
condition.sessions != null && !condition.sessions.isEmpty() ?
asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null;
this.subscribeMatchers =
this.subscribeTopics =
condition.subscribes != null && !condition.subscribes.isEmpty() ?
asTopicMatcher(condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList())) : null;
this.publishMatchers =
condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList()) : null;
this.publishTopics =
condition.publishes != null && !condition.publishes.isEmpty() ?
asTopicMatcher(condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList())) : null;
condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList()) : null;
this.guarded = guarded;
}

public boolean matchesSession(
Expand All @@ -62,14 +66,15 @@ public boolean matchesSession(
}

public boolean matchesSubscribe(
String topic)
String topic,
long authorization)
{
boolean match = false;
if (subscribeMatchers != null)
if (subscribeTopics != null)
{
for (Matcher matcher : subscribeMatchers)
for (String subscribeTopic : subscribeTopics)
{
match = matcher.reset(topic).matches();
match = topicMatches(topic, subscribeTopic, authorization);
if (match)
{
break;
Expand All @@ -80,14 +85,15 @@ public boolean matchesSubscribe(
}

public boolean matchesPublish(
String topic)
String topic,
long authorization)
{
boolean match = false;
if (publishMatchers != null)
if (publishTopics != null)
{
for (Matcher matcher : publishMatchers)
for (String publishTopic : publishTopics)
{
match = matcher.reset(topic).matches();
match = topicMatches(topic, publishTopic, authorization);
if (match)
{
break;
Expand All @@ -97,6 +103,27 @@ public boolean matchesPublish(
return match;
}

private boolean topicMatches(String topic, String pattern, long authorization)
{
for (GuardedConfig g : guarded)
{
String identity = g.identity.apply(authorization);
if (identity != null)
{
pattern = pattern.replace(String.format("{guarded[%s].identity}", g.name), identity);
}
}
return topic.matches(pattern
.replace("{", "\\{")
.replace("}", "\\}")
.replace("[", "\\[")
.replace("]", "\\]")
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*"));
}

private static List<Matcher> asWildcardMatcher(
List<String> wildcards)
{
Expand All @@ -115,20 +142,4 @@ private static List<Matcher> asWildcardMatcher(

return matchers;
}

private static List<Matcher> asTopicMatcher(
List<String> wildcards)
{
List<Matcher> matchers = new ArrayList<>();
for (String wildcard : wildcards)
{
matchers.add(Pattern.compile(wildcard
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*")).matcher(""));

}
return matchers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public MqttRouteConfig(
this.id = route.id;
this.when = route.when.stream()
.map(MqttConditionConfig.class::cast)
.map(MqttConditionMatcher::new)
.map(conf -> new MqttConditionMatcher(conf, route.guarded))
.collect(toList());
this.with = (MqttWithConfig) route.with;
this.authorized = route.authorized;
Expand All @@ -63,14 +63,16 @@ boolean matchesSession(
}

boolean matchesSubscribe(
String topic)
String topic,
long authorization)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic));
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic, authorization));
}

boolean matchesPublish(
String topic)
String topic,
long authorization)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic));
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic, authorization));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2021-2024 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.internal.config;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.List;
import java.util.Map;

import org.junit.Test;

import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig;
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttPublishConfig;
import io.aklivity.zilla.runtime.binding.mqtt.config.MqttSubscribeConfig;
import io.aklivity.zilla.runtime.engine.config.GuardedConfig;

public class MqttConditionMatcherTest
{

@Test
public void shouldMatchIsolatedMultiLevelWildcard()
{
MqttConditionMatcher matcher = buildMatcher(
"#",
"#");
assertTrue(matcher.matchesPublish("#", 1L));
assertTrue(matcher.matchesSubscribe("#", 1L));
assertTrue(matcher.matchesPublish("topic", 1L));
assertTrue(matcher.matchesSubscribe("topic", 1L));
assertTrue(matcher.matchesPublish("topic/pub", 1L));
assertTrue(matcher.matchesSubscribe("topic/sub", 1L));
assertTrue(matcher.matchesPublish("topic/+/pub", 1L));
assertTrue(matcher.matchesSubscribe("topic/+/sub", 1L));
assertTrue(matcher.matchesPublish("topic/pub/#", 1L));
assertTrue(matcher.matchesSubscribe("topic/sub/#", 1L));
}

@Test
public void shouldMatchMultipleTopicNames()
{
MqttConditionMatcher matcher = buildMatcher(
"topic/pub",
"topic/sub");
assertTrue(matcher.matchesPublish("topic/pub", 1L));
assertTrue(matcher.matchesSubscribe("topic/sub", 1L));
assertFalse(matcher.matchesPublish("topic/#", 1L));
assertFalse(matcher.matchesSubscribe("topic/#", 1L));
assertFalse(matcher.matchesPublish("topic/+", 1L));
assertFalse(matcher.matchesSubscribe("topic/+", 1L));
assertFalse(matcher.matchesPublish("topic/sub", 1L));
assertFalse(matcher.matchesSubscribe("topic/pub", 1L));
assertFalse(matcher.matchesPublish("topic/pu", 1L));
assertFalse(matcher.matchesSubscribe("topic/su", 1L));
assertFalse(matcher.matchesPublish("topic/put", 1L));
assertFalse(matcher.matchesSubscribe("topic/sup", 1L));
assertFalse(matcher.matchesPublish("topic/publ", 1L));
assertFalse(matcher.matchesSubscribe("topic/subs", 1L));
assertFalse(matcher.matchesPublish("topicpub", 1L));
assertFalse(matcher.matchesSubscribe("topicsub", 1L));
assertFalse(matcher.matchesPublish("opic/pub", 1L));
assertFalse(matcher.matchesSubscribe("opic/sub", 1L));
assertFalse(matcher.matchesPublish("popic/pub", 1L));
assertFalse(matcher.matchesSubscribe("zopic/sub", 1L));
}

@Test
public void shouldMatchMultipleTopicNamesWithSingleLevelWildcard()
{
MqttConditionMatcher matcher = buildMatcher(
"topic/pub/+",
"topic/sub/+");
assertTrue(matcher.matchesPublish("topic/pub/aa", 1L));
assertTrue(matcher.matchesSubscribe("topic/sub/bbb", 1L));
assertTrue(matcher.matchesPublish("topic/pub/+", 1L));
assertTrue(matcher.matchesSubscribe("topic/sub/+", 1L));
assertFalse(matcher.matchesPublish("topic/sub/aa", 1L));
assertFalse(matcher.matchesSubscribe("topic/pub/bbb", 1L));
assertFalse(matcher.matchesPublish("topic/pub/aa/one", 1L));
assertFalse(matcher.matchesSubscribe("topic/sub/bbb/two", 1L));
}

@Test
public void shouldMatchMultipleTopicNamesWithSingleAndMultiLevelWildcard()
{
MqttConditionMatcher matcher = buildMatcher(
"topic/+/pub/#",
"topic/+/sub/#");
assertTrue(matcher.matchesPublish("topic/x/pub/aa", 1L));
assertTrue(matcher.matchesSubscribe("topic/y/sub/b", 1L));
assertTrue(matcher.matchesPublish("topic/x/pub/test/cc", 1L));
assertTrue(matcher.matchesSubscribe("topic/y/sub/test/bb", 1L));
assertFalse(matcher.matchesPublish("topic/pub/aa", 1L));
assertFalse(matcher.matchesSubscribe("topic/sub/b", 1L));
}

@Test
public void shouldMatchTopicNameWithIdentityPlaceholder()
{
MqttConditionMatcher matcher = buildMatcher(
"pub/{guarded[gname].identity}",
"sub/{guarded[gname].identity}",
"gname",
Map.of(
1L, "myuser",
2L, "otheruser"));
assertTrue(matcher.matchesPublish("pub/myuser", 1L));
assertTrue(matcher.matchesSubscribe("sub/myuser", 1L));
assertTrue(matcher.matchesPublish("pub/otheruser", 2L));
assertTrue(matcher.matchesSubscribe("sub/otheruser", 2L));
assertFalse(matcher.matchesPublish("pub/myuser", 2L));
assertFalse(matcher.matchesSubscribe("sub/myuser", 2L));
assertFalse(matcher.matchesPublish("pub/otheruser", 1L));
assertFalse(matcher.matchesSubscribe("sub/otheruser", 1L));
assertFalse(matcher.matchesPublish("pub/myuset", 1L));
assertFalse(matcher.matchesSubscribe("sub/myuset", 1L));
assertFalse(matcher.matchesPublish("pub/myusert", 1L));
assertFalse(matcher.matchesSubscribe("sub/myusert", 1L));
assertFalse(matcher.matchesPublish("pub/myuser/a", 1L));
assertFalse(matcher.matchesSubscribe("sub/myuser/a", 1L));
assertFalse(matcher.matchesPublish("pub/myuser", 3L));
assertFalse(matcher.matchesSubscribe("sub/myuser", 3L));
assertFalse(matcher.matchesPublish("pub/null", 3L));
assertFalse(matcher.matchesSubscribe("sub/null", 3L));
}

@Test
public void shouldMatchTopicNameWithIdentityPlaceholderAndMultiLevelWildcard()
{
MqttConditionMatcher matcher = buildMatcher(
"pub/{guarded[gname].identity}/#",
"sub/{guarded[gname].identity}/#",
"gname",
Map.of(
1L, "myuser",
2L, "otheruser"));
assertTrue(matcher.matchesPublish("pub/myuser/pubtest", 1L));
assertTrue(matcher.matchesSubscribe("sub/myuser/subtest", 1L));
assertTrue(matcher.matchesPublish("pub/myuser/pubtest/aaa", 1L));
assertTrue(matcher.matchesSubscribe("sub/myuser/subtest/aa", 1L));
assertTrue(matcher.matchesPublish("pub/otheruser/pubtest", 2L));
assertTrue(matcher.matchesSubscribe("sub/otheruser/subtest", 2L));
assertTrue(matcher.matchesPublish("pub/otheruser/pubtest/aa", 2L));
assertTrue(matcher.matchesSubscribe("sub/otheruser/subtest/aa", 2L));
assertFalse(matcher.matchesPublish("pub/myuser/pubtest", 2L));
assertFalse(matcher.matchesSubscribe("sub/myuser/subtest", 2L));
assertFalse(matcher.matchesPublish("pub/myuser/pubtest/aaa", 2L));
assertFalse(matcher.matchesSubscribe("sub/myuser/subtest/aa", 2L));
assertFalse(matcher.matchesPublish("pub/otheruser/pubtest", 1L));
assertFalse(matcher.matchesSubscribe("sub/otheruser/subtest", 1L));
assertFalse(matcher.matchesPublish("pub/otheruser/pubtest/aa", 1L));
assertFalse(matcher.matchesSubscribe("sub/otheruser/subtest/aa", 1L));
}

@Test
public void shouldNotMatchTopicNameWithInvalidIdentityPlaceholder()
{
MqttConditionMatcher matcher = buildMatcher(
"pub/{guarded[invalid].identity}",
"sub/{guarded[invalid].identity}",
"gname",
Map.of(
1L, "myuser",
2L, "otheruser"));
assertTrue(matcher.matchesPublish("pub/{guarded[invalid].identity}", 1L));
assertTrue(matcher.matchesSubscribe("sub/{guarded[invalid].identity}", 1L));
assertFalse(matcher.matchesPublish("pub/myuser", 1L));
assertFalse(matcher.matchesSubscribe("sub/myuser", 1L));
assertFalse(matcher.matchesPublish("pub/otheruser", 2L));
assertFalse(matcher.matchesSubscribe("sub/otheruser", 2L));
}

private static MqttConditionMatcher buildMatcher(
String publishTopic,
String subscribeTopic)
{
return buildMatcher(publishTopic, subscribeTopic, "", Map.of());
}

private static MqttConditionMatcher buildMatcher(
String publishTopic,
String subscribeTopic,
String guardName,
Map<Long, String> identities)
{
MqttConditionConfig condition = MqttConditionConfig.builder()
.publish(MqttPublishConfig.builder().topic(publishTopic).build())
.subscribe(MqttSubscribeConfig.builder().topic(subscribeTopic).build())
.build();
GuardedConfig guarded = GuardedConfig.builder()
.name(guardName)
.build();
guarded.identity = identities::get;
return new MqttConditionMatcher(condition, List.of(guarded));
}
}
Loading

0 comments on commit 0a1d2de

Please sign in to comment.