Skip to content

Commit

Permalink
Merge pull request #507 from splitio/SDKS-8456
Browse files Browse the repository at this point in the history
Update PushManager and SyncManager
  • Loading branch information
nmayorsplit authored Jun 10, 2024
2 parents 426ea5a + 950303b commit 23074ab
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 62 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
4.12.1 (Jun 10, 2024)
- Fixed deadlock for virtual thread in Push Manager and SSE Client.

4.12.0 (May 15, 2024)
- Added support for targeting rules based on semantic versions (https://semver.org/).
- Added the logic to handle correctly when the SDK receives an unsupported Matcher type.
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
Expand Down
67 changes: 42 additions & 25 deletions client/src/main/java/io/split/engine/common/PushManagerImp.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.split.client.utils.SplitExecutorFactory.buildSingleThreadScheduledExecutor;
Expand All @@ -42,6 +44,7 @@ public class PushManagerImp implements PushManager {
private final FeatureFlagsWorker _featureFlagsWorker;
private final Worker<SegmentQueueDto> _segmentWorker;
private final PushStatusTracker _pushStatusTracker;
private static final Lock lock = new ReentrantLock();

private Future<?> _nextTokenRefreshTask;
private final ScheduledExecutorService _scheduledExecutorService;
Expand Down Expand Up @@ -92,37 +95,42 @@ public static PushManagerImp build(Synchronizer synchronizer,
}

@Override
public synchronized void start() {
AuthenticationResponse response = _authApiClient.Authenticate();
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
_expirationTime.set(response.getExpiration());
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
response.getExpiration(), System.currentTimeMillis()));
return;
}

stop();
if (response.isRetry()) {
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
} else {
_pushStatusTracker.forcePushDisable();
public void start() {
try {
lock.lock();
AuthenticationResponse response = _authApiClient.Authenticate();
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())) {
_expirationTime.set(response.getExpiration());
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
response.getExpiration(), System.currentTimeMillis()));
return;
}

cleanUpResources();
if (response.isRetry()) {
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
} else {
_pushStatusTracker.forcePushDisable();
}
} finally {
lock.unlock();
}
}

