From 6546a31f5310e5f21617c1d14c2bb59f8b944cd6 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 30 Jan 2025 22:27:52 +0000 Subject: [PATCH 01/10] Upgrade ringpop-go to fix locking bug (#7191) ## What changed? Pick up https://github.com/temporalio/ringpop-go/pull/17 ## Why? Fix bug ## How did you test it? Ringpop unit tests --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bf262cdf6382..f0faefd33937 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/sony/gobreaker v1.0.0 github.com/stretchr/testify v1.10.0 - github.com/temporalio/ringpop-go v0.0.0-20241119001152-e505ebd8f887 + github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7 github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb github.com/temporalio/tchannel-go v1.22.1-0.20240528171429-1db37fdea938 github.com/temporalio/tctl-kit v0.0.0-20230328153839-577f95d16fa0 diff --git a/go.sum b/go.sum index e2d10384dc90..24754d35b729 100644 --- a/go.sum +++ b/go.sum @@ -279,8 +279,8 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/temporalio/ringpop-go v0.0.0-20241119001152-e505ebd8f887 h1:08Y1jDl4UKVu+TiQHIVKcW6TKQaHl15vBKkcZ094/SA= -github.com/temporalio/ringpop-go v0.0.0-20241119001152-e505ebd8f887/go.mod h1:RE+CHmY+kOZQk47AQaVzwrGmxpflnLgTd6EOK0853j4= +github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7 h1:lEebX/hZss+TSH3EBwhztnBavJVj7pWGJOH8UgKHS0w= +github.com/temporalio/ringpop-go v0.0.0-20250130211428-b97329e994f7/go.mod h1:RE+CHmY+kOZQk47AQaVzwrGmxpflnLgTd6EOK0853j4= github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb h1:YzHH/U/dN7vMP+glybzcXRTczTrgfdRisNTzAj7La04= github.com/temporalio/sqlparser v0.0.0-20231115171017-f4060bcfa6cb/go.mod h1:143qKdh3G45IgV9p+gbAwp3ikRDI8mxsijFiXDfuxsw= github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b/go.mod h1:c+V9Z/ZgkzAdyGvHrvC5AsXgN+M9Qwey04cBdKYzV7U= From cccd932cfa2fc6fd73f28aca8454d519977a1d54 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 30 Jan 2025 22:29:46 +0000 Subject: [PATCH 02/10] Add history.alignMembershipChange setting (#6510) ## What changed? Add `history.alignMembershipChange` to allow history to use aligned membership changes also. ## Why? Better behavior during restarts/deployments. ## How did you test it? not yet --- common/dynamicconfig/constants.go | 6 ++++ common/membership/ringpop/factory.go | 2 ++ service/history/configs/config.go | 2 ++ service/history/service.go | 45 +++++++++++++++++++++------- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 398341d6df9e..3db0e0d3ff55 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1347,6 +1347,12 @@ This feature is still under development and should NOT be enabled.`, 0*time.Second, `HistoryStartupMembershipJoinDelay is the duration a history instance waits before joining membership after starting.`, + ) + HistoryAlignMembershipChange = NewGlobalDurationSetting( + "history.alignMembershipChange", + 0*time.Second, + `HistoryAlignMembershipChange is a duration to align history's membership changes to. +This can help reduce effects of shard movement.`, ) HistoryShutdownDrainDuration = NewGlobalDurationSetting( "history.shutdownDrainDuration", diff --git a/common/membership/ringpop/factory.go b/common/membership/ringpop/factory.go index 4b0b4e8c6354..2c80c22ae88f 100644 --- a/common/membership/ringpop/factory.go +++ b/common/membership/ringpop/factory.go @@ -156,6 +156,8 @@ func (factory *factory) getJoinTime(maxPropagationTime time.Duration) time.Time switch factory.ServiceName { case primitives.MatchingService: alignTime = dynamicconfig.MatchingAlignMembershipChange.Get(factory.DC)() + case primitives.HistoryService: + alignTime = dynamicconfig.HistoryAlignMembershipChange.Get(factory.DC)() } if alignTime == 0 { return time.Time{} diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 22ed5eee8b6a..1600089e085b 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -63,6 +63,7 @@ type Config struct { EmitShardLagLog dynamicconfig.BoolPropertyFn ThrottledLogRPS dynamicconfig.IntPropertyFn EnableStickyQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter + AlignMembershipChange dynamicconfig.DurationPropertyFn ShutdownDrainDuration dynamicconfig.DurationPropertyFn StartupMembershipJoinDelay dynamicconfig.DurationPropertyFn @@ -403,6 +404,7 @@ func NewConfig( PersistencePerShardNamespaceMaxQPS: dynamicconfig.HistoryPersistencePerShardNamespaceMaxQPS.Get(dc), PersistenceDynamicRateLimitingParams: dynamicconfig.HistoryPersistenceDynamicRateLimitingParams.Get(dc), PersistenceQPSBurstRatio: dynamicconfig.PersistenceQPSBurstRatio.Get(dc), + AlignMembershipChange: dynamicconfig.HistoryAlignMembershipChange.Get(dc), ShutdownDrainDuration: dynamicconfig.HistoryShutdownDrainDuration.Get(dc), StartupMembershipJoinDelay: dynamicconfig.HistoryStartupMembershipJoinDelay.Get(dc), AllowResetWithPendingChildren: dynamicconfig.AllowResetWithPendingChildren.Get(dc), diff --git a/service/history/service.go b/service/history/service.go index 009b12926d56..bbaaa5dfd69f 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/configs" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -119,22 +120,46 @@ func (s *Service) Start() { // Stop stops the service func (s *Service) Stop() { - s.logger.Info("ShutdownHandler: Evicting self from membership ring") - _ = s.membershipMonitor.EvictSelf() - - if delay := s.config.ShutdownDrainDuration(); delay > 0 { - s.logger.Info("ShutdownHandler: delaying for shutdown drain", - tag.NewDurationTag("shutdownDrainDuration", delay)) - time.Sleep(delay) + // remove self from membership ring and wait for traffic to drain + var err error + var waitTime time.Duration + if align := s.config.AlignMembershipChange(); align > 0 { + propagation := s.membershipMonitor.ApproximateMaxPropagationTime() + asOf := util.NextAlignedTime(time.Now().Add(propagation), align) + s.logger.Info("ShutdownHandler: Evicting self from membership ring as of", tag.Timestamp(asOf)) + waitTime, err = s.membershipMonitor.EvictSelfAt(asOf) + } else { + s.logger.Info("ShutdownHandler: Evicting self from membership ring immediately") + err = s.membershipMonitor.EvictSelf() + } + if err != nil { + s.logger.Error("ShutdownHandler: Failed to evict self from membership ring", tag.Error(err)) } - s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING) + s.logger.Info("ShutdownHandler: Waiting for drain") + if waitTime > 0 { + time.Sleep( + waitTime + // wait for membership change + s.config.ShardLingerTimeLimit() + // after membership change shards may linger before close + s.config.ShardFinalizerTimeout(), // and then take this long to run a finalizer + ) + } else { + time.Sleep(s.config.ShutdownDrainDuration()) + } + + // Stop shard controller. We should have waited long enough for all shards to realize they + // lost ownership and close, but if not, this will definitely close them. s.logger.Info("ShutdownHandler: Initiating shardController shutdown") s.handler.controller.Stop() - // TODO: Change this to GracefulStop when integration tests are refactored. - s.server.Stop() + // All grpc handlers should be cancelled now. Give them a little time to return. + t := time.AfterFunc(2*time.Second, func() { + s.logger.Info("ShutdownHandler: Drain time expired, stopping all traffic") + s.server.Stop() + }) + s.server.GracefulStop() + t.Stop() s.handler.Stop() s.visibilityManager.Close() From 7d2da814e952844984a3bc741c80939ce1d21cae Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 31 Jan 2025 02:25:14 +0000 Subject: [PATCH 03/10] Don't return BUSY_WORKFLOW on polls (#7197) ## What changed? Follow-up from #7093: exclude `RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW` from early return to client. ## Why? Busy workflow is really a "workflow" scope error, not namespace or system scope, so the argument that we shouldn't retry immediately doesn't apply. ## How did you test it? there weren't specific tests for this --- service/matching/matching_engine.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index b77a50ddf392..4a3dfa2632c1 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -652,7 +652,7 @@ pollLoop: } resp, err := e.recordWorkflowTaskStarted(ctx, requestClone, task) if err != nil { - switch err.(type) { + switch err := err.(type) { case *serviceerror.Internal, *serviceerror.DataLoss: if e.config.MatchingDropNonRetryableTasks() { e.nonRetryableErrorsDropTask(task, taskQueueName, err) @@ -707,8 +707,11 @@ pollLoop: case *serviceerror.ResourceExhausted: // If history returns one ResourceExhausted, it's likely to return more if we retry // immediately. Instead, return the error to the client which will back off. + // BUSY_WORKFLOW is limited to one workflow and is okay to retry. task.finish(err, false) - return nil, err + if err.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { + return nil, err + } default: task.finish(err, false) if err.Error() == common.ErrNamespaceHandover.Error() { @@ -842,7 +845,7 @@ pollLoop: } resp, err := e.recordActivityTaskStarted(ctx, requestClone, task) if err != nil { - switch err.(type) { + switch err := err.(type) { case *serviceerror.Internal, *serviceerror.DataLoss: if e.config.MatchingDropNonRetryableTasks() { e.nonRetryableErrorsDropTask(task, taskQueueName, err) @@ -910,8 +913,11 @@ pollLoop: case *serviceerror.ResourceExhausted: // If history returns one ResourceExhausted, it's likely to return more if we retry // immediately. Instead, return the error to the client which will back off. + // BUSY_WORKFLOW is limited to one workflow and is okay to retry. task.finish(err, false) - return nil, err + if err.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW { + return nil, err + } default: task.finish(err, false) if err.Error() == common.ErrNamespaceHandover.Error() { From 32df386eb2a3a9350f7cc1a75969ccc48ce305be Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 30 Jan 2025 20:48:37 -0800 Subject: [PATCH 04/10] OTEL add grpc headers (#7192) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changed? Track grpc headers in OTEL _debug mode_. ## Why? Improve debugging experience. ## How did you test it? Checked in Grafana. Screenshot 2025-01-30 at 2 29 24 PM ## Potential risks Only enabled in debug mode. ## Documentation ## Is hotfix candidate? --- common/telemetry/grpc.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/common/telemetry/grpc.go b/common/telemetry/grpc.go index e7d94b43007f..e8b81a3a1546 100644 --- a/common/telemetry/grpc.go +++ b/common/telemetry/grpc.go @@ -26,6 +26,7 @@ package telemetry import ( "context" + "time" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" @@ -134,6 +135,17 @@ func (c *customServerStatsHandler) HandleRPC(ctx context.Context, stat stats.RPC c.wrapped.HandleRPC(ctx, stat) switch s := stat.(type) { + case *stats.InHeader: + if c.isDebug { + span := trace.SpanFromContext(ctx) + for key, values := range s.Header { + span.SetAttributes(attribute.StringSlice("rpc.request.headers."+key, values)) + } + deadline, ok := ctx.Deadline() + if ok { + span.SetAttributes(attribute.String("rpc.request.timeout", deadline.Format(time.RFC3339Nano))) + } + } case *stats.InPayload: span := trace.SpanFromContext(ctx) c.annotateTags(ctx, span, s.Payload) @@ -147,6 +159,13 @@ func (c *customServerStatsHandler) HandleRPC(ctx context.Context, stat stats.RPC span.SetAttributes(attribute.Key("rpc.request.payload").String(string(payload))) span.SetAttributes(attribute.Key("rpc.request.type").String(msgType)) } + case *stats.OutHeader: + if c.isDebug { + span := trace.SpanFromContext(ctx) + for key, values := range s.Header { + span.SetAttributes(attribute.StringSlice("rpc.response.headers."+key, values)) + } + } case *stats.OutPayload: span := trace.SpanFromContext(ctx) c.annotateTags(ctx, span, s.Payload) From 9fe70b4bdc01a2046a7d01345c433ef9f765febe Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 30 Jan 2025 20:48:50 -0800 Subject: [PATCH 05/10] Omit empty persistence fields in JSON (#7188) ## What changed? Added a few `,omitempty` across the data persistence types on list and map fields. Some of these may never be empty, but it's easier to just annotate them all. ## Why? When these types are JSON serialized (for debugging/observability), the empty fields are just noise. ## How did you test it? Checked OTEL output. ## Potential risks No _meaningful_ behavior change is expected. ## Documentation ## Is hotfix candidate? --- common/persistence/persistence_interface.go | 108 ++++++++++---------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/common/persistence/persistence_interface.go b/common/persistence/persistence_interface.go index faa383b4b78d..8a9d63a95a5a 100644 --- a/common/persistence/persistence_interface.go +++ b/common/persistence/persistence_interface.go @@ -293,8 +293,8 @@ type ( Version int64 UserData *commonpb.DataBlob // Used to build an index of build_id to task_queues - BuildIdsAdded []string - BuildIdsRemoved []string + BuildIdsAdded []string `json:",omitempty"` + BuildIdsRemoved []string `json:",omitempty"` } InternalTaskQueueUserDataEntry struct { @@ -305,7 +305,7 @@ type ( InternalListTaskQueueUserDataEntriesResponse struct { NextPageToken []byte - Entries []InternalTaskQueueUserDataEntry + Entries []InternalTaskQueueUserDataEntry `json:",omitempty"` } InternalCreateTasksRequest struct { @@ -314,7 +314,7 @@ type ( TaskType enumspb.TaskQueueType RangeID int64 TaskQueueInfo *commonpb.DataBlob - Tasks []*InternalCreateTask + Tasks []*InternalCreateTask `json:",omitempty"` } InternalCreateTask struct { @@ -324,12 +324,12 @@ type ( } InternalGetTasksResponse struct { - Tasks []*commonpb.DataBlob + Tasks []*commonpb.DataBlob `json:",omitempty"` NextPageToken []byte } InternalListTaskQueueResponse struct { - Items []*InternalListTaskQueueItem + Items []*InternalListTaskQueueItem `json:",omitempty"` NextPageToken []byte } @@ -353,7 +353,7 @@ type ( PreviousLastWriteVersion int64 NewWorkflowSnapshot InternalWorkflowSnapshot - NewWorkflowNewEvents []*InternalAppendHistoryNodesRequest + NewWorkflowNewEvents []*InternalAppendHistoryNodesRequest `json:",omitempty"` } // InternalCreateWorkflowExecutionResponse is the response from persistence for create new workflow execution @@ -368,9 +368,9 @@ type ( Mode UpdateWorkflowMode UpdateWorkflowMutation InternalWorkflowMutation - UpdateWorkflowNewEvents []*InternalAppendHistoryNodesRequest + UpdateWorkflowNewEvents []*InternalAppendHistoryNodesRequest `json:",omitempty"` NewWorkflowSnapshot *InternalWorkflowSnapshot - NewWorkflowNewEvents []*InternalAppendHistoryNodesRequest + NewWorkflowNewEvents []*InternalAppendHistoryNodesRequest `json:",omitempty"` } // InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface @@ -382,14 +382,14 @@ type ( // workflow to be resetted ResetWorkflowSnapshot InternalWorkflowSnapshot - ResetWorkflowEventsNewEvents []*InternalAppendHistoryNodesRequest + ResetWorkflowEventsNewEvents []*InternalAppendHistoryNodesRequest `json:",omitempty"` // maybe new workflow NewWorkflowSnapshot *InternalWorkflowSnapshot - NewWorkflowEventsNewEvents []*InternalAppendHistoryNodesRequest + NewWorkflowEventsNewEvents []*InternalAppendHistoryNodesRequest `json:",omitempty"` // current workflow CurrentWorkflowMutation *InternalWorkflowMutation - CurrentWorkflowEventsNewEvents []*InternalAppendHistoryNodesRequest + CurrentWorkflowEventsNewEvents []*InternalAppendHistoryNodesRequest `json:",omitempty"` } InternalSetWorkflowExecutionRequest struct { ShardID int32 @@ -400,17 +400,17 @@ type ( // InternalWorkflowMutableState indicates workflow related state for Persistence Interface InternalWorkflowMutableState struct { - ActivityInfos map[int64]*commonpb.DataBlob // ActivityInfo - TimerInfos map[string]*commonpb.DataBlob // TimerInfo - ChildExecutionInfos map[int64]*commonpb.DataBlob // ChildExecutionInfo - RequestCancelInfos map[int64]*commonpb.DataBlob // RequestCancelInfo - SignalInfos map[int64]*commonpb.DataBlob // SignalInfo - SignalRequestedIDs []string - ExecutionInfo *commonpb.DataBlob // WorkflowExecutionInfo - ExecutionState *commonpb.DataBlob // WorkflowExecutionState + ActivityInfos map[int64]*commonpb.DataBlob `json:",omitempty"` // ActivityInfo + TimerInfos map[string]*commonpb.DataBlob `json:",omitempty"` // TimerInfo + ChildExecutionInfos map[int64]*commonpb.DataBlob `json:",omitempty"` // ChildExecutionInfo + RequestCancelInfos map[int64]*commonpb.DataBlob `json:",omitempty"` // RequestCancelInfo + SignalInfos map[int64]*commonpb.DataBlob `json:",omitempty"` // SignalInfo + SignalRequestedIDs []string `json:",omitempty"` + ExecutionInfo *commonpb.DataBlob // WorkflowExecutionInfo + ExecutionState *commonpb.DataBlob // WorkflowExecutionState NextEventID int64 - BufferedEvents []*commonpb.DataBlob - Checksum *commonpb.DataBlob // persistencespb.Checksum + BufferedEvents []*commonpb.DataBlob `json:",omitempty"` + Checksum *commonpb.DataBlob // persistencespb.Checksum DBRecordVersion int64 } @@ -427,7 +427,7 @@ type ( NamespaceID string WorkflowID string - Tasks map[tasks.Category][]InternalHistoryTask + Tasks map[tasks.Category][]InternalHistoryTask `json:",omitempty"` } // InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface @@ -446,22 +446,22 @@ type ( LastWriteVersion int64 DBRecordVersion int64 - UpsertActivityInfos map[int64]*commonpb.DataBlob - DeleteActivityInfos map[int64]struct{} - UpsertTimerInfos map[string]*commonpb.DataBlob - DeleteTimerInfos map[string]struct{} - UpsertChildExecutionInfos map[int64]*commonpb.DataBlob - DeleteChildExecutionInfos map[int64]struct{} - UpsertRequestCancelInfos map[int64]*commonpb.DataBlob - DeleteRequestCancelInfos map[int64]struct{} - UpsertSignalInfos map[int64]*commonpb.DataBlob - DeleteSignalInfos map[int64]struct{} - UpsertSignalRequestedIDs map[string]struct{} - DeleteSignalRequestedIDs map[string]struct{} + UpsertActivityInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + DeleteActivityInfos map[int64]struct{} `json:",omitempty"` + UpsertTimerInfos map[string]*commonpb.DataBlob `json:",omitempty"` + DeleteTimerInfos map[string]struct{} `json:",omitempty"` + UpsertChildExecutionInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + DeleteChildExecutionInfos map[int64]struct{} `json:",omitempty"` + UpsertRequestCancelInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + DeleteRequestCancelInfos map[int64]struct{} `json:",omitempty"` + UpsertSignalInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + DeleteSignalInfos map[int64]struct{} `json:",omitempty"` + UpsertSignalRequestedIDs map[string]struct{} `json:",omitempty"` + DeleteSignalRequestedIDs map[string]struct{} `json:",omitempty"` NewBufferedEvents *commonpb.DataBlob ClearBufferedEvents bool - Tasks map[tasks.Category][]InternalHistoryTask + Tasks map[tasks.Category][]InternalHistoryTask `json:",omitempty"` Condition int64 @@ -484,14 +484,14 @@ type ( NextEventID int64 DBRecordVersion int64 - ActivityInfos map[int64]*commonpb.DataBlob - TimerInfos map[string]*commonpb.DataBlob - ChildExecutionInfos map[int64]*commonpb.DataBlob - RequestCancelInfos map[int64]*commonpb.DataBlob - SignalInfos map[int64]*commonpb.DataBlob - SignalRequestedIDs map[string]struct{} + ActivityInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + TimerInfos map[string]*commonpb.DataBlob `json:",omitempty"` + ChildExecutionInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + RequestCancelInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + SignalInfos map[int64]*commonpb.DataBlob `json:",omitempty"` + SignalRequestedIDs map[string]struct{} `json:",omitempty"` - Tasks map[tasks.Category][]InternalHistoryTask + Tasks map[tasks.Category][]InternalHistoryTask `json:",omitempty"` Condition int64 @@ -541,7 +541,7 @@ type ( // InternalListConcreteExecutionsResponse is the response to ListConcreteExecutions for Persistence Interface InternalListConcreteExecutionsResponse struct { - States []*InternalWorkflowMutableState + States []*InternalWorkflowMutableState `json:",omitempty"` NextPageToken []byte } @@ -550,7 +550,7 @@ type ( } InternalGetHistoryTasksResponse struct { - Tasks []InternalHistoryTask + Tasks []InternalHistoryTask `json:",omitempty"` NextPageToken []byte } @@ -597,7 +597,7 @@ type ( // Used in sharded data stores to identify which shard to use ShardID int32 // branch ranges is used to delete range of history nodes from target branch and it ancestors. - BranchRanges []InternalDeleteHistoryBranchRange + BranchRanges []InternalDeleteHistoryBranchRange `json:",omitempty"` } // InternalDeleteHistoryBranchRange is used to delete a range of history nodes of a branch @@ -641,7 +641,7 @@ type ( // InternalReadHistoryBranchResponse is the response to ReadHistoryBranchRequest InternalReadHistoryBranchResponse struct { // History nodes - Nodes []InternalHistoryNode + Nodes []InternalHistoryNode `json:",omitempty"` // Pagination token NextPageToken []byte } @@ -652,7 +652,7 @@ type ( // pagination token NextPageToken []byte // all branches of all trees - Branches []InternalHistoryBranchDetail + Branches []InternalHistoryBranchDetail `json:",omitempty"` } // InternalHistoryBranchDetail used by InternalGetAllHistoryTreeBranchesResponse @@ -675,7 +675,7 @@ type ( // Only used by persistence layer InternalGetHistoryTreeContainingBranchResponse struct { // TreeInfos - TreeInfos []*commonpb.DataBlob + TreeInfos []*commonpb.DataBlob `json:",omitempty"` } // InternalCreateNamespaceRequest is used to create the namespace @@ -714,7 +714,7 @@ type ( // InternalListNamespacesResponse is the response for GetNamespace InternalListNamespacesResponse struct { - Namespaces []*InternalGetNamespaceResponse + Namespaces []*InternalGetNamespaceResponse `json:",omitempty"` NextPageToken []byte } @@ -726,7 +726,7 @@ type ( // InternalListClusterMetadataResponse is the response for ListClusterMetadata InternalListClusterMetadataResponse struct { - ClusterMetadata []*InternalGetClusterMetadataResponse + ClusterMetadata []*InternalGetClusterMetadataResponse `json:",omitempty"` NextPageToken []byte } @@ -775,7 +775,7 @@ type ( InternalListNexusEndpointsResponse struct { TableVersion int64 NextPageToken []byte - Endpoints []InternalNexusEndpoint + Endpoints []InternalNexusEndpoint `json:",omitempty"` } // QueueV2 is an interface for a generic FIFO queue. It should eventually replace the Queue interface. Why do we @@ -846,7 +846,7 @@ type ( } InternalReadMessagesResponse struct { - Messages []QueueV2Message + Messages []QueueV2Message `json:",omitempty"` NextPageToken []byte } @@ -882,7 +882,7 @@ type ( } InternalListQueuesResponse struct { - Queues []QueueInfo + Queues []QueueInfo `json:",omitempty"` NextPageToken []byte } ) From 3dd9794d3b1ac1934d9ed6f8bc769df4326c0abd Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 31 Jan 2025 10:56:17 -0800 Subject: [PATCH 06/10] Reduce logging on WFTCompletedHandler.Invoke error (#7195) ## What changed? Reduce logging on `WFTCompletedHandler.Invoke` error. Now it is emitted only when effects were actually cleared. ## Why? Remove noisy logs. ## How did you test it? Didn't test. ## Potential risks No risks. ## Documentation No. ## Is hotfix candidate? No. --------- Co-authored-by: Stephan Behnke --- internal/effect/buffer.go | 12 ++++++++-- .../api/respondworkflowtaskcompleted/api.go | 24 +++++++++---------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/internal/effect/buffer.go b/internal/effect/buffer.go index 7d1f6e383126..16a44946371d 100644 --- a/internal/effect/buffer.go +++ b/internal/effect/buffer.go @@ -49,20 +49,28 @@ func (b *Buffer) OnAfterRollback(effect func(context.Context)) { // Apply invokes the buffered effect functions in the order that they were added // to this Buffer. -func (b *Buffer) Apply(ctx context.Context) { +// It returns true if any effects were applied. +func (b *Buffer) Apply(ctx context.Context) bool { + applied := false b.cancels = nil for _, effect := range b.effects { effect(ctx) + applied = true } b.effects = nil + return applied } // Cancel invokes the buffered rollback functions in the reverse of the order // that they were added to this Buffer. -func (b *Buffer) Cancel(ctx context.Context) { +// It returns true if any effects were canceled. +func (b *Buffer) Cancel(ctx context.Context) bool { + canceled := false b.effects = nil for i := len(b.cancels) - 1; i >= 0; i-- { b.cancels[i](ctx) + canceled = true } b.cancels = nil + return canceled } diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index f56afb14ef23..bf93f1021eaa 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -237,20 +237,17 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( var effects effect.Buffer defer func() { - // code in this file and workflowTaskHandler is inconsistent in the way - // errors are returned - some functions which appear to return error - // actually return nil in all cases and instead set a member variable - // that should be observed by other collaborating code (e.g. - // workflowtaskHandler.workflowTaskFailedCause). That made me paranoid - // about the way this function exits so while we have this defer here - // there is _also_ code to call effects.Cancel at key points. + // `effects` are canceled immediately on WFT failure or persistence errors. + // This `defer` handles rare cases where an error is returned but the cancellation didn't happen. if retError != nil { - handler.logger.Info("Cancel effects due to error.", - tag.Error(retError), - tag.WorkflowID(token.GetWorkflowId()), - tag.WorkflowRunID(token.GetRunId()), - tag.WorkflowNamespaceID(namespaceEntry.ID().String())) - effects.Cancel(ctx) + cancelled := effects.Cancel(ctx) + if cancelled { + handler.logger.Info("Canceled effects due to error.", + tag.Error(retError), + tag.WorkflowID(token.GetWorkflowId()), + tag.WorkflowRunID(token.GetRunId()), + tag.WorkflowNamespaceID(namespaceEntry.ID().String())) + } } }() @@ -518,6 +515,7 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( } var newWorkflowTask *workflow.WorkflowTaskInfo + // Speculative workflow task will be created after mutable state is persisted. if newWorkflowTaskType == enumsspb.WORKFLOW_TASK_TYPE_NORMAL { versioningStamp := request.WorkerVersionStamp From 5bd88573a49fddc9fa5a44f9bdaae67e4fe1ccce Mon Sep 17 00:00:00 2001 From: justinp-tt <174377431+justinp-tt@users.noreply.github.com> Date: Fri, 31 Jan 2025 14:09:50 -0600 Subject: [PATCH 07/10] Control state-based deletion using the enableTransitionHistory dynamic config (#7163) ## What changed? Control state-based deletion using the enableTransitionHistory dynamic config ## Why? ## How did you test it? ## Potential risks ## Documentation ## Is hotfix candidate? --------- Co-authored-by: Roey Berman Co-authored-by: Roey Berman --- components/nexusoperations/events.go | 19 +++++++++++--- components/nexusoperations/helpers_test.go | 4 +++ .../nexusoperations/workflow/commands.go | 25 +++++++++++++++++-- .../nexusoperations/workflow/commands_test.go | 4 ++- .../workflow/mutable_state_rebuilder_test.go | 1 + 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/components/nexusoperations/events.go b/components/nexusoperations/events.go index 7f8ad79f3fff..5dcf22cfe481 100644 --- a/components/nexusoperations/events.go +++ b/components/nexusoperations/events.go @@ -125,7 +125,7 @@ func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.History return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d CompletedEventDefinition) Type() enumspb.EventType { @@ -161,7 +161,7 @@ func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEve return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -192,7 +192,7 @@ func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -222,7 +222,7 @@ func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryE return err } - return root.DeleteChild(node.Key) + return maybeDeleteNode(node) } func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error { @@ -304,3 +304,14 @@ func findOperationNode(root *hsm.Node, event *historypb.HistoryEvent) (*hsm.Node } return node, nil } + +func maybeDeleteNode(node *hsm.Node) error { + ms, err := hsm.MachineData[interface{ IsTransitionHistoryEnabled() bool }](node.Parent) + if err != nil { + return err + } + if !ms.IsTransitionHistoryEnabled() { + return node.Parent.DeleteChild(node.Key) + } + return nil +} diff --git a/components/nexusoperations/helpers_test.go b/components/nexusoperations/helpers_test.go index 6e66b511f772..89832bdd2350 100644 --- a/components/nexusoperations/helpers_test.go +++ b/components/nexusoperations/helpers_test.go @@ -90,6 +90,10 @@ func (root) IsWorkflowExecutionRunning() bool { return true } +func (root) IsTransitionHistoryEnabled() bool { + return false +} + func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historypb.HistoryEvent { conv := converter.GetDefaultDataConverter() payload, err := conv.ToPayload("input") diff --git a/components/nexusoperations/workflow/commands.go b/components/nexusoperations/workflow/commands.go index 20561f82aa21..947d5a44f993 100644 --- a/components/nexusoperations/workflow/commands.go +++ b/components/nexusoperations/workflow/commands.go @@ -212,10 +212,11 @@ func (ch *commandHandler) HandleCancelCommand( coll := nexusoperations.MachineCollection(ms.HSM()) nodeID := strconv.FormatInt(attrs.ScheduledEventId, 10) - _, err := coll.Node(nodeID) + node, err := coll.Node(nodeID) + hasBufferedEvent := ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) if err != nil { if errors.Is(err, hsm.ErrStateMachineNotFound) { - if !ms.HasAnyBufferedEvent(makeNexusOperationTerminalEventFilter(attrs.ScheduledEventId)) { + if !hasBufferedEvent { return workflow.FailWorkflowTaskError{ Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, Message: fmt.Sprintf("requested cancelation for a non-existing or already completed operation with scheduled event ID of %d", attrs.ScheduledEventId), @@ -227,6 +228,26 @@ func (ch *commandHandler) HandleCancelCommand( } } + if node != nil { + // TODO(bergundy): Remove this when operation auto-deletes itself on terminal state. + // Operation may already be in a terminal state because it doesn't yet delete itself. We don't want to accept + // cancelation in this case. + op, err := hsm.MachineData[nexusoperations.Operation](node) + if err != nil { + return err + } + // The operation is already in a terminal state and the terminal NexusOperation event has not just been buffered. + // We allow the workflow to request canceling an operation that has just completed while a workflow task is in + // flight since it cannot know about the state of the operation. + if !nexusoperations.TransitionCanceled.Possible(op) && !hasBufferedEvent { + return workflow.FailWorkflowTaskError{ + Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_REQUEST_CANCEL_NEXUS_OPERATION_ATTRIBUTES, + Message: fmt.Sprintf("requested cancelation for an already complete operation with scheduled event ID of %d", attrs.ScheduledEventId), + } + } + // END TODO + } + // Always create the event even if there's a buffered completion to avoid breaking replay in the SDK. // The event will be applied before the completion since buffered events are reordered and put at the end of the // batch, after command events from the workflow task. diff --git a/components/nexusoperations/workflow/commands_test.go b/components/nexusoperations/workflow/commands_test.go index 50b64f81bbac..5ed654c848d4 100644 --- a/components/nexusoperations/workflow/commands_test.go +++ b/components/nexusoperations/workflow/commands_test.go @@ -96,6 +96,7 @@ func newTestContext(t *testing.T, cfg *nexusoperations.Config) testContext { ms := workflow.NewMockMutableState(gomock.NewController(t)) node, err := hsm.NewRoot(smReg, workflow.StateMachineType, ms, make(map[string]*persistencespb.StateMachineMap), ms) require.NoError(t, err) + ms.EXPECT().IsTransitionHistoryEnabled().Return(false).AnyTimes() ms.EXPECT().HSM().Return(node).AnyTimes() lastEventID := int64(4) history := &historypb.History{} @@ -537,7 +538,7 @@ func TestHandleCancelCommand(t *testing.T) { t.Run("operation already completed - completion buffered", func(t *testing.T) { tcx := newTestContext(t, defaultConfig) - tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(true) + tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(true).AnyTimes() err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ @@ -578,6 +579,7 @@ func TestHandleCancelCommand(t *testing.T) { t.Run("sets event attributes with UserMetadata and spawns cancelation child machine", func(t *testing.T) { tcx := newTestContext(t, defaultConfig) + tcx.ms.EXPECT().HasAnyBufferedEvent(gomock.Any()).Return(false).AnyTimes() err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ diff --git a/service/history/workflow/mutable_state_rebuilder_test.go b/service/history/workflow/mutable_state_rebuilder_test.go index cd0cc52ec972..97096c4e3d50 100644 --- a/service/history/workflow/mutable_state_rebuilder_test.go +++ b/service/history/workflow/mutable_state_rebuilder_test.go @@ -128,6 +128,7 @@ func (s *stateBuilderSuite) SetupTest() { root, err := hsm.NewRoot(reg, StateMachineType, s.mockMutableState, make(map[string]*persistencespb.StateMachineMap), s.mockMutableState) s.NoError(err) s.mockMutableState.EXPECT().HSM().Return(root).AnyTimes() + s.mockMutableState.EXPECT().IsTransitionHistoryEnabled().Return(false).AnyTimes() s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata From feb0785ca58a6daf1b23865bfa35d569ceeb868a Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 31 Jan 2025 15:30:06 -0800 Subject: [PATCH 08/10] Basic CHASM registry implementation (#7184) ## What changed? Added basic CHASM registry implementation. ## Why? CHASM project. ## How did you test it? Added new unit tests. `registry.go` has 100% coverage. ## Potential risks No risks. ## Documentation Not yet. ## Is hotfix candidate? No. --- chasm/component.go | 2 + chasm/component_mock.go | 89 +++++++++++ chasm/export_test.go | 41 ++++++ chasm/library.go | 50 +++++++ chasm/library_mock.go | 117 +++++++++++++++ chasm/ref.go | 4 + chasm/registrable_component.go | 82 +++++++++++ chasm/registrable_task.go | 62 ++++++++ chasm/registry.go | 163 +++++++++++++++------ chasm/registry_test.go | 260 +++++++++++++++++++++++++++++++++ chasm/task.go | 20 ++- chasm/task_mock.go | 92 ++++++++++++ 12 files changed, 928 insertions(+), 54 deletions(-) create mode 100644 chasm/component_mock.go create mode 100644 chasm/export_test.go create mode 100644 chasm/library.go create mode 100644 chasm/library_mock.go create mode 100644 chasm/registrable_component.go create mode 100644 chasm/registrable_task.go create mode 100644 chasm/registry_test.go create mode 100644 chasm/task_mock.go diff --git a/chasm/component.go b/chasm/component.go index 8cd858653375..43f471075035 100644 --- a/chasm/component.go +++ b/chasm/component.go @@ -22,6 +22,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../LICENSE -package $GOPACKAGE -source $GOFILE -destination component_mock.go + package chasm import "context" diff --git a/chasm/component_mock.go b/chasm/component_mock.go new file mode 100644 index 000000000000..7ccea11a0d95 --- /dev/null +++ b/chasm/component_mock.go @@ -0,0 +1,89 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: component.go +// +// Generated by this command: +// +// mockgen -copyright_file ../LICENSE -package chasm -source component.go -destination component_mock.go +// + +// Package chasm is a generated GoMock package. +package chasm + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockComponent is a mock of Component interface. +type MockComponent struct { + ctrl *gomock.Controller + recorder *MockComponentMockRecorder +} + +// MockComponentMockRecorder is the mock recorder for MockComponent. +type MockComponentMockRecorder struct { + mock *MockComponent +} + +// NewMockComponent creates a new mock instance. +func NewMockComponent(ctrl *gomock.Controller) *MockComponent { + mock := &MockComponent{ctrl: ctrl} + mock.recorder = &MockComponentMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockComponent) EXPECT() *MockComponentMockRecorder { + return m.recorder +} + +// LifecycleState mocks base method. +func (m *MockComponent) LifecycleState() LifecycleState { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LifecycleState") + ret0, _ := ret[0].(LifecycleState) + return ret0 +} + +// LifecycleState indicates an expected call of LifecycleState. +func (mr *MockComponentMockRecorder) LifecycleState() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LifecycleState", reflect.TypeOf((*MockComponent)(nil).LifecycleState)) +} + +// mustEmbedUnimplementedComponent mocks base method. +func (m *MockComponent) mustEmbedUnimplementedComponent() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "mustEmbedUnimplementedComponent") +} + +// mustEmbedUnimplementedComponent indicates an expected call of mustEmbedUnimplementedComponent. +func (mr *MockComponentMockRecorder) mustEmbedUnimplementedComponent() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedComponent", reflect.TypeOf((*MockComponent)(nil).mustEmbedUnimplementedComponent)) +} diff --git a/chasm/export_test.go b/chasm/export_test.go new file mode 100644 index 000000000000..83a8b601a228 --- /dev/null +++ b/chasm/export_test.go @@ -0,0 +1,41 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package chasm + +func (r *Registry) Component(fqn string) (*RegistrableComponent, bool) { + return r.component(fqn) +} + +func (r *Registry) Task(fqn string) (*RegistrableTask, bool) { + return r.task(fqn) +} + +func (r *Registry) ComponentFor(componentInstance any) (*RegistrableComponent, bool) { + return r.componentFor(componentInstance) +} + +func (r *Registry) TaskFor(taskInstance any) (*RegistrableTask, bool) { + return r.taskFor(taskInstance) +} diff --git a/chasm/library.go b/chasm/library.go new file mode 100644 index 000000000000..86cde8137b0e --- /dev/null +++ b/chasm/library.go @@ -0,0 +1,50 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../LICENSE -package $GOPACKAGE -source $GOFILE -destination library_mock.go + +package chasm + +type ( + Library interface { + Name() string + Components() []*RegistrableComponent + Tasks() []*RegistrableTask + // Service() + + mustEmbedUnimplementedLibrary() + } + + UnimplementedLibrary struct{} +) + +func (UnimplementedLibrary) Components() []*RegistrableComponent { + return nil +} + +func (UnimplementedLibrary) Tasks() []*RegistrableTask { + return nil +} + +func (UnimplementedLibrary) mustEmbedUnimplementedLibrary() {} diff --git a/chasm/library_mock.go b/chasm/library_mock.go new file mode 100644 index 000000000000..281f162c831d --- /dev/null +++ b/chasm/library_mock.go @@ -0,0 +1,117 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: library.go +// +// Generated by this command: +// +// mockgen -copyright_file ../LICENSE -package chasm -source library.go -destination library_mock.go +// + +// Package chasm is a generated GoMock package. +package chasm + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockLibrary is a mock of Library interface. +type MockLibrary struct { + ctrl *gomock.Controller + recorder *MockLibraryMockRecorder +} + +// MockLibraryMockRecorder is the mock recorder for MockLibrary. +type MockLibraryMockRecorder struct { + mock *MockLibrary +} + +// NewMockLibrary creates a new mock instance. +func NewMockLibrary(ctrl *gomock.Controller) *MockLibrary { + mock := &MockLibrary{ctrl: ctrl} + mock.recorder = &MockLibraryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLibrary) EXPECT() *MockLibraryMockRecorder { + return m.recorder +} + +// Components mocks base method. +func (m *MockLibrary) Components() []*RegistrableComponent { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Components") + ret0, _ := ret[0].([]*RegistrableComponent) + return ret0 +} + +// Components indicates an expected call of Components. +func (mr *MockLibraryMockRecorder) Components() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Components", reflect.TypeOf((*MockLibrary)(nil).Components)) +} + +// Name mocks base method. +func (m *MockLibrary) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockLibraryMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockLibrary)(nil).Name)) +} + +// Tasks mocks base method. +func (m *MockLibrary) Tasks() []*RegistrableTask { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Tasks") + ret0, _ := ret[0].([]*RegistrableTask) + return ret0 +} + +// Tasks indicates an expected call of Tasks. +func (mr *MockLibraryMockRecorder) Tasks() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tasks", reflect.TypeOf((*MockLibrary)(nil).Tasks)) +} + +// mustEmbedUnimplementedLibrary mocks base method. +func (m *MockLibrary) mustEmbedUnimplementedLibrary() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "mustEmbedUnimplementedLibrary") +} + +// mustEmbedUnimplementedLibrary indicates an expected call of mustEmbedUnimplementedLibrary. +func (mr *MockLibraryMockRecorder) mustEmbedUnimplementedLibrary() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedLibrary", reflect.TypeOf((*MockLibrary)(nil).mustEmbedUnimplementedLibrary)) +} diff --git a/chasm/ref.go b/chasm/ref.go index b06ff0d767ad..eec32d511de2 100644 --- a/chasm/ref.go +++ b/chasm/ref.go @@ -28,6 +28,10 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" ) +var ( + defaultShardingFn = func(key EntityKey) string { return key.NamespaceID + "_" + key.EntityID } +) + type EntityKey struct { NamespaceID string BusinessID string diff --git a/chasm/registrable_component.go b/chasm/registrable_component.go new file mode 100644 index 000000000000..a6c6d038dd2d --- /dev/null +++ b/chasm/registrable_component.go @@ -0,0 +1,82 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package chasm + +import ( + "reflect" +) + +type ( + RegistrableComponent struct { + name string + goType reflect.Type + + ephemeral bool + singleCluster bool + shardingFn func(EntityKey) string + } + + RegistrableComponentOption func(*RegistrableComponent) +) + +func NewRegistrableComponent[C Component]( + name string, + opts ...RegistrableComponentOption, +) *RegistrableComponent { + rc := &RegistrableComponent{ + name: name, + goType: reflect.TypeFor[C](), + shardingFn: defaultShardingFn, + } + for _, opt := range opts { + opt(rc) + } + return rc +} + +func WithEphemeral() RegistrableComponentOption { + return func(rc *RegistrableComponent) { + rc.ephemeral = true + } +} + +// Is there any use case where we don't want to replicate certain instances of a archetype? +func WithSingleCluster() RegistrableComponentOption { + return func(rc *RegistrableComponent) { + rc.singleCluster = true + } +} + +func WithShardingFn( + shardingFn func(EntityKey) string, +) RegistrableComponentOption { + return func(rc *RegistrableComponent) { + rc.shardingFn = shardingFn + } +} + +func (rc RegistrableComponent) Name() string { + return rc.name +} diff --git a/chasm/registrable_task.go b/chasm/registrable_task.go new file mode 100644 index 000000000000..405e0aef4cdc --- /dev/null +++ b/chasm/registrable_task.go @@ -0,0 +1,62 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package chasm + +import ( + "reflect" +) + +type ( + RegistrableTask struct { + name string + goType reflect.Type + componentGoType reflect.Type // It is not clear how this one is used. + handler any + } + + RegistrableTaskOption func(*RegistrableTask) +) + +// NOTE: C is not Component but any. +func NewRegistrableTask[C any, T any]( + name string, + handler TaskHandler[C, T], + opts ...RegistrableTaskOption, +) *RegistrableTask { + rt := &RegistrableTask{ + name: name, + goType: reflect.TypeFor[T](), + componentGoType: reflect.TypeFor[C](), + handler: handler, + } + for _, opt := range opts { + opt(rt) + } + return rt +} + +func (rt RegistrableTask) Name() string { + return rt.name +} diff --git a/chasm/registry.go b/chasm/registry.go index 7c8bc57b189f..ff9e3d22ff1a 100644 --- a/chasm/registry.go +++ b/chasm/registry.go @@ -24,67 +24,138 @@ package chasm -type Registry struct{} - -func (r *Registry) RegisterLibrary(lib Library) { - panic("not implemented") -} - -type Library interface { - Name() string - Components() []RegistrableComponent - Tasks() []RegistrableTask - // Service() - - mustEmbedUnimplementedLibrary() +import ( + "errors" + "fmt" + "reflect" + "regexp" +) + +var ( + // This is golang type identifier regex. + nameValidator = regexp.MustCompile(`^[A-Za-z_][A-Za-z0-9_]*$`) +) + +type ( + Registry struct { + componentByName map[string]*RegistrableComponent // fully qualified name -> component + componentByType map[reflect.Type]*RegistrableComponent // component type -> component + + taskByName map[string]*RegistrableTask // fully qualified name -> task + taskByType map[reflect.Type]*RegistrableTask // task type -> task + } +) + +func NewRegistry() *Registry { + return &Registry{ + componentByName: make(map[string]*RegistrableComponent), + componentByType: make(map[reflect.Type]*RegistrableComponent), + taskByName: make(map[string]*RegistrableTask), + taskByType: make(map[reflect.Type]*RegistrableTask), + } } -type UnimplementedLibrary struct{} - -func (UnimplementedLibrary) Components() []RegistrableComponent { +func (r *Registry) Register(lib Library) error { + if err := r.validateName(lib.Name()); err != nil { + return err + } + for _, c := range lib.Components() { + if err := r.registerComponent(lib.Name(), c); err != nil { + return err + } + } + for _, t := range lib.Tasks() { + if err := r.registerTask(lib.Name(), t); err != nil { + return err + } + } return nil } -func (UnimplementedLibrary) Tasks() []RegistrableTask { - return nil +func (r *Registry) component(fqn string) (*RegistrableComponent, bool) { + rc, ok := r.componentByName[fqn] + return rc, ok } -func (UnimplementedLibrary) mustEmbedUnimplementedLibrary() {} - -type RegistrableComponent struct { +func (r *Registry) task(fqn string) (*RegistrableTask, bool) { + rt, ok := r.taskByName[fqn] + return rt, ok } -func NewRegistrableComponent[C Component]( - name string, - opts ...RegistrableComponentOption, -) RegistrableComponent { - panic("not implemented") +func (r *Registry) componentFor(componentInstance any) (*RegistrableComponent, bool) { + rt, ok := r.componentByType[reflect.TypeOf(componentInstance)] + return rt, ok } -type RegistrableComponentOption func(*RegistrableComponent) - -func EntityEphemeral() RegistrableComponentOption { - panic("not implemented") +func (r *Registry) taskFor(taskInstance any) (*RegistrableTask, bool) { + rt, ok := r.taskByType[reflect.TypeOf(taskInstance)] + return rt, ok } -// Is there any use case where we don't want to replicate -// certain instances of a archetype? -func EntitySingleCluster() RegistrableComponentOption { - panic("not implemented") +func (r *Registry) fqn(libName, name string) string { + return libName + "." + name } -func EntityShardingFn( - func(EntityKey) string, -) RegistrableComponentOption { - panic("not implemented") +func (r *Registry) registerComponent( + libName string, + rc *RegistrableComponent, +) error { + if err := r.validateName(rc.name); err != nil { + return err + } + fqn := r.fqn(libName, rc.name) + if _, ok := r.componentByName[fqn]; ok { + return fmt.Errorf("component %s is already registered", fqn) + } + // rc.goType implements Component interface; therefore, it must be a struct. + // This check to protect against the interface itself being registered. + if !(rc.goType.Kind() == reflect.Struct || + (rc.goType.Kind() == reflect.Ptr && rc.goType.Elem().Kind() == reflect.Struct)) { + return fmt.Errorf("component type %s must be struct or pointer to struct", rc.goType.String()) + } + if _, ok := r.componentByType[rc.goType]; ok { + return fmt.Errorf("component type %s is already registered", rc.goType.String()) + } + r.componentByName[fqn] = rc + r.componentByType[rc.goType] = rc + return nil +} +func (r *Registry) registerTask( + libName string, + rt *RegistrableTask, +) error { + if err := r.validateName(rt.name); err != nil { + return err + } + fqn := r.fqn(libName, rt.name) + if _, ok := r.taskByName[fqn]; ok { + return fmt.Errorf("task %s is already registered", fqn) + } + if !(rt.goType.Kind() == reflect.Struct || + (rt.goType.Kind() == reflect.Ptr && rt.goType.Elem().Kind() == reflect.Struct)) { + return fmt.Errorf("task type %s must be struct or pointer to struct", rt.goType.String()) + } + if _, ok := r.taskByType[rt.goType]; ok { + return fmt.Errorf("task type %s is already registered", rt.goType.String()) + } + if !(rt.componentGoType.Kind() == reflect.Interface || + (rt.componentGoType.Kind() == reflect.Struct || + (rt.componentGoType.Kind() == reflect.Ptr && rt.componentGoType.Elem().Kind() == reflect.Struct)) && + rt.componentGoType.AssignableTo(reflect.TypeOf((*Component)(nil)).Elem())) { + return fmt.Errorf("component type %s must be and interface or struct that implements Component interface", rt.componentGoType.String()) + } + + r.taskByName[fqn] = rt + r.taskByType[rt.goType] = rt + return nil } -type RegistrableTask struct{} - -func NewRegistrableTask[C any, T any]( - name string, - handler TaskHandler[C, T], - // opts ...RegistrableTaskOptions, no options right now -) RegistrableTask { - panic("not implemented") +func (r *Registry) validateName(n string) error { + if n == "" { + return errors.New("name must not be empty") + } + if !nameValidator.MatchString(n) { + return fmt.Errorf("name %s is invalid. name must follow golang identifier rules: %s", n, nameValidator.String()) + } + return nil } diff --git a/chasm/registry_test.go b/chasm/registry_test.go new file mode 100644 index 000000000000..d4c2ce2401e0 --- /dev/null +++ b/chasm/registry_test.go @@ -0,0 +1,260 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package chasm_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.temporal.io/server/chasm" + "go.uber.org/mock/gomock" +) + +type ( + testTask1 struct{} + testTask2 struct{} + testTaskComponentInterface interface { + DoSomething() + } +) + +func TestRegistry_RegisterComponents_Success(t *testing.T) { + r := chasm.NewRegistry() + ctrl := gomock.NewController(t) + lib := chasm.NewMockLibrary(ctrl) + lib.EXPECT().Name().Return("TestLibrary").AnyTimes() + lib.EXPECT().Components().Return([]*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*chasm.MockComponent]("Component1"), + }) + + lib.EXPECT().Tasks().Return(nil) + + err := r.Register(lib) + require.NoError(t, err) + + rc1, ok := r.Component("TestLibrary.Component1") + require.True(t, ok) + require.Equal(t, "Component1", rc1.Name()) + + missingRC, ok := r.Component("TestLibrary.Component2") + require.False(t, ok) + require.Nil(t, missingRC) + + cInstance1 := chasm.NewMockComponent(ctrl) + rc2, ok := r.ComponentFor(cInstance1) + require.True(t, ok) + require.Equal(t, "Component1", rc2.Name()) + + cInstance2 := "invalid component instance" + rc3, ok := r.ComponentFor(cInstance2) + require.False(t, ok) + require.Nil(t, rc3) +} + +func TestRegistry_RegisterTasks_Success(t *testing.T) { + r := chasm.NewRegistry() + ctrl := gomock.NewController(t) + lib := chasm.NewMockLibrary(ctrl) + lib.EXPECT().Name().Return("TestLibrary").AnyTimes() + lib.EXPECT().Components().Return(nil) + + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("Task1", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + chasm.NewRegistrableTask[testTaskComponentInterface, testTask2]("Task2", chasm.NewMockTaskHandler[testTaskComponentInterface, testTask2](ctrl)), + }) + + err := r.Register(lib) + require.NoError(t, err) + + rt1, ok := r.Task("TestLibrary.Task1") + require.True(t, ok) + require.Equal(t, "Task1", rt1.Name()) + + missingRT, ok := r.Task("TestLibrary.TaskMissing") + require.False(t, ok) + require.Nil(t, missingRT) + + tInstance1 := testTask2{} + rt2, ok := r.TaskFor(tInstance1) + require.True(t, ok) + require.Equal(t, "Task2", rt2.Name()) + + tInstance2 := "invalid task instance" + rt3, ok := r.TaskFor(tInstance2) + require.False(t, ok) + require.Nil(t, rt3) +} + +func TestRegistry_Register_LibraryError(t *testing.T) { + ctrl := gomock.NewController(t) + lib := chasm.NewMockLibrary(ctrl) + + t.Run("library name must not be empty", func(t *testing.T) { + lib.EXPECT().Name().Return("") + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "name must not be empty") + }) + + t.Run("library name must follow rules", func(t *testing.T) { + lib.EXPECT().Name().Return("bad.lib.name") + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "name must follow golang identifier rules") + }) +} + +func TestRegistry_RegisterComponents_Error(t *testing.T) { + ctrl := gomock.NewController(t) + lib := chasm.NewMockLibrary(ctrl) + lib.EXPECT().Name().Return("TestLibrary").AnyTimes() + + t.Run("component name must not be empty", func(t *testing.T) { + lib.EXPECT().Components().Return([]*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*chasm.MockComponent](""), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "name must not be empty") + }) + + t.Run("component name must follow rules", func(t *testing.T) { + lib.EXPECT().Components().Return([]*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*chasm.MockComponent]("bad.component.name"), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "name must follow golang identifier rules") + }) + + t.Run("component is already registered by name", func(t *testing.T) { + lib.EXPECT().Components().Return([]*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*chasm.MockComponent]("Component1"), + chasm.NewRegistrableComponent[*chasm.MockComponent]("Component1"), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "is already registered") + }) + + t.Run("component is already registered by type", func(t *testing.T) { + lib.EXPECT().Components().Return([]*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*chasm.MockComponent]("Component1"), + chasm.NewRegistrableComponent[*chasm.MockComponent]("Component2"), + }) + r := chasm.NewRegistry() + + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "is already registered") + }) + + t.Run("component must be a struct", func(t *testing.T) { + lib.EXPECT().Components().Return([]*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[chasm.Component]("Component1"), + }) + r := chasm.NewRegistry() + + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "must be struct or pointer to struct") + }) + +} + +func TestRegistry_RegisterTasks_Error(t *testing.T) { + ctrl := gomock.NewController(t) + lib := chasm.NewMockLibrary(ctrl) + lib.EXPECT().Name().Return("TestLibrary").AnyTimes() + lib.EXPECT().Components().Return(nil).AnyTimes() + + t.Run("task name must not be empty", func(t *testing.T) { + r := chasm.NewRegistry() + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + }) + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "name must not be empty") + }) + + t.Run("task name must follow rules", func(t *testing.T) { + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("bad.task.name", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "name must follow golang identifier rules") + }) + + t.Run("task is already registered by name", func(t *testing.T) { + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("Task1", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("Task1", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "is already registered") + }) + + t.Run("task is already registered by type", func(t *testing.T) { + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("Task1", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + chasm.NewRegistrableTask[*chasm.MockComponent, testTask1]("Task2", chasm.NewMockTaskHandler[*chasm.MockComponent, testTask1](ctrl)), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "is already registered") + }) + + t.Run("task component struct must implement Component", func(t *testing.T) { + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + // MockComponent has only pointer receivers and therefore does not implement Component interface. + chasm.NewRegistrableTask[chasm.MockComponent, testTask1]("Task1", chasm.NewMockTaskHandler[chasm.MockComponent, testTask1](ctrl)), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "struct that implements Component interface") + }) + + t.Run("task must be struct", func(t *testing.T) { + lib.EXPECT().Tasks().Return([]*chasm.RegistrableTask{ + chasm.NewRegistrableTask[*chasm.MockComponent, string]("Task1", chasm.NewMockTaskHandler[*chasm.MockComponent, string](ctrl)), + }) + r := chasm.NewRegistry() + err := r.Register(lib) + require.Error(t, err) + require.Contains(t, err.Error(), "must be struct or pointer to struct") + }) +} diff --git a/chasm/task.go b/chasm/task.go index c5a9263d882d..6d780715ab27 100644 --- a/chasm/task.go +++ b/chasm/task.go @@ -22,6 +22,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../LICENSE -package $GOPACKAGE -source $GOFILE -destination task_mock.go + package chasm import ( @@ -29,12 +31,14 @@ import ( "time" ) -type TaskAttributes struct { - ScheduledTime time.Time - Destination string -} +type ( + TaskAttributes struct { + ScheduledTime time.Time + Destination string + } -type TaskHandler[C any, T any] interface { - Validate(Context, C, T) error - Execute(context.Context, ComponentRef, T) error -} + TaskHandler[C any, T any] interface { + Validate(Context, C, T) error + Execute(context.Context, ComponentRef, T) error + } +) diff --git a/chasm/task_mock.go b/chasm/task_mock.go new file mode 100644 index 000000000000..de462e9371fc --- /dev/null +++ b/chasm/task_mock.go @@ -0,0 +1,92 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: task.go +// +// Generated by this command: +// +// mockgen -copyright_file ../LICENSE -package chasm -source task.go -destination task_mock.go +// + +// Package chasm is a generated GoMock package. +package chasm + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockTaskHandler is a mock of TaskHandler interface. +type MockTaskHandler[C any, T any] struct { + ctrl *gomock.Controller + recorder *MockTaskHandlerMockRecorder[C, T] +} + +// MockTaskHandlerMockRecorder is the mock recorder for MockTaskHandler. +type MockTaskHandlerMockRecorder[C any, T any] struct { + mock *MockTaskHandler[C, T] +} + +// NewMockTaskHandler creates a new mock instance. +func NewMockTaskHandler[C any, T any](ctrl *gomock.Controller) *MockTaskHandler[C, T] { + mock := &MockTaskHandler[C, T]{ctrl: ctrl} + mock.recorder = &MockTaskHandlerMockRecorder[C, T]{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskHandler[C, T]) EXPECT() *MockTaskHandlerMockRecorder[C, T] { + return m.recorder +} + +// Execute mocks base method. +func (m *MockTaskHandler[C, T]) Execute(arg0 context.Context, arg1 ComponentRef, arg2 T) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Execute", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Execute indicates an expected call of Execute. +func (mr *MockTaskHandlerMockRecorder[C, T]) Execute(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Execute", reflect.TypeOf((*MockTaskHandler[C, T])(nil).Execute), arg0, arg1, arg2) +} + +// Validate mocks base method. +func (m *MockTaskHandler[C, T]) Validate(arg0 Context, arg1 C, arg2 T) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Validate", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Validate indicates an expected call of Validate. +func (mr *MockTaskHandlerMockRecorder[C, T]) Validate(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Validate", reflect.TypeOf((*MockTaskHandler[C, T])(nil).Validate), arg0, arg1, arg2) +} From 5ad297b9af1c632e005f58dc723733a267f7cc10 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 31 Jan 2025 16:24:45 -0800 Subject: [PATCH 09/10] CHASM: Fix default sharding function (#7214) ## What changed? Fix default sharding function. ## Why? Made a typo in my previous PR #7184. --- chasm/ref.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chasm/ref.go b/chasm/ref.go index eec32d511de2..7befe47722cb 100644 --- a/chasm/ref.go +++ b/chasm/ref.go @@ -29,7 +29,7 @@ import ( ) var ( - defaultShardingFn = func(key EntityKey) string { return key.NamespaceID + "_" + key.EntityID } + defaultShardingFn = func(key EntityKey) string { return key.NamespaceID + "_" + key.BusinessID } ) type EntityKey struct { From 458bbe2faf5b93deff9bf99cf76875ade4b04d21 Mon Sep 17 00:00:00 2001 From: zigeH Date: Sun, 2 Feb 2025 13:23:04 -0800 Subject: [PATCH 10/10] Quick fix to reconfig maximumHandoverTimeoutSeconds (#7216) ## What changed? Reconfig the maximumHandoverTimeoutSecondto 30s but change the actual input to 10s for the default tests. ## Why? ## How did you test it? ## Potential risks ## Documentation ## Is hotfix candidate? --- service/worker/migration/handover_workflow.go | 2 +- service/worker/migration/handover_workflow_test.go | 2 +- tests/xdc/failover_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/service/worker/migration/handover_workflow.go b/service/worker/migration/handover_workflow.go index df2811ea87d6..a67dd1fa6a6e 100644 --- a/service/worker/migration/handover_workflow.go +++ b/service/worker/migration/handover_workflow.go @@ -37,7 +37,7 @@ const ( minimumAllowedLaggingSeconds = 5 maximumAllowedLaggingSeconds = 120 - maximumHandoverTimeoutSeconds = 10 + maximumHandoverTimeoutSeconds = 30 ) type ( diff --git a/service/worker/migration/handover_workflow_test.go b/service/worker/migration/handover_workflow_test.go index aa4bbb4eb9b4..8281da8dd6a4 100644 --- a/service/worker/migration/handover_workflow_test.go +++ b/service/worker/migration/handover_workflow_test.go @@ -62,7 +62,7 @@ func TestHandoverWorkflow(t *testing.T) { Namespace: "test-ns", RemoteCluster: "test-remote", AllowedLaggingSeconds: 10, - HandoverTimeoutSeconds: 30, + HandoverTimeoutSeconds: 10, }) require.True(t, env.IsWorkflowCompleted()) diff --git a/tests/xdc/failover_test.go b/tests/xdc/failover_test.go index da0aa63bf7ea..33268fc47a1e 100644 --- a/tests/xdc/failover_test.go +++ b/tests/xdc/failover_test.go @@ -2700,7 +2700,7 @@ func (s *FunctionalClustersTestSuite) TestLocalNamespaceMigration() { Namespace: namespace, RemoteCluster: s.clusterNames[1], AllowedLaggingSeconds: 10, - HandoverTimeoutSeconds: 30, + HandoverTimeoutSeconds: 10, }) s.NoError(err) err = run5.Get(testCtx, nil)