From f705bf59a94e65699e86a4c1b92c56b42d655f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Wed, 1 Sep 2021 15:02:54 +0200 Subject: [PATCH] Plug cli-protonj2 into selftests (#287) --- cli-protonj2/pom.xml | 5 + .../com/redhat/mqe/CliProtonJ2Receiver.java | 251 +++++++++ .../com/redhat/mqe/CliProtonJ2Sender.java | 279 ++++++++++ .../main/java/com/redhat/mqe/ContentType.java | 2 +- .../src/main/java/com/redhat/mqe/LogMsgs.java | 24 + .../src/main/java/com/redhat/mqe/Main.java | 489 +----------------- .../java/com/redhat/mqe/PropertyType.java | 35 ++ .../redhat/mqe/ProtonJ2MessageFormatter.java | 97 ++++ .../test/java/com/redhat/mqe/MainTest.java | 6 +- .../com/redhat/mqe/MessageLoggingTest.java | 119 +++++ cli-protonj2/src/test/kotlin/MainTest.kt | 235 +++++++++ .../src/test/kotlin/ClientFormatterSpy.kt | 51 +- interop-tests/src/test/kotlin/InteropTest.kt | 2 +- .../test/kotlin/ProtonJ2ClientFormatterSpy.kt | 39 ++ .../main/java/com/redhat/mqe/lib/Content.java | 9 +- .../com/redhat/mqe/lib/MessageFormatter.java | 14 +- .../main/java/com/redhat/mqe/lib/Utils.java | 0 17 files changed, 1154 insertions(+), 503 deletions(-) create mode 100644 cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java create mode 100644 cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Sender.java create mode 100644 cli-protonj2/src/main/java/com/redhat/mqe/LogMsgs.java create mode 100644 cli-protonj2/src/main/java/com/redhat/mqe/PropertyType.java create mode 100644 cli-protonj2/src/main/java/com/redhat/mqe/ProtonJ2MessageFormatter.java create mode 100644 cli-protonj2/src/test/java/com/redhat/mqe/MessageLoggingTest.java create mode 100644 cli-protonj2/src/test/kotlin/MainTest.kt create mode 100644 interop-tests/src/test/kotlin/ProtonJ2ClientFormatterSpy.kt rename {jmslib => lib}/src/main/java/com/redhat/mqe/lib/Content.java (93%) rename {jmslib => lib}/src/main/java/com/redhat/mqe/lib/Utils.java (100%) diff --git a/cli-protonj2/pom.xml b/cli-protonj2/pom.xml index 32202c59..ae038343 100644 --- a/cli-protonj2/pom.xml +++ b/cli-protonj2/pom.xml @@ -90,6 +90,11 @@ 4.6.1 + + com.redhat.cli-java + cli + test + com.redhat.cli-java tests diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java b/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java new file mode 100644 index 00000000..ab78e939 --- /dev/null +++ b/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe; + +import com.redhat.mqe.lib.Utils; +import org.apache.qpid.protonj2.client.Client; +import org.apache.qpid.protonj2.client.ClientOptions; +import org.apache.qpid.protonj2.client.Connection; +import org.apache.qpid.protonj2.client.ConnectionOptions; +import org.apache.qpid.protonj2.client.Delivery; +import org.apache.qpid.protonj2.client.DistributionMode; +import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.Receiver; +import org.apache.qpid.protonj2.client.ReceiverOptions; +import org.apache.qpid.protonj2.client.Sender; +import picocli.CommandLine; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static com.redhat.mqe.lib.ClientOptionManager.QUEUE_PREFIX; +import static com.redhat.mqe.lib.ClientOptionManager.TOPIC_PREFIX; + +@CommandLine.Command( + name = "receiver", + mixinStandardHelpOptions = true, + version = "1.0.0", + description = "Opens AMQP connections" +) +public class CliProtonJ2Receiver extends CliProtonJ2SenderReceiver implements Callable { + + private final ProtonJ2MessageFormatter messageFormatter; + + @CommandLine.Option(names = {"--log-msgs"}, description = "MD5, SHA-1, SHA-256, ...") + private LogMsgs logMsgs = LogMsgs.dict; + + @CommandLine.Option(names = {"--msg-content-hashed"}) + private String msgContentHashedString = "false"; + + @CommandLine.Option(names = {"-b", "--broker"}, description = "MD5, SHA-1, SHA-256, ...") + private String broker = "MD5"; + + @CommandLine.Option(names = {"--conn-username"}, description = "MD5, SHA-1, SHA-256, ...") + private String connUsername = "MD5"; + + @CommandLine.Option(names = {"--conn-password"}, description = "MD5, SHA-1, SHA-256, ...") + private String connPassword = "MD5"; + + @CommandLine.Option(names = {"--conn-clientid"}) + private String connClientId; + + @CommandLine.Option(names = {"--durable-subscriber"}) + private String durableSubscriberString = "false"; + + @CommandLine.Option(names = {"--durable-subscriber-name"}) + private String durableSubscriberName; + + // TODO not implemented + @CommandLine.Option(names = {"--subscriber-unsubscribe"}) + private String subscriberUnsubscribeString; + + @CommandLine.Option(names = {"-a", "--address"}, description = "MD5, SHA-1, SHA-256, ...") + private String address = "MD5"; + + @CommandLine.Option(names = {"--recv-browse"}, description = "browse queued messages instead of receiving them") + private String recvBrowseString = "false"; + + @CommandLine.Option(names = {"--count"}, description = "MD5, SHA-1, SHA-256, ...") + private int count = 1; + + @CommandLine.Option(names = {"--timeout"}, description = "MD5, SHA-1, SHA-256, ...") + private int timeout; + + @CommandLine.Option(names = {"--conn-auth-mechanisms"}, description = "MD5, SHA-1, SHA-256, ...") + // todo, want to accept comma-separated lists; there is https://picocli.info/#_split_regex + private List connAuthMechanisms = new ArrayList<>(); + + @CommandLine.Option(names = {"--process-reply-to"}) + private boolean processReplyTo = false; + + @CommandLine.Option(names = {"--duration"}) // todo + private Integer duration; + + @CommandLine.Option(names = {"--duration-mode"}) // todo + private DurationMode durationMode; + + @CommandLine.Option(names = {"--ssn-ack-mode"}) + private SsnAckMode ssnAckMode; + + public CliProtonJ2Receiver() { + this.messageFormatter = new ProtonJ2MessageFormatter(); + } + + public CliProtonJ2Receiver(ProtonJ2MessageFormatter messageFormatter) { + this.messageFormatter = messageFormatter; + } + + @Override + public Integer call() throws Exception { // your business logic goes here... + String prefix = ""; + if (!broker.startsWith("amqp://") && !broker.startsWith("amqps://")) { + prefix = "amqp://"; + } + final URI url = new URI(prefix + broker); + final String serverHost = url.getHost(); + int serverPort = url.getPort(); + serverPort = (serverPort == -1) ? 5672 : serverPort; + + String destinationCapability = "queue"; + if (address.startsWith(TOPIC_PREFIX)) { + address = address.substring((TOPIC_PREFIX.length())); + destinationCapability = "topic"; + } + if (address.startsWith(QUEUE_PREFIX)) { + address = address.substring((QUEUE_PREFIX.length())); + } + + ClientOptions clientOptions = new ClientOptions(); + // TODO api usability I had to hunt for this a bit; the idea is to have durable subscription: need specify connection id and subscriber name + if (connClientId != null) { + clientOptions.id(connClientId); + } + + // TODO api usability; If I use the w/ clientOptions variant of Client.create, then .id defaults to null, and I get exception; + // ok, that just cannot be true ^^^; but it looks to be true; what!?! + // aha, right; constructor does not check, factory method does check for null + // proposed solution: allow null there, and let it mean autoassign; or tell us method to generate ID ourselves if we don't care + Client client; + if (clientOptions.id() != null) { + client = Client.create(clientOptions); + } else { + client = Client.create(); + } + + final ConnectionOptions options = new ConnectionOptions(); + options.user(connUsername); + options.password(connPassword); + for (AuthMechanism mech : connAuthMechanisms) { + options.saslOptions().addAllowedMechanism(mech.name()); + } + + /* + TODO API usability, hard to ask for queue when dealing with broker that likes to autocreate topics + */ + ReceiverOptions receiverOptions = new ReceiverOptions(); + // is it target or source? target. + receiverOptions.sourceOptions().capabilities(destinationCapability); + + // todo: another usability, little hard to figure out this is analogue of jms to browse queues + if (stringToBool(recvBrowseString)) { + receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY); + } + + // TODO: API question: what is difference between autoSettle and autoAccept? why I want one but not the other? + if (ssnAckMode != null) { + if (ssnAckMode == SsnAckMode.client) { + receiverOptions.autoAccept(false); + receiverOptions.autoSettle(false); + } + } + + try (Connection connection = client.connect(serverHost, serverPort, options)) { + Receiver receiver; + if (stringToBool(durableSubscriberString)) { + receiver = connection.openDurableReceiver(address, durableSubscriberName, receiverOptions); + } else { + receiver = connection.openReceiver(address, receiverOptions); + } + + double initialTimestamp = Utils.getTime(); + for (int i = 0; i < count; i++) { + +// if (durationMode == DurationMode.sleepBeforeReceive) { +// LOG.trace("Sleeping before receive"); +// Utils.sleepUntilNextIteration(initialTimestamp, msgCount, duration, i + 1); +// } + + final Delivery delivery; + if (timeout == 0) { + delivery = receiver.receive(); // todo: can default it to -1 + } else { + delivery = receiver.receive(timeout, TimeUnit.SECONDS); + } + + if (delivery == null) { + break; + } + + if (durationMode == DurationMode.afterReceive) { +// LOG.trace("Sleeping after receive"); + Utils.sleepUntilNextIteration(initialTimestamp, count, duration, i + 1); // todo possibly it is i, different loop here + } + + if (processReplyTo && delivery.message().replyTo() != null) { + String replyTo = delivery.message().replyTo(); + Message message = delivery.message(); + message.replyTo(null); + try (Sender sender = connection.openSender(replyTo)) { + sender.send(message); + } + } + + int messageFormat = delivery.messageFormat(); + Message message = delivery.message(); + + // todo, is this what we mean? + if (ssnAckMode != null && ssnAckMode == SsnAckMode.client) { + delivery.accept(); + } + + Map messageDict = messageFormatter.formatMessage(address, message, stringToBool(msgContentHashedString)); + switch (logMsgs) { + case dict: + messageFormatter.printMessageAsPython(messageDict); + break; + case interop: + messageFormatter.printMessageAsJson(messageDict); + break; + } + } + + // TODO API usability, how do I do durable subscription with detach, resume, etc; no mention of unsubscribe in the client anywhere + receiver.close(); // TODO want to do autoclosable, need helper func, that's all +// receiver.detach(); + } + + return 0; + } + +} diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Sender.java b/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Sender.java new file mode 100644 index 00000000..6c160f61 --- /dev/null +++ b/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Sender.java @@ -0,0 +1,279 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe; + +import com.redhat.mqe.lib.Content; +import com.redhat.mqe.lib.MessageFormatter; +import org.apache.qpid.protonj2.client.Client; +import org.apache.qpid.protonj2.client.Connection; +import org.apache.qpid.protonj2.client.ConnectionOptions; +import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.Sender; +import org.apache.qpid.protonj2.client.SenderOptions; +import picocli.CommandLine; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import static com.redhat.mqe.lib.ClientOptionManager.QUEUE_PREFIX; +import static com.redhat.mqe.lib.ClientOptionManager.TOPIC_PREFIX; + +@CommandLine.Command( + name = "sender", + mixinStandardHelpOptions = true, + version = "1.0.0", + description = "Opens AMQP connections" +) +public class CliProtonJ2Sender extends CliProtonJ2SenderReceiver implements Callable { + + private final ProtonJ2MessageFormatter messageFormatter; + + @CommandLine.Option(names = {"--log-msgs"}, description = "message reporting style") + private LogMsgs logMsgs = LogMsgs.dict; + + @CommandLine.Option(names = {"--msg-content-hashed"}) + private String msgContentHashedString = "false"; + + @CommandLine.Option(names = {"-b", "--broker"}, description = "MD5, SHA-1, SHA-256, ...") + private String broker = "MD5"; + + @CommandLine.Option(names = {"--conn-username"}, description = "MD5, SHA-1, SHA-256, ...") + private String connUsername = "MD5"; + + @CommandLine.Option(names = {"--conn-password"}, description = "MD5, SHA-1, SHA-256, ...") + private String connPassword = "MD5"; + + @CommandLine.Option(names = {"-a", "--address"}, description = "MD5, SHA-1, SHA-256, ...") + private String address = "MD5"; + + @CommandLine.Option(names = {"--count"}, description = "MD5, SHA-1, SHA-256, ...") + private int count = 1; + + @CommandLine.Option(names = {"--timeout"}, description = "MD5, SHA-1, SHA-256, ...") + private int timeout; + + @CommandLine.Option(names = {"--duration"}) + private Float duration; // TODO do something with it + + @CommandLine.Option(names = {"--conn-auth-mechanisms"}, description = "MD5, SHA-1, SHA-256, ...") + // todo, want to accept comma-separated lists; there is https://picocli.info/#_split_regex + private List connAuthMechanisms = new ArrayList<>(); + + @CommandLine.Option(names = {"--msg-property"}) // picocli Map options works for this, sounds like + private List msgProperties = new ArrayList<>(); + + @CommandLine.Option(names = {"--msg-content"}) + private String msgContent; + + @CommandLine.Option(names = {"--msg-content-from-file"}) + private String msgContentFromFile; + + @CommandLine.Option(names = {"--content-type"}) + private ContentType contentType = ContentType.STRING; + + @CommandLine.Option(names = {"--property-type"}) + private PropertyType propertyType = PropertyType.String; + + @CommandLine.Option(names = {"--msg-durable"}) + private String msgDurableString = "false"; + + @CommandLine.Option(names = {"--msg-ttl"}) + private Long msgTtl; + + @CommandLine.Option(names = {"--msg-content-list-item"}) + private List msgContentListItem; + + @CommandLine.Option(names = {"--msg-content-map-item"}) + private List msgContentMapItems; + + @CommandLine.Option(names = {"--msg-content-binary"}) + private String msgContentBinaryString = "false"; + + @CommandLine.Option(names = {"--msg-correlation-id"}) + private String msgCorrelationId; + + @CommandLine.Option(names = {"--msg-group-id"}) + private String msgGroupId; + + @CommandLine.Option(names = {"--msg-id"}) + private String msgId; // todo, not just string is an option + + @CommandLine.Option(names = {"--msg-reply-to"}) + private String msgReplyTo; + + @CommandLine.Option(names = {"--msg-subject"}) + private String msgSubject; + + @CommandLine.Option(names = {"--msg-user-id"}) + private String msgUserId; + + @CommandLine.Option(names = {"--msg-priority"}) + private Byte msgPriority; + + // jms.populateJMSXUserID opt in qpid-jms + // TODO: does not seem to have equivalent; what is the threat model for "prevent spoofing" in JMS docs? + @CommandLine.Option(names = {"--conn-populate-user-id"}) + private String connPopulateUserIdString = "false"; + + @CommandLine.Option(names = {"--msg-group-seq"}) + private Integer msgGroupSeq; + + @CommandLine.Option(names = {"--msg-reply-to-group-id"}) + private String msgReplyToGroupId; + + @CommandLine.Option(names = {"--ssn-ack-mode"}) + private SsnAckMode ssnAckMode; + + public CliProtonJ2Sender() { + this.messageFormatter = new ProtonJ2MessageFormatter(); + } + + public CliProtonJ2Sender(ProtonJ2MessageFormatter messageFormatter) { + this.messageFormatter = messageFormatter; + } + + @Override + public Integer call() throws Exception { // your business logic goes here... + + String prefix = ""; + if (!broker.startsWith("amqp://") && !broker.startsWith("amqps://")) { + prefix = "amqp://"; + } + final URI url = new URI(prefix + broker); + final String serverHost = url.getHost(); + int serverPort = url.getPort(); + serverPort = (serverPort == -1) ? 5672 : serverPort; + + String destinationCapability = "queue"; + if (address.startsWith(TOPIC_PREFIX)) { + address = address.substring((TOPIC_PREFIX.length())); + destinationCapability = "topic"; + } + if (address.startsWith(QUEUE_PREFIX)) { + address = address.substring((QUEUE_PREFIX.length())); + } + + final Client client = Client.create(); + + final ConnectionOptions options = new ConnectionOptions(); + options.user(connUsername); + options.password(connPassword); + for (AuthMechanism mech : connAuthMechanisms) { + options.saslOptions().addAllowedMechanism(mech.name()); + } + + /* + TODO API usablility, hard to ask for queue when dealing with broker that likes to autocreate topics + */ + SenderOptions senderOptions = new SenderOptions(); + // is it target or source? target. + senderOptions.targetOptions().capabilities(destinationCapability); + try (Connection connection = client.connect(serverHost, serverPort, options); + Sender sender = connection.openSender(address, senderOptions)) { + + for (int i = 0; i < count; i++) { + Message message; + if (msgContentListItem != null && !msgContentListItem.isEmpty()) { // TODO check only one of these is specified + List list = new ArrayList<>(); + for (String item : msgContentListItem) { + Content content = new Content(contentType.toString(), item, false); // TODO do this in args parsing? + list.add(content.getValue()); + } + message = Message.create((Object) list); + } else if (msgContentMapItems != null) { + Map map = new HashMap<>(); + for (String item : msgContentMapItems) { + Content content = new Content(contentType.toString(), item, true); // TODO do this in args parsing? + map.put(content.getKey(), content.getValue()); + } + message = Message.create((Object) map); + } else if (msgContentFromFile != null) { + if (stringToBool(msgContentBinaryString)) { + message = Message.create(Files.readAllBytes(Paths.get(msgContentFromFile))); // todo maybe param type as Path? check exists + } else { + message = Message.create(Files.readString(Paths.get(msgContentFromFile))); // todo maybe param type as Path? check exists + } + } else { + message = Message.create(msgContent); + } + if (msgProperties != null) { + for (String item : msgProperties) { + Content content = new Content(propertyType.toString(), item, true); // TODO do this in args parsing? + message.property(content.getKey(), content.getValue()); + } + } + if (msgId != null) { + message.messageId(msgId); + } + if (msgCorrelationId != null) { + message.correlationId(msgCorrelationId); + } + if (msgTtl != null) { + message.timeToLive(msgTtl); + } + if (stringToBool(msgDurableString)) { + message.durable(true); + } + if (msgGroupId != null) { + message.groupId(msgGroupId); + } + if (msgGroupSeq != null) { + message.groupSequence(msgGroupSeq); + } + if (msgReplyTo != null) { + message.replyTo(msgReplyTo); + } + if (msgReplyToGroupId != null) { + message.replyToGroupId(msgReplyToGroupId); + } + if (contentType != null) { + message.contentType(contentType.toString()); // TODO: maybe should do more with it? don't bother with enum? + } + if (stringToBool(connPopulateUserIdString)) { + message.userId(msgUserId.getBytes()); + } + if (msgSubject != null) { + message.subject(msgSubject); + } + if (msgPriority != null) { + message.priority(msgPriority); + } + sender.send(message); // TODO what's timeout for in a sender? + + Map messageDict = messageFormatter.formatMessage(address, (Message) message, stringToBool(msgContentHashedString)); + switch (logMsgs) { + case dict: + messageFormatter.printMessageAsPython(messageDict); + break; + case interop: + messageFormatter.printMessageAsJson(messageDict); + break; + } + } + } + return 0; + } +} diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/ContentType.java b/cli-protonj2/src/main/java/com/redhat/mqe/ContentType.java index f2105971..e5d5dd35 100644 --- a/cli-protonj2/src/main/java/com/redhat/mqe/ContentType.java +++ b/cli-protonj2/src/main/java/com/redhat/mqe/ContentType.java @@ -20,7 +20,7 @@ package com.redhat.mqe; public enum ContentType { - INT("int"); + INT("int"), STRING("string"); private final String value; diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/LogMsgs.java b/cli-protonj2/src/main/java/com/redhat/mqe/LogMsgs.java new file mode 100644 index 00000000..47f5a0bd --- /dev/null +++ b/cli-protonj2/src/main/java/com/redhat/mqe/LogMsgs.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe; + +public enum LogMsgs { + dict, interop +} diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/Main.java b/cli-protonj2/src/main/java/com/redhat/mqe/Main.java index c01d5122..25a9def8 100644 --- a/cli-protonj2/src/main/java/com/redhat/mqe/Main.java +++ b/cli-protonj2/src/main/java/com/redhat/mqe/Main.java @@ -1,5 +1,6 @@ package com.redhat.mqe; +import com.redhat.mqe.lib.Content; import com.redhat.mqe.lib.MessageFormatter; import com.redhat.mqe.lib.Utils; import org.apache.qpid.protonj2.client.ClientOptions; @@ -64,92 +65,7 @@ public static void main(String... args) { } class CliProtonJ2SenderReceiver { - void logMessage(String address, Message message, boolean msgContentHashed) throws ClientException { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - - addKeyValue(sb, "address", address); - addKeyValue(sb, "group-id", message.groupId()); - addKeyValue(sb, "subject", message.subject()); - addKeyValue(sb, "user-id", message.userId()); - addKeyValue(sb, "correlation-id", message.correlationId()); - addKeyValue(sb, "content-encoding", message.contentEncoding()); - addKeyValue(sb, "priority", message.priority()); - addKeyValue(sb, "type", "string"); // ??? - addKeyValue(sb, "ttl", message.timeToLive()); - addKeyValue(sb, "absolute-expiry-time", message.absoluteExpiryTime()); - if (msgContentHashed) { - // this is inlined addKeyValue, TODO do it nicer - sb.append("'"); - sb.append("content"); - sb.append("': "); - sb.append("'"); // extra quotes to format - sb.append(MessageFormatter.hash(formatPython(message.body()))); - sb.append("'"); - sb.append(", "); - } else { - addKeyValue(sb, "content", message.body()); - } - addKeyValue(sb, "redelivered", message.deliveryCount() > 1); - addKeyValue(sb, "reply-to-group-id", message.replyToGroupId()); - addKeyValue(sb, "durable", message.durable()); - addKeyValue(sb, "group-sequence", message.groupSequence()); - addKeyValue(sb, "creation-time", message.creationTime()); - addKeyValue(sb, "content-type", message.contentType()); - addKeyValue(sb, "id", message.messageId()); - addKeyValue(sb, "reply-to", message.replyTo()); - - // getPropertyNames? from JMS missing? - StringBuilder sbb = new StringBuilder(); - sbb.append('{'); -// AtomicBoolean first = new AtomicBoolean(true); - message.forEachProperty((s, o) -> { -// if (!first.get()) { -// sbb.append(", "); -// first.set(false); -// } - addKeyValue(sbb, (String) s, o); // this wanted to cast to string when I removed message generic type; what??? TODO - }); - if (message.hasProperties()) { - sbb.delete(sbb.length() - 2, sbb.length()); // remove last ", " - } - sbb.append('}'); - addKeyValue(sb, "properties", sbb); // ??? - - sb.delete(sb.length() - 2, sb.length()); // remove last ", " - - sb.append("}"); - - System.out.println(sb); - } - - void addKeyValue(StringBuilder sb, String key, Object value) { - sb.append("'"); - sb.append(key); - sb.append("': "); - sb.append(formatPython(value)); - sb.append(", "); - } - - String formatPython(Object parameter) { - if (parameter == null) { - return "None"; - } - if (parameter instanceof String) { - return "'" + parameter + "'"; - } - if (parameter instanceof Boolean) { - return ((boolean)parameter) ? "True" : "False"; - } - if (parameter instanceof StringBuilder) { - return parameter.toString(); - } - if (parameter instanceof List) { - return "[" + ((List) parameter).stream().map(this::formatPython).collect(Collectors.joining(", ")) + "]"; - } - return "'" + parameter + "'"; - } protected boolean stringToBool(String string) { boolean bool = string.equalsIgnoreCase("true") || string.equalsIgnoreCase("yes"); @@ -208,404 +124,7 @@ enum AuthMechanism { PLAIN, } -@Command( - name = "sender", - mixinStandardHelpOptions = true, - version = "1.0.0", - description = "Opens AMQP connections" -) -class CliProtonJ2Sender extends CliProtonJ2SenderReceiver implements Callable { - - @Option(names = {"--log-msgs"}, description = "MD5, SHA-1, SHA-256, ...") - private String logMsgs = "MD5"; - - @Option(names = {"--msg-content-hashed"}) - private String msgContentHashedString = "false"; - - @Option(names = {"--broker"}, description = "MD5, SHA-1, SHA-256, ...") - private String broker = "MD5"; - - @Option(names = {"--conn-username"}, description = "MD5, SHA-1, SHA-256, ...") - private String connUsername = "MD5"; - - @Option(names = {"--conn-password"}, description = "MD5, SHA-1, SHA-256, ...") - private String connPassword = "MD5"; - - @Option(names = {"--address"}, description = "MD5, SHA-1, SHA-256, ...") - private String address = "MD5"; - - @Option(names = {"--count"}, description = "MD5, SHA-1, SHA-256, ...") - private int count = 1; - - @Option(names = {"--timeout"}, description = "MD5, SHA-1, SHA-256, ...") - private int timeout; - - @Option(names = {"--duration"}) - private int duration; // TODO do something with it - - @Option(names = {"--conn-auth-mechanisms"}, description = "MD5, SHA-1, SHA-256, ...") - // todo, want to accept comma-separated lists; there is https://picocli.info/#_split_regex - private List connAuthMechanisms = new ArrayList<>(); - - @Option(names = {"--msg-property"}) // picocli Map options works for this, sounds like - private List msgProperties = new ArrayList<>(); - - @Option(names = {"--msg-content"}) - private String msgContent; - - @Option(names = {"--msg-content-from-file"}) - private String msgContentFromFile; - - @Option(names = {"--content-type"}) - private ContentType contentType; - - @Option(names = {"--msg-durable"}) - private String msgDurableString = "false"; - - @Option(names = {"--msg-ttl"}) - private Long msgTtl; - - @Option(names = {"--msg-content-list-item"}) - private List msgContentListItem; - - @Option(names = {"--msg-content-map-item"}) - private List msgContentMapItems; - - @Option(names = {"--msg-correlation-id"}) - private String msgCorrelationId; - - @Option(names = {"--msg-group-id"}) - private String msgGroupId; - - @Option(names = {"--msg-id"}) - private String msgId; // todo, not just string is an option - - @Option(names = {"--msg-reply-to"}) - private String msgReplyTo; - - @Option(names = {"--msg-subject"}) - private String msgSubject; - - @Option(names = {"--msg-user-id"}) - private String msgUserId; - - @Option(names = {"--msg-priority"}) - private Byte msgPriority; - - // jms.populateJMSXUserID opt in qpid-jms - // TODO: does not seem to have equivalent; what is the threat model for "prevent spoofing" in JMS docs? - @Option(names = {"--conn-populate-user-id"}) - private String connPopulateUserIdString = "false"; - - @Option(names = {"--msg-group-seq"}) - private Integer msgGroupSeq; - - @Option(names = {"--msg-reply-to-group-id"}) - private String msgReplyToGroupId; - - @Option(names = {"--ssn-ack-mode"}) - private SsnAckMode ssnAckMode; - - @Override - public Integer call() throws Exception { // your business logic goes here... - - String prefix = ""; - if (!broker.startsWith("amqp://") || !broker.startsWith("amqps://")) { - prefix = "amqp://"; - } - final URI url = new URI(prefix + broker); - final String serverHost = url.getHost(); - int serverPort = url.getPort(); - serverPort = (serverPort == -1) ? 5672 : serverPort; - - String destinationCapability = "queue"; - if (address.startsWith(TOPIC_PREFIX)) { - address = address.substring((TOPIC_PREFIX.length())); - destinationCapability = "topic"; - } - if (address.startsWith(QUEUE_PREFIX)) { - address = address.substring((QUEUE_PREFIX.length())); - } - - final Client client = Client.create(); - - final ConnectionOptions options = new ConnectionOptions(); - options.user(connUsername); - options.password(connPassword); - for (AuthMechanism mech : connAuthMechanisms) { - options.saslOptions().addAllowedMechanism(mech.name()); - } - - /* - TODO API usablility, hard to ask for queue when dealing with broker that likes to autocreate topics - */ - SenderOptions senderOptions = new SenderOptions(); - // is it target or source? target. - senderOptions.targetOptions().capabilities(destinationCapability); - try (Connection connection = client.connect(serverHost, serverPort, options); - Sender sender = connection.openSender(address, senderOptions)) { +// todo list of features in general; supports kerberos, io-uring, epoll, websockets, +// does it support opening listening sockets? listening websocket? +// does support all JMS 2.0 capabilities? (in some way, assuming broker cooperates?) - for (int i = 0; i < count; i++) { - Message message; - if (msgContentListItem != null && !msgContentListItem.isEmpty()) { // TODO check only one of these is specified - // TODO have to cast strings to objects of correct types - message = Message.create(msgContentListItem); - } else if (msgContentMapItems != null) { - Map map = new HashMap<>(); - for (String item : msgContentMapItems) { - String[] fields = item.split("[=~]", 2); - if (fields.length != 2) { - throw new RuntimeException("Wrong format " + Arrays.toString(fields)); // TODO do this in args parsing? - } - map.put(fields[0], fields[1]); // todo retype value - } - message = Message.create(map); - } else if (msgContentFromFile != null) { - message = Message.create(Files.readString(Paths.get(msgContentFromFile))); // todo maybe param type as Path? check exists - } else { - message = Message.create(msgContent); - } - for (String property : msgProperties) { - String[] fields = property.split("[=~]", 2); // todo do something with ~ - if (fields.length != 2) { - throw new RuntimeException("Wrong format " + Arrays.toString(fields)); // TODO do this in args parsing - } - String key = fields[0]; - String value = fields[1]; // more types - message.property(key, value); - } - if (msgId != null) { - message.messageId(msgId); - } - if (msgCorrelationId != null) { - message.correlationId(msgCorrelationId); - } - if (msgTtl != null) { - message.timeToLive(msgTtl); - } - if (stringToBool(msgDurableString)) { - message.durable(true); - } - if (msgGroupId != null) { - message.groupId(msgGroupId); - } - if (msgGroupSeq != null) { - message.groupSequence(msgGroupSeq); - } - if (msgReplyTo != null) { - message.replyTo(msgReplyTo); - } - if (msgReplyToGroupId != null) { - message.replyToGroupId(msgReplyToGroupId); - } - if (contentType != null) { - message.contentType(contentType.toString()); // TODO: maybe should do more with it? don't bother with enum? - } - // todo, not sure what to do here; should I use authenticated userid instead? - if (stringToBool(connPopulateUserIdString)) { - message.userId(msgUserId.getBytes()); - } - if (msgSubject != null) { - message.subject(msgSubject); - } - if (msgPriority != null) { - message.priority(msgPriority); - } - sender.send(message); // TODO what's timeout for in a sender? - logMessage(address, message, stringToBool(msgContentHashedString)); - } - } - - return 0; - } -} - -@Command( - name = "receiver", - mixinStandardHelpOptions = true, - version = "1.0.0", - description = "Opens AMQP connections" -) -class CliProtonJ2Receiver extends CliProtonJ2SenderReceiver implements Callable { - - @Option(names = {"--log-msgs"}, description = "MD5, SHA-1, SHA-256, ...") - private String logMsgs = "MD5"; - - @Option(names = {"--msg-content-hashed"}) - private String msgContentHashedString = "false"; - - @Option(names = {"--broker"}, description = "MD5, SHA-1, SHA-256, ...") - private String broker = "MD5"; - - @Option(names = {"--conn-username"}, description = "MD5, SHA-1, SHA-256, ...") - private String connUsername = "MD5"; - - @Option(names = {"--conn-password"}, description = "MD5, SHA-1, SHA-256, ...") - private String connPassword = "MD5"; - - @Option(names = {"--conn-clientid"}) - private String connClientId; - - @Option(names = {"--durable-subscriber"}) - private String durableSubscriberString = "false"; - - @Option(names = {"--durable-subscriber-name"}) - private String durableSubscriberName; - - // TODO not implemented - @Option(names = {"--subscriber-unsubscribe"}) - private String subscriberUnsubscribeString; - - @Option(names = {"--address"}, description = "MD5, SHA-1, SHA-256, ...") - private String address = "MD5"; - - @Option(names = {"--recv-browse"}, description = "browse queued messages instead of receiving them") - private String recvBrowseString = "false"; - - @Option(names = {"--count"}, description = "MD5, SHA-1, SHA-256, ...") - private int count = 1; - - @Option(names = {"--timeout"}, description = "MD5, SHA-1, SHA-256, ...") - private int timeout; - - @Option(names = {"--conn-auth-mechanisms"}, description = "MD5, SHA-1, SHA-256, ...") - // todo, want to accept comma-separated lists; there is https://picocli.info/#_split_regex - private List connAuthMechanisms = new ArrayList<>(); - - @Option(names = {"--process-reply-to"}) - private boolean processReplyTo = false; - - @Option(names = {"--duration"}) // todo - private Integer duration; - - @Option(names = {"--duration-mode"}) // todo - private DurationMode durationMode; - - @Option(names = {"--ssn-ack-mode"}) - private SsnAckMode ssnAckMode; - - @Override - public Integer call() throws Exception { // your business logic goes here... - String prefix = ""; - if (!broker.startsWith("amqp://") || !broker.startsWith("amqps://")) { - prefix = "amqp://"; - } - final URI url = new URI(prefix + broker); - final String serverHost = url.getHost(); - int serverPort = url.getPort(); - serverPort = (serverPort == -1) ? 5672 : serverPort; - - String destinationCapability = "queue"; - if (address.startsWith(TOPIC_PREFIX)) { - address = address.substring((TOPIC_PREFIX.length())); - destinationCapability = "topic"; - } - if (address.startsWith(QUEUE_PREFIX)) { - address = address.substring((QUEUE_PREFIX.length())); - } - - ClientOptions clientOptions = new ClientOptions(); - // TODO api usability I had to hunt for this a bit; the idea is to have durable subscription: need specify connection id and subscriber name - if (connClientId != null) { - clientOptions.id(connClientId); - } - - // TODO api usability; If I use the w/ clientOptions variant of Client.create, then .id defaults to null, and I get exception; - // ok, that just cannot be true ^^^; but it looks to be true; what!?! - // aha, right; constructor does not check, factory method does check for null - // proposed solution: allow null there, and let it mean autoassign; or tell us method to generate ID ourselves if we don't care - Client client; - if (clientOptions.id() != null) { - client = Client.create(clientOptions); - } else { - client = Client.create(); - } - - final ConnectionOptions options = new ConnectionOptions(); - options.user(connUsername); - options.password(connPassword); - for (AuthMechanism mech : connAuthMechanisms) { - options.saslOptions().addAllowedMechanism(mech.name()); - } - - /* - TODO API usability, hard to ask for queue when dealing with broker that likes to autocreate topics - */ - ReceiverOptions receiverOptions = new ReceiverOptions(); - // is it target or source? target. - receiverOptions.sourceOptions().capabilities(destinationCapability); - - // todo: another usability, little hard to figure out this is analogue of jms to browse queues - if (stringToBool(recvBrowseString)) { - receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY); - } - - // TODO: API question: what is difference between autoSettle and autoAccept? why I want one but not the other? - if(ssnAckMode != null) { - if (ssnAckMode == SsnAckMode.client) { - receiverOptions.autoAccept(false); - receiverOptions.autoSettle(false); - } - } - - try (Connection connection = client.connect(serverHost, serverPort, options)) { - Receiver receiver; - if (stringToBool(durableSubscriberString)) { - receiver = connection.openDurableReceiver(address, durableSubscriberName, receiverOptions); - } else { - receiver = connection.openReceiver(address, receiverOptions); - } - - double initialTimestamp = Utils.getTime(); - for (int i = 0; i < count; i++) { - -// if (durationMode == DurationMode.sleepBeforeReceive) { -// LOG.trace("Sleeping before receive"); -// Utils.sleepUntilNextIteration(initialTimestamp, msgCount, duration, i + 1); -// } - - final Delivery delivery; - if (timeout == 0) { - delivery = receiver.receive(); // todo: can default it to -1 - } else { - delivery = receiver.receive(timeout, TimeUnit.SECONDS); - } - - if (delivery == null) { - break; - } - - if (durationMode == DurationMode.afterReceive) { -// LOG.trace("Sleeping after receive"); - Utils.sleepUntilNextIteration(initialTimestamp, count, duration, i + 1); // todo possibly it is i, different loop here - } - - if (processReplyTo && delivery.message().replyTo() != null) { - String replyTo = delivery.message().replyTo(); - Message message = delivery.message(); - message.replyTo(null); - try (Sender sender = connection.openSender(replyTo)) { - sender.send(message); - } - } - - int messageFormat = delivery.messageFormat(); - Message message = delivery.message(); - - // todo, is this what we mean? - if (ssnAckMode != null && ssnAckMode == SsnAckMode.client) { - delivery.accept(); - } - - logMessage(address, message, stringToBool(msgContentHashedString)); - } - - // TODO API usability, how do I do durable subscription with detach, resume, etc; no mention of unsubscribe in the client anywhere - receiver.close(); // TODO want to do autoclosable, need helper func, that's all -// receiver.detach(); - } - - return 0; - } - -} diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/PropertyType.java b/cli-protonj2/src/main/java/com/redhat/mqe/PropertyType.java new file mode 100644 index 00000000..06b439f7 --- /dev/null +++ b/cli-protonj2/src/main/java/com/redhat/mqe/PropertyType.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe; + +public enum PropertyType { + String("string"); + + private final String value; + + PropertyType(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } +} diff --git a/cli-protonj2/src/main/java/com/redhat/mqe/ProtonJ2MessageFormatter.java b/cli-protonj2/src/main/java/com/redhat/mqe/ProtonJ2MessageFormatter.java new file mode 100644 index 00000000..5bbf2da7 --- /dev/null +++ b/cli-protonj2/src/main/java/com/redhat/mqe/ProtonJ2MessageFormatter.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe; + +import com.redhat.mqe.lib.MessageFormatter; +import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.exceptions.ClientException; + +import java.util.HashMap; +import java.util.Map; + +public class ProtonJ2MessageFormatter extends MessageFormatter { + + public Map formatMessage(String address, Message message, boolean msgContentHashed) throws ClientException { + Map map = new HashMap<>(); + map.put("address", address); + map.put("group-id", message.groupId()); + map.put("subject", message.subject()); + map.put("user-id", message.userId()); + map.put("correlation-id", message.correlationId()); + map.put("content-encoding", message.contentEncoding()); + map.put("priority", (int) message.priority()); // TODO(ENTMQCL-2973) remove cast + map.put("type", message.contentType()); +// map.put("ttl", message.timeToLive()); // todo, why do we do the weird thing below instead of this? + map.put("ttl", getTtl(message)); + map.put("absolute-expiry-time", message.absoluteExpiryTime()); + if (msgContentHashed) { + map.put("content", MessageFormatter.hash(formatObject(message.body()))); + } else { + map.put("content", message.body()); + } + map.put("redelivered", message.deliveryCount() > 1); + map.put("reply-to-group-id", message.replyToGroupId()); + map.put("durable", message.durable()); + map.put("group-sequence", (long) message.groupSequence()); // TODO(ENTMQCL-2973) remove cast + map.put("creation-time", message.creationTime()); + map.put("content-type", message.contentType()); + map.put("id", message.messageId()); + map.put("reply-to", message.replyTo()); + + // getPropertyNames? from JMS missing? + Map propertyMap = new HashMap<>(); + message.forEachProperty((s, o) -> { + propertyMap.put((String) s, o); // this wanted to cast to string when I removed message generic type; what??? TODO + }); + map.put("properties", propertyMap); + + return map; + } + + + /** + * Calculate TTL of given message from message + * expiration time and message timestamp. + *

+ * Returns the time the message expires, which is the sum of the time-to-live value + * specified by the client and the GMT at the time of the send + * EXP_TIME = CLIENT_SEND+TTL (CLIENT_SEND??) + * CLIENT_SEND time is approximately getJMSTimestamp() (time value between send()/publish() and return) + * TODO - check for correctness + * + * @param message calculate TTL for this message + * @return positive long number if TTL was calculated. Long.MIN_VALUE if error. + */ + public static long getTtl(Message message) { + long ttl = 0; + try { + long expiration = message.absoluteExpiryTime(); + long timestamp = message.creationTime(); + if (expiration != 0 && timestamp != 0) { + ttl = expiration - timestamp; + } + } catch (ClientException jmse) { +// LOG.error("Error while calculating TTL value.\n" + jmse.getMessage()); + jmse.printStackTrace(); + System.exit(1); + } + return ttl; + } +} diff --git a/cli-protonj2/src/test/java/com/redhat/mqe/MainTest.java b/cli-protonj2/src/test/java/com/redhat/mqe/MainTest.java index 219d031c..02a9db1c 100644 --- a/cli-protonj2/src/test/java/com/redhat/mqe/MainTest.java +++ b/cli-protonj2/src/test/java/com/redhat/mqe/MainTest.java @@ -226,7 +226,7 @@ void test2(@BrokerFixture.TempBroker Broker broker) throws Throwable { checkMainInvocation("sender --log-msgs dict --broker " + brokerUrl + " --conn-auth-mechanisms PLAIN --conn-username admin --conn-password admin --address test_amqp_bare_message_consistency --count 1 --msg-subject amqp_bare_message_test --msg-reply-to ExpiryQueue --msg-property PI=~3.141592 --msg-property color=red --msg-property mapKey=mapValue --msg-content amqp_bare_msg-CBJJIY --msg-durable True --msg-ttl 300000 --msg-correlation-id amqp_bare_msg-CBJJIY --msg-user-id admin --msg-priority 7 --conn-populate-user-id True --msg-group-id group-a --msg-group-seq 1 --msg-reply-to-group-id group-a"); // tests.JAMQMessage000Tests.JAMQMessageTests.test_populate_validated_user_option - // FAIL dtestlib.Test:levels.py:61 Checking properties keys for validated user with expected '_AMQ_VALIDATED_USER' or 'JMSXUserID': dict_keys([]) # result:False (exp. True), dur.:-1.00 err_cnt:1 + // TODO FAIL dtestlib.Test:levels.py:61 Checking properties keys for validated user with expected '_AMQ_VALIDATED_USER' or 'JMSXUserID': dict_keys([]) # result:False (exp. True), dur.:-1.00 err_cnt:1 // tests.JAMQMessage000Tests.JAMQMessageTests.test_scheduled_message_zero_timestamp // Unknown options: '--msg-id' @@ -238,7 +238,9 @@ void test2(@BrokerFixture.TempBroker Broker broker) throws Throwable { checkMainInvocation("sender --log-msgs dict --broker " + brokerUrl + " --conn-auth-mechanisms PLAIN --conn-username admin --conn-password admin --address test_client_acknowledge_inactivity_exception --count 20 --msg-durable True --ssn-ack-mode client"); checkMainInvocation("receiver --timeout 10 --log-msgs dict --broker " + brokerUrl + " --conn-auth-mechanisms PLAIN --conn-username admin --conn-password admin --address test_client_acknowledge_inactivity_exception --count 20 --duration 100 --duration-mode after-receive --ssn-ack-mode client"); - //checkMainInvocation("sender --log-msgs dict --broker " + brokerUrl + " --conn-auth-mechanisms PLAIN --conn-username admin --conn-password admin --address test_direct_transient_text_message --count 1 --msg-content SimpleTextMessage --msg-correlation-id corr-id-eqa9vp"); + +// +// checkMainInvocation("sender --log-msgs dict --broker " + brokerUrl + " --conn-auth-mechanisms PLAIN --conn-username admin --conn-password admin --address test_direct_transient_text_message --count 1 --msg-content SimpleTextMessage --msg-correlation-id corr-id-eqa9vp"); } // void testMessageContentListItem() { diff --git a/cli-protonj2/src/test/java/com/redhat/mqe/MessageLoggingTest.java b/cli-protonj2/src/test/java/com/redhat/mqe/MessageLoggingTest.java new file mode 100644 index 00000000..c0338139 --- /dev/null +++ b/cli-protonj2/src/test/java/com/redhat/mqe/MessageLoggingTest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe; + +import com.redhat.mqe.lib.MessageFormatter; +import org.apache.qpid.protonj2.client.Message; +import org.apache.qpid.protonj2.client.exceptions.ClientException; + +import java.util.List; +import java.util.stream.Collectors; + +class PreviousImplementation { + void logMessage(String address, Message message, boolean msgContentHashed) throws ClientException { + StringBuilder sb = new StringBuilder(); + + sb.append("{"); + + addKeyValue(sb, "address", address); + addKeyValue(sb, "group-id", message.groupId()); + addKeyValue(sb, "subject", message.subject()); + addKeyValue(sb, "user-id", message.userId()); + addKeyValue(sb, "correlation-id", message.correlationId()); + addKeyValue(sb, "content-encoding", message.contentEncoding()); + addKeyValue(sb, "priority", message.priority()); + addKeyValue(sb, "type", "string"); // ??? + addKeyValue(sb, "ttl", message.timeToLive()); + addKeyValue(sb, "absolute-expiry-time", message.absoluteExpiryTime()); + if (msgContentHashed) { + // this is inlined addKeyValue, TODO do it nicer + sb.append("'"); + sb.append("content"); + sb.append("': "); + sb.append("'"); // extra quotes to format + sb.append(MessageFormatter.hash(formatPython(message.body()))); + sb.append("'"); + sb.append(", "); + } else { + addKeyValue(sb, "content", message.body()); + } + addKeyValue(sb, "redelivered", message.deliveryCount() > 1); + addKeyValue(sb, "reply-to-group-id", message.replyToGroupId()); + addKeyValue(sb, "durable", message.durable()); + addKeyValue(sb, "group-sequence", message.groupSequence()); + addKeyValue(sb, "creation-time", message.creationTime()); + addKeyValue(sb, "content-type", message.contentType()); + addKeyValue(sb, "id", message.messageId()); + addKeyValue(sb, "reply-to", message.replyTo()); + + // getPropertyNames? from JMS missing? + StringBuilder sbb = new StringBuilder(); + sbb.append('{'); +// AtomicBoolean first = new AtomicBoolean(true); + message.forEachProperty((s, o) -> { +// if (!first.get()) { +// sbb.append(", "); +// first.set(false); +// } + addKeyValue(sbb, (String) s, o); // this wanted to cast to string when I removed message generic type; what??? TODO + }); + if (message.hasProperties()) { + sbb.delete(sbb.length() - 2, sbb.length()); // remove last ", " + } + sbb.append('}'); + addKeyValue(sb, "properties", sbb); // ??? + + sb.delete(sb.length() - 2, sb.length()); // remove last ", " + + sb.append("}"); + + System.out.println(sb); + } + + void addKeyValue(StringBuilder sb, String key, Object value) { + sb.append("'"); + sb.append(key); + sb.append("': "); + sb.append(formatPython(value)); + sb.append(", "); + } + + String formatPython(Object parameter) { + if (parameter == null) { + return "None"; + } + if (parameter instanceof String) { + return "'" + parameter + "'"; + } + if (parameter instanceof Boolean) { + return ((boolean)parameter) ? "True" : "False"; + } + if (parameter instanceof StringBuilder) { + return parameter.toString(); + } + if (parameter instanceof List) { + return "[" + ((List) parameter).stream().map(this::formatPython).collect(Collectors.joining(", ")) + "]"; + } + return "'" + parameter + "'"; + } +} + +public class MessageLoggingTest { +} diff --git a/cli-protonj2/src/test/kotlin/MainTest.kt b/cli-protonj2/src/test/kotlin/MainTest.kt new file mode 100644 index 00000000..f4ddfde9 --- /dev/null +++ b/cli-protonj2/src/test/kotlin/MainTest.kt @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2021 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redhat.mqe + +import AbstractMainTest +import com.google.common.truth.Truth.assertThat +import com.redhat.mqe.lib.MessageFormatter +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Test +import picocli.CommandLine +import java.io.File +import java.nio.file.Files + +class ProtonJ2ClientListener(private val clientListener: ClientListener) : ProtonJ2MessageFormatter() { + override fun printMessageAsPython(format: MutableMap?) { + clientListener.onMessage(format) + super.printMessageAsPython(format) + } +} + +@Disabled("fails") +@Tag("external") +class ProtonJ2MainTest : AbstractMainTest() { + + override val brokerUrl = "amqp://127.0.0.1:61616" + override val sslBrokerUrl = "amqps://127.0.0.1:61617" + + override val senderAdditionalOptions = """ +--capacity 1 +--conn-async-acks true +--conn-async-send true +--conn-auth-mechanisms anonymous +--conn-auth-sasl false +--conn-cache-ena false +--conn-cache-size 1 +--conn-clientid aClientId +--conn-clientid-prefix aClientIdPrefix +--conn-close-timeout 1000 +--conn-conn-timeout 1000 +--conn-connid-prefix aConnIdPrefix +--conn-heartbeat 1000 +--conn-local-msg-priority true +--conn-max-frame-size 4096 +--conn-prefetch 1 +--conn-prefetch-browser 1 +--conn-prefetch-queue 1 +--conn-prefetch-topic 1 +--conn-prefetch-topic-dur 1 +--conn-prefix-packet-size-ena false +--conn-queue-prefix aQueuePrefix +--conn-reconnect true +--conn-reconnect-backoff false +--conn-reconnect-backoff-multiplier 1 +--conn-reconnect-initial-delay 1 +--conn-reconnect-interval 1000 +--conn-reconnect-limit 1000 +--conn-reconnect-start-limit 1000 +--conn-reconnect-timeout 1000 +--conn-reconnect-warn-attempts 1 +--conn-redeliveries-max 1 +--conn-server-stack-trace-ena false +--conn-sync-send true +--conn-tcp-buf-size-recv 1 +--conn-tcp-buf-size-send 1 +--conn-tcp-conn-timeout 1000 +--conn-tcp-keep-alive true +--conn-tcp-no-delay false +--conn-tcp-sock-linger 1000 +--conn-tcp-sock-timeout 1000 +--conn-tcp-traffic-class 1 +--conn-tight-encoding-ena false +--conn-topic-prefix aTopicPrefix +--conn-valid-prop-names false +--msg-content-type aMsgContentType +--msg-correlation-id aCorrelationId +--msg-durable false +--msg-group-id aMsgGroupId +--msg-group-seq -1 +--msg-no-timestamp true +--msg-priority 1 +--timeout 2 +--tx-size 1 +--msg-reply-to aReplyToQueue +--msg-reply-to-group-id aReplyToGroupId +--msg-subject aMsgSubject +--msg-ttl 10000 +--msg-user-id aMsgUserId +--property-type String +""".split(" ", "\n").toTypedArray() + + // cannot set Client ID, because more than one connection is created, and these would clash +//--conn-clientid aClientId + override val connectorAdditionalOptions = """ +--conn-async-acks true +--conn-async-send true +--conn-auth-mechanisms anonymous +--conn-auth-sasl false +--conn-cache-ena false +--conn-cache-size 1 +--conn-clientid-prefix aClientIdPrefix +--conn-close-timeout 1000 +--conn-conn-timeout 1000 +--conn-connid-prefix aConnIdPrefix +--conn-heartbeat 1000 +--conn-local-msg-priority true +--conn-max-frame-size 4096 +--conn-prefetch 1 +--conn-prefetch-browser 1 +--conn-prefetch-queue 1 +--conn-prefetch-topic 1 +--conn-prefetch-topic-dur 1 +--conn-prefix-packet-size-ena false +--conn-queue-prefix aQueuePrefix +--conn-reconnect true +--conn-reconnect-backoff false +--conn-reconnect-backoff-multiplier 1 +--conn-reconnect-initial-delay 1 +--conn-reconnect-interval 1000 +--conn-reconnect-limit 1000 +--conn-reconnect-start-limit 1000 +--conn-reconnect-timeout 1000 +--conn-reconnect-warn-attempts 1 +--conn-redeliveries-max 1 +--conn-server-stack-trace-ena false +--conn-sync-send true +--conn-tcp-buf-size-recv 1 +--conn-tcp-buf-size-send 1 +--conn-tcp-conn-timeout 1000 +--conn-tcp-keep-alive true +--conn-tcp-no-delay false +--conn-tcp-sock-linger 1000 +--conn-tcp-sock-timeout 1000 +--conn-tcp-traffic-class 1 +--conn-tight-encoding-ena false +--conn-topic-prefix aTopicPrefix +--conn-valid-prop-names false +""".split(" ", "\n").toTypedArray() + + override fun main_(listener: ClientListener, args: Array) { + val protonJ2ClientListener = ProtonJ2ClientListener(listener) + val main = when (args[0]) { + "sender" -> CommandLine(CliProtonJ2Sender(protonJ2ClientListener)) + "receiver" -> CommandLine(CliProtonJ2Receiver(protonJ2ClientListener)) + "connector" -> CommandLine(CliProtonJ2Connector()) + else -> throw NotImplementedError(args[0]) + } + val returnCode = main.execute(*(args.drop(1).toTypedArray())) + } + + override val prefix: String + get() = "ProtonJ2MainTest" + + /** + * Large message streaming from/to java.io.{Input,Output}Stream is artemis-jms-client only + */ + @Test + fun sendLargeMessageStreamFile() { + val file = File.createTempFile(address, null) + val outputDirectory = Files.createTempDirectory(address) + val output = outputDirectory.resolve("message") + val output0 = outputDirectory.resolve("message_0") + try { + file.writeText("aContent") + val senderParameters = + "sender --log-msgs dict --broker $brokerUrl --address $address --count 1 --msg-content-from-file $file --msg-content-binary true --msg-content-stream true".split( + " " + ).toTypedArray() + val receiverParameters = + "receiver --log-msgs dict --broker $brokerUrl --address $address --count 1 --msg-binary-content-to-file $output".split( + " " + ).toTypedArray() + + print("Sending: ") + main(senderParameters) + print("Receiving: ") + main(receiverParameters) + + assertThat(output0.toFile().readBytes()).isEqualTo(file.readBytes()) + } finally { + file.delete() + outputDirectory.toFile().deleteRecursively() + } + } + + /** + * Large message streaming from/to java.io.{Input,Output}Stream is artemis-jms-client only + */ + @Test + @Disabled("https://github.com/rh-messaging/cli-java/issues/50") + fun sendAndReceiveLargeMessageStreamFile() { + val file = File.createTempFile(address, "input") + val outputDirectory = Files.createTempDirectory(address) + val output = outputDirectory.resolve("message") + val output0 = outputDirectory.resolve("message_0") + try { + file.writeText("aContent") + val senderParameters = + "sender --log-msgs dict --broker $brokerUrl --address $address --count 1 --msg-content-from-file $file --msg-content-binary true --msg-content-stream true".split( + " " + ).toTypedArray() + val receiverParameters = + "receiver --log-msgs dict --broker $brokerUrl --address $address --count 1 --msg-binary-content-to-file $output --msg-content-stream true".split( + " " + ).toTypedArray() + + print("Sending: ") + main(senderParameters) + print("Receiving: ") + main(receiverParameters) + + assertThat(output0.toFile().readBytes()).isEqualTo(file.readBytes()) + } finally { + file.delete() + outputDirectory.toFile().deleteRecursively() + } + } +} diff --git a/interop-tests/src/test/kotlin/ClientFormatterSpy.kt b/interop-tests/src/test/kotlin/ClientFormatterSpy.kt index c369bb1c..f449e9e4 100644 --- a/interop-tests/src/test/kotlin/ClientFormatterSpy.kt +++ b/interop-tests/src/test/kotlin/ClientFormatterSpy.kt @@ -17,6 +17,9 @@ * limitations under the License. */ +import com.redhat.mqe.CliProtonJ2Receiver +import com.redhat.mqe.CliProtonJ2Sender +import com.redhat.mqe.ProtonJ2MessageFormatter import com.redhat.mqe.acc.AccClientOptionManager import com.redhat.mqe.acc.AccConnectionManagerFactory import com.redhat.mqe.acc.AccCoreJmsMessageFormatter @@ -27,11 +30,19 @@ import com.redhat.mqe.jms.AacConnectionManagerFactory import com.redhat.mqe.jms.AacReceiverOptions import com.redhat.mqe.jms.AacSenderOptions import com.redhat.mqe.lib.* +import picocli.CommandLine import javax.jms.Message -class ClientFormatterSpy(private val formatter: JmsMessageFormatter) : JmsMessageFormatter() { +interface IClientFormatterSpy { + val messages: MutableList> + fun printMessageAsPython(format: MutableMap?) + fun run() +} + +class ClientFormatterSpy(private val formatter: JmsMessageFormatter) : JmsMessageFormatter(), IClientFormatterSpy { lateinit var client: CoreClient - val messages: MutableList> = ArrayList() + + override val messages: MutableList> = ArrayList() override fun formatMessage(msg: Message?, hashContent: Boolean): MutableMap = formatter.formatMessage(msg, hashContent) @@ -41,11 +52,13 @@ class ClientFormatterSpy(private val formatter: JmsMessageFormatter) : JmsMessag super.printMessageAsPython(format) } - fun run() { + override fun run() { client.startClient() } companion object { + //region Qpid Jms + fun makeAacSenderClient(args: Array): ClientFormatterSpy { val connectionManagerFactory = AacConnectionManagerFactory() val messageFormatter: JmsMessageFormatter = AMQPJmsMessageFormatter() @@ -82,7 +95,9 @@ class ClientFormatterSpy(private val formatter: JmsMessageFormatter) : JmsMessag spy.client = MessageBrowser(options, connectionManagerFactory, spy) return spy } + //endregion + //region Artemis Core fun makeAccSenderClient(args: Array): ClientFormatterSpy { val connectionManagerFactory = AccConnectionManagerFactory() @@ -120,7 +135,9 @@ class ClientFormatterSpy(private val formatter: JmsMessageFormatter) : JmsMessag spy.client = MessageBrowser(options, connectionManagerFactory, spy) return spy } + //endregion + //region ActiveMQ fun makeAocSenderClient(args: Array): ClientFormatterSpy { val connectionManagerFactory = AocConnectionManagerFactory() @@ -158,5 +175,33 @@ class ClientFormatterSpy(private val formatter: JmsMessageFormatter) : JmsMessag spy.client = MessageBrowser(options, connectionManagerFactory, spy) return spy } + //endregion + + //region ProtonJ2 + + fun makeProtonj2SenderClient(args: Array): ProtonJ2ClientFormatterSpy { + val messageFormatter = ProtonJ2MessageFormatter() + val messageFormatterSpy = ProtonJ2ClientFormatterSpy() + + messageFormatterSpy.client = CommandLine(CliProtonJ2Sender(messageFormatterSpy)) + messageFormatterSpy.args = args + + return messageFormatterSpy; + } + + fun makeProtonj2ReceiverClient(args: Array): ProtonJ2ClientFormatterSpy { + val messageFormatter = ProtonJ2MessageFormatter() + val messageFormatterSpy = ProtonJ2ClientFormatterSpy() + + messageFormatterSpy.client = CommandLine(CliProtonJ2Receiver(messageFormatterSpy)) + messageFormatterSpy.args = args + + return messageFormatterSpy + } + + fun makeProtonj2BrowserClient(args: Array): ProtonJ2ClientFormatterSpy { + return makeProtonj2ReceiverClient(args + "--recv-browse" + "true") + } + //endregion } } diff --git a/interop-tests/src/test/kotlin/InteropTest.kt b/interop-tests/src/test/kotlin/InteropTest.kt index 675d3cf8..cdc5cd39 100644 --- a/interop-tests/src/test/kotlin/InteropTest.kt +++ b/interop-tests/src/test/kotlin/InteropTest.kt @@ -148,7 +148,7 @@ class InteropTest : AbstractTest() { companion object { @JvmStatic fun clientCombinationsProvider(): Stream { - val clients = listOf("aac", "acc", "aoc") + val clients = listOf("aac", "acc", "aoc", "protonj2") return clients.flatMap { s -> clients.map { r -> Arguments.of(s, r) } }.stream() } } diff --git a/interop-tests/src/test/kotlin/ProtonJ2ClientFormatterSpy.kt b/interop-tests/src/test/kotlin/ProtonJ2ClientFormatterSpy.kt new file mode 100644 index 00000000..646dbd83 --- /dev/null +++ b/interop-tests/src/test/kotlin/ProtonJ2ClientFormatterSpy.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2017 Red Hat, Inc. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.truth.Truth +import com.redhat.mqe.ProtonJ2MessageFormatter +import picocli.CommandLine + +class ProtonJ2ClientFormatterSpy : ProtonJ2MessageFormatter(), IClientFormatterSpy { + lateinit var args: Array + lateinit var client: CommandLine + + override val messages: MutableList> = ArrayList() + + override fun printMessageAsPython(format: MutableMap?) { + messages.add(format!!.toMap()) + super.printMessageAsPython(format) + } + + override fun run() { + val exitCode = client.execute(*args) + Truth.assertThat(exitCode).isEqualTo(0) + } +} diff --git a/jmslib/src/main/java/com/redhat/mqe/lib/Content.java b/lib/src/main/java/com/redhat/mqe/lib/Content.java similarity index 93% rename from jmslib/src/main/java/com/redhat/mqe/lib/Content.java rename to lib/src/main/java/com/redhat/mqe/lib/Content.java index d7140dbe..a997f3f7 100644 --- a/jmslib/src/main/java/com/redhat/mqe/lib/Content.java +++ b/lib/src/main/java/com/redhat/mqe/lib/Content.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017 Red Hat, Inc. + * Copyright (c) 2021 Red Hat, Inc. * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -36,7 +36,7 @@ public class Content { private String key; private Object value; private Class type; - private boolean isMap; + private final boolean isMap; /** * Create content from defined rules. @@ -65,7 +65,7 @@ public Content(String contentType, String parsedValue, boolean isMap) { splitValue = parsedValue.substring(parsedValue.indexOf(splitter) + 1); } } else if (parsedValue.contains("=")) { - // last argument 'allowExplicityRetype' can be omitted as parsedValue will not by autotypecasted - no '~' + // last argument 'allowExplicitlyRetype' can be omitted as parsedValue will not by autotypecasted - no '~' splitValue = parsedValue.substring(parsedValue.indexOf(splitter) + 1); } else { splitter = "~"; @@ -93,6 +93,9 @@ public Content(String contentType, String parsedValue, boolean isMap) { } public String getKey() { + if (!isMap) { + throw new IllegalStateException("Only maps have keys"); + } return key; } diff --git a/lib/src/main/java/com/redhat/mqe/lib/MessageFormatter.java b/lib/src/main/java/com/redhat/mqe/lib/MessageFormatter.java index 9e816e6f..a7004b64 100644 --- a/lib/src/main/java/com/redhat/mqe/lib/MessageFormatter.java +++ b/lib/src/main/java/com/redhat/mqe/lib/MessageFormatter.java @@ -26,6 +26,7 @@ import java.io.UnsupportedEncodingException; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Hashtable; @@ -116,6 +117,9 @@ protected StringBuilder formatDouble(double in_data) { return int_res; } + /** + Formats object as Python + */ @SuppressWarnings("unchecked") protected StringBuilder formatObject(Object in_data) { StringBuilder int_res = new StringBuilder(); @@ -138,14 +142,8 @@ protected StringBuilder formatObject(Object in_data) { } else if (in_data instanceof UUID) { int_res.append(formatString(in_data.toString())); } else if (in_data instanceof byte[]) { - try { - String value = new String((byte[]) in_data, "UTF-8"); - int_res.append(formatString(value)); - } catch (UnsupportedEncodingException uee) { - LOG.error("Error while getting message properties!", uee.getMessage()); - uee.printStackTrace(); - System.exit(1); - } + String value = new String((byte[]) in_data, StandardCharsets.UTF_8); + int_res.append(formatString(value)); } else { handleUnsupportedObjectMessagePayloadType(int_res, in_data); } diff --git a/jmslib/src/main/java/com/redhat/mqe/lib/Utils.java b/lib/src/main/java/com/redhat/mqe/lib/Utils.java similarity index 100% rename from jmslib/src/main/java/com/redhat/mqe/lib/Utils.java rename to lib/src/main/java/com/redhat/mqe/lib/Utils.java