Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop-v2' into opentelemetry-c…
Browse files Browse the repository at this point in the history
…ontextmanager

# Conflicts:
#	context-propagation-core/src/main/java/nl/talsmasoftware/context/core/ContextManagers.java
  • Loading branch information
sjoerdtalsma committed Nov 8, 2024
2 parents e9a3389 + 55e9d7a commit 8f3ea2f
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*
* @param <T> Type of the context value.
* @author Sjoerd Talsma
* @since 1.1.0
* @since 2.0.0
*/
public interface Context<T> extends Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*
* @param <T> type of the context value
* @author Sjoerd Talsma
* @since 2.0.0
*/
public interface ContextManager<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* {@link Reactivation#close()}.
*
* @author Sjoerd Talsma
* @since 1.1.0
* @since 2.0.0
*/
public interface ContextSnapshot {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,30 @@
/**
* Minimal Service Provider Interface for services that wish to get informed of context switching metrics.
* <p>
* Currently the following timed operations are updated:
* Currently, the following timed operations are updated:
* <ul>
* <li>{@code ContextManagers.createContextSnapshot}</li>
* <li>{@code ContextSnapshot.reactivate}</li>
* <li>{@code ContextManager.initializeNewContext}(*)</li>
* <li>{@code ContextManager.getActiveContext}(*)</li>
* <li>{@linkplain ContextSnapshot#reactivate()}</li>
* <li>{@linkplain ContextManager#getActiveContextValue()} (*)</li>
* <li>{@linkplain ContextManager#initializeNewContext(Object)} (*)</li>
* </ul>
* <p>
* (*) <em>Timing is updated for each concrete {@code ContextManager} implementation class</em>
*
* @author Sjoerd Talsma
* @since 1.1.0
* @since 2.0.0
*/
public interface ContextTimer {

/**
* Provides a new update for the context timer.
*
* @param type The class being called
* @param method The method being called
* @param duration The duration of the method
* @param unit The unit of the duration
* @param type Class that was called
* @param method Method that was called
* @param duration Duration of the call
* @param unit Unit of the duration
* @param error Error that was thrown in the call (optional, normally {@code null})
*/
void update(Class<?> type, String method, long duration, TimeUnit unit);
void update(Class<?> type, String method, long duration, TimeUnit unit, Throwable error);

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,7 @@ public static ContextSnapshot createContextSnapshot() {
final Object[] values = new Object[managers.size()];

for (int i = 0; i < values.length; i++) {
final ContextManager manager = managers.get(i);
long managerStart = System.nanoTime();
try {
values[i] = getActiveContextValue(manager);
Timers.timed(System.nanoTime() - managerStart, manager.getClass(), "getActiveContextValue");
} catch (RuntimeException rte) {
LOGGER.log(Level.WARNING, "Error obtaining active context from " + manager + " (in thread " + Thread.currentThread().getName() + ").", rte);
Timers.timed(System.nanoTime() - managerStart, manager.getClass(), "getActiveContext.exception");
}
values[i] = getActiveContextValue(managers.get(i));
}

final ContextSnapshotImpl result = new ContextSnapshotImpl(managers, values);
Expand All @@ -82,7 +74,7 @@ public static ContextSnapshot createContextSnapshot() {
+ " Thead=" + Thread.currentThread()
+ ", ContextClassLoader=" + Thread.currentThread().getContextClassLoader());
}
Timers.timed(System.nanoTime() - start, ContextManagers.class, "createContextSnapshot");
Timers.timed(System.nanoTime() - start, ContextManagers.class, "createContextSnapshot", null);
return result;
}

Expand All @@ -105,24 +97,10 @@ public static ContextSnapshot createContextSnapshot() {
*/
public static void clearActiveContexts() {
final long start = System.nanoTime();
Long managerStart = null;
for (ContextManager<?> manager : ServiceCache.cached(ContextManager.class)) {
managerStart = System.nanoTime();
try {
clear(manager);
Timers.timed(System.nanoTime() - managerStart, manager.getClass(), "clear");
} catch (RuntimeException rte) {
LOGGER.log(Level.WARNING, "Error clearing active context from " + manager + ".", rte);
ServiceCache.clear();
Timers.timed(System.nanoTime() - managerStart, manager.getClass(), "clear.exception");
}
clear(manager);
}
if (managerStart == null && LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("No ContextManagers were cleared because none were found! "
+ " Thead=" + Thread.currentThread()
+ ", ContextClassLoader=" + Thread.currentThread().getContextClassLoader());
}
Timers.timed(System.nanoTime() - start, ContextManagers.class, "clearActiveContexts");
Timers.timed(System.nanoTime() - start, ContextManagers.class, "clearActiveContexts", null);
}

/**
Expand Down Expand Up @@ -151,16 +129,39 @@ public static synchronized void useClassLoader(ClassLoader classLoader) {
}

private static Object getActiveContextValue(ContextManager<?> manager) {
final Object activeContextValue = manager.getActiveContextValue();
LOGGER.finest(() -> activeContextValue == null
? "There is no active context value for " + manager + " (in thread " + Thread.currentThread().getName() + ")."
: "Active context value of " + manager + " in " + Thread.currentThread().getName() + ": " + activeContextValue);
return activeContextValue;
final long start = System.nanoTime();
RuntimeException error = null;
try {

final Object activeContextValue = manager.getActiveContextValue();
LOGGER.finest(() -> activeContextValue == null
? "There is no active context value for " + manager + " (in thread " + Thread.currentThread().getName() + ")."
: "Active context value of " + manager + " in " + Thread.currentThread().getName() + ": " + activeContextValue);
return activeContextValue;

} catch (RuntimeException e) {
LOGGER.log(Level.WARNING, e, () -> "Error obtaining active context from " + manager + " (in thread " + Thread.currentThread().getName() + ").");
error = e;
return null;
} finally {
Timers.timed(System.nanoTime() - start, manager.getClass(), "getActiveContextValue", error);
}
}

private static void clear(ContextManager<?> manager) {
manager.clear();
LOGGER.finest(() -> "Active context of " + manager + " was cleared.");
final long start = System.nanoTime();
RuntimeException error = null;
try {

manager.clear();
LOGGER.finest(() -> "Active context of " + manager + " was cleared.");

} catch (RuntimeException e) {
LOGGER.log(Level.WARNING, e, () -> "Error clearing active context from " + manager + "(in thread " + Thread.currentThread().getName() + ").");
error = e;
} finally {
Timers.timed(System.nanoTime() - start, manager.getClass(), "clear", error);
}
}

/**
Expand All @@ -169,7 +170,6 @@ private static void clear(ContextManager<?> manager) {
*/
@SuppressWarnings("rawtypes")
private static final class ContextSnapshotImpl implements ContextSnapshot {
// TODO extract this inner class?
private final List<ContextManager> managers;
private final Object[] values;

Expand All @@ -180,49 +180,74 @@ private ContextSnapshotImpl(List<ContextManager> managers, Object[] values) {

public Reactivation reactivate() {
final long start = System.nanoTime();
RuntimeException error = null;
final Context[] reactivatedContexts = new Context[managers.size()];

try {

for (int i = 0; i < values.length; i++) {
reactivatedContexts[i] = reactivate(managers.get(i), values[i]);
}
return new ReactivationImpl(reactivatedContexts);

ReactivationImpl reactivation = new ReactivationImpl(reactivatedContexts);
Timers.timed(System.nanoTime() - start, ContextSnapshot.class, "reactivate");
return reactivation;
} catch (RuntimeException reactivationException) {
// TODO think about simplifying by catching & handling in reactivate(manager, value) method
for (Context alreadyReactivated : reactivatedContexts) {
if (alreadyReactivated != null) try {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Snapshot reactivation failed! " +
"Closing already reactivated context: " + alreadyReactivated + ".");
}
alreadyReactivated.close();
} catch (RuntimeException rte) {
reactivationException.addSuppressed(rte);
}
}
tryClose(reactivatedContexts, reactivationException);
ServiceCache.clear();
throw reactivationException;
throw error = reactivationException;
} finally {
Timers.timed(System.nanoTime() - start, ContextSnapshot.class, "reactivate", error);
}
}

@SuppressWarnings("unchecked") // As we got the values from the managers themselves, they must also accept them!
private Context reactivate(ContextManager contextManager, Object snapshotValue) {
long start = System.nanoTime();
@Override
public String toString() {
return "ContextSnapshot{size=" + managers.size() + '}';
}

/**
* Reactivates a snapshot value for a single context manager.
*
* <p>
* This initializes a new context with the context manager
* (normally on another thread the snapshot value was captured from).
*
* @param contextManager The context manager to reactivate the snapshot value for.
* @param snapshotValue The snapshot value to be reactivated.
* @return The context to be included in the reactivation object.
*/
@SuppressWarnings("unchecked") // We got the value from the managers itself.
private static Context reactivate(ContextManager contextManager, Object snapshotValue) {
if (snapshotValue == null) return null;
Context reactivated = contextManager.initializeNewContext(snapshotValue);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Context reactivated from snapshot by " + contextManager + ": " + reactivated + ".");
long start = System.nanoTime();
RuntimeException error = null;
try {

Context reactivated = contextManager.initializeNewContext(snapshotValue);
LOGGER.finest(() -> "Context reactivated from snapshot by " + contextManager + ": " + reactivated + ".");
return reactivated;

} catch (RuntimeException e) {
throw error = e;
} finally {
Timers.timed(System.nanoTime() - start, contextManager.getClass(), "initializeNewContext", error);
}
Timers.timed(System.nanoTime() - start, contextManager.getClass(), "initializeNewContext");
return reactivated;
}

@Override
public String toString() {
return "ContextSnapshot{size=" + managers.size() + '}';
/**
* Try to close already-reactivated contexts when a later context manager threw an exception.
*
* @param reactivatedContexts The contexts that were already reactivated when the error happened.
* @param reason The error that happened.
*/
private static void tryClose(Context[] reactivatedContexts, Throwable reason) {
for (Context alreadyReactivated : reactivatedContexts) {
if (alreadyReactivated != null) {
try {
alreadyReactivated.close();
} catch (RuntimeException rte) {
reason.addSuppressed(rte);
}
}
}
}
}

Expand All @@ -233,7 +258,6 @@ public String toString() {
*/
@SuppressWarnings("rawtypes")
private static final class ReactivationImpl implements Reactivation {
// TODO extract this inner class?
private final Context[] reactivated;

private ReactivationImpl(Context[] reactivated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
final class Timers {
private static final Logger TIMING_LOGGER = Logger.getLogger(Timers.class.getName());

static void timed(long durationNanos, Class<?> type, String method) {
static void timed(long durationNanos, Class<?> type, String method, Throwable error) {
for (ContextTimer delegate : ServiceCache.cached(ContextTimer.class)) {
delegate.update(type, method, durationNanos, TimeUnit.NANOSECONDS);
delegate.update(type, method, durationNanos, TimeUnit.NANOSECONDS, error);
}
if (TIMING_LOGGER.isLoggable(Level.FINEST)) {
TIMING_LOGGER.log(Level.FINEST, "{0}.{1}: {2,number}ns", new Object[]{type.getName(), method, durationNanos});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TimersTest {

@Test
public void testTimingDelegation() {
Timers.timed(TimeUnit.MILLISECONDS.toNanos(150), getClass(), "testTimingDelegation");
Timers.timed(TimeUnit.MILLISECONDS.toNanos(150), getClass(), "testTimingDelegation", null);
assertThat(TestContextTimer.getLastTimedMillis(getClass(), "testTimingDelegation"), is(150L));
}

Expand All @@ -45,7 +45,7 @@ public static Long getLastTimedMillis(Class<?> type, String method) {
return LAST_TIMED.get(type.getName() + "." + method);
}

public void update(Class<?> type, String method, long duration, TimeUnit unit) {
public void update(Class<?> type, String method, long duration, TimeUnit unit, Throwable error) {
LAST_TIMED.put(type.getName() + "." + method, unit.toMillis(duration));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package nl.talsmasoftware.context.timers.metrics;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -68,32 +69,45 @@ public class MetricsContextTimer implements ContextTimer {
private static final String SYS_REGISTRY_NAME = "contextpropagation.metrics.registry";
private static final String ENV_REGISTRY_NAME = SYS_REGISTRY_NAME.toUpperCase().replace('.', '_');

private static final ConcurrentMap<String, Timer> TIMERS = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Timer> CACHED_TIMERS = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Meter> CACHED_ERRORS = new ConcurrentHashMap<>();

@Override
public void update(Class<?> type, String method, long duration, TimeUnit unit) {
locateTimer(type, method).update(duration, unit);
public void update(Class<?> type, String method, long duration, TimeUnit unit, Throwable error) {
final String name = MetricRegistry.name(type, method);
final String errorsName = MetricRegistry.name(name, "errors");
CACHED_TIMERS.computeIfAbsent(name, this::registerTimer).update(duration, unit);
CACHED_ERRORS.computeIfAbsent(errorsName, this::registerMeter).mark(error == null ? 0L : 1L);
}

private static Timer locateTimer(Class<?> type, String method) {
final String name = MetricRegistry.name(type, method);
Timer timer = TIMERS.get(name);
if (timer == null) {
final Collection<MetricRegistry> sharedRegistries = locateSharedRegistries();
for (MetricRegistry registry : sharedRegistries) {
timer = registry.getTimers().get(name);
if (timer != null) break;
}
if (timer == null) timer = new Timer();
TIMERS.putIfAbsent(name, timer);
timer = TIMERS.get(name); // In case of race conditions
for (MetricRegistry registry : sharedRegistries) {
if (!registry.getTimers().containsKey(name)) registry.register(name, timer);
private Timer registerTimer(String name) {
final Collection<MetricRegistry> sharedRegistries = locateSharedRegistries();
for (MetricRegistry registry : sharedRegistries) {
Timer timer = registry.getTimers().get(name);
if (timer != null) {
return timer;
}
}

final Timer timer = new Timer();
sharedRegistries.forEach(registry -> registry.register(name, timer));
return timer;
}

private Meter registerMeter(String name) {
final Collection<MetricRegistry> sharedRegistries = locateSharedRegistries();
for (MetricRegistry registry : sharedRegistries) {
Meter meter = registry.getMeters().get(name);
if (meter != null) {
return meter;
}
}

final Meter meter = new Meter();
sharedRegistries.forEach(registry -> registry.register(name, meter));
return meter;
}

private static Collection<MetricRegistry> locateSharedRegistries() {
String registryName = System.getProperty(SYS_REGISTRY_NAME, System.getenv(ENV_REGISTRY_NAME));
if (registryName == null) {
Expand Down Expand Up @@ -123,6 +137,6 @@ private static Collection<MetricRegistry> locateSharedRegistries() {

@Override
public String toString() {
return getClass().getSimpleName() + "{timers=" + TIMERS.keySet() + "}";
return getClass().getSimpleName() + "{timers=" + CACHED_TIMERS.keySet() + "}";
}
}
Loading

0 comments on commit 8f3ea2f

Please sign in to comment.