Skip to content

Commit

Permalink
Merge branch 'master' into improve_rg_test_diagnose
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Aug 22, 2024
2 parents f15b156 + b132ea6 commit 5570562
Show file tree
Hide file tree
Showing 93 changed files with 565 additions and 363 deletions.
6 changes: 6 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
safepoint = "/pd/api/v1/gc/safepoint"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
// Keyspace
Expand Down Expand Up @@ -215,3 +216,8 @@ func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
func GetKeyspaceMetaByNameURL(keyspaceName string) string {
return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName)
}

// GetDeleteSafePointURI returns the URI for delete safepoint service
func GetDeleteSafePointURI(serviceID string) string {
return fmt.Sprintf("%s/%s", safepoint, serviceID)
}
30 changes: 30 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type Client interface {
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
GetGCSafePoint(context.Context) (ListServiceGCSafepoint, error)
DeleteGCSafePoint(context.Context, string) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServicePrimary(context.Context, string) (string, error)
Expand Down Expand Up @@ -1024,3 +1026,31 @@ func (c *client) GetKeyspaceMetaByName(ctx context.Context, keyspaceName string)
}
return &keyspaceMetaPB, nil
}

// GetGCSafePoint gets the GC safe point list.
func (c *client) GetGCSafePoint(ctx context.Context) (ListServiceGCSafepoint, error) {
var gcSafePoint ListServiceGCSafepoint
err := c.request(ctx, newRequestInfo().
WithName(GetGCSafePointName).
WithURI(safepoint).
WithMethod(http.MethodGet).
WithResp(&gcSafePoint))
if err != nil {
return gcSafePoint, err
}
return gcSafePoint, nil
}

// DeleteGCSafePoint deletes a GC safe point with the given service ID.
func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (string, error) {
var msg string
err := c.request(ctx, newRequestInfo().
WithName(DeleteGCSafePointName).
WithURI(GetDeleteSafePointURI(serviceID)).
WithMethod(http.MethodDelete).
WithResp(&msg))
if err != nil {
return msg, err
}
return msg, nil
}
2 changes: 2 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
deleteOperators = "DeleteOperators"
UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType"
GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName"
GetGCSafePointName = "GetGCSafePoint"
DeleteGCSafePointName = "DeleteGCSafePoint"
)

type requestInfo struct {
Expand Down
16 changes: 16 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ import (
pd "github.com/tikv/pd/client"
)

// ServiceSafePoint is the safepoint for a specific service
// NOTE: This type is in sync with pd/pkg/storage/endpoint/gc_safe_point.go
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
SafePoint uint64 `json:"safe_point"`
}

// ListServiceGCSafepoint is the response for list service GC safepoint.
// NOTE: This type is in sync with pd/server/api/service_gc_safepoint.go
type ListServiceGCSafepoint struct {
ServiceGCSafepoints []*ServiceSafePoint `json:"service_gc_safe_points"`
MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"`
GCSafePoint uint64 `json:"gc_safe_point"`
}

