Skip to content

Commit

Permalink
Merge pull request #429 from awills96/feature/global-parameter-mqtt-v…
Browse files Browse the repository at this point in the history
…host-mapping

Add global parameter mqtt-vhost mapping functions
  • Loading branch information
acogoluegnes authored Jan 17, 2024
2 parents 1b64339 + 2dc5232 commit 47bb6b9
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 1 deletion.
3 changes: 3 additions & 0 deletions ci/before-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ $PLUGINS enable rabbitmq_shovel_management
$PLUGINS enable rabbitmq_federation
$PLUGINS enable rabbitmq_federation_management

# Enable mqtt plugin
$PLUGINS enable rabbitmq_mqtt

# So that virtual host descriptions are available
$CTL enable_feature_flag all

Expand Down
2 changes: 1 addition & 1 deletion ci/start-broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ wait_for_message() {

mkdir -p rabbitmq-configuration

echo "[rabbitmq_management, rabbitmq_shovel,rabbitmq_shovel_management,rabbitmq_federation,rabbitmq_federation_management]." \
echo "[rabbitmq_management, rabbitmq_shovel,rabbitmq_shovel_management,rabbitmq_federation,rabbitmq_federation_management,rabbitmq_mqtt]." \
> rabbitmq-configuration/enabled_plugins

echo "Running RabbitMQ ${RABBITMQ_IMAGE}"
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/rabbitmq/http/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,29 @@ public void clearMaxQueuesLimit(String vhost) {
this.deleteIgnoring404(uri);
}

public MqttVhostPortInfo getMqttPortToVhostMapping(){
return getGlobalParameters("mqtt_port_to_vhost_mapping", new ParameterizedTypeReference<>() {});
}

public void deleteMqttPortToVhostMapping() {
this.deleteIgnoring404(uri().withEncodedPath("./global-parameters/mqtt_port_to_vhost_mapping").get());
}

public void setMqttPortToVhostMapping(Map<Integer, String> portMappings) {
for (String vhost : portMappings.values()){
if (vhost.isBlank()) {
throw new IllegalArgumentException("Map with undefined vhosts provided!");
}
}

final URI uri = uri().withEncodedPath("./global-parameters/mqtt_port_to_vhost_mapping").get();

MqttVhostPortInfo body = new MqttVhostPortInfo();
body.setValue(portMappings);

this.httpLayer.put(uri, body);
}

private <T> List<T> getParameters(String component, final ParameterizedTypeReference<List<T>> responseType) {
final URI uri = uri().withEncodedPath("./parameters").withEncodedPath(component).withPathSeparator().get();
return this.httpLayer.get(uri, responseType);
Expand All @@ -1271,6 +1294,11 @@ private <T> List<T> getParameters(String vhost, String component, final Paramete
return getForObjectReturningNullOn404(uri, responseType);
}

private <T> T getGlobalParameters(String name, final ParameterizedTypeReference<T> responseType) {
final URI uri = uri().withEncodedPath("./global-parameters").withEncodedPath(name).withPathSeparator().get();
return getForObjectReturningNullOn404(uri, responseType);
}

//
// Implementation
//
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,27 @@ public Flux<UpstreamSetInfo> getUpstreamSets(String vhost) {
return doGetFlux(UpstreamSetInfo.class, "parameters", "federation-upstream-set", encodePath(vhost));
}

public Mono<MqttVhostPortInfo> getMqttPortToVhostMapping(){
return doGetMono(MqttVhostPortInfo.class, "global-parameters", "mqtt_port_to_vhost_mapping");
}

public Mono<HttpResponse> deleteMqttPortToVhostMapping(){
return doDelete( "global-parameters", "mqtt_port_to_vhost_mapping");
}

public Mono<HttpResponse> setMqttPortToVhostMapping(Map<Integer, String> portMappings){
for (String vhost : portMappings.values()){
if (vhost.isBlank()) {
throw new IllegalArgumentException("Map with undefined vhosts provided!");
}
}

MqttVhostPortInfo body = new MqttVhostPortInfo();
body.setValue(portMappings);
return doPut(body, "global-parameters", "mqtt_port_to_vhost_mapping");
}


/**
* Returns the limits (max queues and connections) for all virtual hosts.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.rabbitmq.http.client.domain;

import com.fasterxml.jackson.annotation.JsonInclude;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class GlobalRuntimeParameter<T> {
private String name;
private T value;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public T getValue() {
return value;
}

public void setValue(T value) {
this.value = value;
}

@Override
public String toString() {
return "RuntimeParameter{" +
"name='" + name + '\'' +
", value=" + value +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.rabbitmq.http.client.domain;

import java.util.Map;

public class MqttVhostPortInfo extends GlobalRuntimeParameter<Map<Integer, String>> {}
38 changes: 38 additions & 0 deletions src/test/groovy/com/rabbitmq/http/client/ClientSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2843,6 +2843,44 @@ class ClientSpec extends Specification {

}

def "GET /api/global-parameters/mqtt_port_to_vhost_mapping without mqtt vhost port mapping"() {
given: "rabbitmq deployment without mqtt port mappings defined"
client.deleteMqttPortToVhostMapping()

when: "client tries to look up mqtt vhost port mappings"
def mqttPorts = client.getMqttPortToVhostMapping()

then: "mqtt vhost port mappings are null"
mqttPorts == null
}

def "GET /api/global-parameters/mqtt_port_to_vhost_mapping with a sample mapping"() {
given: "a mqtt mapping with 2 vhosts defined"
def mqttInputMap = Map.of(2024, "vhost1", 2025, "vhost2")
client.setMqttPortToVhostMapping(mqttInputMap)

when: "client tries to get mqtt port mappings"
def mqttInfo = client.getMqttPortToVhostMapping()
def mqttReturnValues = mqttInfo.getValue()

then: "a map with 2 mqtt ports and vhosts is returned"
mqttReturnValues == mqttInputMap

cleanup:
client.deleteMqttPortToVhostMapping()
}

def "PUT /api/global-parameters/mqtt_port_to_vhost_mapping with a blank vhost value"(){
given: "a mqtt mapping with blank vhost"
def mqttInputMap = Map.of(2024, " ", 2025, "vhost2")

when: "client tries to set mqtt port mappings"
client.setMqttPortToVhostMapping(mqttInputMap)

then: "an illegal argument exception is thrown"
thrown(IllegalArgumentException)
}


def "GET /api/vhost-limits/{vhost}"() {
given: "a virtual host with limits"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,45 @@ class ReactorNettyClientSpec extends Specification {
client.clearMaxConnectionsLimit("/").block()
}


def "GET /api/global-parameters/mqtt_port_to_vhost_mapping without mqtt vhost port mapping"() {
given: "rabbitmq deployment without mqtt port mappings defined"
client.deleteMqttPortToVhostMapping().block();

when: "client tries to look up mqtt vhost port mappings"
def mqttPorts = client.getMqttPortToVhostMapping().block();
then: "mono throws exception"
def exception = thrown(HttpClientException.class)
exception.status() == 404
}

def "GET /api/global-parameters/mqtt_port_to_vhost_mapping with a sample mapping"() {
given: "a mqtt mapping with 2 vhosts defined"
def mqttInputMap = Map.of(2024, "vhost1", 2025, "vhost2")
client.setMqttPortToVhostMapping(mqttInputMap).block()

when: "client tries to get mqtt port mappings"
def mqttInfo = client.getMqttPortToVhostMapping().block()
def mqttReturnValues = mqttInfo.getValue()

then: "a map with 2 mqtt ports and vhosts is returned"
mqttReturnValues == mqttInputMap

cleanup:
client.deleteMqttPortToVhostMapping().block()
}

def "PUT /api/global-parameters/mqtt_port_to_vhost_mapping with a blank vhost value"(){
given: "a mqtt mapping with blank vhost"
def mqttInputMap = Map.of(2024, " ", 2025, "vhost2")

when: "client tries to set mqtt port mappings"
client.setMqttPortToVhostMapping(mqttInputMap).block()

then: "an illegal argument exception is thrown"
thrown(IllegalArgumentException)
}

def "GET /api/vhost-limits without limits on any host"() {
given: "the default configuration"

Expand Down

0 comments on commit 47bb6b9

Please sign in to comment.