diff --git a/CHANGES.txt b/CHANGES.txt index 0d1f612ea..072fab1f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +4.12.1 (Jun 10, 2024) +- Fixed deadlock for virtual thread in Push Manager and SSE Client. + 4.12.0 (May 15, 2024) - Added support for targeting rules based on semantic versions (https://semver.org/). - Added the logic to handle correctly when the SDK receives an unsupported Matcher type. diff --git a/client/pom.xml b/client/pom.xml index b48aa1100..b8d94bba7 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -5,7 +5,7 @@ io.split.client java-client-parent - 4.12.0 + 4.12.1 java-client jar diff --git a/client/src/main/java/io/split/engine/common/PushManagerImp.java b/client/src/main/java/io/split/engine/common/PushManagerImp.java index b6118efb6..3c15481fd 100644 --- a/client/src/main/java/io/split/engine/common/PushManagerImp.java +++ b/client/src/main/java/io/split/engine/common/PushManagerImp.java @@ -30,6 +30,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.checkNotNull; import static io.split.client.utils.SplitExecutorFactory.buildSingleThreadScheduledExecutor; @@ -42,6 +44,7 @@ public class PushManagerImp implements PushManager { private final FeatureFlagsWorker _featureFlagsWorker; private final Worker _segmentWorker; private final PushStatusTracker _pushStatusTracker; + private static final Lock lock = new ReentrantLock(); private Future _nextTokenRefreshTask; private final ScheduledExecutorService _scheduledExecutorService; @@ -92,37 +95,42 @@ public static PushManagerImp build(Synchronizer synchronizer, } @Override - public synchronized void start() { - AuthenticationResponse response = _authApiClient.Authenticate(); - _log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled())); - if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) { - _expirationTime.set(response.getExpiration()); - _telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(), - response.getExpiration(), System.currentTimeMillis())); - return; - } - - stop(); - if (response.isRetry()) { - _pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR); - } else { - _pushStatusTracker.forcePushDisable(); + public void start() { + try { + lock.lock(); + AuthenticationResponse response = _authApiClient.Authenticate(); + _log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled())); + if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) { + _expirationTime.set(response.getExpiration()); + _telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(), + response.getExpiration(), System.currentTimeMillis())); + return; + } + + cleanUpResources(); + if (response.isRetry()) { + _pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR); + } else { + _pushStatusTracker.forcePushDisable(); + } + } finally { + lock.unlock(); } } @Override - public synchronized void stop() { - _log.debug("Stopping PushManagerImp"); - _eventSourceClient.stop(); - stopWorkers(); - if (_nextTokenRefreshTask != null) { - _log.debug("Cancel nextTokenRefreshTask"); - _nextTokenRefreshTask.cancel(false); + public void stop() { + try { + lock.lock(); + _log.debug("Stopping PushManagerImp"); + cleanUpResources(); + } finally { + lock.unlock(); } } @Override - public synchronized void scheduleConnectionReset() { + public void scheduleConnectionReset() { _log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime)); _nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> { _log.debug("Starting scheduleNextTokenRefresh ..."); @@ -142,14 +150,23 @@ private boolean startSse(String token, String channels) { } @Override - public synchronized void startWorkers() { + public void startWorkers() { _featureFlagsWorker.start(); _segmentWorker.start(); } @Override - public synchronized void stopWorkers() { + public void stopWorkers() { _featureFlagsWorker.stop(); _segmentWorker.stop(); } + + private void cleanUpResources() { + _eventSourceClient.stop(); + stopWorkers(); + if (_nextTokenRefreshTask != null) { + _log.debug("Cancel nextTokenRefreshTask"); + _nextTokenRefreshTask.cancel(false); + } + } } \ No newline at end of file diff --git a/client/src/main/java/io/split/engine/common/SyncManagerImp.java b/client/src/main/java/io/split/engine/common/SyncManagerImp.java index 4f5ce3714..691b6def4 100644 --- a/client/src/main/java/io/split/engine/common/SyncManagerImp.java +++ b/client/src/main/java/io/split/engine/common/SyncManagerImp.java @@ -218,7 +218,6 @@ private void startPollingMode() { long howLong = _backoff.interval(); _log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong)); _synchronizer.startPeriodicFetching(); - _pushManager.stopWorkers(); _pushManager.stop(); Thread.sleep(howLong * 1000); _incomingPushStatus.clear(); diff --git a/client/src/main/java/io/split/engine/sse/client/SSEClient.java b/client/src/main/java/io/split/engine/sse/client/SSEClient.java index 9c2024d99..37cc6dac9 100644 --- a/client/src/main/java/io/split/engine/sse/client/SSEClient.java +++ b/client/src/main/java/io/split/engine/sse/client/SSEClient.java @@ -6,7 +6,6 @@ import io.split.telemetry.domain.enums.StreamEventsEnum; import io.split.telemetry.storage.TelemetryRuntimeProducer; import org.apache.hc.client5.http.classic.methods.HttpGet; -import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; import org.slf4j.Logger; @@ -25,6 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import static com.google.common.base.Preconditions.checkNotNull; @@ -49,6 +50,7 @@ private enum ConnectionState { private final static String SOCKET_CLOSED_MESSAGE = "Socket closed"; private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n"; private final static long CONNECT_TIMEOUT = 30000; + private static final Lock lock = new ReentrantLock(); private static final Logger _log = LoggerFactory.getLogger(SSEClient.class); private final ExecutorService _connectionExecutor; private final CloseableHttpClient _client; @@ -59,7 +61,6 @@ private enum ConnectionState { private final AtomicReference _ongoingRequest = new AtomicReference<>(); private AtomicBoolean _forcedStop; private final RequestDecorator _requestDecorator; - private final TelemetryRuntimeProducer _telemetryRuntimeProducer; public SSEClient(Function eventCallback, @@ -77,47 +78,57 @@ public SSEClient(Function eventCallback, _requestDecorator = requestDecorator; } - public synchronized boolean open(URI uri) { - if (isOpen()) { - _log.info("SSEClient already open."); - return false; - } - - _statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS); - - CountDownLatch signal = new CountDownLatch(1); - _connectionExecutor.submit(() -> connectAndLoop(uri, signal)); + public boolean open(URI uri) { try { - if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) { + lock.lock(); + if (isOpen()) { + _log.info("SSEClient already open."); return false; } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if(e.getMessage() == null){ - _log.info("The thread was interrupted while opening SSEClient"); + + _statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS); + + CountDownLatch signal = new CountDownLatch(1); + _connectionExecutor.submit(() -> connectAndLoop(uri, signal)); + try { + if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) { + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if(e.getMessage() == null){ + _log.info("The thread was interrupted while opening SSEClient"); + return false; + } + _log.info(e.getMessage()); return false; } - _log.info(e.getMessage()); - return false; + return isOpen(); + } finally { + lock.unlock(); } - return isOpen(); } public boolean isOpen() { return (ConnectionState.OPEN.equals(_state.get())); } - public synchronized void close() { - _forcedStop.set(true); - if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) { - if (_ongoingResponse.get() != null) { - try { - _ongoingRequest.get().abort(); - _ongoingResponse.get().close(); - } catch (IOException e) { - _log.debug(String.format("SSEClient close forced: %s", e.getMessage())); + public void close() { + try { + lock.lock(); + _forcedStop.set(true); + if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) { + if (_ongoingResponse.get() != null) { + try { + _ongoingRequest.get().abort(); + _ongoingResponse.get().close(); + } catch (IOException e) { + _log.debug(String.format("SSEClient close forced: %s", e.getMessage())); + } } } + } finally { + lock.unlock(); } } diff --git a/client/src/test/java/io/split/engine/common/PushManagerTest.java b/client/src/test/java/io/split/engine/common/PushManagerTest.java index 12664ad1d..33ce13416 100644 --- a/client/src/test/java/io/split/engine/common/PushManagerTest.java +++ b/client/src/test/java/io/split/engine/common/PushManagerTest.java @@ -22,9 +22,13 @@ public class PushManagerTest { private PushManager _pushManager; private PushStatusTracker _pushStatusTracker; private TelemetryStorage _telemetryStorage; + private FeatureFlagsWorker _featureFlagsWorker; + private SegmentsWorkerImp _segmentsWorkerImp; @Before public void setUp() { + _featureFlagsWorker = Mockito.mock(FeatureFlagsWorker.class); + _segmentsWorkerImp = Mockito.mock(SegmentsWorkerImp.class); _authApiClient = Mockito.mock(AuthApiClient.class); _eventSourceClient = Mockito.mock(EventSourceClient.class); _backoff = Mockito.mock(Backoff.class); @@ -32,8 +36,8 @@ public void setUp() { _telemetryStorage = new InMemoryTelemetryStorage(); _pushManager = new PushManagerImp(_authApiClient, _eventSourceClient, - Mockito.mock(FeatureFlagsWorker.class), - Mockito.mock(SegmentsWorkerImp.class), + _featureFlagsWorker, + _segmentsWorkerImp, _pushStatusTracker, _telemetryStorage, null); @@ -107,4 +111,60 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE Thread.sleep(1500); Mockito.verify(_pushStatusTracker, Mockito.times(1)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR); } + + + @Test + public void startAndStop() throws InterruptedException { + AuthenticationResponse response = new AuthenticationResponse(true, "token-test", "channels-test", 1, false); + + Mockito.when(_authApiClient.Authenticate()) + .thenReturn(response); + + Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken())) + .thenReturn(true); + + _pushManager.start(); + + Mockito.verify(_authApiClient, Mockito.times(1)).Authenticate(); + Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response.getChannels(), response.getToken()); + + Thread.sleep(1500); + + Mockito.verify(_pushStatusTracker, Mockito.times(0)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR); + Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable(); + Assert.assertEquals(1, _telemetryStorage.popStreamingEvents().size()); + + _pushManager.stop(); + + Mockito.verify(_eventSourceClient, Mockito.times(1)).stop(); + Mockito.verify(_featureFlagsWorker, Mockito.times(1)).stop(); + Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).stop(); + } + + @Test + public void validateStartWorkers() { + _pushManager.startWorkers(); + Mockito.verify(_featureFlagsWorker, Mockito.times(1)).start(); + Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).start(); + } + + @Test + public void validateScheduleConnectionReset() throws InterruptedException { + AuthenticationResponse response = new AuthenticationResponse(false, "token-test", "channels-test", 3, false); + + Mockito.when(_authApiClient.Authenticate()) + .thenReturn(response); + + Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken())) + .thenReturn(true); + + _pushManager.start(); + + _pushManager.scheduleConnectionReset(); + Thread.sleep(1000); + + Mockito.verify(_eventSourceClient, Mockito.times(3)).stop(); + Mockito.verify(_featureFlagsWorker, Mockito.times(3)).stop(); + Mockito.verify(_segmentsWorkerImp, Mockito.times(3)).stop(); + } } \ No newline at end of file diff --git a/pluggable-storage/pom.xml b/pluggable-storage/pom.xml index a6f21cd7c..f643564f9 100644 --- a/pluggable-storage/pom.xml +++ b/pluggable-storage/pom.xml @@ -6,7 +6,7 @@ java-client-parent io.split.client - 4.12.0 + 4.12.1 2.1.0 diff --git a/pom.xml b/pom.xml index fc4f01fa5..3dc7d33f9 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 io.split.client java-client-parent - 4.12.0 + 4.12.1 diff --git a/redis-wrapper/pom.xml b/redis-wrapper/pom.xml index 6f5231911..a8ce195f5 100644 --- a/redis-wrapper/pom.xml +++ b/redis-wrapper/pom.xml @@ -6,7 +6,7 @@ java-client-parent io.split.client - 4.12.0 + 4.12.1 redis-wrapper 3.1.0 diff --git a/testing/pom.xml b/testing/pom.xml index 0faeed8c3..2240c94db 100644 --- a/testing/pom.xml +++ b/testing/pom.xml @@ -5,7 +5,7 @@ io.split.client java-client-parent - 4.12.0 + 4.12.1 java-client-testing jar