Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into rebase-to-main
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal committed Feb 27, 2025
2 parents 122ac6f + f968fa3 commit 16302b6
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 50 deletions.
39 changes: 28 additions & 11 deletions temporalcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@ func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error)

// Headers
if len(c.GrpcMeta) > 0 {
headers := make(stringMapHeadersProvider, len(c.GrpcMeta))
for _, kv := range c.GrpcMeta {
pieces := strings.SplitN(kv, "=", 2)
if len(pieces) != 2 {
return nil, fmt.Errorf("gRPC meta of %q does not have '='", kv)
}
headers[pieces[0]] = pieces[1]
headers, err := NewStringMapHeaderProvider(c.GrpcMeta)
if err != nil {
return nil, fmt.Errorf("grpc-meta %s", err)
}
clientOptions.HeadersProvider = headers
}

// Remote codec
if c.CodecEndpoint != "" {
interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, c.CodecAuth)
codecHeaders, err := NewStringMapHeaderProvider(c.CodecHeader)
if err != nil {
return nil, fmt.Errorf("codec-header %s", err)
}

if c.CodecAuth != "" {
codecHeaders["Authorization"] = c.CodecAuth
}

interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, codecHeaders)
if err != nil {
return nil, fmt.Errorf("failed creating payload codec interceptor: %w", err)
}
Expand Down Expand Up @@ -145,16 +150,16 @@ func fixedHeaderOverrideInterceptor(
return invoker(ctx, method, req, reply, cc, opts...)
}

func payloadCodecInterceptor(namespace, codecEndpoint, codecAuth string) (grpc.UnaryClientInterceptor, error) {
func payloadCodecInterceptor(namespace, codecEndpoint string, codecHeaders stringMapHeadersProvider) (grpc.UnaryClientInterceptor, error) {
codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace)

payloadCodec := converter.NewRemotePayloadCodec(
converter.RemotePayloadCodecOptions{
Endpoint: codecEndpoint,
ModifyRequest: func(req *http.Request) error {
req.Header.Set("X-Namespace", namespace)
if codecAuth != "" {
req.Header.Set("Authorization", codecAuth)
for headerName, headerValue := range codecHeaders {
req.Header.Set(headerName, headerValue)
}
return nil
},
Expand Down Expand Up @@ -185,6 +190,18 @@ func (s stringMapHeadersProvider) GetHeaders(context.Context) (map[string]string
return s, nil
}

func NewStringMapHeaderProvider(config []string) (stringMapHeadersProvider, error) {
headers := make(stringMapHeadersProvider, len(config))
for _, kv := range config {
pieces := strings.SplitN(kv, "=", 2)
if len(pieces) != 2 {
return nil, fmt.Errorf("%q does not have '='", kv)
}
headers[pieces[0]] = pieces[1]
}
return headers, nil
}

var DataConverterWithRawValue = converter.NewCompositeDataConverter(
rawValuePayloadConverter{},
converter.NewNilPayloadConverter(),
Expand Down
53 changes: 52 additions & 1 deletion temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ClientOptions struct {
TlsServerName string
CodecEndpoint string
CodecAuth string
CodecHeader []string
}

func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
Expand All @@ -41,7 +42,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
cctx.BindFlagEnvVar(f.Lookup("namespace"), "TEMPORAL_NAMESPACE")
f.StringVar(&v.ApiKey, "api-key", "", "API key for request.")
cctx.BindFlagEnvVar(f.Lookup("api-key"), "TEMPORAL_API_KEY")
f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. format as a `KEY=VALUE` pair May be passed multiple times to set multiple headers.")
f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.")
f.BoolVar(&v.Tls, "tls", false, "Enable base TLS encryption. Does not have additional options like mTLS or client certs.")
cctx.BindFlagEnvVar(f.Lookup("tls"), "TEMPORAL_TLS")
f.StringVar(&v.TlsCertPath, "tls-cert-path", "", "Path to x509 certificate. Can't be used with --tls-cert-data.")
Expand All @@ -64,6 +65,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
cctx.BindFlagEnvVar(f.Lookup("codec-endpoint"), "TEMPORAL_CODEC_ENDPOINT")
f.StringVar(&v.CodecAuth, "codec-auth", "", "Authorization header for Codec Server requests.")
cctx.BindFlagEnvVar(f.Lookup("codec-auth"), "TEMPORAL_CODEC_AUTH")
f.StringArrayVar(&v.CodecHeader, "codec-header", nil, "HTTP headers for requests to codec server. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.")
}

type OverlapPolicyOptions struct {
Expand Down Expand Up @@ -2861,6 +2863,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command)
Expand Down Expand Up @@ -3055,6 +3058,7 @@ type TemporalWorkflowListCommand struct {
Query string
Archived bool
Limit int
PageSize int
}

func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowListCommand {
Expand All @@ -3072,6 +3076,7 @@ func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkfl
s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.")
s.Command.Flags().BoolVar(&s.Archived, "archived", false, "Limit output to archived Workflow Executions.")
s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Maximum number of Workflow Executions to display.")
s.Command.Flags().IntVar(&s.PageSize, "page-size", 0, "Maximum number of Workflow Executions to fetch at a time from the server.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down Expand Up @@ -3288,6 +3293,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork
return &s
}

type TemporalWorkflowSignalWithStartCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SharedWorkflowStartOptions
WorkflowStartOptions
PayloadInputOptions
SignalName string
SignalInput []string
SignalInputFile []string
SignalInputMeta []string
SignalInputBase64 bool
}

func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand {
var s TemporalWorkflowSignalWithStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "signal-with-start [flags]"
s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running"
if hasHighlighting {
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name")
s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.")
s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.")
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"name": "type",
"signal-type": "signal-name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowStackCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Expand Down
19 changes: 19 additions & 0 deletions temporalcli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/temporalio/cli/temporalcli/internal/printer"
"github.com/temporalio/ui-server/v2/server/version"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/failure/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/common/headers"
"google.golang.org/grpc"
Expand Down Expand Up @@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err
Details: deets,
}, nil
}

func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
if len(in) == 0 {
return nil, nil
}
// search attributes always use default dataconverter
dc := converter.GetDefaultDataConverter()
out := make(map[string]*commonpb.Payload, len(in))
for key, val := range in {
payload, err := dc.ToPayload(val)
if err != nil {
return nil, err
}
out[key] = payload
}
return out, nil
}
20 changes: 1 addition & 19 deletions temporalcli/commands.schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
schedpb "go.temporal.io/api/schedule/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/server/common/primitives/timestamp"
)

Expand Down Expand Up @@ -257,7 +256,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c
if err != nil {
return nil, err
}
untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes)
untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string {
s = strings.TrimSpace(s)
return s
}

func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
if len(in) == 0 {
return nil, nil
}
// search attributes always use default dataconverter
dc := converter.GetDefaultDataConverter()
out := make(map[string]*commonpb.Payload, len(in))
for key, val := range in {
payload, err := dc.ToPayload(val)
if err != nil {
return nil, err
}
out[key] = payload
}
return out, nil
}
14 changes: 0 additions & 14 deletions temporalcli/commands.worker.deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ type jsonDeploymentVersionInfoType struct {
Metadata map[string]*common.Payload `json:"metadata"`
}

