Skip to content

Commit

Permalink
*: upgrade etcd to v3.4.30 (#7884) (#7888)
Browse files Browse the repository at this point in the history
ref #5520, close #7904

Upgrade etcd to v3.4.30 and adopt the latest code changes.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: JmPotato <[email protected]>
  • Loading branch information
ti-chi-bot and JmPotato authored Mar 12, 2024
1 parent d71a1a3 commit 8544834
Show file tree
Hide file tree
Showing 28 changed files with 250 additions and 352 deletions.
3 changes: 2 additions & 1 deletion client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

Expand All @@ -52,7 +53,7 @@ const (
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...grpc.DialOption) (*grpc.ClientConn, error) {
opt := grpc.WithInsecure() //nolint
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsCfg != nil {
creds := credentials.NewTLS(tlsCfg)
opt = grpc.WithTransportCredentials(creds)
Expand Down
38 changes: 19 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ module github.com/tikv/pd

go 1.21

// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/BurntSushi/toml v0.3.1
Expand Down Expand Up @@ -43,7 +48,7 @@ require (
github.com/sasha-s/go-deadlock v0.2.0
github.com/shirou/gopsutil/v3 v3.23.3
github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072
github.com/soheilhy/cmux v0.1.4
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.3
Expand All @@ -52,15 +57,15 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/unrolled/render v1.0.1
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.etcd.io/etcd v0.5.0-alpha.5.0.20240131130919-ff2304879e1b
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a
golang.org/x/text v0.13.0
golang.org/x/text v0.14.0
golang.org/x/time v0.1.0
golang.org/x/tools v0.6.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc v1.58.3
gotest.tools/gotestsum v1.7.0
)

Expand Down Expand Up @@ -179,22 +184,24 @@ require (
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
// Fix panic in unit test with go >= 1.14, ref: etcd-io/bbolt#201 https://github.com/etcd-io/bbolt/pull/201
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
go.uber.org/dig v1.9.0 // indirect
go.uber.org/fx v1.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/image v0.5.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -205,10 +212,3 @@ require (
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0

// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
76 changes: 29 additions & 47 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/dashboard/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func GenDashboardConfig(srv *server.Server) (*config.Config, error) {

dashboardCfg := config.Default()
dashboardCfg.DataDir = cfg.DataDir
dashboardCfg.PDEndPoint = etcdCfg.ACUrls[0].String()
dashboardCfg.PDEndPoint = etcdCfg.AdvertiseClientUrls[0].String()
dashboardCfg.PublicPathPrefix = cfg.Dashboard.PublicPathPrefix
dashboardCfg.EnableTelemetry = cfg.Dashboard.EnableTelemetry
dashboardCfg.EnableExperimental = cfg.Dashboard.EnableExperimental
Expand Down
6 changes: 3 additions & 3 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestExitWatch(t *testing.T) {
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
cfg1 := server.Config()
etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client)
client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls)
client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().ListenClientUrls)
re.NoError(err)
// close the original leader
server.Server.HardStop()
Expand Down Expand Up @@ -189,7 +189,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe
re := require.New(t)
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().ListenClientUrls)
re.NoError(err)
defer client2.Close()

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestRequestProgress(t *testing.T) {
defer os.RemoveAll(fname)
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().ListenClientUrls)
re.NoError(err)
defer client2.Close()

Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/tso/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// MustNewGrpcClient must create a new TSO grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) (*grpc.ClientConn, tsopb.TSOClient) {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithTransportCredentials(insecure.NewCredentials()))
re.NoError(err)
return conn, tsopb.NewTSOClient(conn)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestErrorCode(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.TODO())
rc := NewRegionSyncer(server)
conn, err := grpcutil.GetClientConn(ctx, "127.0.0.1", nil)
conn, err := grpcutil.GetClientConn(ctx, "http://127.0.0.1", nil)
re.NoError(err)
cancel()
_, err = rc.syncRegion(ctx, conn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.ClusterID = rand.Uint64()
servers, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
suite.backendEndpoints, suite.etcdClient, suite.clean = servers[0].Config().LCUrls[0].String(), client, clean
suite.backendEndpoints, suite.etcdClient, suite.clean = servers[0].Config().ListenClientUrls[0].String(), client, clean
suite.cfg = suite.createConfig()
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ func TestEtcdScaleInAndOut(t *testing.T) {
etcd1, cfg1 := servers[0], servers[0].Config()

// Create two etcd clients with etcd1 as endpoint.
client1, err := CreateEtcdClient(nil, cfg1.LCUrls) // execute member change operation with this client
client1, err := CreateEtcdClient(nil, cfg1.ListenClientUrls) // execute member change operation with this client
re.NoError(err)
defer client1.Close()
client2, err := CreateEtcdClient(nil, cfg1.LCUrls) // check member change with this client
client2, err := CreateEtcdClient(nil, cfg1.ListenClientUrls) // check member change with this client
re.NoError(err)
defer client2.Close()

Expand Down Expand Up @@ -285,7 +285,7 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, cfg1.LCUrls[0].String(), proxyAddr, &enableDiscard)
go proxyWithDiscard(re, cfg1.ListenClientUrls[0].String(), proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
Expand Down Expand Up @@ -385,7 +385,7 @@ func (suite *loopWatcherTestSuite) SetupSuite() {
// Start a etcd server and create a client with etcd1 as endpoint.
suite.config = newTestSingleConfig(t)
suite.startEtcd()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.ListenClientUrls)
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
Expand Down Expand Up @@ -587,23 +587,23 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.ListenClientUrls)
suite.NoError(err)
watcher.updateClientCh <- suite.client
suite.put("TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.ListenClientUrls)
suite.NoError(err)
suite.put("TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls)
suite.client, err = CreateEtcdClient(nil, suite.config.ListenClientUrls)
suite.NoError(err)
suite.put("TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/etcdutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func newTestSingleConfig(t *testing.T) *embed.Config {
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cfg.ListenPeerUrls = []url.URL{*pu}
cfg.AdvertisePeerUrls = cfg.ListenPeerUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls
cfg.ListenClientUrls = []url.URL{*cu}
cfg.AdvertiseClientUrls = cfg.ListenClientUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.ListenPeerUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}
Expand All @@ -63,7 +63,7 @@ func NewTestEtcdCluster(t *testing.T, count int) (servers []*embed.Etcd, etcdCli
cfg := newTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
etcdClient, err = CreateEtcdClient(nil, cfg.LCUrls)
etcdClient, err = CreateEtcdClient(nil, cfg.ListenClientUrls)
re.NoError(err)
<-etcd.Server.ReadyNotify()
servers = append(servers, etcd)
Expand Down Expand Up @@ -100,9 +100,9 @@ func MustAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client
re := require.New(t)
cfg2 := newTestSingleConfig(t)
cfg2.Name = genRandName()
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.ListenPeerUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
peerURL := cfg2.ListenPeerUrls[0].String()
addResp, err := AddEtcdMember(client, []string{peerURL})
re.NoError(err)
// Check the client can get the new member.
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -133,7 +134,7 @@ func (s TLSConfig) GetOneAllowedCN() (string, error) {
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...grpc.DialOption) (*grpc.ClientConn, error) {
opt := grpc.WithInsecure()
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsCfg != nil {
creds := credentials.NewTLS(tlsCfg)
opt = grpc.WithTransportCredentials(creds)
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var LeakOptions = []goleak.Option{
goleak.IgnoreTopFunction("github.com/syndtr/goleveldb/leveldb.(*DB).mpoolDrain"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ccBalancerWrapper).watcher"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("sync.runtime_notifyListWait"),
// TODO: remove the below options once we fixed the http connection leak problems
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
Expand Down Expand Up @@ -77,7 +78,7 @@ func NewRequestHeader(clusterID uint64) *pdpb.RequestHeader {

// MustNewGrpcClient must create a new PD grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) pdpb.PDClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithTransportCredentials(insecure.NewCredentials()))
re.NoError(err)
return pdpb.NewPDClient(conn)
}
Expand Down
8 changes: 4 additions & 4 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,22 +742,22 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) {
cfg.Logger = "zap"
var err error

cfg.LPUrls, err = parseUrls(c.PeerUrls)
cfg.ListenPeerUrls, err = parseUrls(c.PeerUrls)
if err != nil {
return nil, err
}

cfg.APUrls, err = parseUrls(c.AdvertisePeerUrls)
cfg.AdvertisePeerUrls, err = parseUrls(c.AdvertisePeerUrls)
if err != nil {
return nil, err
}

cfg.LCUrls, err = parseUrls(c.ClientUrls)
cfg.ListenClientUrls, err = parseUrls(c.ClientUrls)
if err != nil {
return nil, err
}

cfg.ACUrls, err = parseUrls(c.AdvertiseClientUrls)
cfg.AdvertiseClientUrls, err = parseUrls(c.AdvertiseClientUrls)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,12 @@ func (s *Server) startClient() error {
}
/* Starting two different etcd clients here is to avoid the throttling. */
// This etcd client will be used to access the etcd cluster to read and write all kinds of meta data.
s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls, "server-etcd-client")
s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.AdvertiseClientUrls, "server-etcd-client")
if err != nil {
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
// This etcd client will only be used to read and write the election-related data, such as leader key.
s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls, "election-etcd-client")
s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.AdvertiseClientUrls, "election-etcd-client")
if err != nil {
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
Expand Down
Loading

0 comments on commit 8544834

Please sign in to comment.