Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[fix] Make ProducerIdManagerImpl thread safe (#1603)
Browse files Browse the repository at this point in the history
### Motivation

The `ProducerIdManagerImpl` updates `currentProducerIdBlock` and
`nextProducerId` from callbacks that are likely run on separate threads.
When many producers connect, that could lead to data races that might
lead to incorrect incrementing for producer ids. This PR seeks to ensure
that getting the next `currentProducerIdBlock` is thread safe and does
not get run concurrently, and to make sure that updates to
`nextProducerId` are always thread safe. My model relies on
synchronizing on the object, which is already partially implemented with
synchronized methods.

### Modifications

* Update the callback in `generateProducerId` to ensure that
`nextProducerId` is updated safely and `currentProducerIdBlock` is read
safely
* Update `getNewProducerIdBlock` so that the `currentProducerIdBlock` is
only updated after a successful write to the metadata store.
* Introduce `newProducerIdBlockFuture` to prevent duplicate attempts to
update the metadata store's state for `currentProducerIdBlock`.
Duplicate attempts would fail, so this change should prevent races that
will result in failure for all but one future.
* Update the `nextProducerId` from within the `getNewProducerIdBlock`
method to simplify the callback logic in the `generateProducerId`
method.
  • Loading branch information
michaeljmarshall authored Dec 11, 2022
1 parent 50eb5fd commit 6653115
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class ProducerIdManagerImpl implements ProducerIdManager {
private ProducerIdBlock currentProducerIdBlock;
private Long nextProducerId = -1L;

private CompletableFuture<Void> newProducerIdBlockFuture;

public ProducerIdManagerImpl(int brokerId, MetadataStoreExtended metadataStore) {
this.brokerId = brokerId;
this.metadataStore = metadataStore;
Expand All @@ -78,70 +80,88 @@ public static ProducerIdBlock parseProducerIdBlockData(byte[] bytes) throws IOEx
.build();
}

public CompletableFuture<Void> getNewProducerIdBlock() {
CompletableFuture<Void> future = new CompletableFuture<>();
public synchronized CompletableFuture<Void> getNewProducerIdBlock() {
if (newProducerIdBlockFuture != null && !newProducerIdBlockFuture.isDone()) {
// In this case, the class is already getting the new producer id block.
// Returning this future ensures that callbacks work correctly
return newProducerIdBlockFuture;
}
newProducerIdBlockFuture = new CompletableFuture<>();
getCurrentDataAndVersion().thenAccept(currentDataAndVersionOpt -> {
if (currentDataAndVersionOpt.isPresent() && currentDataAndVersionOpt.get().getData() != null) {
DataAndVersion dataAndVersion = currentDataAndVersionOpt.get();
try {
ProducerIdBlock currProducerIdBlock =
ProducerIdManagerImpl.parseProducerIdBlockData(dataAndVersion.getData());
if (currProducerIdBlock.blockEndId > Long.MAX_VALUE - ProducerIdManagerImpl.PID_BLOCK_SIZE) {
// We have exhausted all producerIds (wow!), treat it as a fatal error
log.error("Exhausted all producerIds as the next block's end producerId is will "
+ "has exceeded long type limit (current block end producerId is {})",
currProducerIdBlock.blockEndId);
future.completeExceptionally(new KafkaException("Have exhausted all producerIds."));
synchronized (this) {
final ProducerIdBlock nextProducerIdBlock;
if (currentDataAndVersionOpt.isPresent() && currentDataAndVersionOpt.get().getData() != null) {
DataAndVersion dataAndVersion = currentDataAndVersionOpt.get();
try {
ProducerIdBlock currProducerIdBlock =
ProducerIdManagerImpl.parseProducerIdBlockData(dataAndVersion.getData());
if (currProducerIdBlock.blockEndId > Long.MAX_VALUE - ProducerIdManagerImpl.PID_BLOCK_SIZE) {
// We have exhausted all producerIds (wow!), treat it as a fatal error
log.error("Exhausted all producerIds as the next block's end producerId is will "
+ "has exceeded long type limit (current block end producerId is {})",
currProducerIdBlock.blockEndId);
newProducerIdBlockFuture
.completeExceptionally(new KafkaException("Have exhausted all producerIds."));
return;
}
nextProducerIdBlock = ProducerIdBlock
.builder()
.brokerId(brokerId)
.blockStartId(currProducerIdBlock.blockEndId + 1L)
.blockEndId(currProducerIdBlock.blockEndId + ProducerIdManagerImpl.PID_BLOCK_SIZE)
.build();
} catch (IOException e) {
newProducerIdBlockFuture.completeExceptionally(new KafkaException("Get producerId failed.", e));
return;
}
currentProducerIdBlock = ProducerIdBlock
} else {
if (log.isDebugEnabled()) {
log.debug("There is no producerId block yet, creating the first block");
}
nextProducerIdBlock = ProducerIdBlock
.builder()
.brokerId(brokerId)
.blockStartId(currProducerIdBlock.blockEndId + 1L)
.blockEndId(currProducerIdBlock.blockEndId + ProducerIdManagerImpl.PID_BLOCK_SIZE)
.blockStartId(0L)
.blockEndId(ProducerIdManagerImpl.PID_BLOCK_SIZE - 1)
.build();
} catch (IOException e) {
future.completeExceptionally(new KafkaException("Get producerId failed.", e));
return;
}
} else {
if (log.isDebugEnabled()) {
log.debug("There is no producerId block yet, creating the first block");
try {
byte[] newProducerIdBlockData = ProducerIdManagerImpl
.generateProducerIdBlockJson(nextProducerIdBlock);
conditionalUpdateData(newProducerIdBlockData,
currentDataAndVersionOpt.orElse(DataAndVersion.DEFAULT_VERSION).getVersion())
.thenAccept(version -> {
synchronized (this) {
currentProducerIdBlock = nextProducerIdBlock;
nextProducerId = nextProducerIdBlock.blockStartId;
newProducerIdBlockFuture.complete(null);
}
}).exceptionally(ex -> {
synchronized (this) {
newProducerIdBlockFuture.completeExceptionally(ex);
}
return null;
});
} catch (JsonProcessingException e) {
newProducerIdBlockFuture.completeExceptionally(e);
}
currentProducerIdBlock = ProducerIdBlock
.builder()
.brokerId(brokerId)
.blockStartId(0L)
.blockEndId(ProducerIdManagerImpl.PID_BLOCK_SIZE - 1)
.build();
}}).exceptionally(ex -> {
synchronized (this) {
newProducerIdBlockFuture.completeExceptionally(ex);
}
try {
byte[] newProducerIdBlockData = ProducerIdManagerImpl
.generateProducerIdBlockJson(currentProducerIdBlock);
conditionalUpdateData(newProducerIdBlockData,
currentDataAndVersionOpt.orElse(DataAndVersion.DEFAULT_VERSION).getVersion())
.thenAccept(version -> {
future.complete(null);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
} catch (JsonProcessingException e) {
future.completeExceptionally(e);
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
return newProducerIdBlockFuture;
}

@Override
public CompletableFuture<Void> initialize() {
CompletableFuture<Void> future = new CompletableFuture<>();
getNewProducerIdBlock()
.thenAccept(__ -> {
nextProducerId = currentProducerIdBlock.blockStartId;
synchronized (this) {
nextProducerId = currentProducerIdBlock.blockStartId;
}
future.complete(null);
}).exceptionally(throwable -> {
future.completeExceptionally(throwable);
Expand All @@ -156,8 +176,17 @@ public synchronized CompletableFuture<Long> generateProducerId() {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.blockEndId) {
getNewProducerIdBlock().thenAccept(__ -> {
nextProducerId = currentProducerIdBlock.blockStartId + 1;
nextProducerIdFuture.complete(nextProducerId - 1);
synchronized (this) {
if (nextProducerId > currentProducerIdBlock.blockEndId) {
// This can only happen if more than blockSize producers attempt to connect
// while the getNewProducerIdBlock() is processing
Exception ex = new IllegalStateException("New ProducerIdBlock exhausted. Try again.");
nextProducerIdFuture.completeExceptionally(ex);
} else {
nextProducerId += 1;
nextProducerIdFuture.complete(nextProducerId - 1);
}
}
}).exceptionally(ex -> {
nextProducerIdFuture.completeExceptionally(ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ProducerIdManagerImplTest {

@Test
public void verifyThreadSafetyForTwoConcurrentNewProducerIdBlockCalls() throws Exception {
// Initialize a fake metadata store such that the futures are not completed. This will allow
// for low level control during this test.
CompletableFuture<Optional<GetResult>> getFuture = new CompletableFuture<>();
CompletableFuture<Stat> completedPutFuture = new CompletableFuture<>();
// The value is not used, so mock with all "zero" values
completedPutFuture.complete(new Stat("", 0, 0, 0, false, false));

MetadataStoreExtended mockedMetadataStore = mock(MetadataStoreExtended.class);
when(mockedMetadataStore.get(anyString())).thenReturn(getFuture);
when(mockedMetadataStore.put(anyString(), any(), any())).thenReturn(completedPutFuture);

ProducerIdManagerImpl producerIdManager = new ProducerIdManagerImpl(1, mockedMetadataStore);
// Trigger two calls to increase the producer id block.
CompletableFuture<Void> firstNewBlock = producerIdManager.getNewProducerIdBlock();
CompletableFuture<Void> secondNewBlock = producerIdManager.getNewProducerIdBlock();

Assert.assertFalse(firstNewBlock.isDone());
Assert.assertFalse(secondNewBlock.isDone());

// Relies on the fact that completing the future also triggers the callbacks to run in same thread
getFuture.complete(Optional.empty());

// Ensure that both calls completed
Assert.assertTrue(firstNewBlock.isDone());
Assert.assertTrue(secondNewBlock.isDone());
Assert.assertFalse(firstNewBlock.isCompletedExceptionally());
Assert.assertFalse(secondNewBlock.isCompletedExceptionally());

// Ensure that the next producer id is the first value
Assert.assertEquals(producerIdManager.generateProducerId().get().intValue(), 0, "The first id should be 0.");
}

@Test
public void verifyProducerIdManagerForManyBrokersAndManyNewProducers() throws Exception {
int expectedNumIds = 1000000;
int numBrokers = 10;
LocalMemoryMetadataStore metadataStore =
new LocalMemoryMetadataStore("memory:localhost", MetadataStoreConfig.builder().build());
List<ProducerIdManagerImpl> producerIdManagers = new ArrayList<>(numBrokers);
for (int i = 0; i < numBrokers; i++) {
ProducerIdManagerImpl producerIdManager = new ProducerIdManagerImpl(i, metadataStore);
producerIdManagers.add(producerIdManager);
producerIdManager.initialize();
}

List<CompletableFuture<Long>> futureIds = new ArrayList<>(expectedNumIds);

for (int i = 0; i < expectedNumIds; i++) {
for (ProducerIdManagerImpl producerIdManager : producerIdManagers) {
futureIds.add(producerIdManager.generateProducerId());
}
}

CompletableFuture.allOf(futureIds.toArray(new CompletableFuture[0])).get();

HashSet<Long> ids = new HashSet<>();
for (CompletableFuture<Long> futureId : futureIds) {
Assert.assertTrue(ids.add(futureId.get()), String.format("Expected %d to be a unique id", futureId.get()));
}
Assert.assertEquals(ids.size(), expectedNumIds * numBrokers);
}

@Test
public void tooManyConcurrentNewProducersShouldFail() throws Exception {
long blockSize = ProducerIdManagerImpl.PID_BLOCK_SIZE;
int brokerId = 1;
// Initialize a fake metadata store such that the futures are not completed. This will allow
// for low level control during this test.
CompletableFuture<Optional<GetResult>> firstGetFuture = new CompletableFuture<>();
CompletableFuture<Optional<GetResult>> secondGetFuture = new CompletableFuture<>();
CompletableFuture<Stat> firstPutFuture = new CompletableFuture<>();
// The value is not used, and we mock the get results, so the put is essentially ignored
firstPutFuture.complete(new Stat("", 0, 0, 0, false, false));

MetadataStoreExtended mockedMetadataStore = mock(MetadataStoreExtended.class);
when(mockedMetadataStore.get(anyString())).thenReturn(firstGetFuture).thenReturn(secondGetFuture);
when(mockedMetadataStore.put(anyString(), any(), any())).thenReturn(firstPutFuture);

ProducerIdManagerImpl producerIdManager = new ProducerIdManagerImpl(brokerId, mockedMetadataStore);
producerIdManager.initialize();
// Relies on the fact that completing the future also triggers the callbacks to run
firstGetFuture.complete(Optional.empty());
List<CompletableFuture<Long>> futureIds = new ArrayList<>((int) blockSize + 1);

// Create one blockSize worth of producer ids
for (int i = 0; i < blockSize; i++) {
Assert.assertEquals(producerIdManager.generateProducerId().get().intValue(), i);
}

// Now create callbacks for blockSize + 1 producer ids.
for (int i = 0; i < blockSize + 1; i++) {
futureIds.add(producerIdManager.generateProducerId());
}

// Relies on the fact that completing the future also triggers the callbacks to run
ProducerIdManagerImpl.ProducerIdBlock zeroBlock = ProducerIdManagerImpl.ProducerIdBlock
.builder()
.brokerId(brokerId)
.blockStartId(0L)
.blockEndId(ProducerIdManagerImpl.PID_BLOCK_SIZE - 1)
.build();
// This stat is not actually used
Stat stat = new Stat("", 0, 0, 0, false, false);
GetResult result = new GetResult(ProducerIdManagerImpl.generateProducerIdBlockJson(zeroBlock), stat);
secondGetFuture.complete(Optional.of(result));

int countFailed = 0;
HashSet<Long> set = new HashSet<>();
for (CompletableFuture<Long> id : futureIds) {
if (id.isDone()) {
if (id.isCompletedExceptionally()) {
countFailed++;
} else {
set.add(id.get());
}
} else {
Assert.fail();
}
}

Assert.assertEquals(countFailed, 1, "Only one producer id should have failed");
Assert.assertEquals(set.size(), blockSize, "Ensures all ids are unique and that no extra ids were created.");
}

}

0 comments on commit 6653115

Please sign in to comment.