Skip to content

Commit

Permalink
Merge pull request #523 from splitio/impression-toggle
Browse files Browse the repository at this point in the history
Impression toggle
  • Loading branch information
chillaq authored Dec 16, 2024
2 parents 37d53ad + 18d050c commit 747ca31
Show file tree
Hide file tree
Showing 32 changed files with 998 additions and 276 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
4.14.0 (Dec X, 2024)
- Added support for Impression Toggle in feature flags

4.13.1 (Dec 5, 2024)
- Updated `org.apache.httpcomponents.client5` dependency to 5.4.1 to fix vulnerabilities.
- Updated `redis.clients` dependency to 4.4.8 to fix vulnerabilities.
Expand Down
4 changes: 2 additions & 2 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.13.1</version>
<version>4.14.0-rc1</version>
</parent>
<version>4.13.1</version>
<version>4.14.0-rc1</version>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
<name>Java Client</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void updateCache(Map<SplitAndKey, LocalhostSplit> map) {
String treatment = conditions.size() > 0 ? Treatments.CONTROL : localhostSplit.treatment;
configurations.put(localhostSplit.treatment, localhostSplit.config);

split = new ParsedSplit(splitName, 0, false, treatment,conditions, LOCALHOST, 0, 100, 0, 0, configurations, new HashSet<>());
split = new ParsedSplit(splitName, 0, false, treatment,conditions, LOCALHOST, 0, 100, 0, 0, configurations, new HashSet<>(), true);
parsedSplits.removeIf(parsedSplit -> parsedSplit.feature().equals(splitName));
parsedSplits.add(split);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public SplitChange fetch(long since, FetchOptions options) {
String.format("Could not retrieve splitChanges since %s; http return code %s", since, response.statusCode())
);
}

return Json.fromJson(response.body(), SplitChange.class);
} catch (Exception e) {
throw new IllegalStateException(String.format("Problem fetching splitChanges since %s: %s", since, e), e);
Expand Down
26 changes: 17 additions & 9 deletions client/src/main/java/io/split/client/SplitClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.api.Key;
import io.split.client.api.SplitResult;
import io.split.client.dtos.DecoratedImpression;
import io.split.client.dtos.Event;
import io.split.client.events.EventsStorageProducer;
import io.split.client.impressions.Impression;
Expand Down Expand Up @@ -356,7 +357,8 @@ private SplitResult getTreatmentWithConfigInternal(String matchingKey, String bu
String.format("sdk.%s", methodEnum.getMethod()),
_config.labelsEnabled() ? result.label : null,
result.changeNumber,
attributes
attributes,
result.track
);
_telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis() - initTime);
return new SplitResult(result.treatment, result.configurations);
Expand Down Expand Up @@ -435,7 +437,7 @@ private Map<String, SplitResult> getTreatmentsBySetsWithConfigInternal(String ma
private Map<String, SplitResult> processEvaluatorResult(Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluatorResult,
MethodEnum methodEnum, String matchingKey, String bucketingKey, Map<String,
Object> attributes, long initTime){
List<Impression> impressions = new ArrayList<>();
List<DecoratedImpression> decoratedImpressions = new ArrayList<>();
Map<String, SplitResult> result = new HashMap<>();
evaluatorResult.keySet().forEach(t -> {
if (evaluatorResult.get(t).treatment.equals(Treatments.CONTROL) && evaluatorResult.get(t).label.
Expand All @@ -445,13 +447,16 @@ private Map<String, SplitResult> processEvaluatorResult(Map<String, EvaluatorImp
result.put(t, SPLIT_RESULT_CONTROL);
} else {
result.put(t, new SplitResult(evaluatorResult.get(t).treatment, evaluatorResult.get(t).configurations));
impressions.add(new Impression(matchingKey, bucketingKey, t, evaluatorResult.get(t).treatment, System.currentTimeMillis(),
evaluatorResult.get(t).label, evaluatorResult.get(t).changeNumber, attributes));
decoratedImpressions.add(
new DecoratedImpression(
new Impression(matchingKey, bucketingKey, t, evaluatorResult.get(t).treatment, System.currentTimeMillis(),
evaluatorResult.get(t).label, evaluatorResult.get(t).changeNumber, attributes),
evaluatorResult.get(t).track));
}
});
_telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis() - initTime);
if (impressions.size() > 0) {
_impressionManager.track(impressions);
if (!decoratedImpressions.isEmpty()) {
_impressionManager.track(decoratedImpressions);
}
return result;
}
Expand Down Expand Up @@ -501,10 +506,13 @@ private Set<String> filterSetsAreInConfig(Set<String> sets, MethodEnum methodEnu
return setsToReturn;
}
private void recordStats(String matchingKey, String bucketingKey, String featureFlagName, long start, String result,
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
String operation, String label, Long changeNumber, Map<String, Object> attributes, boolean track) {
try {
_impressionManager.track(Stream.of(new Impression(matchingKey, bucketingKey, featureFlagName, result, System.currentTimeMillis(),
label, changeNumber, attributes)).collect(Collectors.toList()));
_impressionManager.track(Stream.of(
new DecoratedImpression(
new Impression(matchingKey, bucketingKey, featureFlagName, result, System.currentTimeMillis(),
label, changeNumber, attributes),
track)).collect(Collectors.toList()));
} catch (Throwable t) {
_log.error("Exception", t);
}
Expand Down
25 changes: 11 additions & 14 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -630,13 +630,14 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config,
.collect(Collectors.toCollection(() -> impressionListeners));
}
ProcessImpressionStrategy processImpressionStrategy = null;
ImpressionCounter counter = null;
ImpressionCounter counter = new ImpressionCounter();
ImpressionListener listener = !impressionListeners.isEmpty()
? new ImpressionListener.FederatedImpressionListener(impressionListeners)
: null;
ProcessImpressionNone processImpressionNone = new ProcessImpressionNone(listener != null, _uniqueKeysTracker, counter);

switch (config.impressionsMode()) {
case OPTIMIZED:
counter = new ImpressionCounter();
ImpressionObserver impressionObserver = new ImpressionObserver(config.getLastSeenCacheSize());
processImpressionStrategy = new ProcessImpressionOptimized(listener != null, impressionObserver,
counter, _telemetryStorageProducer);
Expand All @@ -646,13 +647,12 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config,
processImpressionStrategy = new ProcessImpressionDebug(listener != null, impressionObserver);
break;
case NONE:
counter = new ImpressionCounter();
processImpressionStrategy = new ProcessImpressionNone(listener != null, _uniqueKeysTracker, counter);
processImpressionStrategy = processImpressionNone;
break;
}
return ImpressionsManagerImpl.instance(config, _telemetryStorageProducer, impressionsStorageConsumer,
impressionsStorageProducer,
_impressionsSender, processImpressionStrategy, counter, listener);
_impressionsSender, processImpressionNone, processImpressionStrategy, counter, listener);
}

