Skip to content

Commit

Permalink
Merge pull request #139 from Azure/0.14.3_rc
Browse files Browse the repository at this point in the history
0.14.3
  • Loading branch information
sjkwak authored Jul 7, 2017
2 parents cbfddf2 + f589005 commit 81165cd
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
</dependency>
```

Expand Down
6 changes: 3 additions & 3 deletions azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand All @@ -24,12 +24,12 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>5.0.0</version>
<version>5.3.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
<version>2.8.1</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -454,7 +455,22 @@ public Map<Symbol, UnknownDescribedType> getFilter(final Message lastReceivedMes
@Override
public Map<Symbol, Object> getProperties() {

return this.isEpochReceiver ? Collections.singletonMap(AmqpConstants.EPOCH, (Object) this.epoch) : null;
if (!this.isEpochReceiver &&
(this.receiverOptions == null || this.receiverOptions.getIdentifier() == null)) {
return null;
}

final Map<Symbol, Object> properties = new HashMap<>();

if (this.isEpochReceiver) {
properties.put(AmqpConstants.EPOCH, (Object) this.epoch);
}

if (this.receiverOptions != null && this.receiverOptions.getIdentifier() != null) {
properties.put(AmqpConstants.RECEIVER_IDENTIFIER_NAME, (Object) this.receiverOptions.getIdentifier());
}

return properties;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
*/
package com.microsoft.azure.eventhubs;

import com.microsoft.azure.servicebus.ClientConstants;

/**
* Represents various optional behaviors which can be turned on or off during the creation of a {@link PartitionReceiver}.
*/
public final class ReceiverOptions {

private boolean receiverRuntimeMetricEnabled;
private String identifier;

/**
* Knob to enable/disable runtime metric of the receiver. If this is set to true and is passed to {@link EventHubClient#createReceiver},
Expand All @@ -36,4 +39,39 @@ public void setReceiverRuntimeMetricEnabled(boolean value) {

this.receiverRuntimeMetricEnabled = value;
}

/**
* Gets the identifier of the {@link PartitionReceiver}
*
* @return identifier of the {@link PartitionReceiver}; null if nothing was set
*/
public String getIdentifier() {

return this.identifier;
}

/**
* Set an identifier to {@link PartitionReceiver}.
* <p>
* This identifier will be used by EventHubs service when reporting any errors across receivers, and is caused by this receiver.
* For example, when receiver quota limit is hit, while a user is trying to create New receiver,
* EventHubs service will throw {@link com.microsoft.azure.servicebus.QuotaExceededException} and will include this identifier.
* So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}.
*
* </p>
* @param value string to identify {@link PartitionReceiver}
*/
public void setIdentifier(final String value) {

ReceiverOptions.validateReceiverIdentifier(value);
this.identifier = value;
}

private static void validateReceiverIdentifier(final String receiverName) {

if (receiverName != null &&
receiverName.length() > ClientConstants.MAX_RECEIVER_NAME_LENGTH) {
throw new IllegalArgumentException("receiverIdentifier length cannot exceed 64");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private ClientConstants() {

public final static Duration DEFAULT_RERTRY_MIN_BACKOFF = Duration.ofSeconds(0);
public final static Duration DEFAULT_RERTRY_MAX_BACKOFF = Duration.ofSeconds(30);
public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(20);
public final static Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins
public final static Duration TOKEN_VALIDITY = Duration.ofMinutes(20);

public final static int DEFAULT_MAX_RETRY_COUNT = 10;

Expand All @@ -52,7 +53,7 @@ private ClientConstants() {
public final static String DEFAULT_RETRY = "Default";

public final static String PRODUCT_NAME = "MSJavaClient";
public final static String CURRENT_JAVACLIENT_VERSION = "0.14.2";
public final static String CURRENT_JAVACLIENT_VERSION = "0.14.3";

public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
Expand Down Expand Up @@ -94,6 +95,8 @@ private ClientConstants() {
public static final String AMQP_PUT_TOKEN_FAILED_ERROR = "Put token failed. status-code: %s, status-description: %s";
public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";

public static final int MAX_RECEIVER_NAME_LENGTH = 64;

private static String getPlatformInfo() {
final StringBuilder platformInfo = new StringBuilder();
platformInfo.append("arch:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void run() {
try {
underlyingFactory.getCBSChannel().sendToken(
underlyingFactory.getReactorScheduler(),
underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult<Void, Exception>() {
@Override
Expand Down Expand Up @@ -474,7 +474,7 @@ else if (u != null)
try {
this.underlyingFactory.getCBSChannel().sendToken(
this.underlyingFactory.getReactorScheduler(),
this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult<Void, Exception>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void run() {
try {
underlyingFactory.getCBSChannel().sendToken(
underlyingFactory.getReactorScheduler(),
underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult<Void, Exception>() {
@Override
Expand Down Expand Up @@ -566,7 +566,7 @@ else if (u != null)
try {
this.underlyingFactory.getCBSChannel().sendToken(
this.underlyingFactory.getReactorScheduler(),
this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_REFRESH_INTERVAL),
this.underlyingFactory.getTokenProvider().getToken(tokenAudience, ClientConstants.TOKEN_VALIDITY),
tokenAudience,
new IOperationResult<Void, Exception>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,5 @@ private AmqpConstants() {
public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id";

public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric");
public static final Symbol RECEIVER_IDENTIFIER_NAME = Symbol.valueOf(AmqpConstants.VENDOR + ":receiver-name");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.sendrecv;

import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import com.microsoft.azure.servicebus.QuotaExceededException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.eventhubs.lib.ApiTestBase;
import com.microsoft.azure.eventhubs.lib.TestBase;
import com.microsoft.azure.eventhubs.lib.TestContext;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.ServiceBusException;

public class ReceiverIdentifierTest extends ApiTestBase {

static final String cgName = TestContext.getConsumerGroupName();
static final String partitionId = "0";
static final Instant beforeTestStart = Instant.now();
static final int sentEvents = 25;
static final List<PartitionReceiver> receivers = new LinkedList<>();

static EventHubClient ehClient;

@BeforeClass
public static void initializeEventHub() throws Exception {

final ConnectionStringBuilder connectionString = TestContext.getConnectionString();
ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString());

TestBase.pushEventsToPartition(ehClient, partitionId, sentEvents).get();
}

@Test()
public void testReceiverIdentierShowsUpInQuotaErrors() throws ServiceBusException {

final String receiverIdentifierPrefix = UUID.randomUUID().toString();
for (int receiverCount = 0; receiverCount < 5; receiverCount ++) {
final ReceiverOptions options = new ReceiverOptions();
options.setIdentifier(receiverIdentifierPrefix + receiverCount);
ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.START_OF_STREAM, options);
}

try {
ehClient.createReceiverSync(cgName, partitionId, PartitionReceiver.START_OF_STREAM);
Assert.assertTrue(false);
}
catch (QuotaExceededException quotaError) {
final String errorMsg = quotaError.getMessage();
for (int receiverCount=0; receiverCount < 5; receiverCount++) {
Assert.assertTrue(errorMsg.contains(receiverIdentifierPrefix + receiverCount));
}
}
}

@AfterClass()
public static void cleanup() throws ServiceBusException {

for (PartitionReceiver receiver : receivers)
receiver.closeSync();

if (ehClient != null)
ehClient.closeSync();
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
<packaging>pom</packaging>

<url>https://github.com/Azure/azure-event-hubs</url>
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ the required version of Apache Qpid Proton-J, and the crytography library BCPKIX
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>0.14.2</version>
<version>0.14.3</version>
</dependency>
```

Expand Down

0 comments on commit 81165cd

Please sign in to comment.