diff --git a/README.md b/README.md
index ad250c2..fcb8838 100644
--- a/README.md
+++ b/README.md
@@ -13,20 +13,17 @@ Prior to building or installing this JMeter plugin, ensure that the RabbitMQ cli
Build Dependencies
------------------
-Build dependencies are managed by Ivy. JARs should automagically be downloaded by Ivy as part of the build process.
-
-In addition, you'll need to copy or symlink the following from JMeter's lib/ext directory:
-* ApacheJMeter_core.jar
+Build dependencies are managed by Maven. JARs should automagically be downloaded by Maven as part of the build process.
Building
--------
-The project is built using Ant. To execute the build script, just execute:
- ant
+The project is built using Maven. To execute the build script, just execute:
+ mvn clean compile assembly:single
Installing
----------
-To install the plugin, build the project and copy the generated JMeterAMQP.jar file from target/dist to JMeter's lib/ext/ directory.
+To install the plugin, build the project and copy the generated jmeter-rabbit-amqp-1.0.0-jar-with-dependencies.jar file from target to JMeter's lib/ext/ directory.
diff --git a/build.xml b/build.xml
index b61b1f1..23e4881 100644
--- a/build.xml
+++ b/build.xml
@@ -52,6 +52,11 @@
Resolving properties
+
+
+
+
+
diff --git a/ivy.xml b/ivy.xml
index 44498f6..30ecd5f 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -17,5 +17,6 @@
+
diff --git a/module.pom b/module.pom
new file mode 100644
index 0000000..9aab241
--- /dev/null
+++ b/module.pom
@@ -0,0 +1,89 @@
+
+
+
+
+ 4.0.0
+ com.zeroclue.jmeter.protocol.amqp
+ jmeter-rabbit-amqp
+ jar
+ working@sgouveia-VirtualBox
+
+
+ org.apache.commons
+ commons-lang3
+ 3.0
+ true
+
+
+ commons-io
+ commons-io
+ 1.4
+ true
+
+
+ commons-jexl
+ commons-jexl
+ 1.1
+ true
+
+
+ commons-codec
+ commons-codec
+ 1.4
+ true
+
+
+ commons-collections
+ commons-collections
+ 3.2.1
+ true
+
+
+ commons-httpclient
+ commons-httpclient
+ 3.1
+ true
+
+
+ commons-logging
+ commons-logging
+ 1.1.1
+ true
+
+
+ commons-net
+ commons-net
+ 1.4.1
+ true
+
+
+ org.apache.jmeter
+ jorphan
+ 2.6
+ true
+
+
+ avalon-logkit
+ avalon-logkit
+ 2.0
+ true
+
+
+ com.rabbitmq
+ amqp-client
+ 3.5.1
+ true
+
+
+ org.apache.jmeter
+ ApacheJMeter_core
+ 2.11
+ true
+
+
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..8956ecd
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,156 @@
+
+ 4.0.0
+
+ com.zeroclue.jmeter.protocol.amqp
+ jmeter-rabbit-amqp
+ jar
+ 1.0.0
+
+
+
+ svn-repo
+ https://svn.apache.org/repos/asf/excalibur/repository
+
+ true
+
+
+
+
+
+
+
+
+ org.json
+ json
+ 20090211
+
+
+ org.apache.commons
+ commons-lang3
+ 3.4
+ true
+ provided
+
+
+ commons-io
+ commons-io
+ 2.4
+ true
+ provided
+
+
+ commons-jexl
+ commons-jexl
+ 1.1
+ true
+ provided
+
+
+ commons-codec
+ commons-codec
+ 1.10
+ true
+ provided
+
+
+ commons-collections
+ commons-collections
+ 3.2.2
+ true
+ provided
+
+
+ commons-httpclient
+ commons-httpclient
+ 3.1
+ true
+ provided
+
+
+ commons-logging
+ commons-logging
+ 1.2
+ true
+ provided
+
+
+ commons-net
+ commons-net
+ 3.4
+ true
+ provided
+
+
+
+ avalon-logkit
+ avalon-logkit
+ 2.1
+ true
+ provided
+
+
+ com.rabbitmq
+ amqp-client
+ 3.6.0
+ true
+ provided
+
+
+ org.apache.jmeter
+ ApacheJMeter_core
+ 2.13
+ provided
+
+
+ commons-math3
+ commons-math3
+
+
+ commons-pool2
+ commons-pool2
+
+
+
+
+ org.apache.commons
+ commons-math3
+ 3.4.1
+ provided
+
+
+ org.apache.commons
+ commons-pool2
+ 2.3
+ provided
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+
+ 1.5
+
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+
+
diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
old mode 100644
new mode 100755
similarity index 69%
rename from src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
rename to src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
index 7dd6b32..c3804dd
--- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
@@ -1,19 +1,26 @@
package com.zeroclue.jmeter.protocol.amqp;
-import java.io.IOException;
-import java.security.*;
-
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConsumerCancelledException;
+import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownSignalException;
+import com.zeroclue.jmeter.protocol.amqp.util.AMQPMessageContentEncodingUtil;
+import com.zeroclue.jmeter.protocol.amqp.util.MessageContentEncodingUtilFactory;
import org.apache.jmeter.samplers.Entry;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.testelement.TestStateListener;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.ConsumerCancelledException;
-import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownSignalException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStateListener {
private static final int DEFAULT_PREFETCH_COUNT = 0; // unlimited
@@ -36,14 +43,13 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat
private transient QueueingConsumer consumer;
private transient String consumerTag;
- public AMQPConsumer(){
+ public AMQPConsumer() {
super();
}
/**
* {@inheritDoc}
*/
- @Override
public SampleResult sample(Entry entry) {
SampleResult result = new SampleResult();
result.setSampleLabel(getName());
@@ -55,7 +61,7 @@ public SampleResult sample(Entry entry) {
try {
initChannel();
- // only do this once per thread. Otherwise it slows down the consumption by appx 50%
+ // only do this once per thread. Otherwise it slows down the consumption by appx 50%
if (consumer == null) {
log.info("Creating consumer");
consumer = new QueueingConsumer(channel);
@@ -83,7 +89,7 @@ public SampleResult sample(Entry entry) {
for (int idx = 0; idx < loop; idx++) {
delivery = consumer.nextDelivery(getReceiveTimeoutAsInt());
- if(delivery == null){
+ if (delivery == null) {
result.setResponseMessage("timed out");
return result;
}
@@ -92,15 +98,20 @@ public SampleResult sample(Entry entry) {
* Set up the sample result details
*/
if (getReadResponseAsBoolean()) {
- String response = new String(delivery.getBody());
- result.setSamplerData(response);
- result.setResponseMessage(response);
- }
- else {
+ String messageBody = decodeMessage(delivery);//new String(delivery.getBody());
+ //result.setSamplerData(messageBody);
+ result.setResponseMessage(messageBody);
+
+ BasicProperties messageProperties = delivery.getProperties();
+ JSONObject message = toJson(messageProperties, messageBody);
+ result.setSamplerData(message.toString());
+
+ //result.setResponseData(messageBody, null);
+ } else {
result.setSamplerData("Read response is false.");
}
- if(!autoAck())
+ if (!autoAck())
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
@@ -137,6 +148,13 @@ public SampleResult sample(Entry entry) {
log.warn("AMQP consumer failed to consume", e);
result.setResponseCode("100");
result.setResponseMessage(e.getMessage());
+ } catch (Exception e) {
+ e.printStackTrace();
+ consumer = null;
+ consumerTag = null;
+ log.warn("AMQP consumer failed to consume", e);
+ result.setResponseCode("100");
+ result.setResponseMessage(e.getMessage());
} finally {
result.sampleEnd(); // End timimg
}
@@ -171,7 +189,7 @@ public void setPurgeQueue(Boolean purgeQueue) {
setProperty(PURGE_QUEUE, purgeQueue.toString());
}
- public boolean purgeQueue(){
+ public boolean purgeQueue() {
return Boolean.parseBoolean(getPurgeQueue());
}
@@ -190,7 +208,7 @@ public void setAutoAck(Boolean autoAck) {
setProperty(AUTO_ACK, autoAck.toString());
}
- public boolean autoAck(){
+ public boolean autoAck() {
return getPropertyAsBoolean(AUTO_ACK);
}
@@ -215,7 +233,7 @@ public String getPrefetchCount() {
}
public void setPrefetchCount(String prefetchCount) {
- setProperty(PREFETCH_COUNT, prefetchCount);
+ setProperty(PREFETCH_COUNT, prefetchCount);
}
public int getPrefetchCountAsInt() {
@@ -249,9 +267,6 @@ public boolean getReadResponseAsBoolean() {
return getPropertyAsBoolean(READ_RESPONSE);
}
-
-
- @Override
public boolean interrupt() {
testEnded();
return true;
@@ -260,10 +275,9 @@ public boolean interrupt() {
/**
* {@inheritDoc}
*/
- @Override
public void testEnded() {
- if(purgeQueue()){
+ if (purgeQueue()) {
log.info("Purging queue " + getQueue());
try {
channel.queuePurge(getQueue());
@@ -273,17 +287,14 @@ public void testEnded() {
}
}
- @Override
public void testEnded(String arg0) {
}
- @Override
public void testStarted() {
}
- @Override
public void testStarted(String arg0) {
}
@@ -292,9 +303,9 @@ public void cleanup() {
try {
if (consumerTag != null) {
- channel.basicCancel(consumerTag);
+ channel.basicCancel(consumerTag);
}
- } catch(IOException e) {
+ } catch (IOException e) {
log.error("Couldn't safely cancel the sample " + consumerTag, e);
}
@@ -302,6 +313,31 @@ public void cleanup() {
}
+ private JSONObject toJson(BasicProperties messageProperties, String messageBody) throws JSONException {
+ JSONObject headers = new JSONObject();
+ for (Map.Entry header : messageProperties.getHeaders().entrySet()) {
+ headers.put(header.getKey(), header.getValue().toString());
+ }
+
+ return new JSONObject()
+ .put("appId", messageProperties.getAppId())
+ .put("classId", messageProperties.getClassId())
+ .put("clusterId", messageProperties.getClusterId())
+ .put("content", new JSONObject(messageBody))
+ .put("contentEncoding", messageProperties.getContentEncoding())
+ .put("contentType", messageProperties.getContentEncoding())
+ .put("correlationId", messageProperties.getCorrelationId())
+ .put("deliveryMode", messageProperties.getDeliveryMode())
+ .put("expiration", messageProperties.getExpiration())
+ .put("headers", headers)
+ .put("messageId", messageProperties.getMessageId())
+ .put("priority", messageProperties.getPriority())
+ .put("replyTo", messageProperties.getReplyTo())
+ .put("timestamp", messageProperties.getTimestamp())
+ .put("type", messageProperties.getType())
+ .put("userId", messageProperties.getUserId());
+ }
+
/*
* Helper method
*/
@@ -312,9 +348,17 @@ private void trace(String s) {
log.debug(tn + " " + tl + " " + s + " " + th);
}
- protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException {
+ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException, TimeoutException {
boolean ret = super.initChannel();
channel.basicQos(getPrefetchCountAsInt());
return ret;
}
+
+ public static String decodeMessage(QueueingConsumer.Delivery delivery) throws Exception {
+ // Gets an encoding util to decode the message to basic UTF-8
+ AMQPMessageContentEncodingUtil messageEncodingUtil = MessageContentEncodingUtilFactory.create(delivery);
+ QueueingConsumer.Delivery receivedMessage = messageEncodingUtil.decode(delivery);
+ String consumedMessage = new String(receivedMessage.getBody());
+ return consumedMessage;
+ }
}
diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
old mode 100644
new mode 100755
similarity index 90%
rename from src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
rename to src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
index 242de14..c081e76
--- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
@@ -1,11 +1,13 @@
package com.zeroclue.jmeter.protocol.amqp;
import com.rabbitmq.client.AMQP;
-
+import com.rabbitmq.client.Channel;
import java.io.IOException;
-import java.security.*;
-import java.util.*;
-
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.samplers.Entry;
@@ -15,7 +17,6 @@
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
-import com.rabbitmq.client.Channel;
/**
* JMeter creates an instance of a sampler class for every occurrence of the
@@ -58,7 +59,6 @@ public AMQPPublisher() {
/**
* {@inheritDoc}
*/
- @Override
public SampleResult sample(Entry e) {
SampleResult result = new SampleResult();
result.setSampleLabel(getName());
@@ -115,8 +115,7 @@ public SampleResult sample(Entry e) {
log.debug(ex.getMessage(), ex);
result.setResponseCode("000");
result.setResponseMessage(ex.toString());
- }
- finally {
+ } finally {
result.sampleEnd(); // End timimg
}
@@ -173,13 +172,13 @@ public void setReplyToQueue(String content) {
}
public String getContentType() {
- return getPropertyAsString(CONTENT_TYPE);
+ return getPropertyAsString(CONTENT_TYPE);
}
-
+
public void setContentType(String contentType) {
- setProperty(CONTENT_TYPE, contentType);
+ setProperty(CONTENT_TYPE, contentType);
}
-
+
/**
* @return the correlation identifier for the sample
*/
@@ -215,7 +214,7 @@ public Boolean getPersistent() {
}
public void setPersistent(Boolean persistent) {
- setProperty(PERSISTENT, persistent);
+ setProperty(PERSISTENT, persistent);
}
public Boolean getUseTx() {
@@ -223,31 +222,28 @@ public Boolean getUseTx() {
}
public void setUseTx(Boolean tx) {
- setProperty(USE_TX, tx);
+ setProperty(USE_TX, tx);
}
- @Override
public boolean interrupt() {
cleanup();
return true;
}
- @Override
protected Channel getChannel() {
return channel;
}
- @Override
protected void setChannel(Channel channel) {
this.channel = channel;
}
- protected AMQP.BasicProperties getProperties() {
+ private AMQP.BasicProperties getProperties() {
final AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
final int deliveryMode = getPersistent() ? 2 : 1;
final String contentType = StringUtils.defaultIfEmpty(getContentType(), "text/plain");
-
+
builder.contentType(contentType)
.deliveryMode(deliveryMode)
.priority(0)
@@ -256,13 +252,13 @@ protected AMQP.BasicProperties getProperties() {
.type(getMessageType())
.headers(prepareHeaders())
.build();
- if (getMessageId() != null && getMessageId().isEmpty()) {
+ if (getMessageId() != null && getMessageId().equals("")) {
builder.messageId(getMessageId());
}
return builder.build();
}
- protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException {
+ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException, TimeoutException {
boolean ret = super.initChannel();
if (getUseTx()) {
channel.txSelect();
@@ -274,7 +270,12 @@ private Map prepareHeaders() {
Map result = new HashMap();
Map source = getHeaders().getArgumentsAsMap();
for (Map.Entry item : source.entrySet()) {
- result.put(item.getKey(), item.getValue());
+ if (item.getValue().contains("int")) {
+ String val = (item.getValue().split(":"))[1];
+ result.put(item.getKey(), Integer.parseInt(val));
+ } else {
+ result.put(item.getKey(), item.getValue());
+ }
}
return result;
}
diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
old mode 100644
new mode 100755
similarity index 93%
rename from src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
rename to src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
index c39cf9b..2d9388f
--- a/src/main/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/AMQPSampler.java
@@ -1,17 +1,19 @@
package com.zeroclue.jmeter.protocol.amqp;
-import java.io.IOException;
-import java.util.*;
-import java.security.*;
-
import com.rabbitmq.client.*;
+import org.apache.commons.lang3.StringUtils;
import org.apache.jmeter.samplers.AbstractSampler;
import org.apache.jmeter.testelement.ThreadListener;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
-import com.rabbitmq.client.AMQP.BasicProperties;
-import org.apache.commons.lang3.StringUtils;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
public abstract class AMQPSampler extends AbstractSampler implements ThreadListener {
@@ -48,6 +50,7 @@ public abstract class AMQPSampler extends AbstractSampler implements ThreadListe
private static final String ITERATIONS = "AMQPSampler.Iterations";
private static final String MESSAGE_TTL = "AMQPSampler.MessageTTL";
private static final String MESSAGE_EXPIRES = "AMQPSampler.MessageExpires";
+ private static final String X_DEAD_LETTER_EXCHANGE = "AMQPSampler.XDeadLetterExchange";
private static final String QUEUE_DURABLE = "AMQPSampler.QueueDurable";
private static final String QUEUE_REDECLARE = "AMQPSampler.Redeclare";
private static final String QUEUE_EXCLUSIVE = "AMQPSampler.QueueExclusive";
@@ -62,7 +65,7 @@ protected AMQPSampler(){
factory.setRequestedHeartbeat(DEFAULT_HEARTBEAT);
}
- protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException {
+ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException, TimeoutException {
Channel channel = getChannel();
if(channel != null && !channel.isOpen()){
@@ -96,7 +99,7 @@ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, Ke
channel.queueBind(getQueue(), getExchange(), getRoutingKey());
}
}
-
+ //channel.queueBind(getQueue(), getExchange(), getRoutingKey());
log.info("bound to:"
+"\n\t queue: " + getQueue()
+"\n\t exchange: " + getExchange()
@@ -118,6 +121,9 @@ private Map getQueueArguments() {
if(getMessageExpires() != null && !getMessageExpires().isEmpty())
arguments.put("x-expires", getMessageExpiresAsInt());
+ if(getXDeadLetterExchange() != null && !getXDeadLetterExchange().isEmpty())
+ arguments.put("x-dead-letter-exchange", getXDeadLetterExchange());
+
return arguments;
}
@@ -236,15 +242,22 @@ protected Integer getMessageTTLAsInt() {
return getPropertyAsInt(MESSAGE_TTL);
}
-
public String getMessageExpires() {
return getPropertyAsString(MESSAGE_EXPIRES);
}
+ public String getXDeadLetterExchange() {
+ return getPropertyAsString(X_DEAD_LETTER_EXCHANGE);
+ }
+
public void setMessageExpires(String name) {
setProperty(MESSAGE_EXPIRES, name);
}
+ public void setXDeadLetterExchange(String name) {
+ setProperty(X_DEAD_LETTER_EXCHANGE, name);
+ }
+
protected Integer getMessageExpiresAsInt() {
if (getPropertyAsInt(MESSAGE_EXPIRES) < 1) {
return null;
@@ -252,7 +265,6 @@ protected Integer getMessageExpiresAsInt() {
return getPropertyAsInt(MESSAGE_EXPIRES);
}
-
public String getHost() {
return getPropertyAsString(HOST);
}
@@ -364,7 +376,6 @@ public boolean queueAutoDelete(){
return getPropertyAsBoolean(QUEUE_AUTO_DELETE);
}
-
public Boolean getQueueRedeclare() {
return getPropertyAsBoolean(QUEUE_REDECLARE);
}
@@ -383,18 +394,16 @@ protected void cleanup() {
}
}
- @Override
public void threadFinished() {
log.info("AMQPSampler.threadFinished called");
cleanup();
}
- @Override
public void threadStarted() {
}
- protected Channel createChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException {
+ protected Channel createChannel() throws IOException, NoSuchAlgorithmException, KeyManagementException, TimeoutException {
log.info("Creating channel " + getVirtualHost()+":"+getPortAsInt());
if (connection == null || !connection.isOpen()) {
@@ -433,7 +442,7 @@ protected Channel createChannel() throws IOException, NoSuchAlgorithmException,
return channel;
}
- protected void deleteQueue() throws IOException, NoSuchAlgorithmException, KeyManagementException {
+ protected void deleteQueue() throws IOException, NoSuchAlgorithmException, KeyManagementException, TimeoutException {
// use a different channel since channel closes on exception.
Channel channel = createChannel();
try {
@@ -451,7 +460,7 @@ protected void deleteQueue() throws IOException, NoSuchAlgorithmException, KeyMa
}
}
- protected void deleteExchange() throws IOException, NoSuchAlgorithmException, KeyManagementException {
+ protected void deleteExchange() throws IOException, NoSuchAlgorithmException, KeyManagementException, TimeoutException {
// use a different channel since channel closes on exception.
Channel channel = createChannel();
try {
diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
old mode 100644
new mode 100755
similarity index 95%
rename from src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
rename to src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
index a9056dc..9d4b346
--- a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPConsumerGui.java
@@ -1,13 +1,10 @@
package com.zeroclue.jmeter.protocol.amqp.gui;
-import javax.swing.JCheckBox;
-import javax.swing.JPanel;
-
+import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jorphan.gui.JLabeledTextField;
-import com.zeroclue.jmeter.protocol.amqp.AMQPConsumer;
-
+import javax.swing.*;
import java.awt.*;
@@ -43,7 +40,6 @@ protected void init() {
mainPanel.add(autoAck);
}
- @Override
public String getStaticLabel() {
return "AMQP Consumer";
}
@@ -51,7 +47,6 @@ public String getStaticLabel() {
/**
* {@inheritDoc}
*/
- @Override
public void configure(TestElement element) {
super.configure(element);
if (!(element instanceof AMQPConsumer)) return;
@@ -67,7 +62,6 @@ public void configure(TestElement element) {
/**
* {@inheritDoc}
*/
- @Override
public void clearGui() {
super.clearGui();
readResponse.setSelected(AMQPConsumer.DEFAULT_READ_RESPONSE);
@@ -80,7 +74,6 @@ public void clearGui() {
/**
* {@inheritDoc}
*/
- @Override
public TestElement createTestElement() {
AMQPConsumer sampler = new AMQPConsumer();
modifyTestElement(sampler);
@@ -90,7 +83,6 @@ public TestElement createTestElement() {
/**
* {@inheritDoc}
*/
- @Override
public void modifyTestElement(TestElement te) {
AMQPConsumer sampler = (AMQPConsumer) te;
sampler.clear();
@@ -110,15 +102,12 @@ public void modifyTestElement(TestElement te) {
/**
* {@inheritDoc}
*/
- @Override
public String getLabelResource() {
return this.getClass().getSimpleName();
}
- @Override
protected void setMainPanel(JPanel panel) {
mainPanel = panel;
}
-
}
diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPPublisherGui.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPPublisherGui.java
old mode 100644
new mode 100755
similarity index 97%
rename from src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPPublisherGui.java
rename to src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPPublisherGui.java
index bde4d00..6e63c97
--- a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPPublisherGui.java
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPPublisherGui.java
@@ -1,16 +1,14 @@
package com.zeroclue.jmeter.protocol.amqp.gui;
-import java.awt.Dimension;
-
-import javax.swing.*;
-
+import com.zeroclue.jmeter.protocol.amqp.AMQPPublisher;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.config.gui.ArgumentsPanel;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jorphan.gui.JLabeledTextArea;
import org.apache.jorphan.gui.JLabeledTextField;
-import com.zeroclue.jmeter.protocol.amqp.AMQPPublisher;
+import javax.swing.*;
+import java.awt.*;
/**
* AMQP Sampler
@@ -44,7 +42,7 @@ public class AMQPPublisherGui extends AMQPSamplerGui {
private JCheckBox persistent = new JCheckBox("Persistent?", AMQPPublisher.DEFAULT_PERSISTENT);
private JCheckBox useTx = new JCheckBox("Use Transactions?", AMQPPublisher.DEFAULT_USE_TX);
- private ArgumentsPanel headers = new ArgumentsPanel("Headers");
+ private ArgumentsPanel headers = new ArgumentsPanel("Headers New2");
public AMQPPublisherGui(){
init();
@@ -53,12 +51,10 @@ public AMQPPublisherGui(){
/**
* {@inheritDoc}
*/
- @Override
public String getLabelResource() {
return this.getClass().getSimpleName();
}
- @Override
public String getStaticLabel() {
return "AMQP Publisher";
}
@@ -66,7 +62,6 @@ public String getStaticLabel() {
/**
* {@inheritDoc}
*/
- @Override
public void configure(TestElement element) {
super.configure(element);
if (!(element instanceof AMQPPublisher)) return;
@@ -88,7 +83,6 @@ public void configure(TestElement element) {
/**
* {@inheritDoc}
*/
- @Override
public TestElement createTestElement() {
AMQPPublisher sampler = new AMQPPublisher();
modifyTestElement(sampler);
@@ -98,7 +92,6 @@ public TestElement createTestElement() {
/**
* {@inheritDoc}
*/
- @Override
public void modifyTestElement(TestElement te) {
AMQPPublisher sampler = (AMQPPublisher) te;
sampler.clear();
@@ -119,7 +112,6 @@ public void modifyTestElement(TestElement te) {
sampler.setHeaders((Arguments) headers.createTestElement());
}
- @Override
protected void setMainPanel(JPanel panel){
mainPanel = panel;
}
@@ -127,7 +119,6 @@ protected void setMainPanel(JPanel panel){
/*
* Helper method to set up the GUI screen
*/
- @Override
protected final void init() {
super.init();
persistent.setPreferredSize(new Dimension(100, 25));
@@ -155,7 +146,6 @@ protected final void init() {
/**
* {@inheritDoc}
*/
- @Override
public void clearGui() {
super.clearGui();
persistent.setSelected(AMQPPublisher.DEFAULT_PERSISTENT);
diff --git a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java
old mode 100644
new mode 100755
similarity index 96%
rename from src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java
rename to src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java
index 108b8ed..c3e48c7
--- a/src/main/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/gui/AMQPSamplerGui.java
@@ -1,21 +1,16 @@
package com.zeroclue.jmeter.protocol.amqp.gui;
-import java.awt.*;
-
-import javax.swing.BorderFactory;
-import javax.swing.JCheckBox;
-import javax.swing.JPanel;
-
+import com.zeroclue.jmeter.protocol.amqp.AMQPSampler;
import org.apache.jmeter.gui.util.VerticalPanel;
import org.apache.jmeter.samplers.gui.AbstractSamplerGui;
import org.apache.jmeter.testelement.TestElement;
-import org.apache.jmeter.util.JMeterUtils;
import org.apache.jorphan.gui.JLabeledChoice;
import org.apache.jorphan.gui.JLabeledTextField;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
-import com.zeroclue.jmeter.protocol.amqp.AMQPSampler;
+import javax.swing.*;
+import java.awt.*;
public abstract class AMQPSamplerGui extends AbstractSamplerGui {
@@ -30,6 +25,7 @@ public abstract class AMQPSamplerGui extends AbstractSamplerGui {
protected JLabeledTextField virtualHost = new JLabeledTextField("Virtual Host");
protected JLabeledTextField messageTTL = new JLabeledTextField("Message TTL");
protected JLabeledTextField messageExpires = new JLabeledTextField("Expires");
+ protected JLabeledTextField xDeadLetterExchange = new JLabeledTextField("x-dead-letter-exchange");
protected JLabeledChoice exchangeType = new JLabeledChoice("Exchange Type", new String[]{ "direct", "topic", "headers", "fanout"});
private final JCheckBox exchangeDurable = new JCheckBox("Durable?", AMQPSampler.DEFAULT_EXCHANGE_DURABLE);
private final JCheckBox queueDurable = new JCheckBox("Durable?", true);
@@ -53,7 +49,6 @@ public abstract class AMQPSamplerGui extends AbstractSamplerGui {
/**
* {@inheritDoc}
*/
- @Override
public void configure(TestElement element) {
super.configure(element);
if (!(element instanceof AMQPSampler)) return;
@@ -68,6 +63,7 @@ public void configure(TestElement element) {
virtualHost.setText(sampler.getVirtualHost());
messageTTL.setText(sampler.getMessageTTL());
messageExpires.setText(sampler.getMessageExpires());
+ xDeadLetterExchange.setText(sampler.getXDeadLetterExchange());
queueDurable.setSelected(sampler.queueDurable());
queueExclusive.setSelected(sampler.queueExclusive());
queueAutoDelete.setSelected(sampler.queueAutoDelete());
@@ -87,7 +83,6 @@ public void configure(TestElement element) {
/**
* {@inheritDoc}
*/
- @Override
public void clearGui() {
exchange.setText("jmeterExchange");
queue.setText("jmeterQueue");
@@ -97,6 +92,7 @@ public void clearGui() {
virtualHost.setText("/");
messageTTL.setText("");
messageExpires.setText("");
+ xDeadLetterExchange.setText("");
exchangeType.setText("direct");
queueDurable.setSelected(true);
queueExclusive.setSelected(false);
@@ -117,7 +113,6 @@ public void clearGui() {
/**
* {@inheritDoc}
*/
- @Override
public void modifyTestElement(TestElement element) {
AMQPSampler sampler = (AMQPSampler) element;
sampler.clear();
@@ -131,6 +126,7 @@ public void modifyTestElement(TestElement element) {
sampler.setVirtualHost(virtualHost.getText());
sampler.setMessageTTL(messageTTL.getText());
sampler.setMessageExpires(messageExpires.getText());
+ sampler.setXDeadLetterExchange(xDeadLetterExchange.getText());
sampler.setExchangeType(exchangeType.getText());
sampler.setQueueDurable(queueDurable.isSelected());
sampler.setQueueExclusive(queueExclusive.isSelected());
@@ -219,6 +215,10 @@ private Component makeCommonPanel() {
gridBagConstraints.gridy = 3;
queueSettings.add(messageExpires, gridBagConstraints);
+ gridBagConstraints.gridx = 0;
+ gridBagConstraints.gridy = 4;
+ queueSettings.add(xDeadLetterExchange, gridBagConstraints);
+
gridBagConstraints.gridx = 1;
gridBagConstraints.gridy = 1;
queueSettings.add(queueDurable, gridBagConstraints);
diff --git a/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/AMQPMessageContentEncodingEnum.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/AMQPMessageContentEncodingEnum.java
new file mode 100644
index 0000000..d0231e1
--- /dev/null
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/AMQPMessageContentEncodingEnum.java
@@ -0,0 +1,18 @@
+package com.zeroclue.jmeter.protocol.amqp.util;
+
+/**
+ * Created by ricardotaboada on 5/14/15.
+ */
+public enum AMQPMessageContentEncodingEnum {
+ UTF_8("UTF-8"), GZIP_UTF_8("gzip:UTF-8");
+
+ private String encodingString;
+
+ AMQPMessageContentEncodingEnum(String encodingString) {
+ this.encodingString = encodingString;
+ }
+
+ public String getEncodingString() {
+ return encodingString;
+ }
+}
diff --git a/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/AMQPMessageContentEncodingUtil.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/AMQPMessageContentEncodingUtil.java
new file mode 100644
index 0000000..8c941f9
--- /dev/null
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/AMQPMessageContentEncodingUtil.java
@@ -0,0 +1,11 @@
+package com.zeroclue.jmeter.protocol.amqp.util;
+
+import com.rabbitmq.client.QueueingConsumer.Delivery;
+
+/**
+ * Created by ricardotaboada on 5/14/15.
+ */
+public interface AMQPMessageContentEncodingUtil {
+
+ Delivery decode(Delivery message);
+}
diff --git a/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/DefaultMessageEncodingUtil.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/DefaultMessageEncodingUtil.java
new file mode 100644
index 0000000..da8c8ac
--- /dev/null
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/DefaultMessageEncodingUtil.java
@@ -0,0 +1,20 @@
+package com.zeroclue.jmeter.protocol.amqp.util;
+
+import com.rabbitmq.client.QueueingConsumer.Delivery;
+
+/**
+ * Created by ricardotaboada on 5/14/15.
+ */
+public class DefaultMessageEncodingUtil implements AMQPMessageContentEncodingUtil {
+
+ /**
+ * As this is the default, it simply returns the received message
+ *
+ * @param message
+ *
+ * @return
+ */
+ public Delivery decode(Delivery message) {
+ return message;
+ }
+}
diff --git a/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/EncodingUtils.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/EncodingUtils.java
new file mode 100644
index 0000000..211b6f3
--- /dev/null
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/EncodingUtils.java
@@ -0,0 +1,23 @@
+/**
+ *
+ */
+package com.zeroclue.jmeter.protocol.amqp.util;
+
+import java.nio.charset.Charset;
+import java.util.Locale;
+
+/**
+ * @author jorge.matos
+ *
+ */
+public class EncodingUtils {
+
+ public static Locale getDefaultLocale() {
+ return Locale.US;
+ }
+
+ public static Charset getDefaultCharset() {
+ return Charset.forName("UTF-8");
+ }
+
+}
diff --git a/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/GzipMessageEncodingUtil.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/GzipMessageEncodingUtil.java
new file mode 100644
index 0000000..1720478
--- /dev/null
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/GzipMessageEncodingUtil.java
@@ -0,0 +1,49 @@
+package com.zeroclue.jmeter.protocol.amqp.util;
+
+import com.rabbitmq.client.QueueingConsumer.Delivery;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Created by ricardotaboada on 5/14/15.
+ */
+public class GzipMessageEncodingUtil implements AMQPMessageContentEncodingUtil {
+
+ /**
+ * Decodes a Gzip message
+ *
+ * @param message
+ *
+ * @return
+ */
+
+ public Delivery decode(Delivery message) {
+
+ try {
+ return postProcessMessage(message);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ private Delivery postProcessMessage(Delivery message) throws Exception {
+
+ GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(message.getBody()));
+ BufferedReader bf = new BufferedReader(new InputStreamReader(gis, "UTF-8"));
+ String outStr = "";
+ String line;
+ while ((line = bf.readLine()) != null) {
+ outStr += line;
+ }
+
+ Delivery delivery = new Delivery(message.getEnvelope(), message.getProperties(), outStr.getBytes());
+
+ return delivery;
+
+ }
+}
diff --git a/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/MessageContentEncodingUtilFactory.java b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/MessageContentEncodingUtilFactory.java
new file mode 100644
index 0000000..2f30452
--- /dev/null
+++ b/src/main/java/com/zeroclue/jmeter/protocol/amqp/util/MessageContentEncodingUtilFactory.java
@@ -0,0 +1,84 @@
+package com.zeroclue.jmeter.protocol.amqp.util;
+
+import com.rabbitmq.client.QueueingConsumer.Delivery;
+
+/**
+ * Created by ricardotaboada on 5/14/15.
+ */
+public class MessageContentEncodingUtilFactory {
+
+ /**
+ * Creates and Returns a new AMPQMessageContentEncodingUtil implementation for the contentEncoding provided in
+ * the message.
+ *
+ * @param message
+ *
+ * @return
+ *
+ * @throws SkyWalkerException
+ */
+ public static AMQPMessageContentEncodingUtil create(Delivery message) throws Exception {
+
+ if (message.getProperties().getContentEncoding() == null) {
+ return new DefaultMessageEncodingUtil();
+ }
+
+ // Get the messageContentEncoding type
+ AMQPMessageContentEncodingEnum amqpMessageContentEncodingEnum = getAmqpMessageContentEncodingEnum(message);
+
+ // If no enum matched the encoding on the message then throw an exception
+ if (amqpMessageContentEncodingEnum == null) {
+ throw new Exception("Unsupported message encoding");
+ }
+
+ // Get the AMQPMessageContentEncodingUtil implementation for the encoding read from the message
+ AMQPMessageContentEncodingUtil amqpMessageContentEncodingUtil = getAMQPMessageContentEncodingUtilImpl(amqpMessageContentEncodingEnum);
+
+ if (amqpMessageContentEncodingUtil != null) {
+ return amqpMessageContentEncodingUtil;
+ }
+
+ // If it reaches this statement, then the message encoding is unsupported. Throw an exception
+ throw new Exception("Unsupported message encoding");
+ }
+
+ /**
+ * Gets the Enum corresponding to the content Enconding of the message
+ *
+ * @param message
+ *
+ * @return
+ */
+ private static AMQPMessageContentEncodingEnum getAmqpMessageContentEncodingEnum(Delivery message) {
+
+ AMQPMessageContentEncodingEnum amqpMessageContentEncodingEnum = null;
+
+ for (AMQPMessageContentEncodingEnum amce : AMQPMessageContentEncodingEnum.values()) {
+ if (amce.getEncodingString().equals(message.getProperties().getContentEncoding())) {
+ amqpMessageContentEncodingEnum = amce;
+ }
+ }
+
+ return amqpMessageContentEncodingEnum;
+ }
+
+ /**
+ * Returns a implementation of the AMQPMessageContentEncodingUtil depending from the enum passed to the method
+ *
+ * @param amqpMessageContentEncodingEnum
+ *
+ * @return
+ */
+ private static AMQPMessageContentEncodingUtil getAMQPMessageContentEncodingUtilImpl(AMQPMessageContentEncodingEnum amqpMessageContentEncodingEnum) {
+
+ switch (amqpMessageContentEncodingEnum) {
+ case UTF_8:
+ return new DefaultMessageEncodingUtil();
+ case GZIP_UTF_8:
+ return new GzipMessageEncodingUtil();
+ }
+
+ // if it reaches this statement then the message encoding is unsupported. Return null;
+ return null;
+ }
+}