Skip to content

Commit 717716c

Browse files
chore: move flag checking logic to interceptor
1 parent fdd1c5e commit 717716c

File tree

2 files changed

+5
-76
lines changed

2 files changed

+5
-76
lines changed

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import { instrument, extractContextFromHeaders, headersWithContext } from '../in
2424
import { ContextManager } from './context-manager';
2525
import { SpanName, SPAN_DELIMITER } from './definitions';
2626
import { SpanExporter } from './span-exporter';
27+
import { getActivator } from '@temporalio/workflow/lib/global-attributes';
28+
import { SdkFlags } from '@temporalio/workflow/lib/flags';
2729

2830
export * from './definitions';
2931

@@ -71,12 +73,13 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
7173
input: SignalInput,
7274
next: Next<WorkflowInboundCallsInterceptor, 'handleSignal'>
7375
): Promise<void> {
76+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield);
7477
const context = extractContextFromHeaders(input.headers);
7578
return await instrument({
7679
tracer: this.tracer,
7780
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
7881
fn: () => next(input),
79-
context,
82+
context: shouldInjectYield ? await Promise.resolve(context) : context,
8083
});
8184
}
8285
}

packages/workflow/src/internals.ts

Lines changed: 1 addition & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -980,11 +980,7 @@ export class Activator implements ActivationHandler {
980980

981981
const signalExecutionNum = this.signalHandlerExecutionSeq++;
982982
this.inProgressSignals.set(signalExecutionNum, { name: signalName, unfinishedPolicy });
983-
const execute = composeInterceptors(
984-
this.maybeInjectYieldForOtelHandler(interceptors),
985-
'handleSignal',
986-
this.signalWorkflowNextHandler.bind(this)
987-
);
983+
const execute = composeInterceptors(interceptors, 'handleSignal', this.signalWorkflowNextHandler.bind(this));
988984
execute({
989985
args: arrayFromPayloads(this.payloadConverter, activation.input),
990986
signalName,
@@ -1261,31 +1257,6 @@ export class Activator implements ActivationHandler {
12611257
failureToError(failure: ProtoFailure): Error {
12621258
return this.failureConverter.failureToError(failure, this.payloadConverter);
12631259
}
1264-
1265-
private maybeInjectYieldForOtelHandler(
1266-
interceptors: NonNullable<WorkflowInterceptors['inbound']>
1267-
): NonNullable<WorkflowInterceptors['inbound']> {
1268-
if (!this.hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield)) {
1269-
return [...interceptors];
1270-
}
1271-
const otelInboundInterceptorIndex = findOpenTelemetryInboundInterceptor(interceptors);
1272-
if (otelInboundInterceptorIndex === null) {
1273-
return [...interceptors];
1274-
}
1275-
// A handler that only serves the insert a yield point in the interceptor handlers
1276-
const yieldHandleSignalInterceptor: NonNullable<WorkflowInterceptors['inbound']>[number] = {
1277-
handleSignal: async (input, next) => {
1278-
await Promise.resolve();
1279-
return next(input);
1280-
},
1281-
};
1282-
// Insert the yield handler before the OTEL one to synthesize the yield point added in the affected versions of the handler
1283-
return [
1284-
...interceptors.slice(0, otelInboundInterceptorIndex),
1285-
yieldHandleSignalInterceptor,
1286-
...interceptors.slice(otelInboundInterceptorIndex),
1287-
];
1288-
}
12891260
}
12901261

12911262
function getSeq<T extends { seq?: number | null }>(activation: T): number {
@@ -1336,48 +1307,3 @@ then you can disable this warning by passing an option when setting the handler:
13361307
Array.from(names.entries()).map(([name, count]) => ({ name, count }))
13371308
)}`;
13381309
}
1339-
1340-
// Should only get run on replay
1341-
function shouldInjectYield(version?: string): boolean {
1342-
if (!version) {
1343-
return false;
1344-
}
1345-
const [major, minor, patchAndTags] = version.split('.', 3);
1346-
// 1.11.5 - 1.13.1: need to inject
1347-
if (major !== '1') return false;
1348-
1349-
// patch might have some extra stuff that needs cleaning
1350-
// basically "takeWhile digit"
1351-
let patch;
1352-
try {
1353-
const patchDigits = /[0-9]+/.exec(patchAndTags)?.[0];
1354-
patch = patchDigits ? Number.parseInt(patchDigits) : null;
1355-
} catch {
1356-
patch = null;
1357-
}
1358-
1359-
switch (minor) {
1360-
case '11':
1361-
// 1.11.3 was the last release that didn't inject a yield point
1362-
return Boolean(patch && patch > 3);
1363-
case '12':
1364-
// Every 1.12 release requires a yield
1365-
return true;
1366-
case '13':
1367-
// 1.13.2 will be the first release since 1.11.3 that doesn't have a yield point in `handleSignal`
1368-
return Boolean(patch && patch < 2);
1369-
default:
1370-
return false;
1371-
}
1372-
}
1373-
1374-
function findOpenTelemetryInboundInterceptor(
1375-
interceptors: NonNullable<WorkflowInterceptors['inbound']>
1376-
): number | null {
1377-
const index = interceptors.findIndex(
1378-
(interceptor) =>
1379-
// We use a marker instead of `instanceof` to avoid taking a dependency on @temporalio/interceptors-opentelemetry
1380-
(interceptor as NonNullable<WorkflowInterceptors['inbound']> & { maybeInjectYield: boolean }).maybeInjectYield
1381-
);
1382-
return index !== -1 ? index : null;
1383-
}

0 commit comments

Comments
 (0)