From bdfc5153c71bd922a9c359bf79f11753a9ceda64 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 6 Jul 2017 03:34:58 +0530 Subject: [PATCH 1/6] bump up the version to 0.15.0 --- ConsumingEvents.md | 2 +- PublishingEvents.md | 2 +- azure-eventhubs-eph/pom.xml | 6 +++--- azure-eventhubs-extensions/pom.xml | 2 +- azure-eventhubs/pom.xml | 2 +- .../com/microsoft/azure/servicebus/ClientConstants.java | 2 +- pom.xml | 2 +- readme.md | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ConsumingEvents.md b/ConsumingEvents.md index 66f88c126..f9ebb3886 100644 --- a/ConsumingEvents.md +++ b/ConsumingEvents.md @@ -30,7 +30,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs-clients - 0.14.2 + 0.15.0 ``` diff --git a/PublishingEvents.md b/PublishingEvents.md index a61427981..e2aa18626 100644 --- a/PublishingEvents.md +++ b/PublishingEvents.md @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs-clients - 0.14.2 + 0.15.0 ``` diff --git a/azure-eventhubs-eph/pom.xml b/azure-eventhubs-eph/pom.xml index b93957284..06334a919 100644 --- a/azure-eventhubs-eph/pom.xml +++ b/azure-eventhubs-eph/pom.xml @@ -3,7 +3,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.14.2 + 0.15.0 4.0.0 @@ -24,12 +24,12 @@ com.microsoft.azure azure-storage - 5.0.0 + 5.3.1 com.google.code.gson gson - 2.8.0 + 2.8.1 diff --git a/azure-eventhubs-extensions/pom.xml b/azure-eventhubs-extensions/pom.xml index 94eaf9feb..a5b8f2543 100644 --- a/azure-eventhubs-extensions/pom.xml +++ b/azure-eventhubs-extensions/pom.xml @@ -7,7 +7,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.14.2 + 0.15.0 4.0.0 diff --git a/azure-eventhubs/pom.xml b/azure-eventhubs/pom.xml index f5dec6969..5b9e3ae0b 100755 --- a/azure-eventhubs/pom.xml +++ b/azure-eventhubs/pom.xml @@ -3,7 +3,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.14.2 + 0.15.0 4.0.0 diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java index f56b3353b..4b821f9dc 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java @@ -52,7 +52,7 @@ private ClientConstants() { public final static String DEFAULT_RETRY = "Default"; public final static String PRODUCT_NAME = "MSJavaClient"; - public final static String CURRENT_JAVACLIENT_VERSION = "0.14.2"; + public final static String CURRENT_JAVACLIENT_VERSION = "0.15.0"; public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); diff --git a/pom.xml b/pom.xml index 2fb98a066..f72c044c0 100755 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.14.2 + 0.15.0 pom https://github.com/Azure/azure-event-hubs diff --git a/readme.md b/readme.md index c465dc27c..fd01a0b5d 100644 --- a/readme.md +++ b/readme.md @@ -148,7 +148,7 @@ the required version of Apache Qpid Proton-J, and the crytography library BCPKIX com.microsoft.azure azure-eventhubs - 0.14.2 + 0.15.0 ``` From 346d1b4e50d802427f6ecfc058666611ced02b29 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 6 Jul 2017 05:07:37 +0530 Subject: [PATCH 2/6] add receiver identifier to ReceiverOptions --- .../azure/eventhubs/PartitionReceiver.java | 17 +++- .../azure/eventhubs/ReceiverOptions.java | 22 ++++++ .../azure/servicebus/ClientConstants.java | 2 + .../azure/servicebus/amqp/AmqpConstants.java | 1 + .../sendrecv/ReceiverIdentifierTest.java | 77 +++++++++++++++++++ 5 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index 8ec356ce3..a53b65888 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -454,7 +454,22 @@ public Map getFilter(final Message lastReceivedMes @Override public Map getProperties() { - return this.isEpochReceiver ? Collections.singletonMap(AmqpConstants.EPOCH, (Object) this.epoch) : null; + if (!this.isEpochReceiver && + (this.receiverOptions == null || this.receiverOptions.getIdentifier() == null)) { + return null; + } + + final Map properties = new HashMap<>(); + + if (this.isEpochReceiver) { + properties.put(AmqpConstants.EPOCH, (Object) this.epoch); + } + + if (this.receiverOptions != null && this.receiverOptions.getIdentifier() != null) { + properties.put(AmqpConstants.RECEIVER_IDENTIFIER_NAME, (Object) this.receiverOptions.getIdentifier()); + } + + return properties; } @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java index 414613aa7..bdf9f8ffe 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java @@ -4,12 +4,15 @@ */ package com.microsoft.azure.eventhubs; +import com.microsoft.azure.servicebus.ClientConstants; + /** * Represents various optional behaviors which can be turned on or off during the creation of a {@link PartitionReceiver}. */ public final class ReceiverOptions { private boolean receiverRuntimeMetricEnabled; + private String identifier; /** * Knob to enable/disable runtime metric of the receiver. If this is set to true and is passed to {@link EventHubClient#createReceiver}, @@ -36,4 +39,23 @@ public void setReceiverRuntimeMetricEnabled(boolean value) { this.receiverRuntimeMetricEnabled = value; } + + public String getIdentifier() { + + return this.identifier; + } + + public void setIdentifier(final String value) { + + ReceiverOptions.validateReceiverIdentifier(value); + this.identifier = value; + } + + private static void validateReceiverIdentifier(final String receiverName) { + + if (receiverName != null && + receiverName.length() > ClientConstants.MAX_RECEIVER_NAME_LENGTH) { + throw new IllegalArgumentException("receiverIdentifier length cannot exceed 64"); + } + } } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java index 4b821f9dc..e0a34ebfa 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java @@ -94,6 +94,8 @@ private ClientConstants() { public static final String AMQP_PUT_TOKEN_FAILED_ERROR = "Put token failed. status-code: %s, status-description: %s"; public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s"; + public static final int MAX_RECEIVER_NAME_LENGTH = 64; + private static String getPlatformInfo() { final StringBuilder platformInfo = new StringBuilder(); platformInfo.append("arch:"); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java index 8fc1f181e..f356b6bc8 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java @@ -74,4 +74,5 @@ private AmqpConstants() { public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id"; public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric"); + public static final Symbol RECEIVER_IDENTIFIER_NAME = Symbol.valueOf(AmqpConstants.VENDOR + ":receiver-name"); } diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java new file mode 100644 index 000000000..dd92bdfd3 --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverIdentifierTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.sendrecv; + +import java.time.Instant; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import com.microsoft.azure.servicebus.QuotaExceededException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.ReceiverOptions; +import com.microsoft.azure.eventhubs.lib.ApiTestBase; +import com.microsoft.azure.eventhubs.lib.TestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.ServiceBusException; + +public class ReceiverIdentifierTest extends ApiTestBase { + + static final String cgName = TestContext.getConsumerGroupName(); + static final String partitionId = "0"; + static final Instant beforeTestStart = Instant.now(); + static final int sentEvents = 25; + static final List receivers = new LinkedList<>(); + + static EventHubClient ehClient; + + @BeforeClass + public static void initializeEventHub() throws Exception { + + final ConnectionStringBuilder connectionString = TestContext.getConnectionString(); + ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString()); + + TestBase.pushEventsToPartition(ehClient, partitionId, sentEvents).get(); + } + + @Test() + public void testReceiverIdentierShowsUpInQuotaErrors() throws ServiceBusException { + + final String receiverIdentifierPrefix = UUID.randomUUID().toString(); + for (int receiverCount = 0; receiverCount < 5; receiverCount ++) { + final ReceiverOptions options = new ReceiverOptions(); + options.setIdentifier(receiverIdentifierPrefix + receiverCount); + ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.START_OF_STREAM, options); + } + + try { + ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.START_OF_STREAM); + Assert.assertTrue(false); + } + catch (QuotaExceededException quotaError) { + final String errorMsg = quotaError.getMessage(); + for (int receiverCount=0; receiverCount < 5; receiverCount++) { + Assert.assertTrue(errorMsg.contains(receiverIdentifierPrefix + receiverCount)); + } + } + } + + @AfterClass() + public static void cleanup() throws ServiceBusException { + + for (PartitionReceiver receiver : receivers) + receiver.closeSync(); + + if (ehClient != null) + ehClient.closeSync(); + } +} From 5a08f58c698f20a7e38676157945f725dcdce6a9 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 6 Jul 2017 05:08:31 +0530 Subject: [PATCH 3/6] minor refactor --- .../java/com/microsoft/azure/eventhubs/PartitionReceiver.java | 1 + 1 file changed, 1 insertion(+) diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index a53b65888..30f1df0d9 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -9,6 +9,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; From 1739ad8ebc415cd5a8dc26065982ac7c7d9e685a Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 6 Jul 2017 05:22:52 +0530 Subject: [PATCH 4/6] client version to 0.14.3 --- ConsumingEvents.md | 2 +- PublishingEvents.md | 2 +- azure-eventhubs-eph/pom.xml | 2 +- azure-eventhubs-extensions/pom.xml | 2 +- azure-eventhubs/pom.xml | 2 +- .../java/com/microsoft/azure/servicebus/ClientConstants.java | 2 +- pom.xml | 2 +- readme.md | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ConsumingEvents.md b/ConsumingEvents.md index f9ebb3886..706478592 100644 --- a/ConsumingEvents.md +++ b/ConsumingEvents.md @@ -30,7 +30,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs-clients - 0.15.0 + 0.14.3 ``` diff --git a/PublishingEvents.md b/PublishingEvents.md index e2aa18626..697063db9 100644 --- a/PublishingEvents.md +++ b/PublishingEvents.md @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs-clients - 0.15.0 + 0.14.3 ``` diff --git a/azure-eventhubs-eph/pom.xml b/azure-eventhubs-eph/pom.xml index 06334a919..2462554d6 100644 --- a/azure-eventhubs-eph/pom.xml +++ b/azure-eventhubs-eph/pom.xml @@ -3,7 +3,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.15.0 + 0.14.3 4.0.0 diff --git a/azure-eventhubs-extensions/pom.xml b/azure-eventhubs-extensions/pom.xml index a5b8f2543..24e0e649e 100644 --- a/azure-eventhubs-extensions/pom.xml +++ b/azure-eventhubs-extensions/pom.xml @@ -7,7 +7,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.15.0 + 0.14.3 4.0.0 diff --git a/azure-eventhubs/pom.xml b/azure-eventhubs/pom.xml index 5b9e3ae0b..6e56b82bb 100755 --- a/azure-eventhubs/pom.xml +++ b/azure-eventhubs/pom.xml @@ -3,7 +3,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.15.0 + 0.14.3 4.0.0 diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java index e0a34ebfa..5e856b493 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java @@ -52,7 +52,7 @@ private ClientConstants() { public final static String DEFAULT_RETRY = "Default"; public final static String PRODUCT_NAME = "MSJavaClient"; - public final static String CURRENT_JAVACLIENT_VERSION = "0.15.0"; + public final static String CURRENT_JAVACLIENT_VERSION = "0.14.3"; public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); diff --git a/pom.xml b/pom.xml index f72c044c0..c879ed040 100755 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.microsoft.azure azure-eventhubs-clients - 0.15.0 + 0.14.3 pom https://github.com/Azure/azure-event-hubs diff --git a/readme.md b/readme.md index fd01a0b5d..a0753025d 100644 --- a/readme.md +++ b/readme.md @@ -148,7 +148,7 @@ the required version of Apache Qpid Proton-J, and the crytography library BCPKIX com.microsoft.azure azure-eventhubs - 0.15.0 + 0.14.3 ``` From 8b7750926f3cab9af7da6531c1cc0f988d6106ec Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 6 Jul 2017 23:56:57 +0530 Subject: [PATCH 5/6] Refresh token every 10 mins - to help with error situations --- .../java/com/microsoft/azure/servicebus/ClientConstants.java | 3 ++- .../java/com/microsoft/azure/servicebus/MessageReceiver.java | 4 ++-- .../java/com/microsoft/azure/servicebus/MessageSender.java | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java index 5e856b493..6ed86b4cd 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java @@ -35,7 +35,8 @@ private ClientConstants() { public final static Duration DEFAULT_RERTRY_MIN_BACKOFF = Duration.ofSeconds(0); public final static Duration DEFAULT_RERTRY_MAX_BACKOFF = Duration.ofSeconds(30); - public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(20); + public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins + public final static Duration TOKEN_VALIDITY = Duration.ofMinutes(20); public final static int DEFAULT_MAX_RETRY_COUNT = 10; diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 20c4a40c2..c2ae7dfb2 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -130,7 +130,7 @@ public void run() { try { underlyingFactory.getCBSChannel().sendToken( underlyingFactory.getReactorScheduler(), - underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL), + underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY), tokenAudience, new IOperationResult() { @Override @@ -474,7 +474,7 @@ else if (u != null) try { this.underlyingFactory.getCBSChannel().sendToken( this.underlyingFactory.getReactorScheduler(), - this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL), + this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY), tokenAudience, new IOperationResult() { @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java index 40ad0f45a..7a787c26e 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageSender.java @@ -143,7 +143,7 @@ public void run() { try { underlyingFactory.getCBSChannel().sendToken( underlyingFactory.getReactorScheduler(), - underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL), + underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY), tokenAudience, new IOperationResult() { @Override @@ -566,7 +566,7 @@ else if (u != null) try { this.underlyingFactory.getCBSChannel().sendToken( this.underlyingFactory.getReactorScheduler(), - this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL), + this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY), tokenAudience, new IOperationResult() { @Override From f58900513c3fd7d3424425113c72d9265acd8413 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sat, 8 Jul 2017 03:01:37 +0530 Subject: [PATCH 6/6] javadoc for receiveridentifier (#138) --- .../azure/eventhubs/ReceiverOptions.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java index bdf9f8ffe..f88184375 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java @@ -40,11 +40,27 @@ public void setReceiverRuntimeMetricEnabled(boolean value) { this.receiverRuntimeMetricEnabled = value; } + /** + * Gets the identifier of the {@link PartitionReceiver} + * + * @return identifier of the {@link PartitionReceiver}; null if nothing was set + */ public String getIdentifier() { return this.identifier; } + /** + * Set an identifier to {@link PartitionReceiver}. + *

+ * This identifier will be used by EventHubs service when reporting any errors across receivers, and is caused by this receiver. + * For example, when receiver quota limit is hit, while a user is trying to create New receiver, + * EventHubs service will throw {@link com.microsoft.azure.servicebus.QuotaExceededException} and will include this identifier. + * So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}. + * + *

+ * @param value string to identify {@link PartitionReceiver} + */ public void setIdentifier(final String value) { ReceiverOptions.validateReceiverIdentifier(value);