Skip to content

Commit ce648de

Browse files
fix: no longer yield during OTEL handleSignal
1 parent ffa0bdc commit ce648de

File tree

7 files changed

+60
-62
lines changed

7 files changed

+60
-62
lines changed

packages/interceptors-opentelemetry/src/worker/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC
3434
}
3535

3636
async execute(input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>): Promise<unknown> {
37-
const context = extractContextFromHeaders(input.headers);
37+
const context = await Promise.resolve(extractContextFromHeaders(input.headers));
3838
const spanName = `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}${this.ctx.info.activityType}`;
3939
return await instrument({ tracer: this.tracer, spanName, fn: () => next(input), context });
4040
}

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ function getTracer(): otel.Tracer {
5151
*/
5252
export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInterceptor {
5353
protected readonly tracer = getTracer();
54+
protected readonly maybeInjectYield = true;
5455

5556
public async execute(
5657
input: WorkflowExecuteInput,

packages/test/src/helpers-integration.ts

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@ import {
66
WorkflowFailedError,
77
WorkflowHandle,
88
WorkflowHandleWithFirstExecutionRunId,
9-
WorkflowHandleWithSignaledRunId,
109
WorkflowStartOptions,
1110
WorkflowUpdateFailedError,
12-
WorkflowSignalWithStartOptionsWithArgs,
1311
} from '@temporalio/client';
1412
import {
1513
LocalTestWorkflowEnvironmentOptions,
@@ -197,16 +195,6 @@ export interface Helpers {
197195
fn: T,
198196
opts: Omit<WorkflowStartOptions, 'taskQueue' | 'workflowId'> & Partial<Pick<WorkflowStartOptions, 'workflowId'>>
199197
): Promise<WorkflowHandleWithFirstExecutionRunId<T>>;
200-
signalWithStart<T extends workflow.Workflow, U extends any[]>(
201-
fn: T,
202-
signal: workflow.SignalDefinition<U>
203-
): Promise<WorkflowHandleWithSignaledRunId<T>>;
204-
signalWithStart<T extends workflow.Workflow, U extends any[]>(
205-
fn: T,
206-
signal: workflow.SignalDefinition<U>,
207-
opts: Omit<WorkflowSignalWithStartOptionsWithArgs<U>, 'taskQueue' | 'workflowId'> &
208-
Partial<Pick<WorkflowStartOptions, 'workflowId'>>
209-
): Promise<WorkflowHandleWithSignaledRunId<T>>;
210198
assertWorkflowUpdateFailed(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
211199
assertWorkflowFailedError(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
212200
updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean>;
@@ -262,21 +250,6 @@ export function configurableHelpers<T>(
262250
...opts,
263251
});
264252
},
265-
266-
async signalWithStart(
267-
fn: workflow.Workflow,
268-
signal: workflow.SignalDefinition<any[]>,
269-
opts?: Omit<WorkflowSignalWithStartOptionsWithArgs<any[]>, 'taskQueue' | 'workflowId'> &
270-
Partial<Pick<WorkflowStartOptions, 'workflowId'>>
271-
): Promise<WorkflowHandleWithSignaledRunId<workflow.Workflow>> {
272-
return await testEnv.client.workflow.signalWithStart(fn, {
273-
signal,
274-
taskQueue,
275-
workflowId: randomUUID(),
276-
...opts,
277-
});
278-
},
279-
280253
async assertWorkflowUpdateFailed(
281254
p: Promise<any>,
282255
causeConstructor: ErrorConstructor,

packages/test/src/test-integration-workflows.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
import * as opentelemetry from '@opentelemetry/sdk-node';
2-
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
3-
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
41
import { setTimeout as setTimeoutPromise } from 'timers/promises';
52
import { randomUUID } from 'crypto';
63
import asyncRetry from 'async-retry';
@@ -52,7 +49,7 @@ import {
5249
} from './helpers-integration';
5350
import { overrideSdkInternalFlag } from './mock-internal-flags';
5451
import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
55-
import { loadHistory, RUN_TIME_SKIPPING_TESTS, saveHistory, waitUntil } from './helpers';
52+
import { loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';
5653

5754
const test = makeTestFunction({
5855
workflowsPath: __filename,
@@ -516,7 +513,6 @@ test("Worker doesn't request Eager Activity Dispatch if no activities are regist
516513

517514
const unblockSignal = defineSignal('unblock');
518515
const getBuildIdQuery = defineQuery<string>('getBuildId');
519-
const startSignal = defineSignal('startSignal');
520516

521517
export async function buildIdTester(): Promise<void> {
522518
let blocked = true;

packages/test/src/test-otel.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,7 @@ import { SpanStatusCode } from '@opentelemetry/api';
88
import { ExportResultCode } from '@opentelemetry/core';
99
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
1010
import * as opentelemetry from '@opentelemetry/sdk-node';
11-
import {
12-
BasicTracerProvider,
13-
ConsoleSpanExporter,
14-
InMemorySpanExporter,
15-
SimpleSpanProcessor,
16-
} from '@opentelemetry/sdk-trace-base';
11+
import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
1712
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
1813
import test from 'ava';
1914
import { v4 as uuid4 } from 'uuid';
@@ -518,6 +513,7 @@ if (RUN_INTEGRATION_TESTS) {
518513
}
519514

520515
test('Can replay otel history from 1.11.3', async (t) => {
516+
/*
521517
const staticResource = new opentelemetry.resources.Resource({
522518
[SemanticResourceAttributes.SERVICE_NAME]: 'ts-test-otel-worker',
523519
});
@@ -543,7 +539,6 @@ test('Can replay otel history from 1.11.3', async (t) => {
543539
});
544540
const client = new WorkflowClient();
545541
546-
/*
547542
const result = await worker.runUntil(async () => {
548543
const handle = await client.signalWithStart(workflows.signalStartOtel, {
549544
signal: workflows.startSignal,
@@ -577,11 +572,10 @@ test('Can replay otel history from 1.11.3', async (t) => {
577572
hist
578573
);
579574
});
580-
// t.is('abc', result);
581-
t.pass();
582575
});
583576

584577
test('Can replay otel history from 1.13.1', async (t) => {
578+
/*
585579
const staticResource = new opentelemetry.resources.Resource({
586580
[SemanticResourceAttributes.SERVICE_NAME]: 'ts-test-otel-worker',
587581
});
@@ -607,7 +601,6 @@ test('Can replay otel history from 1.13.1', async (t) => {
607601
});
608602
const client = new WorkflowClient();
609603
610-
/*
611604
const result = await worker.runUntil(async () => {
612605
const handle = await client.signalWithStart(workflows.signalStartOtel, {
613606
signal: workflows.startSignal,
@@ -641,6 +634,4 @@ test('Can replay otel history from 1.13.1', async (t) => {
641634
hist
642635
);
643636
});
644-
// t.is('ac', result);
645-
t.pass();
646637
});

packages/workflow/src/flags.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ export const SdkFlags = {
4444
* to implicitely have this flag on.
4545
*/
4646
ProcessWorkflowActivationJobsAsSingleBatch: defineFlag(2, true, [buildIdSdkVersionMatches(/1\.11\.[01]/)]),
47-
48-
OpenTelemetryInterceptorInsertYieldPoint: defineFlag(3, false, [({ info }) => false]),
4947
} as const;
5048

5149
function defineFlag(id: number, def: boolean, alternativeConditions?: AltConditionFn[]): SdkFlag {

packages/workflow/src/internals.ts

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -980,19 +980,8 @@ export class Activator implements ActivationHandler {
980980

981981
const signalExecutionNum = this.signalHandlerExecutionSeq++;
982982
this.inProgressSignals.set(signalExecutionNum, { name: signalName, unfinishedPolicy });
983-
const injectYield = shouldInjectYield(this.sdkVersion);
984-
const addedInterceptor: WorkflowInterceptors['inbound'] = injectYield
985-
? [
986-
{
987-
handleSignal: async (input, next) => {
988-
await Promise.resolve();
989-
return next(input);
990-
},
991-
},
992-
]
993-
: [];
994983
const execute = composeInterceptors(
995-
[...addedInterceptor, ...interceptors],
984+
this.maybeInjectYieldForOtelHandler(interceptors),
996985
'handleSignal',
997986
this.signalWorkflowNextHandler.bind(this)
998987
);
@@ -1272,6 +1261,31 @@ export class Activator implements ActivationHandler {
12721261
failureToError(failure: ProtoFailure): Error {
12731262
return this.failureConverter.failureToError(failure, this.payloadConverter);
12741263
}
1264+
1265+
private maybeInjectYieldForOtelHandler(
1266+
interceptors: NonNullable<WorkflowInterceptors['inbound']>
1267+
): NonNullable<WorkflowInterceptors['inbound']> {
1268+
if (!this.info.unsafe.isReplaying || !shouldInjectYield(this.sdkVersion)) {
1269+
return [...interceptors];
1270+
}
1271+
const otelInboundInterceptorIndex = findOpenTelemetryInboundInterceptor(interceptors);
1272+
if (otelInboundInterceptorIndex === null) {
1273+
return [...interceptors];
1274+
}
1275+
// A handler that only serves the insert a yield point in the interceptor handlers
1276+
const yieldHandleSignalInterceptor: NonNullable<WorkflowInterceptors['inbound']>[number] = {
1277+
handleSignal: async (input, next) => {
1278+
await Promise.resolve();
1279+
return next(input);
1280+
},
1281+
};
1282+
// Insert the yield handler before the OTEL one to synthesize the yield point added in the affected versions of the handler
1283+
return [
1284+
...interceptors.slice(0, otelInboundInterceptorIndex),
1285+
yieldHandleSignalInterceptor,
1286+
...interceptors.slice(otelInboundInterceptorIndex),
1287+
];
1288+
}
12751289
}
12761290

12771291
function getSeq<T extends { seq?: number | null }>(activation: T): number {
@@ -1323,22 +1337,47 @@ then you can disable this warning by passing an option when setting the handler:
13231337
)}`;
13241338
}
13251339

1340+
// Should only get run on replay
13261341
function shouldInjectYield(version?: string): boolean {
13271342
if (!version) {
13281343
return false;
13291344
}
1330-
const [major, minor, patch] = version.split('.');
1345+
const [major, minor, patchAndTags] = version.split('.', 3);
13311346
// 1.11.5 - 1.13.1: need to inject
13321347
if (major !== '1') return false;
13331348

1349+
// patch might have some extra stuff that needs cleaning
1350+
// basically "takeWhile digit"
1351+
let patch;
1352+
try {
1353+
const patchDigits = /[0-9]+/.exec(patchAndTags)?.[0];
1354+
patch = patchDigits ? Number.parseInt(patchDigits) : null;
1355+
} catch {
1356+
patch = null;
1357+
}
1358+
13341359
switch (minor) {
13351360
case '11':
1336-
return patch === '5';
1361+
// 1.11.3 was the last release that didn't inject a yield point
1362+
return Boolean(patch && patch > 3);
13371363
case '12':
1364+
// Every 1.12 release requires a yield
13381365
return true;
13391366
case '13':
1340-
return patch === '1';
1367+
// 1.13.2 will be the first release since 1.11.3 that doesn't have a yield point in `handleSignal`
1368+
return Boolean(patch && patch < 2);
13411369
default:
13421370
return false;
13431371
}
13441372
}
1373+
1374+
function findOpenTelemetryInboundInterceptor(
1375+
interceptors: NonNullable<WorkflowInterceptors['inbound']>
1376+
): number | null {
1377+
const index = interceptors.findIndex(
1378+
(interceptor) =>
1379+
// We use a marker instead of `instanceof` to avoid taking a dependency on @temporalio/interceptors-opentelemetry
1380+
(interceptor as NonNullable<WorkflowInterceptors['inbound']> & { maybeInjectYield: boolean }).maybeInjectYield
1381+
);
1382+
return index !== -1 ? index : null;
1383+
}

0 commit comments

Comments
 (0)