properties = new HashMap<>();
+
+ if (this.isEpochReceiver) {
+ properties.put(AmqpConstants.EPOCH, (Object) this.epoch);
+ }
+
+ if (this.receiverOptions != null && this.receiverOptions.getIdentifier() != null) {
+ properties.put(AmqpConstants.RECEIVER_IDENTIFIER_NAME, (Object) this.receiverOptions.getIdentifier());
+ }
+
+ return properties;
}
@Override
diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java
index 414613aa7..f88184375 100644
--- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java
+++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java
@@ -4,12 +4,15 @@
*/
package com.microsoft.azure.eventhubs;
+import com.microsoft.azure.servicebus.ClientConstants;
+
/**
* Represents various optional behaviors which can be turned on or off during the creation of a {@link PartitionReceiver}.
*/
public final class ReceiverOptions {
private boolean receiverRuntimeMetricEnabled;
+ private String identifier;
/**
* Knob to enable/disable runtime metric of the receiver. If this is set to true and is passed to {@link EventHubClient#createReceiver},
@@ -36,4 +39,39 @@ public void setReceiverRuntimeMetricEnabled(boolean value) {
this.receiverRuntimeMetricEnabled = value;
}
+
+ /**
+ * Gets the identifier of the {@link PartitionReceiver}
+ *
+ * @return identifier of the {@link PartitionReceiver}; null if nothing was set
+ */
+ public String getIdentifier() {
+
+ return this.identifier;
+ }
+
+ /**
+ * Set an identifier to {@link PartitionReceiver}.
+ *
+ * This identifier will be used by EventHubs service when reporting any errors across receivers, and is caused by this receiver.
+ * For example, when receiver quota limit is hit, while a user is trying to create New receiver,
+ * EventHubs service will throw {@link com.microsoft.azure.servicebus.QuotaExceededException} and will include this identifier.
+ * So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}.
+ *
+ *
+ * @param value string to identify {@link PartitionReceiver}
+ */
+ public void setIdentifier(final String value) {
+
+ ReceiverOptions.validateReceiverIdentifier(value);
+ this.identifier = value;
+ }
+
+ private static void validateReceiverIdentifier(final String receiverName) {
+
+ if (receiverName != null &&
+ receiverName.length() > ClientConstants.MAX_RECEIVER_NAME_LENGTH) {
+ throw new IllegalArgumentException("receiverIdentifier length cannot exceed 64");
+ }
+ }
}
diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java
index f56b3353b..6ed86b4cd 100644
--- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java
+++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java
@@ -35,7 +35,8 @@ private ClientConstants() {
public final static Duration DEFAULT_RERTRY_MIN_BACKOFF = Duration.ofSeconds(0);
public final static Duration DEFAULT_RERTRY_MAX_BACKOFF = Duration.ofSeconds(30);
- public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(20);
+ public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins
+ public final static Duration TOKEN_VALIDITY = Duration.ofMinutes(20);
public final static int DEFAULT_MAX_RETRY_COUNT = 10;
@@ -52,7 +53,7 @@ private ClientConstants() {
public final static String DEFAULT_RETRY = "Default";
public final static String PRODUCT_NAME = "MSJavaClient";
- public final static String CURRENT_JAVACLIENT_VERSION = "0.14.2";
+ public final static String CURRENT_JAVACLIENT_VERSION = "0.14.3";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
@@ -94,6 +95,8 @@ private ClientConstants() {
public static final String AMQP_PUT_TOKEN_FAILED_ERROR = "Put token failed. status-code: %s, status-description: %s";
public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
+ public static final int MAX_RECEIVER_NAME_LENGTH = 64;
+
private static String getPlatformInfo() {
final StringBuilder platformInfo = new StringBuilder();
platformInfo.append("arch:");
diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java
index 20c4a40c2..c2ae7dfb2 100644
--- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java
+++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java
@@ -130,7 +130,7 @@ public void run() {
try {
underlyingFactory.getCBSChannel().sendToken(
underlyingFactory.getReactorScheduler(),
- underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
+ underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult() {
@Override
@@ -474,7 +474,7 @@ else if (u != null)
try {
this.underlyingFactory.getCBSChannel().sendToken(
this.underlyingFactory.getReactorScheduler(),
- this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
+ this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult() {
@Override
diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java
index 40ad0f45a..7a787c26e 100644
--- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java
+++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java
@@ -143,7 +143,7 @@ public void run() {
try {
underlyingFactory.getCBSChannel().sendToken(
underlyingFactory.getReactorScheduler(),
- underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
+ underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult() {
@Override
@@ -566,7 +566,7 @@ else if (u != null)
try {
this.underlyingFactory.getCBSChannel().sendToken(
this.underlyingFactory.getReactorScheduler(),
- this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
+ this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult() {
@Override
diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java
index 8fc1f181e..f356b6bc8 100644
--- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java
+++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java
@@ -74,4 +74,5 @@ private AmqpConstants() {
public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id";
public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric");
+ public static final Symbol RECEIVER_IDENTIFIER_NAME = Symbol.valueOf(AmqpConstants.VENDOR + ":receiver-name");
}
diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java
new file mode 100644
index 000000000..dd92bdfd3
--- /dev/null
+++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) Microsoft. All rights reserved.
+ * Licensed under the MIT license. See LICENSE file in the project root for full license information.
+ */
+package com.microsoft.azure.eventhubs.sendrecv;
+
+import java.time.Instant;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import com.microsoft.azure.servicebus.QuotaExceededException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.microsoft.azure.eventhubs.EventHubClient;
+import com.microsoft.azure.eventhubs.PartitionReceiver;
+import com.microsoft.azure.eventhubs.ReceiverOptions;
+import com.microsoft.azure.eventhubs.lib.ApiTestBase;
+import com.microsoft.azure.eventhubs.lib.TestBase;
+import com.microsoft.azure.eventhubs.lib.TestContext;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.ServiceBusException;
+
+public class ReceiverIdentifierTest extends ApiTestBase {
+
+ static final String cgName = TestContext.getConsumerGroupName();
+ static final String partitionId = "0";
+ static final Instant beforeTestStart = Instant.now();
+ static final int sentEvents = 25;
+ static final List receivers = new LinkedList<>();
+
+ static EventHubClient ehClient;
+
+ @BeforeClass
+ public static void initializeEventHub() throws Exception {
+
+ final ConnectionStringBuilder connectionString = TestContext.getConnectionString();
+ ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString());
+
+ TestBase.pushEventsToPartition(ehClient, partitionId, sentEvents).get();
+ }
+
+ @Test()
+ public void testReceiverIdentierShowsUpInQuotaErrors() throws ServiceBusException {
+
+ final String receiverIdentifierPrefix = UUID.randomUUID().toString();
+ for (int receiverCount = 0; receiverCount < 5; receiverCount ++) {
+ final ReceiverOptions options = new ReceiverOptions();
+ options.setIdentifier(receiverIdentifierPrefix + receiverCount);
+ ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.START_OF_STREAM, options);
+ }
+
+ try {
+ ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.START_OF_STREAM);
+ Assert.assertTrue(false);
+ }
+ catch (QuotaExceededException quotaError) {
+ final String errorMsg = quotaError.getMessage();
+ for (int receiverCount=0; receiverCount < 5; receiverCount++) {
+ Assert.assertTrue(errorMsg.contains(receiverIdentifierPrefix + receiverCount));
+ }
+ }
+ }
+
+ @AfterClass()
+ public static void cleanup() throws ServiceBusException {
+
+ for (PartitionReceiver receiver : receivers)
+ receiver.closeSync();
+
+ if (ehClient != null)
+ ehClient.closeSync();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 2fb98a066..c879ed040 100755
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.microsoft.azure
azure-eventhubs-clients
- 0.14.2
+ 0.14.3
pom
https://github.com/Azure/azure-event-hubs
diff --git a/readme.md b/readme.md
index c465dc27c..a0753025d 100644
--- a/readme.md
+++ b/readme.md
@@ -148,7 +148,7 @@ the required version of Apache Qpid Proton-J, and the crytography library BCPKIX
com.microsoft.azure
azure-eventhubs
- 0.14.2
+ 0.14.3
```