private SDKMetadata createSdkMetadata(boolean ipAddressEnabled, String splitSdkVersion) {
Expand Down Expand Up @@ -690,15 +690,12 @@ private void manageSdkReady(SplitClientConfig config) {
}

private UniqueKeysTracker createUniqueKeysTracker(SplitClientConfig config) {
if (config.impressionsMode().equals(ImpressionsManager.Mode.NONE)) {
int uniqueKeysRefreshRate = config.operationMode().equals(OperationMode.STANDALONE)
? config.uniqueKeysRefreshRateInMemory()
: config.uniqueKeysRefreshRateRedis();
return new UniqueKeysTrackerImp(_telemetrySynchronizer, uniqueKeysRefreshRate,
config.filterUniqueKeysRefreshRate(),
config.getThreadFactory());
}
return null;
int uniqueKeysRefreshRate = config.operationMode().equals(OperationMode.STANDALONE)
? config.uniqueKeysRefreshRateInMemory()
: config.uniqueKeysRefreshRateRedis();
return new UniqueKeysTrackerImp(_telemetrySynchronizer, uniqueKeysRefreshRate,
config.filterUniqueKeysRefreshRate(),
config.getThreadFactory());
}

private SplitChangeFetcher createSplitChangeFetcher(SplitClientConfig splitClientConfig) {
Expand Down
2 changes: 2 additions & 0 deletions client/src/main/java/io/split/client/api/SplitView.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class SplitView {
public Map<String, String> configs;
public List<String> sets;
public String defaultTreatment;
public boolean trackImpression;

public static SplitView fromParsedSplit(ParsedSplit parsedSplit) {
SplitView splitView = new SplitView();
Expand All @@ -46,6 +47,7 @@ public static SplitView fromParsedSplit(ParsedSplit parsedSplit) {

splitView.treatments = new ArrayList<String>(treatments);
splitView.configs = parsedSplit.configurations() == null? Collections.<String, String>emptyMap() : parsedSplit.configurations() ;
splitView.trackImpression = parsedSplit.trackImpression();

return splitView;
}
Expand Down
18 changes: 18 additions & 0 deletions client/src/main/java/io/split/client/dtos/DecoratedImpression.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.split.client.dtos;

import io.split.client.impressions.Impression;

public class DecoratedImpression {
private Impression impression;
private boolean track;

public DecoratedImpression(Impression impression, boolean track) {
this.impression = impression;
this.track = track;
}

public Impression impression() { return this.impression;}

public boolean track() { return this.track;}
}

1 change: 1 addition & 0 deletions client/src/main/java/io/split/client/dtos/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class Split {
public int algo;
public Map<String, String> configurations;
public HashSet<String> sets;
public Boolean trackImpression = null;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ public void postImpressionsBulk(List<TestImpressions> impressions) {
@Override
public void postCounters(HashMap<ImpressionCounter.Key, Integer> raw) {
long initTime = System.currentTimeMillis();
if (_mode.equals(ImpressionsManager.Mode.DEBUG)) {
_logger.warn("Attempted to submit counters in impressions debugging mode. Ignoring");
return;
}

try {

Map<String, List<String>> additionalHeaders = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.client.impressions;

import io.split.client.dtos.DecoratedImpression;

import java.util.List;

public interface ImpressionsManager {
Expand All @@ -10,14 +12,14 @@ public enum Mode {
NONE
}

void track(List<Impression> impressions);
void track(List<DecoratedImpression> decoratedImpressions);
void start();
void close();

final class NoOpImpressionsManager implements ImpressionsManager {

@Override
public void track(List<Impression> impressions) { /* do nothing */ }
public void track(List<DecoratedImpression> decoratedImpressions) { /* do nothing */ }

@Override
public void start(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.SplitClientConfig;
import io.split.client.dtos.DecoratedImpression;
import io.split.client.dtos.KeyImpression;
import io.split.client.dtos.TestImpressions;
import io.split.client.impressions.strategy.ProcessImpressionNone;
import io.split.client.impressions.strategy.ProcessImpressionStrategy;
import io.split.client.utils.SplitExecutorFactory;
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
Expand All @@ -13,10 +15,13 @@

import java.io.Closeable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -40,37 +45,42 @@ public class ImpressionsManagerImpl implements ImpressionsManager, Closeable {
private TelemetryRuntimeProducer _telemetryRuntimeProducer;
private ImpressionCounter _counter;
private ProcessImpressionStrategy _processImpressionStrategy;
private ProcessImpressionNone _processImpressionNone;

private final int _impressionsRefreshRate;

public static ImpressionsManagerImpl instance(SplitClientConfig config,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ImpressionsSender impressionsSender,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter counter,
ImpressionListener listener) throws URISyntaxException {
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
impressionsStorageProducer, processImpressionStrategy, counter, listener);
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
}

public static ImpressionsManagerImpl instanceForTest(SplitClientConfig config,
ImpressionsSender impressionsSender,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter counter,
ImpressionListener listener) {
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
impressionsStorageProducer, processImpressionStrategy, counter, listener);
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
}

private ImpressionsManagerImpl(SplitClientConfig config,
ImpressionsSender impressionsSender,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter impressionCounter,
ImpressionListener impressionListener) {
Expand All @@ -81,6 +91,7 @@ private ImpressionsManagerImpl(SplitClientConfig config,
_impressionsStorageConsumer = checkNotNull(impressionsStorageConsumer);
_impressionsStorageProducer = checkNotNull(impressionsStorageProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_processImpressionNone = checkNotNull(processImpressionNone);
_processImpressionStrategy = checkNotNull(processImpressionStrategy);
_impressionsSender = impressionsSender;
_counter = impressionCounter;
Expand All @@ -101,6 +112,8 @@ public void start(){
break;
case DEBUG:
_scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS, _impressionsRefreshRate, TimeUnit.SECONDS);
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
TimeUnit.SECONDS);
break;
case NONE:
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
Expand All @@ -110,15 +123,28 @@ public void start(){
}

@Override
public void track(List<Impression> impressions) {
if (null == impressions) {
public void track(List<DecoratedImpression> decoratedImpressions) {
if (null == decoratedImpressions) {
return;
}

ImpressionsResult impressionsResult = _processImpressionStrategy.process(impressions);
List<Impression> impressionsForLogs = impressionsResult.getImpressionsToQueue();
List<Impression> impressionsToListener = impressionsResult.getImpressionsToListener();

List<Impression> impressionsForLogs = new ArrayList<>();
List<Impression> impressionsToListener = new ArrayList<>();

for (int i = 0; i < decoratedImpressions.size(); i++) {
ImpressionsResult impressionsResult;
if (decoratedImpressions.get(i).track()) {
impressionsResult = _processImpressionStrategy.process(Stream.of(
decoratedImpressions.get(i).impression()).collect(Collectors.toList()));
} else {
impressionsResult = _processImpressionNone.process(Stream.of(
decoratedImpressions.get(i).impression()).collect(Collectors.toList()));
}
if (!Objects.isNull(impressionsResult.getImpressionsToQueue())) {
impressionsForLogs.addAll(impressionsResult.getImpressionsToQueue());
}
if (!Objects.isNull(impressionsResult.getImpressionsToListener()))
impressionsToListener.addAll(impressionsResult.getImpressionsToListener());
}
int totalImpressions = impressionsForLogs.size();
long queued = _impressionsStorageProducer.put(impressionsForLogs.stream().map(KeyImpression::fromImpression).collect(Collectors.toList()));
if (queued < totalImpressions) {
Expand Down
Loading

0 comments on commit 747ca31

Please sign in to comment.