/*
type jsonDeploymentReachabilityInfoType struct {
DeploymentInfo jsonDeploymentInfoType `json:"deploymentInfo"`
Reachability string `json:"reachability"`
LastUpdateTime time.Time `json:"lastUpdateTime"`
}
type jsonDeploymentListEntryType struct {
Deployment jsonDeploymentType `json:"deployment"`
CreateTime time.Time `json:"createTime"`
IsCurrent bool `json:"isCurrent"`
}
*/

func (s *SharedServerSuite) TestDeployment_Set_Current_Version() {
deploymentName := uuid.NewString()
buildId := uuid.NewString()
Expand Down
112 changes: 112 additions & 0 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ import (
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/types/known/durationpb"
)

func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -92,6 +98,112 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
return err
}

func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error {
if c.SharedWorkflowStartOptions.WorkflowId == "" {
return fmt.Errorf("--workflow-id flag must be provided")
}

cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions)
if err != nil {
return err
}
wfInput, err := c.buildRawInputPayloads()
if err != nil {
return err
}

signalPayloadInputOpts := PayloadInputOptions{
Input: c.SignalInput,
InputFile: c.SignalInputFile,
InputMeta: c.InputMeta,
InputBase64: c.SignalInputBase64,
}
signalInput, err := signalPayloadInputOpts.buildRawInputPayloads()
if err != nil {
return err
}

var retryPolicy *common.RetryPolicy
if wfStartOpts.RetryPolicy != nil {
retryPolicy = &commonpb.RetryPolicy{
MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval),
InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval),
BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient,
MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts,
NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes,
}
}
var memo *common.Memo
if wfStartOpts.Memo != nil {
fields, err := encodeMapToPayloads(wfStartOpts.Memo)
if err != nil {
return err
}
memo = &common.Memo{Fields: fields}
}
var searchAttr *common.SearchAttributes
if wfStartOpts.SearchAttributes != nil {
fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes)
if err != nil {
return err
}
searchAttr = &common.SearchAttributes{IndexedFields: fields}
}

if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) {
cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command")
}

// We have to use the raw signal service call here because the Go SDK's
// signal-with-start call doesn't accept multiple signal arguments.
resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution(
cctx,
&workflowservice.SignalWithStartWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
RequestId: uuid.NewString(),
WorkflowId: c.WorkflowId,
WorkflowType: &common.WorkflowType{Name: c.Type},
TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Input: wfInput,
WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout),
WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout),
WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout),
SignalName: c.SignalName,
SignalInput: signalInput,
Identity: clientIdentity(),
RetryPolicy: retryPolicy,
CronSchedule: wfStartOpts.CronSchedule,
Memo: memo,
SearchAttributes: searchAttr,
WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy,
WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy,
},
)
if err != nil {
return err
}
cctx.Printer.Println(color.MagentaString("Running execution:"))
return cctx.Printer.PrintStructured(struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Type string `json:"type"`
Namespace string `json:"namespace"`
TaskQueue string `json:"taskQueue"`
}{
WorkflowId: c.WorkflowId,
RunId: resp.RunId,
Type: c.Type,
Namespace: c.Parent.Namespace,
TaskQueue: c.TaskQueue,
}, printer.StructuredOptions{})
}

type workflowJSONResult struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Expand Down
Loading

0 comments on commit 16302b6

Please sign in to comment.