From 902ff8a85792dfcf1acbb6e5d313d75ba89abb12 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 31 Jan 2025 16:39:58 -0800 Subject: [PATCH] Delete Namespace: read cache refresh interval from DC --- service/worker/deletenamespace/fx.go | 4 ++- .../reclaimresources/activities.go | 17 ++++++++++--- .../reclaimresources/activities_test.go | 4 --- .../reclaimresources/workflow.go | 13 +++++++--- .../reclaimresources/workflow_test.go | 25 +++++++++++-------- 5 files changed, 40 insertions(+), 23 deletions(-) diff --git a/service/worker/deletenamespace/fx.go b/service/worker/deletenamespace/fx.go index 65829d1595c..723ac178228 100644 --- a/service/worker/deletenamespace/fx.go +++ b/service/worker/deletenamespace/fx.go @@ -60,6 +60,7 @@ type ( allowDeleteNamespaceIfNexusEndpointTarget dynamicconfig.BoolPropertyFn nexusEndpointListDefaultPageSize dynamicconfig.IntPropertyFn deleteActivityRPS dynamicconfig.TypedSubscribable[int] + namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn } componentParams struct { fx.In @@ -92,6 +93,7 @@ func newComponent( allowDeleteNamespaceIfNexusEndpointTarget: dynamicconfig.AllowDeleteNamespaceIfNexusEndpointTarget.Get(params.DynamicCollection), nexusEndpointListDefaultPageSize: dynamicconfig.NexusEndpointListDefaultPageSize.Get(params.DynamicCollection), deleteActivityRPS: dynamicconfig.DeleteNamespaceDeleteActivityRPS.Subscribe(params.DynamicCollection), + namespaceCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(params.DynamicCollection), } } @@ -145,7 +147,7 @@ func (wc *deleteNamespaceComponent) reclaimResourcesActivities() *reclaimresourc } func (wc *deleteNamespaceComponent) reclaimResourcesLocalActivities() *reclaimresources.LocalActivities { - return reclaimresources.NewLocalActivities(wc.visibilityManager, wc.metadataManager, wc.logger) + return reclaimresources.NewLocalActivities(wc.visibilityManager, wc.metadataManager, wc.namespaceCacheRefreshInterval, wc.logger) } func (wc *deleteNamespaceComponent) deleteExecutionsActivities() *deleteexecutions.Activities { diff --git a/service/worker/deletenamespace/reclaimresources/activities.go b/service/worker/deletenamespace/reclaimresources/activities.go index 8c8ebde1a46..2aea4a5ecd8 100644 --- a/service/worker/deletenamespace/reclaimresources/activities.go +++ b/service/worker/deletenamespace/reclaimresources/activities.go @@ -26,12 +26,13 @@ package reclaimresources import ( "context" + "time" "go.temporal.io/sdk/activity" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" - "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" @@ -42,15 +43,16 @@ import ( type ( Activities struct { visibilityManager manager.VisibilityManager - metricsHandler metrics.Handler logger log.Logger } LocalActivities struct { visibilityManager manager.VisibilityManager metadataManager persistence.MetadataManager - metricsHandler metrics.Handler - logger log.Logger + + namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn + + logger log.Logger } ) @@ -67,12 +69,15 @@ func NewActivities( func NewLocalActivities( visibilityManager manager.VisibilityManager, metadataManager persistence.MetadataManager, + namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn, logger log.Logger, ) *LocalActivities { return &LocalActivities{ visibilityManager: visibilityManager, metadataManager: metadataManager, logger: logger, + + namespaceCacheRefreshInterval: namespaceCacheRefreshInterval, } } @@ -176,3 +181,7 @@ func (a *LocalActivities) DeleteNamespaceActivity(ctx context.Context, nsID name logger.Info("Namespace is deleted.") return nil } + +func (a *LocalActivities) GetNamespaceCacheRefreshInterval(_ context.Context) (time.Duration, error) { + return a.namespaceCacheRefreshInterval(), nil +} diff --git a/service/worker/deletenamespace/reclaimresources/activities_test.go b/service/worker/deletenamespace/reclaimresources/activities_test.go index 24383480c0e..c96bddd7f0c 100644 --- a/service/worker/deletenamespace/reclaimresources/activities_test.go +++ b/service/worker/deletenamespace/reclaimresources/activities_test.go @@ -32,7 +32,6 @@ import ( "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/searchattribute" @@ -53,7 +52,6 @@ func Test_EnsureNoExecutionsAdvVisibilityActivity_NoExecutions(t *testing.T) { a := &Activities{ visibilityManager: visibilityManager, - metricsHandler: metrics.NoopMetricsHandler, logger: log.NewTestLogger(), } @@ -78,7 +76,6 @@ func Test_EnsureNoExecutionsAdvVisibilityActivity_ExecutionsExist(t *testing.T) a := &Activities{ visibilityManager: visibilityManager, - metricsHandler: metrics.NoopMetricsHandler, logger: log.NewTestLogger(), } env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity) @@ -107,7 +104,6 @@ func Test_EnsureNoExecutionsAdvVisibilityActivity_NotDeletedExecutionsExist(t *t a := &Activities{ visibilityManager: visibilityManager, - metricsHandler: metrics.NoopMetricsHandler, logger: log.NewTestLogger(), } env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity) diff --git a/service/worker/deletenamespace/reclaimresources/workflow.go b/service/worker/deletenamespace/reclaimresources/workflow.go index 01c97ae8158..efa975eea2c 100644 --- a/service/worker/deletenamespace/reclaimresources/workflow.go +++ b/service/worker/deletenamespace/reclaimresources/workflow.go @@ -41,8 +41,6 @@ import ( const ( WorkflowName = "temporal-sys-reclaim-namespace-resources-workflow" - - namespaceCacheRefreshDelay = 11 * time.Second ) type ( @@ -180,8 +178,15 @@ func ReclaimResourcesWorkflow(ctx workflow.Context, params ReclaimResourcesParam var la *LocalActivities // Step 0. This workflow is started right after the namespace is marked as DELETED and renamed. - // Wait for namespace cache refresh to make sure no new executions are created. - err = workflow.Sleep(ctx, namespaceCacheRefreshDelay) + // Wait for namespace cache refresh to make sure no new executions are created. 2 secodnds is a random buffer. + ctx0 := workflow.WithLocalActivityOptions(ctx, localActivityOptions) + var namespaceCacheRefreshDelay time.Duration + err = workflow.ExecuteLocalActivity(ctx0, la.GetNamespaceCacheRefreshInterval).Get(ctx, &namespaceCacheRefreshDelay) + if err != nil { + return result, err + } + + err = workflow.Sleep(ctx, namespaceCacheRefreshDelay+2*time.Second) if err != nil { return result, err } diff --git a/service/worker/deletenamespace/reclaimresources/workflow_test.go b/service/worker/deletenamespace/reclaimresources/workflow_test.go index 63d7c598df6..48816a07931 100644 --- a/service/worker/deletenamespace/reclaimresources/workflow_test.go +++ b/service/worker/deletenamespace/reclaimresources/workflow_test.go @@ -34,8 +34,8 @@ import ( "github.com/stretchr/testify/require" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" @@ -70,6 +70,7 @@ func Test_ReclaimResourcesWorkflow_Success(t *testing.T) { ErrorCount: 0, }, nil).Once() + env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once() env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).Return(nil).Once() @@ -120,6 +121,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_Error(t *testing.T ErrorCount: 0, }, nil).Once() + env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once() env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0). Return(stderrors.New("specific_error_from_activity")). @@ -168,6 +170,7 @@ func Test_ReclaimResourcesWorkflow_EnsureNoExecutionsActivity_ExecutionsStillExi ErrorCount: 0, }, nil).Once() + env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once() env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0). Return(errors.NewExecutionsStillExist(1)). @@ -233,16 +236,17 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_Success(t *testing.T) { a := &Activities{ visibilityManager: visibilityManager, - metricsHandler: metrics.NoopMetricsHandler, logger: log.NewTestLogger(), } la := &LocalActivities{ - visibilityManager: visibilityManager, - metadataManager: metadataManager, - metricsHandler: metrics.NoopMetricsHandler, - logger: log.NewTestLogger(), + visibilityManager: visibilityManager, + metadataManager: metadataManager, + namespaceCacheRefreshInterval: dynamicconfig.GetDurationPropertyFn(10 * time.Second), + + logger: log.NewTestLogger(), } + env.RegisterActivity(la.GetNamespaceCacheRefreshInterval) env.RegisterActivity(la.CountExecutionsAdvVisibilityActivity) env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity) env.RegisterActivity(la.DeleteNamespaceActivity) @@ -312,15 +316,15 @@ func Test_ReclaimResourcesWorkflow_NoActivityMocks_NoProgressMade(t *testing.T) a := &Activities{ visibilityManager: visibilityManager, - metricsHandler: metrics.NoopMetricsHandler, logger: log.NewTestLogger(), } la := &LocalActivities{ - visibilityManager: visibilityManager, - metricsHandler: metrics.NoopMetricsHandler, - logger: log.NewTestLogger(), + visibilityManager: visibilityManager, + namespaceCacheRefreshInterval: dynamicconfig.GetDurationPropertyFn(10 * time.Second), + logger: log.NewTestLogger(), } + env.RegisterActivity(la.GetNamespaceCacheRefreshInterval) env.RegisterActivity(la.CountExecutionsAdvVisibilityActivity) env.RegisterActivity(a.EnsureNoExecutionsAdvVisibilityActivity) @@ -373,6 +377,7 @@ func Test_ReclaimResourcesWorkflow_UpdateDeleteDelay(t *testing.T) { ErrorCount: 0, }, nil).Once() + env.OnActivity(la.GetNamespaceCacheRefreshInterval, mock.Anything).Return(10*time.Second, nil).Once() env.OnActivity(la.CountExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace")).Return(int64(10), nil).Once() env.OnActivity(a.EnsureNoExecutionsAdvVisibilityActivity, mock.Anything, namespace.ID("namespace-id"), namespace.Name("namespace"), 0).Return(nil).Once()