Skip to content

Commit

Permalink
feature: support typed search attributes
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Dec 23, 2024
1 parent cf9cf78 commit 2b64c5c
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 9 deletions.
99 changes: 99 additions & 0 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregatedpool

import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -304,6 +305,104 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
return errors.E(op, err)
}

case *internal.UpsertWorkflowTypedSearchAttributes:
wp.log.Debug("upsert typed search attributes request", zap.Uint64("ID", msg.ID))
var sau []temporal.SearchAttributeUpdate

for k, v := range command.SearchAttributes {
switch v.Type {
case internal.BoolType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(bool); ok {
sau = append(sau, temporal.NewSearchAttributeKeyBool(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.FloatType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(float64); ok {
sau = append(sau, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a float64 type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.IntType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(int); ok {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(tt)))
} else {
wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.KeywordListType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.([]string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.StringType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueSet(tt))
} else {
wp.log.Warn("field value is not a string type", zap.String("key", k), zap.Any("value", v.Value))
}
case internal.DatetimeType:
if v.Value == nil {
wp.log.Warn("field value is not set", zap.String("key", k))
continue
}

if tt, ok := v.Value.(string); ok {
tm, err := time.Parse(time.RFC3339, tt)
if err != nil {
return errors.E(op, fmt.Errorf("failed to parse time into RFC3339: %w", err))
}

sau = append(sau, temporal.NewSearchAttributeKeyTime(k).ValueSet(tm))
} else {
wp.log.Warn("bool field value is not a bool type", zap.String("key", k), zap.Any("value", v.Value))
}
}
}

err := wp.env.UpsertTypedSearchAttributes(temporal.NewSearchAttributes(sau...))
if err != nil {
return errors.E(op, err)
}

case *internal.SignalExternalWorkflow:
wp.log.Debug("signal external workflow request", zap.Uint64("ID", msg.ID))
wp.env.SignalExternalWorkflow(
Expand Down
2 changes: 1 addition & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3208,7 +3208,6 @@ go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08
go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI=
go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI=
go.opentelemetry.io/otel/sdk/metric v1.19.0/go.mod h1:XjG0jQyFJrv2PbMvwND7LwCEhsJzCzV5210euduKcKY=
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
Expand Down Expand Up @@ -3301,6 +3300,7 @@ golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down
45 changes: 37 additions & 8 deletions internal/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ const (
executeChildWorkflowCommand = "ExecuteChildWorkflow"
getChildWorkflowExecutionCommand = "GetChildWorkflowExecution"

newTimerCommand = "NewTimer"
sideEffectCommand = "SideEffect"
getVersionCommand = "GetVersion"
completeWorkflowCommand = "CompleteWorkflow"
completeUpdateCommand = "UpdateCompleted"
validateUpdateCommand = "UpdateValidated"
continueAsNewCommand = "ContinueAsNew"
upsertWorkflowSearchAttributesCommand = "UpsertWorkflowSearchAttributes"
newTimerCommand = "NewTimer"
sideEffectCommand = "SideEffect"
getVersionCommand = "GetVersion"
completeWorkflowCommand = "CompleteWorkflow"
completeUpdateCommand = "UpdateCompleted"
validateUpdateCommand = "UpdateValidated"
continueAsNewCommand = "ContinueAsNew"
upsertWorkflowSearchAttributesCommand = "UpsertWorkflowSearchAttributes"
upsertWorkflowTypedSearchAttributesCommand = "UpsertWorkflowTypedSearchAttributes"

signalExternalWorkflowCommand = "SignalExternalWorkflow"
cancelExternalWorkflowCommand = "CancelExternalWorkflow"
Expand All @@ -49,6 +50,19 @@ const (
panicCommand = "Panic"
)

// TypedSearchAttributeTypes
type TypedSearchAttributeType string

const (
BoolType TypedSearchAttributeType = "bool"
FloatType TypedSearchAttributeType = "float64"
IntType TypedSearchAttributeType = "int64"
KeywordType TypedSearchAttributeType = "keyword"
KeywordListType TypedSearchAttributeType = "keyword_list"
StringType TypedSearchAttributeType = "string"
DatetimeType TypedSearchAttributeType = "datetime"
)

// Context provides worker information about currently. Context can be empty for server-level commands.
type Context struct {
// TaskQueue associates the message batch with the specific task queue in an underlying worker.
Expand Down Expand Up @@ -275,6 +289,16 @@ type UpsertWorkflowSearchAttributes struct {
SearchAttributes map[string]any `json:"searchAttributes"`
}

type TypedSearchAttribute struct {
Type TypedSearchAttributeType `json:"type"`
Value any `json:"value"`
}

// UpsertWorkflowTypedSearchAttributes allows to upsert search attributes
type UpsertWorkflowTypedSearchAttributes struct {
SearchAttributes map[string]*TypedSearchAttribute `json:"search_attributes"`
}

// SignalExternalWorkflow sends signal to external workflow.
type SignalExternalWorkflow struct {
Namespace string `json:"namespace"`
Expand Down Expand Up @@ -438,6 +462,8 @@ func CommandName(cmd any) (string, error) {
return continueAsNewCommand, nil
case UpsertWorkflowSearchAttributes, *UpsertWorkflowSearchAttributes:
return upsertWorkflowSearchAttributesCommand, nil
case UpsertWorkflowTypedSearchAttributes, *UpsertWorkflowTypedSearchAttributes:
return upsertWorkflowTypedSearchAttributesCommand, nil
case SignalExternalWorkflow, *SignalExternalWorkflow:
return signalExternalWorkflowCommand, nil
case CancelExternalWorkflow, *CancelExternalWorkflow:
Expand Down Expand Up @@ -517,6 +543,9 @@ func InitCommand(name string) (any, error) {
case upsertWorkflowSearchAttributesCommand:
return &UpsertWorkflowSearchAttributes{}, nil

case upsertWorkflowTypedSearchAttributesCommand:
return &UpsertWorkflowTypedSearchAttributes{}, nil

case signalExternalWorkflowCommand:
return &SignalExternalWorkflow{}, nil

Expand Down

0 comments on commit 2b64c5c

Please sign in to comment.