From ce79fc82cbd17d47c496ef753cfbe18bf6c460f0 Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 31 Jan 2025 11:29:18 -0800 Subject: [PATCH 1/7] Update DescribeHistoryHost to check shard ownership --- service/history/handler.go | 14 ++++++- service/history/handler_test.go | 72 +++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 service/history/handler_test.go diff --git a/service/history/handler.go b/service/history/handler.go index 4daced93c88..bc449197f27 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -675,12 +675,22 @@ func (h *Handler) ExecuteMultiOperation( } // DescribeHistoryHost returns information about the internal states of a history host -func (h *Handler) DescribeHistoryHost(_ context.Context, _ *historyservice.DescribeHistoryHostRequest) (_ *historyservice.DescribeHistoryHostResponse, retError error) { +func (h *Handler) DescribeHistoryHost(_ context.Context, req *historyservice.DescribeHistoryHostRequest) (_ *historyservice.DescribeHistoryHostResponse, retError error) { defer metrics.CapturePanic(h.logger, h.metricsHandler, &retError) h.startWG.Wait() - itemsInCacheByIDCount, itemsInCacheByNameCount := h.namespaceRegistry.GetCacheSize() + // This API supports describe history host by 1. address 2. shard id 3. namespace id + workflow id + // if option 2/3 is provided, we want to check on the shard ownership to return the correct host address. + shardID := req.GetShardId() + if len(req.GetNamespaceId()) == 0 && req.GetWorkflowExecution() != nil { + shardID = common.WorkflowIDToHistoryShard(req.GetNamespaceId(), req.GetWorkflowExecution().GetWorkflowId(), h.config.NumberOfShards) + } + _, err := h.controller.GetShardByID(shardID) + if err != nil { + return nil, err + } + itemsInCacheByIDCount, itemsInCacheByNameCount := h.namespaceRegistry.GetCacheSize() ownedShardIDs := h.controller.ShardIDs() resp := &historyservice.DescribeHistoryHostResponse{ ShardsNumber: int32(len(ownedShardIDs)), diff --git a/service/history/handler_test.go b/service/history/handler_test.go new file mode 100644 index 00000000000..f10b4b0a152 --- /dev/null +++ b/service/history/handler_test.go @@ -0,0 +1,72 @@ +package history + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/server/api/historyservice/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/membership" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/serviceerror" + "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tests" + "go.uber.org/mock/gomock" +) + +func TestDescribeHistoryHost(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + controller := shard.NewMockController(ctrl) + namespaceRegistry := namespace.NewMockRegistry(ctrl) + hostInfoProvider := membership.NewMockHostInfoProvider(ctrl) + h := Handler{ + config: &configs.Config{ + NumberOfShards: 10, + }, + metricsHandler: metrics.NoopMetricsHandler, + logger: log.NewNoopLogger(), + controller: controller, + namespaceRegistry: namespaceRegistry, + hostInfoProvider: hostInfoProvider, + } + + mockShard1 := shard.NewTestContext( + ctrl, + &persistencespb.ShardInfo{ + ShardId: 2, + RangeId: 1, + }, + tests.NewDynamicConfig(), + ) + controller.EXPECT().GetShardByID(int32(1)).Return(mockShard1, serviceerror.NewShardOwnershipLost("", "")) + + _, err := h.DescribeHistoryHost(context.Background(), &historyservice.DescribeHistoryHostRequest{ + ShardId: 1, + }) + assert.Error(t, err) + var sol *serviceerror.ShardOwnershipLost + assert.True(t, errors.As(err, &sol)) + + mockShard2 := shard.NewTestContext( + ctrl, + &persistencespb.ShardInfo{ + ShardId: 2, + RangeId: 1, + }, + tests.NewDynamicConfig(), + ) + controller.EXPECT().GetShardByID(int32(2)).Return(mockShard2, nil) + controller.EXPECT().ShardIDs().Return([]int32{2}) + namespaceRegistry.EXPECT().GetCacheSize().Return(int64(0), int64(0)) + hostInfoProvider.EXPECT().HostInfo().Return(membership.NewHostInfoFromAddress("0.0.0.0")) + _, err = h.DescribeHistoryHost(context.Background(), &historyservice.DescribeHistoryHostRequest{ + ShardId: 2, + }) + assert.NoError(t, err) +} From 54d045eb0781d0a2202364f3f03c89a71c9a1f6f Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 31 Jan 2025 11:38:27 -0800 Subject: [PATCH 2/7] fix check --- service/history/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/handler.go b/service/history/handler.go index bc449197f27..3fa8d70f84b 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -682,7 +682,7 @@ func (h *Handler) DescribeHistoryHost(_ context.Context, req *historyservice.Des // This API supports describe history host by 1. address 2. shard id 3. namespace id + workflow id // if option 2/3 is provided, we want to check on the shard ownership to return the correct host address. shardID := req.GetShardId() - if len(req.GetNamespaceId()) == 0 && req.GetWorkflowExecution() != nil { + if len(req.GetNamespaceId()) != 0 && req.GetWorkflowExecution() != nil { shardID = common.WorkflowIDToHistoryShard(req.GetNamespaceId(), req.GetWorkflowExecution().GetWorkflowId(), h.config.NumberOfShards) } _, err := h.controller.GetShardByID(shardID) From 43c518291b4d9376130cbe3fded5747a595c57ae Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 31 Jan 2025 11:39:37 -0800 Subject: [PATCH 3/7] add header --- service/history/handler_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/service/history/handler_test.go b/service/history/handler_test.go index f10b4b0a152..2c9437d4564 100644 --- a/service/history/handler_test.go +++ b/service/history/handler_test.go @@ -1,3 +1,26 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. package history import ( From 20e55c9368e84e66a2e0ec551c89f5e6758ed343 Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 31 Jan 2025 13:20:15 -0800 Subject: [PATCH 4/7] check if shard id is providered --- service/history/handler.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/service/history/handler.go b/service/history/handler.go index 3fa8d70f84b..c24b9debce5 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -685,9 +685,11 @@ func (h *Handler) DescribeHistoryHost(_ context.Context, req *historyservice.Des if len(req.GetNamespaceId()) != 0 && req.GetWorkflowExecution() != nil { shardID = common.WorkflowIDToHistoryShard(req.GetNamespaceId(), req.GetWorkflowExecution().GetWorkflowId(), h.config.NumberOfShards) } - _, err := h.controller.GetShardByID(shardID) - if err != nil { - return nil, err + if shardID > 0 { + _, err := h.controller.GetShardByID(shardID) + if err != nil { + return nil, err + } } itemsInCacheByIDCount, itemsInCacheByNameCount := h.namespaceRegistry.GetCacheSize() From 51a48b767d60176b72bbd9863b81cfa7b68560a2 Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 31 Jan 2025 13:24:15 -0800 Subject: [PATCH 5/7] update unit test --- service/history/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/handler_test.go b/service/history/handler_test.go index 2c9437d4564..9d38374f78e 100644 --- a/service/history/handler_test.go +++ b/service/history/handler_test.go @@ -62,7 +62,7 @@ func TestDescribeHistoryHost(t *testing.T) { mockShard1 := shard.NewTestContext( ctrl, &persistencespb.ShardInfo{ - ShardId: 2, + ShardId: 1, RangeId: 1, }, tests.NewDynamicConfig(), From b0f186bff267d4ce4b3029b24bf46882df863942 Mon Sep 17 00:00:00 2001 From: yux0 Date: Mon, 3 Feb 2025 15:31:39 -0800 Subject: [PATCH 6/7] Update caller to desribe history host --- service/frontend/admin_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/frontend/admin_handler.go b/service/frontend/admin_handler.go index 1fbec4dba27..d74b08e002f 100644 --- a/service/frontend/admin_handler.go +++ b/service/frontend/admin_handler.go @@ -1890,7 +1890,7 @@ func (adh *AdminHandler) StreamWorkflowReplicationMessages( if errors.As(err, &solErr) || errors.As(err, &suErr) { ctx, cl := context.WithTimeout(context.Background(), 2*time.Second) // getShard here to make sure we will talk to correct host when stream is retrying - _, err := adh.historyClient.GetShard(ctx, &historyservice.GetShardRequest{ShardId: serverClusterShardID.ShardID}) + _, err := adh.historyClient.DescribeHistoryHost(ctx, &historyservice.DescribeHistoryHostRequest{ShardId: serverClusterShardID.ShardID}) if err != nil { logger.Error("failed to get shard", tag.Error(err)) } From 238edd65ee0c40b3ef5c7da51f2a6f59057b7ecc Mon Sep 17 00:00:00 2001 From: yux0 Date: Tue, 4 Feb 2025 00:18:15 -0800 Subject: [PATCH 7/7] fix unit test --- service/frontend/admin_handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/frontend/admin_handler_test.go b/service/frontend/admin_handler_test.go index 13153581e6e..1bdf5392e78 100644 --- a/service/frontend/admin_handler_test.go +++ b/service/frontend/admin_handler_test.go @@ -1099,7 +1099,7 @@ func (s *adminHandlerSuite) TestStreamWorkflowReplicationMessages_ServerToClient return nil, serviceerror.NewUnavailable("random error") }) - s.mockHistoryClient.EXPECT().GetShard(gomock.Any(), &historyservice.GetShardRequest{ShardId: serverClusterShardID.ShardID}).Return(&historyservice.GetShardResponse{}, nil) + s.mockHistoryClient.EXPECT().DescribeHistoryHost(gomock.Any(), &historyservice.DescribeHistoryHostRequest{ShardId: serverClusterShardID.ShardID}).Return(&historyservice.DescribeHistoryHostResponse{}, nil) serverCluster.EXPECT().Recv().DoAndReturn(func() (*historyservice.StreamWorkflowReplicationMessagesResponse, error) { waitGroupStart.Done() waitGroupStart.Wait()