Skip to content

Commit fdd1c5e

Browse files
chore: move conditional logic to SDK flag
1 parent ce648de commit fdd1c5e

File tree

3 files changed

+78
-4
lines changed

3 files changed

+78
-4
lines changed

packages/test/src/test-flags.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import test from 'ava';
2+
import { SdkFlags } from '@temporalio/workflow/lib/flags';
3+
import type { WorkflowInfo } from '@temporalio/workflow';
4+
5+
test('OpenTelemetryHandleSignalInterceptorInsertYield enabled by version', (t) => {
6+
const cases = [
7+
{ version: '1.0.0', expected: false },
8+
{ version: '1.11.3', expected: false },
9+
{ version: '1.11.5', expected: true },
10+
{ version: '1.11.6', expected: true },
11+
{ version: '1.12.0', expected: true },
12+
{ version: '1.13.1', expected: true },
13+
{ version: '1.13.2', expected: false },
14+
{ version: '1.14.0', expected: false },
15+
];
16+
for (const { version, expected } of cases) {
17+
const actual = SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield.alternativeConditions![0]!({
18+
info: {} as WorkflowInfo,
19+
sdkVersion: version,
20+
});
21+
t.is(
22+
actual,
23+
expected,
24+
`Expected OpenTelemetryHandleSignalInterceptorInsertYield on ${version} to evaluate as ${expected}`
25+
);
26+
}
27+
});

packages/workflow/src/flags.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ export const SdkFlags = {
4444
* to implicitely have this flag on.
4545
*/
4646
ProcessWorkflowActivationJobsAsSingleBatch: defineFlag(2, true, [buildIdSdkVersionMatches(/1\.11\.[01]/)]),
47+
/**
48+
* In 1.11.3 and previous versions, the interceptor for `handleSignal` provided
49+
* by @temporalio/interceptors-opentelemetry did not have a yield point in it.
50+
* A yield point was accidentally added in later versions. This added yield point
51+
* can cause NDE if there was a signal handler and the workflow was started with a signal.
52+
*
53+
* This yield point was removed in 1.13.2, but in order to prevent workflows from the
54+
* affected versions resulting in NDE, we have to inject the yield point on replay.
55+
* This flag should be enabled for SDK versions newer than 1.11.3 or older than 1.13.2.
56+
*
57+
* @since Introduced in 1.13.2.
58+
*/
59+
OpenTelemetryHandleSignalInterceptorInsertYield: defineFlag(3, false, [affectedOtelInterceptorVersion]),
4760
} as const;
4861

4962
function defineFlag(id: number, def: boolean, alternativeConditions?: AltConditionFn[]): SdkFlag {
@@ -68,9 +81,43 @@ export function assertValidFlag(id: number): void {
6881
* condition no longer holds. This is so to avoid incorrect behaviors in case where a Workflow
6982
* Execution has gone through a newer SDK version then again through an older one.
7083
*/
71-
type AltConditionFn = (ctx: { info: WorkflowInfo }) => boolean;
84+
type AltConditionFn = (ctx: { info: WorkflowInfo; sdkVersion?: string }) => boolean;
7285

7386
function buildIdSdkVersionMatches(version: RegExp): AltConditionFn {
7487
const regex = new RegExp(`^@temporalio/worker@(${version.source})[+]`);
7588
return ({ info }) => info.currentBuildId != null && regex.test(info.currentBuildId); // eslint-disable-line deprecation/deprecation
7689
}
90+
91+
function affectedOtelInterceptorVersion({ sdkVersion }: { sdkVersion?: string }): boolean {
92+
if (!sdkVersion) {
93+
return false;
94+
}
95+
const [major, minor, patchAndTags] = sdkVersion.split('.', 3);
96+
if (major !== '1') return false;
97+
98+
// Semver allows for additional tags to be appended to the version
99+
let patch;
100+
try {
101+
const patchDigits = /[0-9]+/.exec(patchAndTags)?.[0];
102+
patch = patchDigits ? Number.parseInt(patchDigits) : null;
103+
} catch {
104+
// This shouldn't ever happen, but we are conservative here and avoid throwing when checking a flag.
105+
patch = null;
106+
}
107+
108+
switch (minor) {
109+
case '11':
110+
// 1.11.3 was the last release that didn't inject a yield point
111+
// If for some reason we are unable to parse the patch version, assume it isn't affected
112+
return Boolean(patch && patch > 3);
113+
case '12':
114+
// Every 1.12 release requires a yield
115+
return true;
116+
case '13':
117+
// 1.13.2 will be the first release since 1.11.3 that doesn't have a yield point in `handleSignal`
118+
// If for some reason we are unable to parse the patch version, assume it isn't affected
119+
return Boolean(patch && patch < 2);
120+
default:
121+
return false;
122+
}
123+
}

packages/workflow/src/internals.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ import {
6565
import { type SinkCall } from './sinks';
6666
import { untrackPromise } from './stack-helpers';
6767
import pkg from './pkg';
68-
import { SdkFlag, assertValidFlag } from './flags';
68+
import { SdkFlag, SdkFlags, assertValidFlag } from './flags';
6969
import { executeWithLifecycleLogging, log } from './logs';
7070

7171
const StartChildWorkflowExecutionFailedCause = {
@@ -1129,7 +1129,7 @@ export class Activator implements ActivationHandler {
11291129
// through an older one.
11301130
if (this.info.unsafe.isReplaying && flag.alternativeConditions) {
11311131
for (const cond of flag.alternativeConditions) {
1132-
if (cond({ info: this.info })) return true;
1132+
if (cond({ info: this.info, sdkVersion: this.sdkVersion })) return true;
11331133
}
11341134
}
11351135

@@ -1265,7 +1265,7 @@ export class Activator implements ActivationHandler {
12651265
private maybeInjectYieldForOtelHandler(
12661266
interceptors: NonNullable<WorkflowInterceptors['inbound']>
12671267
): NonNullable<WorkflowInterceptors['inbound']> {
1268-
if (!this.info.unsafe.isReplaying || !shouldInjectYield(this.sdkVersion)) {
1268+
if (!this.hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield)) {
12691269
return [...interceptors];
12701270
}
12711271
const otelInboundInterceptorIndex = findOpenTelemetryInboundInterceptor(interceptors);

0 commit comments

Comments
 (0)