Skip to content

Commit

Permalink
Merge pull request #2289 from ozangunalp/replace_some_synchronized_bl…
Browse files Browse the repository at this point in the history
…ocks

Replace some synchronized blocks
  • Loading branch information
ozangunalp authored Sep 21, 2023
2 parents 8bfcb7f + cadb166 commit 4364cea
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;

Expand Down Expand Up @@ -64,6 +65,8 @@ public class KafkaRecordStreamSubscription<K, V, T> implements Flow.Subscription
private final RecordQueue<T> queue;
private final long retries;

private final ReentrantLock queueLock = new ReentrantLock();

public KafkaRecordStreamSubscription(
ReactiveKafkaConsumer<K, V> client,
RuntimeKafkaSourceConfiguration config,
Expand Down Expand Up @@ -255,8 +258,9 @@ boolean isCancelled() {
* @param mapFunction
*/
void rewriteQueue(UnaryOperator<T> mapFunction) {
ArrayDeque<T> replacementQueue = new ArrayDeque<>();
synchronized (queue) {
queueLock.lock();
try {
ArrayDeque<T> replacementQueue = new ArrayDeque<>();
queue
.stream()
.map(mapFunction)
Expand All @@ -265,6 +269,8 @@ void rewriteQueue(UnaryOperator<T> mapFunction) {

queue.clear();
queue.addAll((Iterable<T>) replacementQueue);
} finally {
queueLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantLock;

/**
* Stores the records coming from Kafka.
Expand All @@ -12,6 +13,8 @@
*/
public class RecordQueue<T> extends ArrayDeque<T> {

private final ReentrantLock lock = new ReentrantLock();

public RecordQueue(int capacityHint) {
super(capacityHint);
}
Expand All @@ -22,10 +25,13 @@ public boolean addAll(Collection<? extends T> c) {
}

public void addAll(Iterable<T> iterable) {
synchronized (this) {
lock.lock();
try {
for (T record : iterable) {
super.offer(record);
}
} finally {
lock.unlock();
}
}

Expand All @@ -36,16 +42,22 @@ public boolean add(T item) {

@Override
public boolean offer(T item) {
synchronized (this) {
lock.lock();
try {
return super.offer(item);
} finally {
lock.unlock();
}
}

@Override
public T poll() {
T record;
synchronized (this) {
lock.lock();
try {
record = super.poll();
} finally {
lock.unlock();
}
return record;
}
Expand All @@ -62,15 +74,21 @@ public boolean remove(Object o) {

@Override
public int size() {
synchronized (this) {
lock.lock();
try {
return super.size();
} finally {
lock.unlock();
}
}

@Override
public void clear() {
synchronized (this) {
lock.lock();
try {
super.clear();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -35,70 +36,88 @@ public class KafkaTransactionsImpl<T> extends MutinyEmitterImpl<T> implements Ka

private volatile Transaction<?> currentTransaction;

private final ReentrantLock lock = new ReentrantLock();

public KafkaTransactionsImpl(EmitterConfiguration config, long defaultBufferSize, KafkaClientService clientService) {
super(config, defaultBufferSize);
this.clientService = clientService;
this.producer = clientService.getProducer(config.name());
}

@Override
public synchronized boolean isTransactionInProgress() {
return currentTransaction != null;
public boolean isTransactionInProgress() {
lock.lock();
try {
return currentTransaction != null;
} finally {
lock.unlock();
}
}

@Override
@CheckReturnValue
public synchronized <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
if (currentTransaction == null) {
return new Transaction<R>().execute(work);
public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> work) {
lock.lock();
try {
if (currentTransaction == null) {
return new Transaction<R>().execute(work);
}
throw KafkaExceptions.ex.transactionInProgress(name);
} finally {
lock.unlock();
}
throw KafkaExceptions.ex.transactionInProgress(name);
}

@SuppressWarnings("rawtypes")
@Override
@CheckReturnValue
public synchronized <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
String channel;
Map<TopicPartition, OffsetAndMetadata> offsets;

Optional<IncomingKafkaRecordBatchMetadata> batchMetadata = message.getMetadata(IncomingKafkaRecordBatchMetadata.class);
Optional<IncomingKafkaRecordMetadata> recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
if (batchMetadata.isPresent()) {
IncomingKafkaRecordBatchMetadata<?, ?> metadata = batchMetadata.get();
channel = metadata.getChannel();
offsets = metadata.getOffsets().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1)));
} else if (recordMetadata.isPresent()) {
IncomingKafkaRecordMetadata<?, ?> metadata = recordMetadata.get();
channel = metadata.getChannel();
offsets = new HashMap<>();
offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()),
new OffsetAndMetadata(metadata.getOffset() + 1));
} else {
throw KafkaExceptions.ex.noKafkaMetadataFound(message);
}
public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> work) {
lock.lock();
try {
String channel;
Map<TopicPartition, OffsetAndMetadata> offsets;

Optional<IncomingKafkaRecordBatchMetadata> batchMetadata = message
.getMetadata(IncomingKafkaRecordBatchMetadata.class);
Optional<IncomingKafkaRecordMetadata> recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
if (batchMetadata.isPresent()) {
IncomingKafkaRecordBatchMetadata<?, ?> metadata = batchMetadata.get();
channel = metadata.getChannel();
offsets = metadata.getOffsets().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1)));
} else if (recordMetadata.isPresent()) {
IncomingKafkaRecordMetadata<?, ?> metadata = recordMetadata.get();
channel = metadata.getChannel();
offsets = new HashMap<>();
offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()),
new OffsetAndMetadata(metadata.getOffset() + 1));
} else {
throw KafkaExceptions.ex.noKafkaMetadataFound(message);
}

List<KafkaConsumer<Object, Object>> consumers = clientService.getConsumers(channel);
if (consumers.isEmpty()) {
throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
} else if (consumers.size() > 1) {
throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel);
}
KafkaConsumer<Object, Object> consumer = consumers.get(0);
if (currentTransaction == null) {
return new Transaction<R>(
/* before commit */
consumer.consumerGroupMetadata()
.chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)),
r -> Uni.createFrom().item(r),
VOID_UNI,
/* after abort */
t -> consumer.resetToLastCommittedPositions()
.chain(() -> Uni.createFrom().failure(t)))
.execute(work);
List<KafkaConsumer<Object, Object>> consumers = clientService.getConsumers(channel);
if (consumers.isEmpty()) {
throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
} else if (consumers.size() > 1) {
throw KafkaExceptions.ex.exactlyOnceProcessingNotSupported(channel);
}
KafkaConsumer<Object, Object> consumer = consumers.get(0);
if (currentTransaction == null) {
return new Transaction<R>(
/* before commit */
consumer.consumerGroupMetadata()
.chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)),
r -> Uni.createFrom().item(r),
VOID_UNI,
/* after abort */
t -> consumer.resetToLastCommittedPositions()
.chain(() -> Uni.createFrom().failure(t)))
.execute(work);
}
throw KafkaExceptions.ex.transactionInProgress(name);
} finally {
lock.unlock();
}
throw KafkaExceptions.ex.transactionInProgress(name);
}

private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.eclipse.microprofile.reactive.messaging.Message;
Expand All @@ -28,6 +29,8 @@ public abstract class AbstractEmitter<T> implements MessagePublisherProvider<T>
protected final AtomicReference<Throwable> synchronousFailure = new AtomicReference<>();
private final OnOverflow.Strategy overflow;

private final ReentrantLock lock = new ReentrantLock();

@SuppressWarnings("unchecked")
public AbstractEmitter(EmitterConfiguration config, long defaultBufferSize) {
this.name = config.name();
Expand Down Expand Up @@ -55,24 +58,34 @@ public AbstractEmitter(EmitterConfiguration config, long defaultBufferSize) {
}
}

public synchronized void complete() {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.complete();
public void complete() {
lock.lock();
try {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.complete();
}
} finally {
lock.unlock();
}
}

public synchronized void error(Exception e) {
public void error(Exception e) {
if (e == null) {
throw ex.illegalArgumentForException("null");
}
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.fail(e);
lock.lock();
try {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter != null) {
emitter.fail(e);
}
} finally {
lock.unlock();
}
}

public synchronized boolean isCancelled() {
public boolean isCancelled() {
MultiEmitter<? super Message<? extends T>> emitter = internal.get();
return emitter == null || emitter.isCancelled();
}
Expand Down Expand Up @@ -139,29 +152,33 @@ public Publisher<Message<? extends T>> getPublisher() {
return publisher;
}

protected synchronized void emit(Message<? extends T> message) {
protected void emit(Message<? extends T> message) {
if (message == null) {
throw ex.illegalArgumentForNullValue();
}

MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter == null) {
if (overflow == OnOverflow.Strategy.DROP) {
// There are no subscribers, but because we use the DROP strategy, just ignore the event.
// However, nack the message, so the sender can be aware of the rejection.
message.nack(NO_SUBSCRIBER_EXCEPTION);
lock.lock();
try {
MultiEmitter<? super Message<? extends T>> emitter = verify();
if (emitter == null) {
if (overflow == OnOverflow.Strategy.DROP) {
// There are no subscribers, but because we use the DROP strategy, just ignore the event.
// However, nack the message, so the sender can be aware of the rejection.
message.nack(NO_SUBSCRIBER_EXCEPTION);
}
return;
}
return;
}
if (synchronousFailure.get() != null) {
throw ex.incomingNotFoundForEmitter(synchronousFailure.get());
}
if (emitter.isCancelled()) {
throw ex.illegalStateForDownstreamCancel();
}
emitter.emit(message);
if (synchronousFailure.get() != null) {
throw ex.illegalStateForEmitterWhileEmitting(synchronousFailure.get());
if (synchronousFailure.get() != null) {
throw ex.incomingNotFoundForEmitter(synchronousFailure.get());
}
if (emitter.isCancelled()) {
throw ex.illegalStateForDownstreamCancel();
}
emitter.emit(message);
if (synchronousFailure.get() != null) {
throw ex.illegalStateForEmitterWhileEmitting(synchronousFailure.get());
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public EmitterImpl(EmitterConfiguration config, long defaultBufferSize) {
}

@Override
public synchronized CompletionStage<Void> send(T payload) {
public CompletionStage<Void> send(T payload) {
if (payload == null) {
throw ex.illegalArgumentForNullValue();
}
Expand All @@ -40,7 +40,7 @@ public synchronized CompletionStage<Void> send(T payload) {
}

@Override
public synchronized <M extends Message<? extends T>> void send(M msg) {
public <M extends Message<? extends T>> void send(M msg) {
if (msg == null) {
throw ex.illegalArgumentForNullValue();
}
Expand Down
Loading

0 comments on commit 4364cea

Please sign in to comment.