diff --git a/client/go.mod b/client/go.mod index 08048bcd78bb..51249822d239 100644 --- a/client/go.mod +++ b/client/go.mod @@ -41,4 +41,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 +replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 diff --git a/client/go.sum b/client/go.sum index ac845fd783d5..1188bc42f64e 100644 --- a/client/go.sum +++ b/client/go.sum @@ -117,8 +117,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= diff --git a/go.mod b/go.mod index 295fd55577df..4194787957e9 100644 --- a/go.mod +++ b/go.mod @@ -206,4 +206,4 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 // After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`. // replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch -replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 +replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 diff --git a/go.sum b/go.sum index b39faf9927a1..98cc4941f0d6 100644 --- a/go.sum +++ b/go.sum @@ -492,8 +492,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 91d8af8f8dfc..82e3f2dbf37e 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -16,6 +16,7 @@ import ( sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/slice" @@ -349,3 +350,85 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) return nil } + +// HandleRegionHeartbeat processes RegionInfo reports from client. +func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { + if err := c.processRegionHeartbeat(region); err != nil { + return err + } + + c.coordinator.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, c.coordinator.RecordOpStepWithTTL) + return nil +} + +// processRegionHeartbeat updates the region information. +func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { + origin, _, err := c.PreCheckPutRegion(region) + if err != nil { + return err + } + region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) + + c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region)) + c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region)) + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) + c.hotStat.CheckWriteAsync(statistics.NewCheckPeerTask(peerInfo, region)) + } + c.coordinator.GetSchedulersController().CheckTransferWitnessLeader(region) + + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. + // Save to cache if meta or leader is updated, or contains any down/pending peer. + // Mark isNew if the region in cache does not have leader. + isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache && !isNew { + // Due to some config changes need to update the region stats as well, + // so we do some extra checks here. + if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + return nil + } + + var overlaps []*core.RegionInfo + if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if overlaps, err = c.AtomicCheckAndPutRegion(region); err != nil { + return err + } + + for _, item := range overlaps { + if c.regionStats != nil { + c.regionStats.ClearDefunctRegion(item.GetID()) + } + c.labelLevelStats.ClearDefunctRegion(item.GetID()) + c.ruleManager.InvalidCache(item.GetID()) + } + } + + if hasRegionStats { + c.regionStats.Observe(region, c.GetRegionStores(region)) + } + + if !c.IsPrepared() && isNew { + c.coordinator.GetPrepareChecker().Collect(region) + } + + return nil +} + +// IsPrepared return true if the prepare checker is ready. +func (c *Cluster) IsPrepared() bool { + return c.coordinator.GetPrepareChecker().IsPrepared() +} + +// TODO: implement the following methods + +// AllocID allocates a new ID. +func (c *Cluster) AllocID() (uint64, error) { return 0, nil } diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index f615e0c37c03..29ad541ebcbc 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -16,13 +16,20 @@ package server import ( "context" + "io" "net/http" + "sync/atomic" + "time" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/schedulingpb" "github.com/pingcap/log" + "github.com/pkg/errors" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -67,6 +74,104 @@ func NewService[T ConfigProvider](svr bs.Server) registry.RegistrableService { } } +// heartbeatServer wraps PD_RegionHeartbeatServer to ensure when any error +// occurs on Send() or Recv(), both endpoints will be closed. +type heartbeatServer struct { + stream pdpb.PD_RegionHeartbeatServer + closed int32 +} + +func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { + if atomic.LoadInt32(&s.closed) == 1 { + return io.EOF + } + done := make(chan error, 1) + go func() { + defer logutil.LogPanic() + done <- s.stream.Send(m) + }() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case err := <-done: + if err != nil { + atomic.StoreInt32(&s.closed, 1) + } + return errors.WithStack(err) + case <-timer.C: + atomic.StoreInt32(&s.closed, 1) + return status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + } +} + +func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { + if atomic.LoadInt32(&s.closed) == 1 { + return nil, io.EOF + } + req, err := s.stream.Recv() + if err != nil { + atomic.StoreInt32(&s.closed, 1) + return nil, errors.WithStack(err) + } + return req, nil +} + +// RegionHeartbeat implements gRPC PDServer. +func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeatServer) error { + var ( + server = &heartbeatServer{stream: stream} + cancel context.CancelFunc + lastBind time.Time + ) + defer func() { + // cancel the forward stream + if cancel != nil { + cancel() + } + }() + + for { + request, err := server.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + c := s.GetCluster() + if c == nil { + resp := &pdpb.RegionHeartbeatResponse{Header: &pdpb.ResponseHeader{ + ClusterId: s.clusterID, + Error: &pdpb.Error{ + Type: pdpb.ErrorType_NOT_BOOTSTRAPPED, + Message: "scheduling server is not initialized yet", + }, + }} + err := server.Send(resp) + return errors.WithStack(err) + } + + storeID := request.GetLeader().GetStoreId() + store := c.GetStore(storeID) + if store == nil { + return errors.Errorf("invalid store ID %d, not found", storeID) + } + + if time.Since(lastBind) > time.Minute { + s.hbStreams.BindStream(storeID, server) + lastBind = time.Now() + } + region := core.RegionFromHeartbeat(request, core.SetFromHeartbeat(true)) + err = c.HandleRegionHeartbeat(region) + if err != nil { + msg := err.Error() + s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader()) + continue + } + } +} + // StoreHeartbeat implements gRPC PDServer. func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) { c := s.GetCluster() diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index bcb5aeeb9a45..330085e3b82d 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -349,11 +349,6 @@ func (s *Server) GetCoordinator() *schedule.Coordinator { return s.GetCluster().GetCoordinator() } -// GetCluster returns the cluster. -func (s *Server) GetCluster() *Cluster { - return s.cluster -} - // ServerLoopWgDone decreases the server loop wait group. func (s *Server) ServerLoopWgDone() { s.serverLoopWg.Done() diff --git a/server/grpc_service.go b/server/grpc_service.go index b33031666d4a..74e33cca11ea 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1250,6 +1250,9 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error lastForwardedHost string lastBind time.Time errCh chan error + schedulingStream schedulingpb.Scheduling_RegionHeartbeatClient + cancel1 context.CancelFunc + lastPrimaryAddr string ) defer func() { // cancel the forward stream @@ -1363,6 +1366,36 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error s.hbStreams.SendErr(pdpb.ErrorType_UNKNOWN, msg, request.GetLeader()) continue } + + if s.IsAPIServiceMode() { + ctx := stream.Context() + primaryAddr, _ := s.GetServicePrimaryAddr(ctx, utils.SchedulingServiceName) + if schedulingStream == nil || lastPrimaryAddr != primaryAddr { + if cancel1 != nil { + cancel1() + } + client, err := s.getDelegateClient(ctx, primaryAddr) + if err != nil { + log.Error("get delegate client failed", zap.Error(err)) + } + + log.Info("create region heartbeat forward stream", zap.String("forwarded-host", primaryAddr)) + schedulingStream, cancel1, err = s.createSchedulingStream(client) + if err != nil { + log.Error("create region heartbeat forward stream failed", zap.Error(err)) + } else { + lastPrimaryAddr = primaryAddr + errCh = make(chan error, 1) + go forwardSchedulingToServer(schedulingStream, server, errCh) + } + } + if schedulingStream != nil { + if err := schedulingStream.Send(request); err != nil { + log.Error("forward region heartbeat failed", zap.Error(err)) + } + } + } + regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() } @@ -2312,6 +2345,31 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } } +func (s *GrpcServer) createSchedulingStream(client *grpc.ClientConn) (schedulingpb.Scheduling_RegionHeartbeatClient, context.CancelFunc, error) { + done := make(chan struct{}) + ctx, cancel := context.WithCancel(s.ctx) + go grpcutil.CheckStream(ctx, cancel, done) + forwardStream, err := schedulingpb.NewSchedulingClient(client).RegionHeartbeat(ctx) + done <- struct{}{} + return forwardStream, cancel, err +} + +func forwardSchedulingToServer(forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) { + defer logutil.LogPanic() + defer close(errCh) + for { + resp, err := forwardStream.Recv() + if err != nil { + errCh <- errors.WithStack(err) + return + } + if err := server.Send(resp); err != nil { + errCh <- errors.WithStack(err) + return + } + } +} + func (s *GrpcServer) createTSOForwardStream( ctx context.Context, client *grpc.ClientConn, ) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) { diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index 9dc2cf116c8c..9cc7e2de5b9d 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -10,7 +10,7 @@ replace ( // reset grpc and protobuf deps in order to import client and server at the same time replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 -replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 +replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 require ( github.com/docker/go-units v0.4.0 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index 38ba38f8b227..45e03f4b7bcf 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -456,8 +456,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 635488c80e1c..cbc8776254e0 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -10,7 +10,7 @@ replace ( // reset grpc and protobuf deps in order to import client and server at the same time replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 -replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 +replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 681bf1d33013..8f07cdf99f59 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -460,8 +460,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index 7963363868ed..b4162ccb0fb6 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -11,7 +11,7 @@ replace ( // reset grpc and protobuf deps in order to import client and server at the same time replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 -replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 +replace github.com/pingcap/kvproto => github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index 3a7fa8bed38d..2134009d0fd1 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -454,8 +454,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4 h1:z6TiGSwLwG5b+18YVYAhq5qPxaFxJLhyK3FN8n4n4n0= -github.com/rleungx/kvproto v0.0.0-20230823063216-12256d4fd0a4/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287 h1:k5LiNAjzBL5TRaA/ayyUNfPbSkNCUMuYnaLAPb02euE= +github.com/rleungx/kvproto v0.0.0-20230824090226-d21284147287/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=