// ClusterState saves some cluster state information.
// NOTE: This type sync with https://github.com/tikv/pd/blob/5eae459c01a797cbd0c416054c6f0cad16b8740a/server/cluster/cluster.go#L173
type ClusterState struct {
Expand Down
4 changes: 2 additions & 2 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type pdServiceClient struct {
}

// NOTE: In the current implementation, the URL passed in is bound to have a scheme,
// because it is processed in `newPDServiceDiscovery`, and the url returned by etcd member owns the sheme.
// because it is processed in `newPDServiceDiscovery`, and the url returned by etcd member owns the scheme.
// When testing, the URL is also bound to have a scheme.
func newPDServiceClient(url, leaderURL string, conn *grpc.ClientConn, isLeader bool) ServiceClient {
cli := &pdServiceClient{
Expand Down Expand Up @@ -1074,7 +1074,7 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader
leaderURL := pickMatchedURL(leader.GetClientUrls(), c.tlsCfg)
leaderChanged, err := c.switchLeader(leaderURL)
followerChanged := c.updateFollowers(members, leader.GetMemberId(), leaderURL)
// don't need to recreate balancer if no changess.
// don't need to recreate balancer if no changes.
if !followerChanged && !leaderChanged {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@

[keyspace]
## pre-alloc is used to pre-allocate keyspaces during pd bootstrap.
## Its value should be a list of strings, denotting the name of the keyspaces.
## Its value should be a list of strings, denoting the name of the keyspaces.
## Example:
## pre-alloc = ["admin", "user1", "user2"]
# pre-alloc = []
2 changes: 1 addition & 1 deletion pkg/autoscaling/calculation.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func findBestGroupToScaleOut(strategy *Strategy, groups []*Plan, component Compo
},
}

// TODO: we can provide different senerios by using options and remove this kind of special judgement.
// TODO: we can provide different scenarios by using options and remove this kind of special judgement.
if component == TiKV {
group.Labels[filter.SpecialUseKey] = filter.SpecialUseHotRegion
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {
func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcworkerServiceID := "gc_worker"
gcWorkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
cdcServiceSafePoint := uint64(10)
Expand All @@ -101,7 +101,7 @@ func TestServiceGCSafePointUpdate(t *testing.T) {
re.NoError(err)
re.True(updated)
// the service will init the service safepoint to 0(<10 for cdc) for gc_worker.
re.Equal(gcworkerServiceID, min.ServiceID)
re.Equal(gcWorkerServiceID, min.ServiceID)
}()

// update the safepoint for br to 15 should success
Expand All @@ -111,24 +111,24 @@ func TestServiceGCSafePointUpdate(t *testing.T) {
re.NoError(err)
re.True(updated)
// the service will init the service safepoint to 0(<10 for cdc) for gc_worker.
re.Equal(gcworkerServiceID, min.ServiceID)
re.Equal(gcWorkerServiceID, min.ServiceID)
}()

// update safepoint to 8 for gc_woker should be success
// update safepoint to 8 for gc_worker should be success
go func() {
defer wg.Done()
// update with valid ttl for gc_worker should be success.
min, updated, _ := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
min, updated, _ := manager.UpdateServiceGCSafePoint(gcWorkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.True(updated)
// the current min safepoint should be 8 for gc_worker(cdc 10)
re.Equal(gcWorkerSafePoint, min.SafePoint)
re.Equal(gcworkerServiceID, min.ServiceID)
re.Equal(gcWorkerServiceID, min.ServiceID)
}()

go func() {
defer wg.Done()
// update safepoint of gc_worker's service with ttl not infinity should be failed.
_, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, 10000, 10, time.Now())
_, updated, err := manager.UpdateServiceGCSafePoint(gcWorkerServiceID, 10000, 10, time.Now())
re.Error(err)
re.False(updated)
}()
Expand All @@ -145,7 +145,7 @@ func TestServiceGCSafePointUpdate(t *testing.T) {
wg.Wait()
// update safepoint to 15(>10 for cdc) for gc_worker
gcWorkerSafePoint = uint64(15)
min, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
min, updated, err := manager.UpdateServiceGCSafePoint(gcWorkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.NoError(err)
re.True(updated)
re.Equal(cdcServiceID, min.ServiceID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

const (
defaultName = "Resource Manager"
defaultName = "resource manager"
defaultBackendEndpoints = "http://127.0.0.1:2379"
defaultListenAddr = "http://127.0.0.1:3379"

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
mcsconstant "github.com/tikv/pd/pkg/mcs/utils/constant"
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
Expand All @@ -47,7 +47,7 @@ import (
)

const (
defaultName = "Scheduling"
defaultName = "scheduling"
defaultBackendEndpoints = "http://127.0.0.1:2379"
defaultListenAddr = "http://127.0.0.1:3379"
)
Expand Down
8 changes: 7 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -193,7 +194,7 @@ func (s *Server) updateAPIServerMemberLoop() {
if !s.IsServing() {
continue
}
members, err := s.GetClient().MemberList(ctx)
members, err := etcdutil.ListEtcdMembers(ctx, s.GetClient())
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
continue
Expand All @@ -212,6 +213,11 @@ func (s *Server) updateAPIServerMemberLoop() {
cc, err := s.GetDelegateClient(ctx, s.GetTLSConfig(), ep.ClientURLs[0])
if err != nil {
log.Info("failed to get delegate client", errs.ZapError(err))
continue
}
if !s.IsServing() {
// double check
break
}
if s.cluster.SwitchAPIServerLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
defaultMaxResetTSGap = 24 * time.Hour

defaultName = "TSO"
defaultName = "tso"
defaultBackendEndpoints = "http://127.0.0.1:2379"
defaultListenAddr = "http://127.0.0.1:3379"

Expand Down
2 changes: 1 addition & 1 deletion pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
// for in PD/API service or the tsopb.Participant in the microserives.
// for in PD/API service or the tsopb.Participant in the micro services.
type ElectionLeader interface {
// GetListenUrls returns the listen urls
GetListenUrls() []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package mockconfig

import (
sc "github.com/tikv/pd/pkg/schedule/config"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/server/config"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/movingaverage/weight_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package movingaverage
// WeightAllocator will divide these items into some segments whose number named as segNum which should great than 0.
// And the items at first segment will be assigned more weight that is `segNum` times that of item at last segment.
// If you want assign same weights, just input segNum as 1.
// If length is 10 and segNum is 3, it will make the weight arrry as [3,3,3,3,2,2,2,1,1,1],
// If length is 10 and segNum is 3, it will make the weight array as [3,3,3,3,2,2,2,1,1,1],
// and then uniform it : [3,3,3,3,2,2,2,1,1,1]/sum(arr)=arr/21,
// And the final weight is [0.143,0.143,0.143,0.143,0.095,0.095,0.095,0.047,0.047,0.047];
// If length is 10 and segNum is 1, the weight is [0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1,0.1];
Expand Down
2 changes: 1 addition & 1 deletion pkg/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func TestReplicateState(t *testing.T) {
rep.tickReplicateStatus()
assertLastData(t, replicator.lastData[1], "sync", stateID, nil)

// repliate state to new member
// replicate state to new member
replicator.memberIDs = append(replicator.memberIDs, 2, 3)
rep.tickReplicateStatus()
assertLastData(t, replicator.lastData[2], "sync", stateID, nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/logutil"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"go.uber.org/zap"
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/versioninfo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -265,7 +265,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla
minCount := uint64(math.MaxUint64)
for _, p := range region.GetPeers() {
count := c.record.getOfflineLeaderCount(p.GetStoreId())
checkPeerhealth := func() bool {
checkPeerHealth := func() bool {
if p.GetId() == peer.GetId() {
return true
}
Expand All @@ -274,7 +274,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla
}
return c.allowLeader(fit, p)
}
if minCount > count && checkPeerhealth() {
if minCount > count && checkPeerHealth() {
minCount = count
newLeader = p
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/storelimit"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/storage/endpoint"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -182,7 +182,7 @@ func (c *Coordinator) driveSlowNodeScheduler() {
case <-ticker.C:
{
// If enabled, exit.
if exists, _ := c.schedulers.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists {
if exists, _ := c.schedulers.IsSchedulerExisted(types.EvictSlowTrendScheduler.String()); exists {
return
}
// If the cluster was set up with `raft-kv2` engine, this cluster should
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/filter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
types "github.com/tikv/pd/pkg/schedule/type"
"github.com/tikv/pd/pkg/schedule/types"
)

func TestString(t *testing.T) {
Expand Down
Loading

0 comments on commit 5570562

Please sign in to comment.