-
Hi. I need to propagate tenant ID as a header in every Kafka Message that is being sent in the context of a tenant. I have a quarkus project written in kotlin (with coroutines), and I'm sending messages using Emitter with My idea was to propagate the tenant ID into a kafka metadata header in a
it's not accessible in the Multi stream handled in What's interesting though, is that some quarkus-related contexts are propagated to the decorator stream just fine, like:
But others are not, like:
My question is - what's the correct approach to propagate context variables to the publisher interceptor, and how does quarkus implementation of VertxMDC / OTEL achieve this? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Inspired by the public class TenantThreadContextProvider implements ThreadContextProvider {
public static final String TENANT_KEY = "tenant";
@NotNull
private static ThreadContextSnapshot emptyContextSnapshot() {
return () -> () -> {
};
}
public static Context getVertxContext() {
Context context = Vertx.currentContext();
if (context != null && VertxContext.isOnDuplicatedContext()) {
return context;
} else if (context != null) {
Context dc = VertxContext.createNewDuplicatedContext(context);
setContextSafe(dc, true);
return dc;
}
return null;
}
private static void setTenantContext(Object tenant) {
final Context vertxContext = getVertxContext();
if (vertxContext != null)
vertxContext.putLocal(TENANT_KEY, tenant);
}
@Override
public ThreadContextSnapshot currentContext(Map<String, String> props) {
final Context context = getVertxContext();
if (context == null)
return emptyContextSnapshot();
final Object tenant = context.getLocal(TENANT_KEY);
if (tenant == null)
return emptyContextSnapshot();
else {
return () -> {
Context context1 = getVertxContext();
Object currentTenant = context1 != null ? context1.getLocal(TENANT_KEY) : null;
setTenantContext(tenant);
return () -> setTenantContext(currentTenant);
};
}
}
@Override
public ThreadContextSnapshot clearedContext(Map<String, String> props) {
return emptyContextSnapshot();
}
@Override
public String getThreadContextType() {
return TENANT_KEY;
}
} |
Beta Was this translation helpful? Give feedback.
-
After many hours of debugging quarkus / vertx / smallrye source code, I realized that |
Beta Was this translation helpful? Give feedback.
After many hours of debugging quarkus / vertx / smallrye source code, I realized that
PublisherDecorator
is used on the receiver side, not the sender side.I should have implemented
OutgoingInterceptor
instead, which I btw did in the beginning but at some point replaced withPublisherDecorator
and forgot about that 🤦I haven't noticed the issue, cause I had both - producer and consumer configured in the same application so it was executing as it should, but obviously the context was missing.