Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Typed search attribute fixes
Browse files Browse the repository at this point in the history
cretz committed Jan 31, 2024

Verified

This commit was signed with the committer’s verified signature.
navilg Navratan Lal Gupta
1 parent 6679d9c commit 8a917bc
Showing 8 changed files with 51 additions and 29 deletions.
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
@@ -392,7 +392,7 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
}

func (wc *workflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes {
return convertToTypeSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields())
return convertToTypedSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields())
}

func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) {
2 changes: 1 addition & 1 deletion internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
@@ -656,7 +656,7 @@ func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleA
memos[key] = element
}

searchAttrs := convertToTypeSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields())
searchAttrs := convertToTypedSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields())
// Create untyped list for any attribute not in the existing list
untypedSearchAttrs := map[string]*commonpb.Payload{}
for k, v := range workflow.GetSearchAttributes().GetIndexedFields() {
40 changes: 21 additions & 19 deletions internal/internal_search_attributes.go
Original file line number Diff line number Diff line change
@@ -297,7 +297,7 @@ func NewSearchAttributes(attributes ...SearchAttributeUpdate) SearchAttributes {
}

// GetString gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetString(key SearchAttributeKeyString) (string, bool) {
func (sa SearchAttributes) GetString(key SearchAttributeKeyString) (string, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return "", false
@@ -306,7 +306,7 @@ func (sa *SearchAttributes) GetString(key SearchAttributeKeyString) (string, boo
}

// GetKeyword gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetKeyword(key SearchAttributeKeyKeyword) (string, bool) {
func (sa SearchAttributes) GetKeyword(key SearchAttributeKeyKeyword) (string, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return "", false
@@ -315,7 +315,7 @@ func (sa *SearchAttributes) GetKeyword(key SearchAttributeKeyKeyword) (string, b
}

// GetBool gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetBool(key SearchAttributeKeyBool) (bool, bool) {
func (sa SearchAttributes) GetBool(key SearchAttributeKeyBool) (bool, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return false, false
@@ -324,7 +324,7 @@ func (sa *SearchAttributes) GetBool(key SearchAttributeKeyBool) (bool, bool) {
}

// GetInt64 gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetInt64(key SearchAttributeKeyInt64) (int64, bool) {
func (sa SearchAttributes) GetInt64(key SearchAttributeKeyInt64) (int64, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return 0, false
@@ -333,7 +333,7 @@ func (sa *SearchAttributes) GetInt64(key SearchAttributeKeyInt64) (int64, bool)
}

// GetFloat64 gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetFloat64(key SearchAttributeKeyFloat64) (float64, bool) {
func (sa SearchAttributes) GetFloat64(key SearchAttributeKeyFloat64) (float64, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return 0.0, false
@@ -342,7 +342,7 @@ func (sa *SearchAttributes) GetFloat64(key SearchAttributeKeyFloat64) (float64,
}

// GetTime gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetTime(key SearchAttributeKeyTime) (time.Time, bool) {
func (sa SearchAttributes) GetTime(key SearchAttributeKeyTime) (time.Time, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return time.Time{}, false
@@ -351,7 +351,7 @@ func (sa *SearchAttributes) GetTime(key SearchAttributeKeyTime) (time.Time, bool
}

// GetKeywordList gets a value for the given key and whether it was present.
func (sa *SearchAttributes) GetKeywordList(key SearchAttributeKeyKeywordList) ([]string, bool) {
func (sa SearchAttributes) GetKeywordList(key SearchAttributeKeyKeywordList) ([]string, bool) {
value, ok := sa.untypedValue[key]
if !ok {
return nil, false
@@ -362,18 +362,18 @@ func (sa *SearchAttributes) GetKeywordList(key SearchAttributeKeyKeywordList) ([
}

// ContainsKey gets whether a key is present.
func (sa *SearchAttributes) ContainsKey(key SearchAttributeKey) bool {
func (sa SearchAttributes) ContainsKey(key SearchAttributeKey) bool {
_, ok := sa.untypedValue[key]
return ok
}

// Size gets the size of the attribute collection.
func (sa *SearchAttributes) Size() int {
func (sa SearchAttributes) Size() int {
return len(sa.untypedValue)
}

// GetUntypedValues gets a copy of the collection with raw types.
func (sa *SearchAttributes) GetUntypedValues() map[SearchAttributeKey]interface{} {
func (sa SearchAttributes) GetUntypedValues() map[SearchAttributeKey]interface{} {
untypedValueCopy := make(map[SearchAttributeKey]interface{}, len(sa.untypedValue))
for key, value := range sa.untypedValue {
switch v := value.(type) {
@@ -387,7 +387,7 @@ func (sa *SearchAttributes) GetUntypedValues() map[SearchAttributeKey]interface{
}

// Copy creates an update that copies existing values.
func (sa *SearchAttributes) Copy() SearchAttributeUpdate {
func (sa SearchAttributes) Copy() SearchAttributeUpdate {
return func(s *SearchAttributes) {
untypedValues := sa.GetUntypedValues()
for key, value := range untypedValues {
@@ -431,7 +431,7 @@ func serializeTypedSearchAttributes(searchAttributes map[SearchAttributeKey]inte
}
// Server does not remove search attributes if they set a type
if payload.GetData() != nil {
payload.Metadata["type"] = []byte(enumspb.IndexedValueType_name[int32(k.GetValueType())])
payload.Metadata["type"] = []byte(k.GetValueType().String())
}
serializedAttr[k.GetName()] = payload
}
@@ -460,17 +460,19 @@ func serializeSearchAttributes(
return searchAttr, nil
}

func getIndexValue(payload *commonpb.Payload) enumspb.IndexedValueType {
return enumspb.IndexedValueType(enumspb.IndexedValueType_value[string(payload.GetMetadata()["type"][:])])
}

func convertToTypeSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes {
func convertToTypedSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes {
updates := make([]SearchAttributeUpdate, 0, len(attributes))
for key, payload := range attributes {
if payload.Data == nil {
continue
}
switch index := getIndexValue(payload); index {
valueType := enumspb.IndexedValueType(
enumspb.IndexedValueType_shorthandValue[string(payload.GetMetadata()["type"])])
// For TemporalChangeVersion, we imply the value type
if valueType == 0 && key == TemporalChangeVersion {
valueType = enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST
}
switch valueType {
case enumspb.INDEXED_VALUE_TYPE_BOOL:
attr := NewSearchAttributeKeyBool(key)
var value bool
@@ -528,7 +530,7 @@ func convertToTypeSearchAttributes(logger log.Logger, attributes map[string]*com
}
updates = append(updates, attr.ValueSet(value))
default:
logger.Warn("Unrecognized indexed value type on search attribute key", "key", key, "index", index)
logger.Warn("Unrecognized indexed value type on search attribute key", "key", key, "type", valueType)
}
}
return NewSearchAttributes(updates...)
2 changes: 1 addition & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
@@ -2060,7 +2060,7 @@ func (env *testWorkflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
}

func (env *testWorkflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes {
return convertToTypeSearchAttributes(env.logger, env.workflowInfo.SearchAttributes.GetIndexedFields())
return convertToTypedSearchAttributes(env.logger, env.workflowInfo.SearchAttributes.GetIndexedFields())
}

func (env *testWorkflowEnvironmentImpl) RegisterWorkflow(w interface{}) {
1 change: 0 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
@@ -1313,7 +1313,6 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a
// UpsertSearchAttributes is used to add or update workflow search attributes.
// The search attributes can be used in query of List/Scan/Count workflow APIs.
// The key and value type must be registered on temporal server side;
// The value has to deterministic when replay;
// The value has to be Json serializable.
// UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code:
//
2 changes: 2 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
@@ -1223,6 +1223,8 @@ func (ts *IntegrationTestSuite) TestWorkflowWithParallelMutableSideEffects() {

func (ts *IntegrationTestSuite) TestWorkflowTypedSearchAttributes() {
options := ts.startWorkflowOptions("test-wf-typed-search-attributes")
// Need to disable eager workflow start until https://github.com/temporalio/temporal/pull/5124 fixed
options.EnableEagerStart = false
// Create initial set of search attributes
stringKey := temporal.NewSearchAttributeKeyString("CustomStringField")
options.TypedSearchAttributes = temporal.NewSearchAttributes(stringKey.ValueSet("CustomStringFieldValue"))
27 changes: 24 additions & 3 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
@@ -2305,7 +2305,28 @@ func (w *Workflows) ScheduleTypedSearchAttributesWorkflow(ctx workflow.Context)
}

func (w *Workflows) UpsertTypedSearchAttributesWorkflow(ctx workflow.Context, sleepBetweenUpsert bool) error {
//
// Do a get version and confirmed patched attribute. First confirm change
// version not there.
changeKey := temporal.NewSearchAttributeKeyKeywordList("TemporalChangeVersion")
if workflow.GetInfo(ctx).SearchAttributes.GetIndexedFields()["TemporalChangeVersion"] != nil {
return fmt.Errorf("change version unexpectedly present")
} else if _, ok := workflow.GetTypedSearchAttributes(ctx).GetKeywordList(changeKey); ok {
return fmt.Errorf("change version unexpectedly present")
}
// Now do a get version and confirm it is set afterwards
_ = workflow.GetVersion(ctx, "some-id-1", workflow.DefaultVersion, 0)
if sleepBetweenUpsert {
_ = workflow.Sleep(ctx, 1*time.Millisecond)
}
if p := workflow.GetInfo(ctx).SearchAttributes.GetIndexedFields()["TemporalChangeVersion"]; p == nil {
return fmt.Errorf("change version not present")
} else if string(p.Data) != `["some-id-1-0"]` {
return fmt.Errorf("change version invalid, got: %s", p.Data)
} else if s, _ := workflow.GetTypedSearchAttributes(ctx).GetKeywordList(changeKey); len(s) != 1 || s[0] != "some-id-1-0" {
return fmt.Errorf("change version invalid: got: %v", s)
}

// Check string attribute
attributes := workflow.GetTypedSearchAttributes(ctx)
stringKey := temporal.NewSearchAttributeKeyString("CustomStringField")
value, ok := attributes.GetString(stringKey)
@@ -2320,7 +2341,7 @@ func (w *Workflows) UpsertTypedSearchAttributesWorkflow(ctx workflow.Context, sl
return err
}
if sleepBetweenUpsert {
_ = workflow.Sleep(ctx, 1*time.Second)
_ = workflow.Sleep(ctx, 1*time.Millisecond)
}

// Verify the search attributes is added
@@ -2336,7 +2357,7 @@ func (w *Workflows) UpsertTypedSearchAttributesWorkflow(ctx workflow.Context, sl
return err
}
if sleepBetweenUpsert {
_ = workflow.Sleep(ctx, 1*time.Second)
_ = workflow.Sleep(ctx, 1*time.Millisecond)
}

// Verify the search attributes is removed
4 changes: 1 addition & 3 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
@@ -549,7 +549,6 @@ func GetLastError(ctx Context) error {
// UpsertSearchAttributes is used to add or update workflow search attributes.
// The search attributes can be used in query of List/Scan/Count workflow APIs.
// The key and value type must be registered on temporal server side;
// The value has to deterministic when replay;
// The value has to be Json serializable.
// UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code:
//
@@ -585,8 +584,7 @@ func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) erro
}

// UpsertTypedSearchAttributes is used to add, update, or remove workflow search attributes. The search attributes can
// be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on temporal server side;
// The value has to deterministic when replay; The value has to be Json serializable.
// be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on temporal server side.
// UpsertTypedSearchAttributes will merge attributes to existing map in workflow, for example workflow code:
//
// var intKey = temporal.NewSearchAttributeKeyInt64("CustomIntField")

0 comments on commit 8a917bc

Please sign in to comment.