Skip to content

Commit

Permalink
CAMEL-20199: Remove synchronized blocks from components T to Z (apach…
Browse files Browse the repository at this point in the history
…e#16848)

## Motivation

For better support of virtual threads, we need to avoid lengthy and frequent pinning by replacing synchronized blocks with ReentrantLocks

## Modifications:

* Replace mutex with locks
* Use locks instead of synchronized blocks
* Leverage ConcurrentMap methods to get rid of synchronized blocks
  • Loading branch information
essobedo authored Jan 17, 2025
1 parent 609db13 commit 8466fd6
Show file tree
Hide file tree
Showing 26 changed files with 795 additions and 514 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -63,7 +63,7 @@ public class GooglePubsubConsumer extends DefaultConsumer {
this.endpoint = endpoint;
this.processor = processor;
this.subscribers = Collections.synchronizedList(new LinkedList<>());
this.pendingSynchronousPullResponses = Collections.synchronizedSet(new HashSet<>());
this.pendingSynchronousPullResponses = ConcurrentHashMap.newKeySet();
String loggerId = endpoint.getLoggerId();

if (Strings.isNullOrEmpty(loggerId)) {
Expand Down Expand Up @@ -108,16 +108,14 @@ protected void doStop() throws Exception {
}

private void safeCancelSynchronousPullResponses() {
synchronized (pendingSynchronousPullResponses) {
for (ApiFuture<PullResponse> pullResponseApiFuture : pendingSynchronousPullResponses) {
try {
pullResponseApiFuture.cancel(true);
} catch (Exception e) {
localLog.warn("Exception while cancelling pending synchronous pull response", e);
}
for (ApiFuture<PullResponse> pullResponseApiFuture : pendingSynchronousPullResponses) {
try {
pullResponseApiFuture.cancel(true);
} catch (Exception e) {
localLog.warn("Exception while cancelling pending synchronous pull response", e);
}
pendingSynchronousPullResponses.clear();
}
pendingSynchronousPullResponses.clear();
}

private class SubscriberWrapper implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.camel.component.grpc.client;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.grpc.stub.StreamObserver;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
Expand All @@ -33,6 +36,7 @@ class GrpcStreamingExchangeForwarder implements GrpcExchangeForwarder {

private final Object grpcStub;

private final Lock lock = new ReentrantLock();
private volatile StreamObserver<Object> currentStream;

private volatile StreamObserver<Object> currentResponseObserver;
Expand Down Expand Up @@ -75,13 +79,16 @@ public void shutdown() {
private StreamObserver<Object> checkAndRecreateStreamObserver(StreamObserver<Object> responseObserver) {
StreamObserver<Object> curStream = this.currentStream;
if (curStream == null) {
synchronized (this) {
lock.lock();
try {
if (this.currentStream == null) {
this.currentResponseObserver = responseObserver;
this.currentStream = doCreateStream(responseObserver);
}

curStream = this.currentStream;
} finally {
lock.unlock();
}
}

Expand All @@ -93,9 +100,12 @@ private StreamObserver<Object> checkAndRecreateStreamObserver(StreamObserver<Obj
}

private void doCloseStream() {
synchronized (this) {
lock.lock();
try {
this.currentStream = null;
this.currentResponseObserver = null;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
Expand All @@ -41,6 +43,7 @@ public class AsyncInputStream implements ReadStream<Buffer> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncInputStream.class);
private static final int DEFAULT_BUFFER_SIZE = 4096;

private final Lock lock = new ReentrantLock();
private final ReadableByteChannel channel;
private final Vertx vertx;
private final Context context;
Expand Down Expand Up @@ -68,43 +71,68 @@ public AsyncInputStream(Vertx vertx, Context context, InputStream inputStream) {
}

@Override
public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) {
checkStreamClosed();
this.endHandler = endHandler;
return this;
public AsyncInputStream endHandler(Handler<Void> endHandler) {
lock.lock();
try {
checkStreamClosed();
this.endHandler = endHandler;
return this;
} finally {
lock.unlock();
}
}

@Override
public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
checkStreamClosed();
this.exceptionHandler = exceptionHandler;
return this;
public AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
lock.lock();
try {
checkStreamClosed();
this.exceptionHandler = exceptionHandler;
return this;
} finally {
lock.unlock();
}
}

@Override
public synchronized AsyncInputStream handler(Handler<Buffer> handler) {
checkStreamClosed();
this.dataHandler = handler;
if (this.dataHandler != null && !this.closed) {
this.doRead();
} else {
queue.clear();
public AsyncInputStream handler(Handler<Buffer> handler) {
lock.lock();
try {
checkStreamClosed();
this.dataHandler = handler;
if (this.dataHandler != null && !this.closed) {
this.doRead();
} else {
queue.clear();
}
return this;
} finally {
lock.unlock();
}
return this;
}

@Override
public synchronized AsyncInputStream pause() {
checkStreamClosed();
queue.pause();
return this;
public AsyncInputStream pause() {
lock.lock();
try {
checkStreamClosed();
queue.pause();
return this;
} finally {
lock.unlock();
}
}

@Override
public synchronized AsyncInputStream resume() {
checkStreamClosed();
queue.resume();
return this;
public AsyncInputStream resume() {
lock.lock();
try {
checkStreamClosed();
queue.resume();
return this;
} finally {
lock.unlock();
}
}

@Override
Expand Down Expand Up @@ -133,9 +161,14 @@ private void checkContext() {
}
}

private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
closed = true;
doClose(handler);
private void closeInternal(Handler<AsyncResult<Void>> handler) {
lock.lock();
try {
closed = true;
doClose(handler);
} finally {
lock.unlock();
}
}

private void doClose(Handler<AsyncResult<Void>> handler) {
Expand All @@ -156,22 +189,27 @@ private void doRead() {
doRead(ByteBuffer.allocate(DEFAULT_BUFFER_SIZE));
}

private synchronized void doRead(ByteBuffer buffer) {
if (!readInProgress) {
readInProgress = true;
Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE);
doRead(buff, 0, buffer, readPos, result -> {
if (result.succeeded()) {
readInProgress = false;
Buffer updatedBuffer = result.result();
readPos += updatedBuffer.length();
if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) {
doRead(buffer);
private void doRead(ByteBuffer buffer) {
lock.lock();
try {
if (!readInProgress) {
readInProgress = true;
Buffer buff = Buffer.buffer(DEFAULT_BUFFER_SIZE);
doRead(buff, 0, buffer, readPos, result -> {
if (result.succeeded()) {
readInProgress = false;
Buffer updatedBuffer = result.result();
readPos += updatedBuffer.length();
if (queue.write(updatedBuffer) && updatedBuffer.length() > 0) {
doRead(buffer);
}
} else {
handleException(result.cause());
}
} else {
handleException(result.cause());
}
});
});
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -210,20 +248,26 @@ private void doRead(Buffer writeBuff, int offset, ByteBuffer buffer, long positi

private void handleData(Buffer buffer) {
Handler<Buffer> handler;
synchronized (this) {
lock.lock();
try {
handler = this.dataHandler;
} finally {
lock.unlock();
}
if (handler != null) {
checkContext();
handler.handle(buffer);
}
}

private synchronized void handleEnd() {
private void handleEnd() {
Handler<Void> endHandler;
synchronized (this) {
lock.lock();
try {
dataHandler = null;
endHandler = this.endHandler;
} finally {
lock.unlock();
}
if (endHandler != null) {
checkContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.camel.component.sjms.reply;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
Expand Down Expand Up @@ -110,8 +112,10 @@ public void onException(JMSException exception) {
}

private final class TemporaryReplyQueueDestinationResolver implements DestinationCreationStrategy {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private TemporaryQueue queue;
private final AtomicBoolean refreshWanted = new AtomicBoolean();
private volatile boolean refreshWanted;

@Override
public Destination createDestination(Session session, String name, boolean topic) throws JMSException {
Expand All @@ -120,32 +124,38 @@ public Destination createDestination(Session session, String name, boolean topic

@Override
public Destination createTemporaryDestination(Session session, boolean topic) throws JMSException {
synchronized (refreshWanted) {
if (queue == null || refreshWanted.get()) {
refreshWanted.set(false);
lock.lock();
try {
if (queue == null || refreshWanted) {
refreshWanted = false;
queue = session.createTemporaryQueue();
setReplyTo(queue);
if (log.isDebugEnabled()) {
log.debug("Refreshed Temporary ReplyTo Queue. New queue: {}", queue.getQueueName());
}
refreshWanted.notifyAll();
condition.signalAll();
}
} finally {
lock.unlock();
}
return queue;
}

public void scheduleRefresh() {
refreshWanted.set(true);
refreshWanted = true;
}

public void destinationReady() throws InterruptedException {
if (refreshWanted.get()) {
synchronized (refreshWanted) {
if (refreshWanted) {
lock.lock();
try {
//check if requestWanted is still true
if (refreshWanted.get()) {
if (refreshWanted) {
log.debug("Waiting for new Temporary ReplyTo queue to be assigned before we can continue");
refreshWanted.wait();
condition.await();
}
} finally {
lock.unlock();
}
}
}
Expand Down
Loading

0 comments on commit 8466fd6

Please sign in to comment.