-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Typed search attributes #1612
base: main
Are you sure you want to change the base?
Typed search attributes #1612
Changes from all commits
fee7333
eee023e
f2399cc
103b1b8
25b0722
78b74a4
af0754e
3261083
0c3afe2
ba05b76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,11 @@ | ||
import { status as grpcStatus } from '@grpc/grpc-js'; | ||
import { v4 as uuid4 } from 'uuid'; | ||
import { mapToPayloads, searchAttributePayloadConverter, Workflow } from '@temporalio/common'; | ||
import { | ||
decodeSearchAttributes, | ||
decodeTypedSearchAttributes, | ||
encodeUnifiedSearchAttributes, | ||
Workflow, | ||
} from '@temporalio/common'; | ||
import { composeInterceptors, Headers } from '@temporalio/common/lib/interceptors'; | ||
import { | ||
encodeMapToPayloads, | ||
|
@@ -39,7 +44,6 @@ | |
decodeScheduleRecentActions, | ||
decodeScheduleRunningActions, | ||
decodeScheduleSpec, | ||
decodeSearchAttributes, | ||
encodeScheduleAction, | ||
encodeSchedulePolicies, | ||
encodeScheduleSpec, | ||
|
@@ -238,11 +242,12 @@ | |
state: encodeScheduleState(opts.state), | ||
}, | ||
memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, | ||
searchAttributes: opts.searchAttributes | ||
? { | ||
indexedFields: mapToPayloads(searchAttributePayloadConverter, opts.searchAttributes), | ||
} | ||
: undefined, | ||
searchAttributes: | ||
opts.searchAttributes || opts.typedSearchAttributes | ||
Check warning on line 246 in packages/client/src/schedule-client.ts
|
||
? { | ||
indexedFields: encodeUnifiedSearchAttributes(opts.searchAttributes, opts.typedSearchAttributes), | ||
Check warning on line 248 in packages/client/src/schedule-client.ts
|
||
} | ||
: undefined, | ||
initialPatch: { | ||
triggerImmediately: opts.state?.triggerImmediately | ||
? { overlapPolicy: temporal.api.enums.v1.ScheduleOverlapPolicy.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL } | ||
|
@@ -388,7 +393,8 @@ | |
workflowType: raw.info.workflowType.name, | ||
}, | ||
memo: await decodeMapFromPayloads(this.dataConverter, raw.memo?.fields), | ||
searchAttributes: decodeSearchAttributes(raw.searchAttributes), | ||
searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields), | ||
typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), | ||
state: { | ||
paused: raw.info?.paused === true, | ||
note: raw.info?.notes ?? undefined, | ||
|
@@ -425,7 +431,8 @@ | |
spec: decodeScheduleSpec(raw.schedule.spec), | ||
action: await decodeScheduleAction(this.client.dataConverter, raw.schedule.action), | ||
memo: await decodeMapFromPayloads(this.client.dataConverter, raw.memo?.fields), | ||
searchAttributes: decodeSearchAttributes(raw.searchAttributes), | ||
searchAttributes: decodeSearchAttributes(raw.searchAttributes?.indexedFields), | ||
typedSearchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), | ||
policies: { | ||
// 'overlap' should never be missing on describe, as the server will replace UNSPECIFIED by an actual value | ||
overlap: decodeScheduleOverlapPolicy(raw.schedule.policies?.overlapPolicy) ?? ScheduleOverlapPolicy.SKIP, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,12 @@ | ||
import Long from 'long'; // eslint-disable-line import/no-named-as-default | ||
import { | ||
compileRetryPolicy, | ||
decodeSearchAttributes, | ||
decodeTypedSearchAttributes, | ||
decompileRetryPolicy, | ||
encodeUnifiedSearchAttributes, | ||
extractWorkflowType, | ||
LoadedDataConverter, | ||
mapFromPayloads, | ||
mapToPayloads, | ||
searchAttributePayloadConverter, | ||
SearchAttributes, | ||
} from '@temporalio/common'; | ||
import { Headers } from '@temporalio/common/lib/interceptors'; | ||
import { | ||
|
@@ -260,11 +259,12 @@ | |
workflowTaskTimeout: msOptionalToTs(action.workflowTaskTimeout), | ||
retryPolicy: action.retry ? compileRetryPolicy(action.retry) : undefined, | ||
memo: action.memo ? { fields: await encodeMapToPayloads(dataConverter, action.memo) } : undefined, | ||
searchAttributes: action.searchAttributes | ||
? { | ||
indexedFields: mapToPayloads(searchAttributePayloadConverter, action.searchAttributes), | ||
} | ||
: undefined, | ||
searchAttributes: | ||
action.searchAttributes || action.typedSearchAttributes | ||
Check warning on line 263 in packages/client/src/schedule-helpers.ts
|
||
? { | ||
indexedFields: encodeUnifiedSearchAttributes(action.searchAttributes, action.typedSearchAttributes), | ||
Check warning on line 265 in packages/client/src/schedule-helpers.ts
|
||
} | ||
: undefined, | ||
header: { fields: headers }, | ||
}, | ||
}; | ||
|
@@ -326,14 +326,8 @@ | |
args: await decodeArrayFromPayloads(dataConverter, pb.startWorkflow.input?.payloads), | ||
memo: await decodeMapFromPayloads(dataConverter, pb.startWorkflow.memo?.fields), | ||
retry: decompileRetryPolicy(pb.startWorkflow.retryPolicy), | ||
searchAttributes: Object.fromEntries( | ||
Object.entries( | ||
mapFromPayloads( | ||
searchAttributePayloadConverter, | ||
pb.startWorkflow.searchAttributes?.indexedFields ?? {} | ||
) as SearchAttributes | ||
) | ||
), | ||
searchAttributes: decodeSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields), | ||
typedSearchAttributes: decodeTypedSearchAttributes(pb.startWorkflow.searchAttributes?.indexedFields), | ||
workflowExecutionTimeout: optionalTsToMs(pb.startWorkflow.workflowExecutionTimeout), | ||
workflowRunTimeout: optionalTsToMs(pb.startWorkflow.workflowRunTimeout), | ||
workflowTaskTimeout: optionalTsToMs(pb.startWorkflow.workflowTaskTimeout), | ||
|
@@ -342,17 +336,6 @@ | |
throw new TypeError('Unsupported schedule action'); | ||
} | ||
|
||
export function decodeSearchAttributes( | ||
pb: temporal.api.common.v1.ISearchAttributes | undefined | null | ||
): SearchAttributes { | ||
if (!pb?.indexedFields) return {}; | ||
return Object.fromEntries( | ||
Object.entries(mapFromPayloads(searchAttributePayloadConverter, pb.indexedFields) as SearchAttributes).filter( | ||
([_, v]) => v && v.length > 0 | ||
) // Filter out empty arrays returned by pre 1.18 servers | ||
); | ||
} | ||
|
||
export function decodeScheduleRunningActions( | ||
pb?: temporal.api.common.v1.IWorkflowExecution[] | null | ||
): ScheduleExecutionStartWorkflowActionResult[] { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,11 +4,9 @@ | |
BaseWorkflowHandle, | ||
CancelledFailure, | ||
compileRetryPolicy, | ||
mapToPayloads, | ||
HistoryAndWorkflowId, | ||
QueryDefinition, | ||
RetryState, | ||
searchAttributePayloadConverter, | ||
SignalDefinition, | ||
UpdateDefinition, | ||
TerminatedFailure, | ||
|
@@ -24,6 +22,7 @@ | |
decodeRetryState, | ||
encodeWorkflowIdConflictPolicy, | ||
WorkflowIdConflictPolicy, | ||
encodeUnifiedSearchAttributes, | ||
} from '@temporalio/common'; | ||
import { composeInterceptors } from '@temporalio/common/lib/interceptors'; | ||
import { History } from '@temporalio/common/lib/proto-utils'; | ||
|
@@ -1218,11 +1217,12 @@ | |
workflowStartDelay: options.startDelay, | ||
retryPolicy: options.retry ? compileRetryPolicy(options.retry) : undefined, | ||
memo: options.memo ? { fields: await encodeMapToPayloads(this.dataConverter, options.memo) } : undefined, | ||
searchAttributes: options.searchAttributes | ||
? { | ||
indexedFields: mapToPayloads(searchAttributePayloadConverter, options.searchAttributes), | ||
} | ||
: undefined, | ||
searchAttributes: | ||
options.searchAttributes || options.typedSearchAttributes | ||
Check warning on line 1221 in packages/client/src/workflow-client.ts
|
||
? { | ||
indexedFields: encodeUnifiedSearchAttributes(options.searchAttributes, options.typedSearchAttributes), | ||
Check warning on line 1223 in packages/client/src/workflow-client.ts
|
||
} | ||
: undefined, | ||
cronSchedule: options.cronSchedule, | ||
header: { fields: headers }, | ||
}; | ||
|
@@ -1265,6 +1265,7 @@ | |
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> { | ||
const { options: opts, workflowType, headers } = input; | ||
const { identity, namespace } = this.options; | ||
|
||
return { | ||
namespace, | ||
identity, | ||
|
@@ -1284,11 +1285,12 @@ | |
workflowStartDelay: opts.startDelay, | ||
retryPolicy: opts.retry ? compileRetryPolicy(opts.retry) : undefined, | ||
memo: opts.memo ? { fields: await encodeMapToPayloads(this.dataConverter, opts.memo) } : undefined, | ||
searchAttributes: opts.searchAttributes | ||
? { | ||
indexedFields: mapToPayloads(searchAttributePayloadConverter, opts.searchAttributes), | ||
} | ||
: undefined, | ||
searchAttributes: | ||
opts.searchAttributes || opts.typedSearchAttributes | ||
Check warning on line 1289 in packages/client/src/workflow-client.ts
|
||
? { | ||
indexedFields: encodeUnifiedSearchAttributes(opts.searchAttributes, opts.typedSearchAttributes), | ||
Check warning on line 1291 in packages/client/src/workflow-client.ts
|
||
} | ||
: undefined, | ||
cronSchedule: opts.cronSchedule, | ||
header: { fields: headers }, | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also mention that SA don't get encoded (i.e. they don't go through PayloadCodec). That's important as people may not realize that there are security implication in using SA. Please make sure that's fact is clear everywhere that a user would supply search attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added small note that users shouldn't include sensitive information in SAs