Skip to content

Commit

Permalink
fix(client): Simplify WorkflowClient interceptors (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Nov 9, 2022
1 parent 64761ea commit aa5bba2
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 82 deletions.
27 changes: 18 additions & 9 deletions packages/client/src/interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ import { CompiledWorkflowOptions } from './workflow-options';

export { Next, Headers };

/** Input for WorkflowClientCallsInterceptor.start */
/** Input for WorkflowClientInterceptor.start */
export interface WorkflowStartInput {
/** Name of Workflow to start */
readonly workflowType: string;
readonly headers: Headers;
readonly options: CompiledWorkflowOptions;
}

/** Input for WorkflowClientCallsInterceptor.signal */
/** Input for WorkflowClientInterceptor.signal */
export interface WorkflowSignalInput {
readonly signalName: string;
readonly args: unknown[];
readonly workflowExecution: WorkflowExecution;
readonly headers: Headers;
}

/** Input for WorkflowClientCallsInterceptor.signalWithStart */
/** Input for WorkflowClientInterceptor.signalWithStart */
export interface WorkflowSignalWithStartInput {
readonly workflowType: string;
readonly signalName: string;
Expand All @@ -42,7 +42,7 @@ export interface WorkflowSignalWithStartInput {
readonly options: CompiledWorkflowOptions;
}

/** Input for WorkflowClientCallsInterceptor.query */
/** Input for WorkflowClientInterceptor.query */
export interface WorkflowQueryInput {
readonly queryType: string;
readonly args: unknown[];
Expand All @@ -51,29 +51,29 @@ export interface WorkflowQueryInput {
readonly headers: Headers;
}

/** Input for WorkflowClientCallsInterceptor.terminate */
/** Input for WorkflowClientInterceptor.terminate */
export interface WorkflowTerminateInput {
readonly workflowExecution: WorkflowExecution;
readonly reason?: string;
readonly details?: unknown[];
readonly firstExecutionRunId?: string;
}

/** Input for WorkflowClientCallsInterceptor.cancel */
/** Input for WorkflowClientInterceptor.cancel */
export interface WorkflowCancelInput {
readonly workflowExecution: WorkflowExecution;
readonly firstExecutionRunId?: string;
}

/** Input for WorkflowClientCallsInterceptor.describe */
/** Input for WorkflowClientInterceptor.describe */
export interface WorkflowDescribeInput {
readonly workflowExecution: WorkflowExecution;
}

/**
* Implement any of these methods to intercept WorkflowClient outbound calls
*/
export interface WorkflowClientCallsInterceptor {
export interface WorkflowClientInterceptor {
/**
* Intercept a service call to startWorkflowExecution
*
Expand Down Expand Up @@ -113,22 +113,31 @@ export interface WorkflowClientCallsInterceptor {
describe?: (input: WorkflowDescribeInput, next: Next<this, 'describe'>) => Promise<DescribeWorkflowExecutionResponse>;
}

/** @deprecated: Use WorkflowClientInterceptor instead */
export type WorkflowClientCallsInterceptor = WorkflowClientInterceptor;

/** @deprecated */
export interface WorkflowClientCallsInterceptorFactoryInput {
workflowId: string;
runId?: string;
}

/**
* A function that takes {@link CompiledWorkflowOptions} and returns an interceptor
*
* @deprecated: Please define interceptors directly, without factory
*/
export interface WorkflowClientCallsInterceptorFactory {
(input: WorkflowClientCallsInterceptorFactoryInput): WorkflowClientCallsInterceptor;
}

/**
* A mapping of interceptor type of a list of factory functions
*
* @deprecated: Please define interceptors directly, without factory
*/
export interface WorkflowClientInterceptors {
/** @deprecated */
calls?: WorkflowClientCallsInterceptorFactory[];
}

Expand Down Expand Up @@ -164,7 +173,7 @@ export type CreateScheduleOutput = {
* NOTE: Currently only for {@link WorkflowClient} and {@link ScheduleClient}. More will be added later as needed.
*/
export interface ClientInterceptors {
workflow?: WorkflowClientInterceptors;
workflow?: WorkflowClientInterceptors | WorkflowClientInterceptor[];

/**
* @experimental
Expand Down
31 changes: 20 additions & 11 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { Connection } from './connection';
import { isServerErrorResponse, ServiceError, WorkflowContinuedAsNewError, WorkflowFailedError } from './errors';
import {
WorkflowCancelInput,
WorkflowClientCallsInterceptor,
WorkflowClientInterceptor,
WorkflowClientInterceptors,
WorkflowDescribeInput,
WorkflowQueryInput,
Expand Down Expand Up @@ -175,7 +175,8 @@ export interface WorkflowClientOptions {
*
* Useful for injecting auth headers and tracing Workflow executions
*/
interceptors?: WorkflowClientInterceptors;
// eslint-disable-next-line deprecation/deprecation
interceptors?: WorkflowClientInterceptors | WorkflowClientInterceptor[];

/**
* Identity to report to the server
Expand Down Expand Up @@ -223,7 +224,7 @@ export function defaultWorkflowClientOptions(): WorkflowClientOptionsWithDefault
dataConverter: {},
// The equivalent in Java is ManagementFactory.getRuntimeMXBean().getName()
identity: `${process.pid}@${os.hostname()}`,
interceptors: {},
interceptors: [],
namespace: 'default',
queryRejectCondition: temporal.api.enums.v1.QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED,
};
Expand Down Expand Up @@ -274,7 +275,7 @@ export interface GetWorkflowHandleOptions extends WorkflowResultOptions {
interface WorkflowHandleOptions extends GetWorkflowHandleOptions {
workflowId: string;
runId?: string;
interceptors: WorkflowClientCallsInterceptor[];
interceptors: WorkflowClientInterceptor[];
/**
* A runId to use for getting the workflow's result.
*
Expand Down Expand Up @@ -362,7 +363,7 @@ export class WorkflowClient {
protected async _start<T extends Workflow>(
workflowTypeOrFunc: string | T,
options: WithWorkflowArgs<T, WorkflowOptions>,
interceptors: WorkflowClientCallsInterceptor[]
interceptors: WorkflowClientInterceptor[]
): Promise<string> {
const workflowType = typeof workflowTypeOrFunc === 'string' ? workflowTypeOrFunc : workflowTypeOrFunc.name;
assertRequiredWorkflowOptions(options);
Expand All @@ -386,7 +387,7 @@ export class WorkflowClient {
protected async _signalWithStart<T extends Workflow, SA extends any[]>(
workflowTypeOrFunc: string | T,
options: WithWorkflowArgs<T, WorkflowSignalWithStartOptions<SA>>,
interceptors: WorkflowClientCallsInterceptor[]
interceptors: WorkflowClientInterceptor[]
): Promise<string> {
const workflowType = typeof workflowTypeOrFunc === 'string' ? workflowTypeOrFunc : workflowTypeOrFunc.name;
const { signal, signalArgs, ...rest } = options;
Expand Down Expand Up @@ -418,8 +419,7 @@ export class WorkflowClient {
options: WorkflowStartOptions<T>
): Promise<WorkflowHandleWithFirstExecutionRunId<T>> {
const { workflowId } = options;
// Cast is needed because it's impossible to deduce the type in this situation
const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId }));
const interceptors = this.getOrMakeInterceptors(workflowId);
const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
// runId is not used in handles created with `start*` calls because these
// handles should allow interacting with the workflow if it continues as new.
Expand All @@ -446,7 +446,7 @@ export class WorkflowClient {
options: WithWorkflowArgs<WorkflowFn, WorkflowSignalWithStartOptions<SignalArgs>>
): Promise<WorkflowHandleWithSignaledRunId<WorkflowFn>> {
const { workflowId } = options;
const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId }));
const interceptors = this.getOrMakeInterceptors(workflowId);
const runId = await this._signalWithStart(workflowTypeOrFunc, options, interceptors);
// runId is not used in handles created with `start*` calls because these
// handles should allow interacting with the workflow if it continues as new.
Expand All @@ -472,7 +472,7 @@ export class WorkflowClient {
options: WorkflowStartOptions<T>
): Promise<WorkflowResultType<T>> {
const { workflowId } = options;
const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId }));
const interceptors = this.getOrMakeInterceptors(workflowId);
await this._start(workflowTypeOrFunc, options, interceptors);
return await this.result(workflowId, undefined, {
...options,
Expand Down Expand Up @@ -912,7 +912,7 @@ export class WorkflowClient {
runId?: string,
options?: GetWorkflowHandleOptions
): WorkflowHandle<T> {
const interceptors = (this.options.interceptors.calls ?? []).map((ctor) => ctor({ workflowId, runId }));
const interceptors = this.getOrMakeInterceptors(workflowId, runId);

return this._createWorkflowHandle({
workflowId,
Expand Down Expand Up @@ -951,6 +951,15 @@ export class WorkflowClient {
if (nextPageToken == null || nextPageToken.length == 0) break;
}
}

protected getOrMakeInterceptors(workflowId: string, runId?: string): WorkflowClientInterceptor[] {
if (typeof this.options.interceptors === 'object' && 'calls' in this.options.interceptors) {
// eslint-disable-next-line deprecation/deprecation
const factories = (this.options.interceptors as WorkflowClientInterceptors).calls ?? [];
return factories.map((ctor) => ctor({ workflowId, runId }));
}
return Array.isArray(this.options.interceptors) ? (this.options.interceptors as WorkflowClientInterceptor[]) : [];
}
}

export class QueryRejectedError extends Error {
Expand Down
6 changes: 3 additions & 3 deletions packages/interceptors-opentelemetry/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as otel from '@opentelemetry/api';
import { Next, WorkflowClientCallsInterceptor, WorkflowStartInput } from '@temporalio/client';
import { Next, WorkflowClientInterceptor, WorkflowStartInput } from '@temporalio/client';
import { headersWithContext, RUN_ID_ATTR_KEY } from '@temporalio/common/lib/otel';
import { instrument } from '../instrumentation';
import { SpanName, SPAN_DELIMITER } from '../workflow';
Expand All @@ -13,14 +13,14 @@ export interface InterceptorOptions {
*
* Wraps the operation in an opentelemetry Span and passes it to the Workflow via headers.
*/
export class OpenTelemetryWorkflowClientCallsInterceptor implements WorkflowClientCallsInterceptor {
export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInterceptor {
protected readonly tracer: otel.Tracer;

constructor(options?: InterceptorOptions) {
this.tracer = options?.tracer ?? otel.trace.getTracer('@temporalio/interceptor-client');
}

async start(input: WorkflowStartInput, next: Next<WorkflowClientCallsInterceptor, 'start'>): Promise<string> {
async start(input: WorkflowStartInput, next: Next<WorkflowClientInterceptor, 'start'>): Promise<string> {
return await instrument({
tracer: this.tracer,
spanName: `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
Expand Down
6 changes: 5 additions & 1 deletion packages/interceptors-opentelemetry/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@

export * from './workflow';
export * from './worker';
export { OpenTelemetryWorkflowClientCallsInterceptor } from './client';
export {
OpenTelemetryWorkflowClientInterceptor,
/** deprecated: Use OpenTelemetryWorkflowClientInterceptor instead */
OpenTelemetryWorkflowClientInterceptor as OpenTelemetryWorkflowClientCallsInterceptor,
} from './client';
Loading

0 comments on commit aa5bba2

Please sign in to comment.