Skip to content

Commit

Permalink
[#605]: feature: TSA available on the StartWorkflow stage
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jan 16, 2025
2 parents 5cce3d5 + 914160d commit 6d9a160
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 49 deletions.
43 changes: 38 additions & 5 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (wp *Workflow) handleCancel() {

// schedule the signal processing
func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *commonpb.Header) error {
wp.log.Debug("signal request", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", name))
wp.mq.PushCommand(
internal.InvokeSignal{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Expand All @@ -114,6 +115,9 @@ func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *
// Handle query in blocking mode.
func (wp *Workflow) handleQuery(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error) {
const op = errors.Op("workflow_process_handle_query")

wp.log.Debug("query request", zap.String("RunID", wp.env.WorkflowInfo().WorkflowExecution.RunID), zap.String("name", queryType))

result, err := wp.runCommand(internal.InvokeQuery{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Name: queryType,
Expand Down Expand Up @@ -355,11 +359,30 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
continue
}

if tt, ok := v.Value.(int); ok {
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(tt)))
} else {
switch ti := v.Value.(type) {
case float64:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int64:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(ti))
case int32:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int16:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case int8:
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(int64(ti)))
case string:
i, err := strconv.ParseInt(ti, 10, 64)
if err != nil {
wp.log.Warn("failed to parse int", zap.Error(err))
continue
}
sau = append(sau, temporal.NewSearchAttributeKeyInt64(k).ValueSet(i))
default:
wp.log.Warn("field value is not an int type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.KeywordType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyKeyword(k).ValueUnset())
Expand Down Expand Up @@ -387,11 +410,21 @@ func (wp *Workflow) handleMessage(msg *internal.Message) error {
continue
}

if tt, ok := v.Value.([]string); ok {
switch tt := v.Value.(type) {
case []string:
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(tt))
} else {
case []any:
var res []string
for _, v := range tt {
if s, ok := v.(string); ok {
res = append(res, s)
}
}
sau = append(sau, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(res))
default:
wp.log.Warn("field value is not a []string (strings array) type", zap.String("key", k), zap.Any("value", v.Value))
}

case internal.StringType:
if v.Operation == internal.TypedSearchAttributeOperationUnset {
sau = append(sau, temporal.NewSearchAttributeKeyString(k).ValueUnset())
Expand Down
132 changes: 125 additions & 7 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aggregatedpool
import (
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/temporalio/roadrunner-temporal/v5/queue"
"github.com/temporalio/roadrunner-temporal/v5/registry"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
temporalClient "go.temporal.io/sdk/client"
bindings "go.temporal.io/sdk/internalbindings"
"go.uber.org/zap"
Expand Down Expand Up @@ -131,23 +133,139 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
env.RegisterQueryHandler(wp.handleQuery)
env.RegisterUpdateHandler(wp.handleUpdate)

var lastCompletion = bindings.GetLastCompletionResult(env)
var lastCompletionOffset = 0
// check if we have some TSA
tsa := env.TypedSearchAttributes()
// start workflow command
stwfcmd := internal.StartWorkflow{
Info: env.WorkflowInfo(),
}

// search attributes types are:
/*
INDEXED_VALUE_TYPE_TEXT IndexedValueType = 1
INDEXED_VALUE_TYPE_KEYWORD IndexedValueType = 2
INDEXED_VALUE_TYPE_INT IndexedValueType = 3
INDEXED_VALUE_TYPE_DOUBLE IndexedValueType = 4
INDEXED_VALUE_TYPE_BOOL IndexedValueType = 5
INDEXED_VALUE_TYPE_DATETIME IndexedValueType = 6
INDEXED_VALUE_TYPE_KEYWORD_LIST IndexedValueType = 7
*/
// only process if there're values, obviously
if tsa.Size() > 0 {
untuped := tsa.GetUntypedValues()
tsaParsed := make(map[string]*internal.TypedSearchAttribute, tsa.Size())
for k, v := range untuped {
vt := k.GetValueType()
switch vt {
// just for the linters, should be never reached
case enumspb.INDEXED_VALUE_TYPE_UNSPECIFIED:
continue
case enumspb.INDEXED_VALUE_TYPE_TEXT:
str, ok := v.(string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a string", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.StringType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_KEYWORD:
str, ok := v.(string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a string[keyword]", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.KeywordType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_INT:
switch tt := v.(type) {
case int:
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: tt,
}
case int64:
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: tt,
}
case string:
res, err := strconv.Atoi(tt)
if err != nil {
wp.log.Warn("typed search attribute found, but it is not an int", zap.Error(err), zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.IntType,
Value: res,
}
default:
wp.log.Warn("typed search attribute found, but it is not an int", zap.String("key", k.GetName()))
continue
}
case enumspb.INDEXED_VALUE_TYPE_DOUBLE:
str, ok := v.(float64)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a float64", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.FloatType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_BOOL:
str, ok := v.(bool)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a bool", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.BoolType,
Value: str,
}
case enumspb.INDEXED_VALUE_TYPE_DATETIME:
str, ok := v.(time.Time)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a datetime", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.DatetimeType,
Value: str.Format(time.RFC3339),
}
case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST:
str, ok := v.([]string)
if !ok {
wp.log.Warn("typed search attribute found, but it is not a []string", zap.String("key", k.GetName()))
continue
}
tsaParsed[k.GetName()] = &internal.TypedSearchAttribute{
Type: internal.KeywordListType,
Value: str,
}
}
}

// set typed search attributes
stwfcmd.SearchAttributes = tsaParsed
}

var lastCompletion = bindings.GetLastCompletionResult(env)
if lastCompletion != nil && len(lastCompletion.Payloads) != 0 {
if input == nil {
input = &commonpb.Payloads{Payloads: []*commonpb.Payload{}}
}

input.Payloads = append(input.Payloads, lastCompletion.Payloads...)
lastCompletionOffset = len(lastCompletion.Payloads)
stwfcmd.LastCompletion = len(lastCompletion.Payloads)
}

wp.mq.PushCommand(
internal.StartWorkflow{
Info: env.WorkflowInfo(),
LastCompletion: lastCompletionOffset,
},
stwfcmd,
input,
wp.header,
)
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ toolchain go1.23.4
require (
github.com/goccy/go-json v0.10.4
github.com/google/uuid v1.6.0
github.com/roadrunner-server/api/v4 v4.17.0
github.com/roadrunner-server/api/v4 v4.18.1
github.com/roadrunner-server/endure/v2 v2.6.1
github.com/roadrunner-server/errors v1.4.1
github.com/roadrunner-server/events v1.0.1
github.com/roadrunner-server/pool v1.1.2
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally/v4 v4.1.17-0.20240412215630-22fe011f5ff0
go.temporal.io/api v1.43.1
go.temporal.io/sdk v1.31.0
go.temporal.io/sdk v1.32.1
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/server v1.26.2
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.36.2
google.golang.org/protobuf v1.36.3
)

require (
Expand All @@ -42,7 +42,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nexus-rpc/sdk-go v0.1.0 // indirect
github.com/nexus-rpc/sdk-go v0.1.1 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand All @@ -67,8 +67,8 @@ require (
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.69.4
gopkg.in/yaml.v3 v3.0.1 // indirect
)
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nexus-rpc/sdk-go v0.1.0 h1:PUL/0vEY1//WnqyEHT5ao4LBRQ6MeNUihmnNGn0xMWY=
github.com/nexus-rpc/sdk-go v0.1.0/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/nexus-rpc/sdk-go v0.1.1 h1:S63hhn4CpmwyoCUn8nVLfuKV+6sA/7hR57ohIQajDP0=
github.com/nexus-rpc/sdk-go v0.1.1/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
Expand Down Expand Up @@ -173,8 +173,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/roadrunner-server/api/v4 v4.17.0 h1:SVmqWfMCKcvjrpluk+mA1o1D8PT1uXHM6AWykFe6i0I=
github.com/roadrunner-server/api/v4 v4.17.0/go.mod h1:ALjb4nL64DM9Cm/vTyfWyGt1b3m1lO4fg11YUq6NrqY=
github.com/roadrunner-server/api/v4 v4.18.1 h1:IA01DjK7wPXkSAUc0Bg2d3hmc/XMI0DYn0BKbBju9bI=
github.com/roadrunner-server/api/v4 v4.18.1/go.mod h1:VdCLIpnjKFHNspqRlu5zfPvrDS9eLR7fYy5K9HYKNkE=
github.com/roadrunner-server/endure/v2 v2.6.1 h1:vx+3ayn8HXnyeCcjKWwe+DfPrwL5sOQobugzCId4F7k=
github.com/roadrunner-server/endure/v2 v2.6.1/go.mod h1:1rfvKpSzUW774T3yHm3IEHPQ3SX/lU/ZTYQykfzqMF0=
github.com/roadrunner-server/errors v1.4.1 h1:LKNeaCGiwd3t8IaL840ZNF3UA9yDQlpvHnKddnh0YRQ=
Expand Down Expand Up @@ -243,8 +243,8 @@ go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.43.1 h1:44Q12pUczfGkcAwZtJNhfv3+L6RFzL3kNk547/r8QY8=
go.temporal.io/api v1.43.1/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8=
go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
go.temporal.io/sdk v1.32.1 h1:slA8prhdFr4lxpsTcRusWVitD/cGjELfKUh0mBj73SU=
go.temporal.io/sdk v1.32.1/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
go.temporal.io/sdk/contrib/tally v0.2.0 h1:XnTJIQcjOv+WuCJ1u8Ve2nq+s2H4i/fys34MnWDRrOo=
go.temporal.io/sdk/contrib/tally v0.2.0/go.mod h1:1kpSuCms/tHeJQDPuuKkaBsMqfHnIIRnCtUYlPNXxuE=
go.temporal.io/server v1.26.2 h1:vDW11lxslYPlGDbQklWi/tqbkVZ2ExtRO1jNjvZmUUI=
Expand Down Expand Up @@ -377,10 +377,10 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422 h1:GVIKPyP/kLIyVOgOnTwFOrvQaQUzOzGMCxgFUOEmm24=
google.golang.org/genproto/googleapis/api v0.0.0-20250106144421-5f5ef82da422/go.mod h1:b6h1vNKhxaSoEI+5jc3PJUCustfli/mRab7295pY7rw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422 h1:3UsHvIr4Wc2aW4brOaSCmcxh9ksica6fHEr8P1XhkYw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250106144421-5f5ef82da422/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -404,8 +404,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
Loading

0 comments on commit 6d9a160

Please sign in to comment.