diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index db03facb7278..5d1c1e4dfba2 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -11,6 +11,7 @@ replace ( replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( + github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/kvproto v0.0.0-20230914070744-5a0c9e4b32e6 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 @@ -57,7 +58,6 @@ require ( github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/docker/go-units v0.4.0 // indirect github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 // indirect github.com/elliotchance/pie/v2 v2.1.0 // indirect github.com/fogleman/gg v1.3.0 // indirect diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 187ba54dfcb0..9097b996df9e 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -17,9 +17,11 @@ package scheduling import ( "context" "fmt" + "reflect" "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" @@ -251,3 +253,80 @@ func (suite *serverTestSuite) TestSchedulerSync() { // TODO: test more schedulers. } + +func (suite *serverTestSuite) TestForwardRegionHeartbeat() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + peers := []*metapb.Peer{ + {Id: 11, StoreId: 1}, + {Id: 22, StoreId: 2}, + {Id: 33, StoreId: 3}, + } + queryStats := &pdpb.QueryStats{ + Get: 5, + Coprocessor: 6, + Scan: 7, + Put: 8, + Delete: 9, + DeleteRange: 10, + AcquirePessimisticLock: 11, + Rollback: 12, + Prewrite: 13, + Commit: 14, + } + interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10} + downPeers := []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}} + pendingPeers := []*metapb.Peer{peers[2]} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")}, + Leader: peers[0], + DownPeers: downPeers, + PendingPeers: pendingPeers, + BytesWritten: 10, + BytesRead: 20, + KeysWritten: 100, + KeysRead: 200, + ApproximateSize: 30 * units.MiB, + ApproximateKeys: 300, + Interval: interval, + QueryStats: queryStats, + Term: 1, + CpuUsage: 100, + } + err = stream.Send(regionReq) + re.NoError(err) + testutil.Eventually(re, func() bool { + region := tc.GetPrimaryServer().GetCluster().GetRegion(10) + return region.GetBytesRead() == 20 && region.GetBytesWritten() == 10 && + region.GetKeysRead() == 200 && region.GetKeysWritten() == 100 && region.GetTerm() == 1 && + region.GetApproximateKeys() == 300 && region.GetApproximateSize() == 30 && + reflect.DeepEqual(region.GetLeader(), peers[0]) && + reflect.DeepEqual(region.GetInterval(), interval) && region.GetReadQueryNum() == 18 && region.GetWriteQueryNum() == 77 && + reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers) + }) +}