Skip to content

Commit

Permalink
Add publish qos max options for mqtt-kafka binding (#1065)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics authored May 30, 2024
1 parent da470d6 commit 566bcfb
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaConditionKind;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaWithConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.MetricRefConfig;
Expand Down Expand Up @@ -93,6 +94,9 @@ public NamespaceConfig generateProxy(
.messages(messages)
.retained(retained)
.build()
.publish()
.qosMax(MqttQoS.EXACTLY_ONCE.name().toLowerCase())
.build()
.clients(Collections.emptyList())
.build()
.inject(b -> this.injectMqttKafkaRoutes(b, routes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class MqttKafkaOptionsConfig extends OptionsConfig
public final MqttKafkaTopicsConfig topics;
public final String serverRef;
public final List<String> clients;
public final MqttKafkaPublishConfig publish;

public static MqttKafkaOptionsConfigBuilder<MqttKafkaOptionsConfig> builder()
{
Expand All @@ -39,10 +40,12 @@ public static <T> MqttKafkaOptionsConfigBuilder<T> builder(
MqttKafkaOptionsConfig(
MqttKafkaTopicsConfig topics,
String serverRef,
List<String> clients)
List<String> clients,
MqttKafkaPublishConfig publish)
{
this.topics = topics;
this.serverRef = serverRef;
this.clients = clients;
this.publish = publish;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class MqttKafkaOptionsConfigBuilder<T> extends ConfigBuilder<T, MqttKafka
private MqttKafkaTopicsConfig topics;
private String serverRef;
private List<String> clients;
private MqttKafkaPublishConfig publish;

MqttKafkaOptionsConfigBuilder(
Function<OptionsConfig, T> mapper)
Expand Down Expand Up @@ -68,10 +69,22 @@ public MqttKafkaOptionsConfigBuilder<T> clients(
return this;
}

public MqttKafkaOptionsConfigBuilder<T> publish(
MqttKafkaPublishConfig publish)
{
this.publish = publish;
return this;
}

public MqttKafkaPublishConfigBuilder<MqttKafkaOptionsConfigBuilder<T>> publish()
{
return new MqttKafkaPublishConfigBuilder<>(this::publish);
}


@Override
public T build()
{
return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, clients));
return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, clients, publish));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;

public class MqttKafkaPublishConfig
{
public final MqttQoS qosMax;

public static MqttKafkaPublishConfigBuilder<MqttKafkaPublishConfig> builder()
{
return new MqttKafkaPublishConfigBuilder<>(MqttKafkaPublishConfig.class::cast);
}

public static <T> MqttKafkaPublishConfigBuilder<T> builder(
Function<MqttKafkaPublishConfig, T> mapper)
{
return new MqttKafkaPublishConfigBuilder<>(mapper);
}

MqttKafkaPublishConfig(
MqttQoS qosMax)
{
this.qosMax = qosMax;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.config;

import java.util.function.Function;

import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;

public class MqttKafkaPublishConfigBuilder<T> extends ConfigBuilder<T, MqttKafkaPublishConfigBuilder<T>>
{
private final Function<MqttKafkaPublishConfig, T> mapper;

private MqttQoS qosMax;

MqttKafkaPublishConfigBuilder(
Function<MqttKafkaPublishConfig, T> mapper)
{
this.mapper = mapper;
}

@Override
@SuppressWarnings("unchecked")
protected Class<MqttKafkaPublishConfigBuilder<T>> thisType()
{
return (Class<MqttKafkaPublishConfigBuilder<T>>) getClass();
}


public MqttKafkaPublishConfigBuilder<T> qosMax(
String qosMax)
{
this.qosMax = MqttQoS.valueOf(qosMax.toUpperCase());
return this;
}


@Override
public T build()
{
return mapper.apply(new MqttKafkaPublishConfig(qosMax));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.config;

import static io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS.AT_LEAST_ONCE;
import static java.util.stream.Collectors.toList;

import java.util.ArrayList;
Expand Down Expand Up @@ -99,7 +100,7 @@ public List<MqttKafkaRouteConfig> resolveAll(
public MqttQoS publishQosMax()
{
return routes.stream().noneMatch(r -> r.with != null && r.with.containsParams()) ?
MqttQoS.EXACTLY_ONCE : MqttQoS.AT_LEAST_ONCE;
options.publish.qosMax : AT_LEAST_ONCE;
}

public String16FW messagesTopic()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaOptionsConfigBuilder;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaPublishConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.config.MqttKafkaTopicsConfig;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.MqttKafkaBinding;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.MqttQoS;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
Expand All @@ -41,6 +43,8 @@ public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, J
private static final String SESSIONS_NAME = "sessions";
private static final String MESSAGES_NAME = "messages";
private static final String RETAINED_NAME = "retained";
private static final String PUBLISH_NAME = "publish";
private static final String QOS_MAX_NAME = "qosMax";

@Override
public Kind kind()
Expand Down Expand Up @@ -111,6 +115,7 @@ public OptionsConfig adaptFromJson(
JsonObject topics = object.getJsonObject(TOPICS_NAME);
options.serverRef(object.getString(SERVER_NAME, null));
JsonArray clientsJson = object.getJsonArray(CLIENTS_NAME);
JsonObject publish = object.getJsonObject(PUBLISH_NAME);

List<String> clients = new ArrayList<>();
if (clientsJson != null)
Expand All @@ -128,6 +133,17 @@ public OptionsConfig adaptFromJson(
.retained(topics.getString(RETAINED_NAME))
.build());

if (publish != null)
{
options.publish(MqttKafkaPublishConfig.builder()
.qosMax(publish.getString(QOS_MAX_NAME)).build());
}
else
{
options.publish(MqttKafkaPublishConfig.builder()
.qosMax(MqttQoS.EXACTLY_ONCE.name()).build());
}

return options.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ public void shouldRejectPublishWhenTopicSpaceWithParams() throws Exception
k3po.finish();
}

@Test
@Configuration("proxy.publish.qos.max.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
@Specification({
"${mqtt}/publish.reject.qos2/client",
"${kafka}/publish.reject.qos2/server"})
public void shouldRejectPublishWhenQosMaxSet() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.when.client.topic.space.yaml")
@Configure(name = WILL_AVAILABLE_NAME, value = "false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
bindings:
mqtt0:
type: mqtt-kafka
kind: proxy
options:
topics:
sessions: mqtt-sessions
messages: mqtt-messages
retained: mqtt-retained
publish:
qosMax: at_least_once
exit: kafka0
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@
{
"type": "string"
}
},
"publish":
{
"title": "Publish",
"type": "object",
"properties":
{
"qosMax":
{
"title": "Maximum QoS",
"type": "string",
"enum": [ "at_most_once", "at_least_once", "exactly_once" ],
"default": "exactly_once"
},
"additionalProperties": false
},
"required":
[
"qosMax"
],
"additionalProperties": false
}
},
"required":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public void shouldValidateProxyWithOptions()
assertThat(config, not(nullValue()));
}

@Test
public void shouldValidateProxyWithPublishQosMax()
{
JsonObject config = schema.validate("proxy.publish.qos.max.yaml");

assertThat(config, not(nullValue()));
}

@Test
public void shouldValidateProxyWhenPublishTopicWithMessages()
{
Expand Down

0 comments on commit 566bcfb

Please sign in to comment.