Skip to content

Commit

Permalink
Cache CD dynamic properties (#222)
Browse files Browse the repository at this point in the history
* Cache CD dynamic properties

* Store the DynamicProperties directly instead of wrapping in Optional

* Verify the same instance is always returned
  • Loading branch information
mauhiz authored Dec 18, 2023
1 parent 642a81d commit 2724000
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -72,6 +74,8 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose

private final Watcher<JsonNode> rootWatcher;

private final ConcurrentMap<String, DynamicProperty<?>> cachedProperties = new ConcurrentHashMap<>();

/**
* Creates a new {@link CentralDogmaPropertySupplier}.
* @param centralDogma a {@link CentralDogma} instance to use to access Central Dogma server.
Expand Down Expand Up @@ -118,32 +122,43 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
return Optional.empty();
}

DynamicProperty<T> prop = new DynamicProperty<>(definition);
Watcher<JsonNode> child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name()));
child.watch(node -> {
// note: cache DynamicProperties to avoid using too many child watchers if getProperty is called repeatedly.
// for most use cases though, this cache is only filled/read once.
final DynamicProperty<?> cachedProp = cachedProperties.computeIfAbsent(definition.name(), name -> {
DynamicProperty<T> prop = new DynamicProperty<>(definition);
Watcher<JsonNode> child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name()));
child.watch(node -> {
try {
setValue(prop, node);
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e);
}
});
try {
JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher
setValue(prop, node);
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e);
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
}

return prop;
});
try {
JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher
setValue(prop, node);
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
}

return Optional.of(prop);
if (cachedProp.definition().runtimeType() != definition.runtimeType()) {
throw new IllegalStateException("Several different properties have the same name: " + definition.name());
}
//noinspection unchecked
return Optional.of((Property<T>) cachedProp);
}

@Override
public void close() {
rootWatcher.close();
cachedProperties.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -119,6 +121,15 @@ public void testCDIntegration() throws InterruptedException {

latch.await();
assertEquals(20, prop.value().intValue());

assertEquals(20, IntStream
.range(0, 10000)
.mapToObj(i -> CONFIG_PARTITION_CONCURRENCY)
.map(supplier::getProperty)
.reduce((l, r) -> {
assertSame(l.get(), r.get());
return l;
}).get().get().value().intValue());
}

@Test
Expand Down

0 comments on commit 2724000

Please sign in to comment.