@Override
public synchronized void stop() {
_log.debug("Stopping PushManagerImp");
_eventSourceClient.stop();
stopWorkers();
if (_nextTokenRefreshTask != null) {
_log.debug("Cancel nextTokenRefreshTask");
_nextTokenRefreshTask.cancel(false);
public void stop() {
try {
lock.lock();
_log.debug("Stopping PushManagerImp");
cleanUpResources();
} finally {
lock.unlock();
}
}

@Override
public synchronized void scheduleConnectionReset() {
public void scheduleConnectionReset() {
_log.debug(String.format("scheduleNextTokenRefresh in %s SECONDS", _expirationTime));
_nextTokenRefreshTask = _scheduledExecutorService.schedule(() -> {
_log.debug("Starting scheduleNextTokenRefresh ...");
Expand All @@ -142,14 +150,23 @@ private boolean startSse(String token, String channels) {
}

@Override
public synchronized void startWorkers() {
public void startWorkers() {
_featureFlagsWorker.start();
_segmentWorker.start();
}

@Override
public synchronized void stopWorkers() {
public void stopWorkers() {
_featureFlagsWorker.stop();
_segmentWorker.stop();
}

private void cleanUpResources() {
_eventSourceClient.stop();
stopWorkers();
if (_nextTokenRefreshTask != null) {
_log.debug("Cancel nextTokenRefreshTask");
_nextTokenRefreshTask.cancel(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ private void startPollingMode() {
long howLong = _backoff.interval();
_log.info(String.format("Retryable error in streaming subsystem. Switching to polling and retrying in %d seconds", howLong));
_synchronizer.startPeriodicFetching();
_pushManager.stopWorkers();
_pushManager.stop();
Thread.sleep(howLong * 1000);
_incomingPushStatus.clear();
Expand Down
69 changes: 40 additions & 29 deletions client/src/main/java/io/split/engine/sse/client/SSEClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.split.telemetry.domain.enums.StreamEventsEnum;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.slf4j.Logger;
Expand All @@ -25,6 +24,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -49,6 +50,7 @@ private enum ConnectionState {
private final static String SOCKET_CLOSED_MESSAGE = "Socket closed";
private final static String KEEP_ALIVE_PAYLOAD = ":keepalive\n";
private final static long CONNECT_TIMEOUT = 30000;
private static final Lock lock = new ReentrantLock();
private static final Logger _log = LoggerFactory.getLogger(SSEClient.class);
private final ExecutorService _connectionExecutor;
private final CloseableHttpClient _client;
Expand All @@ -59,7 +61,6 @@ private enum ConnectionState {
private final AtomicReference<HttpGet> _ongoingRequest = new AtomicReference<>();
private AtomicBoolean _forcedStop;
private final RequestDecorator _requestDecorator;

private final TelemetryRuntimeProducer _telemetryRuntimeProducer;

public SSEClient(Function<RawEvent, Void> eventCallback,
Expand All @@ -77,47 +78,57 @@ public SSEClient(Function<RawEvent, Void> eventCallback,
_requestDecorator = requestDecorator;
}

public synchronized boolean open(URI uri) {
if (isOpen()) {
_log.info("SSEClient already open.");
return false;
}

_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);

CountDownLatch signal = new CountDownLatch(1);
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
public boolean open(URI uri) {
try {
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
lock.lock();
if (isOpen()) {
_log.info("SSEClient already open.");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(e.getMessage() == null){
_log.info("The thread was interrupted while opening SSEClient");

_statusCallback.apply(StatusMessage.INITIALIZATION_IN_PROGRESS);

CountDownLatch signal = new CountDownLatch(1);
_connectionExecutor.submit(() -> connectAndLoop(uri, signal));
try {
if (!signal.await(CONNECT_TIMEOUT, TimeUnit.SECONDS)) {
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(e.getMessage() == null){
_log.info("The thread was interrupted while opening SSEClient");
return false;
}
_log.info(e.getMessage());
return false;
}
_log.info(e.getMessage());
return false;
return isOpen();
} finally {
lock.unlock();
}
return isOpen();
}

public boolean isOpen() {
return (ConnectionState.OPEN.equals(_state.get()));
}

public synchronized void close() {
_forcedStop.set(true);
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
if (_ongoingResponse.get() != null) {
try {
_ongoingRequest.get().abort();
_ongoingResponse.get().close();
} catch (IOException e) {
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
public void close() {
try {
lock.lock();
_forcedStop.set(true);
if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) {
if (_ongoingResponse.get() != null) {
try {
_ongoingRequest.get().abort();
_ongoingResponse.get().close();
} catch (IOException e) {
_log.debug(String.format("SSEClient close forced: %s", e.getMessage()));
}
}
}
} finally {
lock.unlock();
}
}

Expand Down
64 changes: 62 additions & 2 deletions client/src/test/java/io/split/engine/common/PushManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ public class PushManagerTest {
private PushManager _pushManager;
private PushStatusTracker _pushStatusTracker;
private TelemetryStorage _telemetryStorage;
private FeatureFlagsWorker _featureFlagsWorker;
private SegmentsWorkerImp _segmentsWorkerImp;

@Before
public void setUp() {
_featureFlagsWorker = Mockito.mock(FeatureFlagsWorker.class);
_segmentsWorkerImp = Mockito.mock(SegmentsWorkerImp.class);
_authApiClient = Mockito.mock(AuthApiClient.class);
_eventSourceClient = Mockito.mock(EventSourceClient.class);
_backoff = Mockito.mock(Backoff.class);
_pushStatusTracker = Mockito.mock(PushStatusTrackerImp.class);
_telemetryStorage = new InMemoryTelemetryStorage();
_pushManager = new PushManagerImp(_authApiClient,
_eventSourceClient,
Mockito.mock(FeatureFlagsWorker.class),
Mockito.mock(SegmentsWorkerImp.class),
_featureFlagsWorker,
_segmentsWorkerImp,
_pushStatusTracker,
_telemetryStorage,
null);
Expand Down Expand Up @@ -107,4 +111,60 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE
Thread.sleep(1500);
Mockito.verify(_pushStatusTracker, Mockito.times(1)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
}


@Test
public void startAndStop() throws InterruptedException {
AuthenticationResponse response = new AuthenticationResponse(true, "token-test", "channels-test", 1, false);

Mockito.when(_authApiClient.Authenticate())
.thenReturn(response);

Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
.thenReturn(true);

_pushManager.start();

Mockito.verify(_authApiClient, Mockito.times(1)).Authenticate();
Mockito.verify(_eventSourceClient, Mockito.times(1)).start(response.getChannels(), response.getToken());

Thread.sleep(1500);

Mockito.verify(_pushStatusTracker, Mockito.times(0)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable();
Assert.assertEquals(1, _telemetryStorage.popStreamingEvents().size());

_pushManager.stop();

Mockito.verify(_eventSourceClient, Mockito.times(1)).stop();
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).stop();
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).stop();
}

@Test
public void validateStartWorkers() {
_pushManager.startWorkers();
Mockito.verify(_featureFlagsWorker, Mockito.times(1)).start();
Mockito.verify(_segmentsWorkerImp, Mockito.times(1)).start();
}

@Test
public void validateScheduleConnectionReset() throws InterruptedException {
AuthenticationResponse response = new AuthenticationResponse(false, "token-test", "channels-test", 3, false);

Mockito.when(_authApiClient.Authenticate())
.thenReturn(response);

Mockito.when(_eventSourceClient.start(response.getChannels(), response.getToken()))
.thenReturn(true);

_pushManager.start();

_pushManager.scheduleConnectionReset();
Thread.sleep(1000);

Mockito.verify(_eventSourceClient, Mockito.times(3)).stop();
Mockito.verify(_featureFlagsWorker, Mockito.times(3)).stop();
Mockito.verify(_segmentsWorkerImp, Mockito.times(3)).stop();
}
}
2 changes: 1 addition & 1 deletion pluggable-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>

<version>2.1.0</version>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.12.0</version>
<version>4.12.1</version>
<dependencyManagement>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion redis-wrapper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>java-client-parent</artifactId>
<groupId>io.split.client</groupId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>
<artifactId>redis-wrapper</artifactId>
<version>3.1.0</version>
Expand Down
2 changes: 1 addition & 1 deletion testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.12.0</version>
<version>4.12.1</version>
</parent>
<artifactId>java-client-testing</artifactId>
<packaging>jar</packaging>
Expand Down

0 comments on commit 23074ab

Please sign in to comment.