diff --git a/aggregatedpool/handler.go b/aggregatedpool/handler.go index f8a943d..b174940 100644 --- a/aggregatedpool/handler.go +++ b/aggregatedpool/handler.go @@ -2,6 +2,7 @@ package aggregatedpool import ( "context" + "fmt" "strconv" "sync/atomic" "time" @@ -304,6 +305,104 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error { return errors.E(op, err) } + case *internal.UpsertWorkflowTypedSearchAttributes: + wp.log.Debug("upsert typed search attributes request", zap.Uint64("ID", msg.ID)) + var sau []temporal.SearchAttributeUpdate + + for k, v := range command.SearchAttributes { + switch v.Type { + case internal.BoolType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.(bool); ok { + sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueSet(tt)) + } else { + wp.log.Warn("field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value)) + } + + case internal.FloatType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.(float64); ok { + sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(tt)) + } else { + wp.log.Warn("field value is not a float64 type", zap.String("key", k), zap.Any("value", v.Value)) + } + + case internal.IntType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.(int); ok { + sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(tt))) + } else { + wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value)) + } + case internal.KeywordType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.(string); ok { + sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(tt)) + } else { + wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value)) + } + case internal.KeywordListType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.([]string); ok { + sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt)) + } else { + wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value)) + } + case internal.StringType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.(string); ok { + sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueSet(tt)) + } else { + wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value)) + } + case internal.DatetimeType: + if v.Value == nil { + wp.log.Warn("field value is not set", zap.String("key", k)) + continue + } + + if tt, ok := v.Value.(string); ok { + tm, err := time.Parse(time.RFC3339, tt) + if err != nil { + return errors.E(op, fmt.Errorf("failed to parse time into RFC3339: %w", err)) + } + + sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueSet(tm)) + } else { + wp.log.Warn("bool field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value)) + } + } + } + + err := wp.env.UpsertTypedSearchAttributes(temporal.NewSearchAttributes(sau...)) + if err != nil { + return errors.E(op, err) + } + case *internal.SignalExternalWorkflow: wp.log.Debug("signal external workflow request", zap.Uint64("ID", msg.ID)) wp.env.SignalExternalWorkflow( diff --git a/go.work.sum b/go.work.sum index a7a6615..2e890e7 100644 --- a/go.work.sum +++ b/go.work.sum @@ -3208,7 +3208,6 @@ go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08 go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI= go.opentelemetry.io/otel/sdk/metric v1.19.0/go.mod h1:XjG0jQyFJrv2PbMvwND7LwCEhsJzCzV5210euduKcKY= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= @@ -3301,6 +3300,7 @@ golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7 golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/internal/protocol.go b/internal/protocol.go index 43878d4..62033ba 100644 --- a/internal/protocol.go +++ b/internal/protocol.go @@ -31,14 +31,15 @@ const ( executeChildWorkflowCommand = "ExecuteChildWorkflow" getChildWorkflowExecutionCommand = "GetChildWorkflowExecution" - newTimerCommand = "NewTimer" - sideEffectCommand = "SideEffect" - getVersionCommand = "GetVersion" - completeWorkflowCommand = "CompleteWorkflow" - completeUpdateCommand = "UpdateCompleted" - validateUpdateCommand = "UpdateValidated" - continueAsNewCommand = "ContinueAsNew" - upsertWorkflowSearchAttributesCommand = "UpsertWorkflowSearchAttributes" + newTimerCommand = "NewTimer" + sideEffectCommand = "SideEffect" + getVersionCommand = "GetVersion" + completeWorkflowCommand = "CompleteWorkflow" + completeUpdateCommand = "UpdateCompleted" + validateUpdateCommand = "UpdateValidated" + continueAsNewCommand = "ContinueAsNew" + upsertWorkflowSearchAttributesCommand = "UpsertWorkflowSearchAttributes" + upsertWorkflowTypedSearchAttributesCommand = "UpsertWorkflowTypedSearchAttributes" signalExternalWorkflowCommand = "SignalExternalWorkflow" cancelExternalWorkflowCommand = "CancelExternalWorkflow" @@ -49,6 +50,19 @@ const ( panicCommand = "Panic" ) +// TypedSearchAttributeTypes +type TypedSearchAttributeType string + +const ( + BoolType TypedSearchAttributeType = "bool" + FloatType TypedSearchAttributeType = "float64" + IntType TypedSearchAttributeType = "int64" + KeywordType TypedSearchAttributeType = "keyword" + KeywordListType TypedSearchAttributeType = "keyword_list" + StringType TypedSearchAttributeType = "string" + DatetimeType TypedSearchAttributeType = "datetime" +) + // Context provides worker information about currently. Context can be empty for server-level commands. type Context struct { // TaskQueue associates the message batch with the specific task queue in an underlying worker. @@ -275,6 +289,16 @@ type UpsertWorkflowSearchAttributes struct { SearchAttributes map[string]any `json:"searchAttributes"` } +type TypedSearchAttribute struct { + Type TypedSearchAttributeType `json:"type"` + Value any `json:"value"` +} + +// UpsertWorkflowTypedSearchAttributes allows to upsert search attributes +type UpsertWorkflowTypedSearchAttributes struct { + SearchAttributes map[string]*TypedSearchAttribute `json:"search_attributes"` +} + // SignalExternalWorkflow sends signal to external workflow. type SignalExternalWorkflow struct { Namespace string `json:"namespace"` @@ -438,6 +462,8 @@ func CommandName(cmd any) (string, error) { return continueAsNewCommand, nil case UpsertWorkflowSearchAttributes, *UpsertWorkflowSearchAttributes: return upsertWorkflowSearchAttributesCommand, nil + case UpsertWorkflowTypedSearchAttributes, *UpsertWorkflowTypedSearchAttributes: + return upsertWorkflowTypedSearchAttributesCommand, nil case SignalExternalWorkflow, *SignalExternalWorkflow: return signalExternalWorkflowCommand, nil case CancelExternalWorkflow, *CancelExternalWorkflow: @@ -517,6 +543,9 @@ func InitCommand(name string) (any, error) { case upsertWorkflowSearchAttributesCommand: return &UpsertWorkflowSearchAttributes{}, nil + case upsertWorkflowTypedSearchAttributesCommand: + return &UpsertWorkflowTypedSearchAttributes{}, nil + case signalExternalWorkflowCommand: return &SignalExternalWorkflow{}, nil