Skip to content

Commit

Permalink
ctl: replace gc_safepoint call with PD HTTP SDK (#8504)
Browse files Browse the repository at this point in the history
ref #7300

replace gc_safepoint call with PD HTTP SDK

Signed-off-by: Boyang Lyu <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JackL9u and ti-chi-bot[bot] authored Aug 21, 2024
1 parent ce1d0e8 commit 6c30bbf
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 26 deletions.
6 changes: 6 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
safepoint = "/pd/api/v1/gc/safepoint"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
// Keyspace
Expand Down Expand Up @@ -215,3 +216,8 @@ func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
func GetKeyspaceMetaByNameURL(keyspaceName string) string {
return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName)
}

// GetDeleteSafePointURI returns the URI for delete safepoint service
func GetDeleteSafePointURI(serviceID string) string {
return fmt.Sprintf("%s/%s", safepoint, serviceID)
}
30 changes: 30 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ type Client interface {
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetPDVersion(context.Context) (string, error)
GetGCSafePoint(context.Context) (ListServiceGCSafepoint, error)
DeleteGCSafePoint(context.Context, string) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error)
GetMicroServicePrimary(context.Context, string) (string, error)
Expand Down Expand Up @@ -1024,3 +1026,31 @@ func (c *client) GetKeyspaceMetaByName(ctx context.Context, keyspaceName string)
}
return &keyspaceMetaPB, nil
}

// GetGCSafePoint gets the GC safe point list.
func (c *client) GetGCSafePoint(ctx context.Context) (ListServiceGCSafepoint, error) {
var gcSafePoint ListServiceGCSafepoint
err := c.request(ctx, newRequestInfo().
WithName(GetGCSafePointName).
WithURI(safepoint).
WithMethod(http.MethodGet).
WithResp(&gcSafePoint))
if err != nil {
return gcSafePoint, err
}
return gcSafePoint, nil
}

// DeleteGCSafePoint deletes a GC safe point with the given service ID.
func (c *client) DeleteGCSafePoint(ctx context.Context, serviceID string) (string, error) {
var msg string
err := c.request(ctx, newRequestInfo().
WithName(DeleteGCSafePointName).
WithURI(GetDeleteSafePointURI(serviceID)).
WithMethod(http.MethodDelete).
WithResp(&msg))
if err != nil {
return msg, err
}
return msg, nil
}
2 changes: 2 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (
deleteOperators = "DeleteOperators"
UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType"
GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName"
GetGCSafePointName = "GetGCSafePoint"
DeleteGCSafePointName = "DeleteGCSafePoint"
)

type requestInfo struct {
Expand Down
16 changes: 16 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ import (
pd "github.com/tikv/pd/client"
)

// ServiceSafePoint is the safepoint for a specific service
// NOTE: This type is in sync with pd/pkg/storage/endpoint/gc_safe_point.go
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
SafePoint uint64 `json:"safe_point"`
}

// ListServiceGCSafepoint is the response for list service GC safepoint.
// NOTE: This type is in sync with pd/server/api/service_gc_safepoint.go
type ListServiceGCSafepoint struct {
ServiceGCSafepoints []*ServiceSafePoint `json:"service_gc_safe_points"`
MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"`
GCSafePoint uint64 `json:"gc_safe_point"`
}

// ClusterState saves some cluster state information.
// NOTE: This type sync with https://github.com/tikv/pd/blob/5eae459c01a797cbd0c416054c6f0cad16b8740a/server/cluster/cluster.go#L173
type ClusterState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

// ServiceSafePoint is the safepoint for a specific service
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
// This type is in sync with `client/http/types.go`.
type ServiceSafePoint struct {
ServiceID string `json:"service_id"`
ExpiredAt int64 `json:"expired_at"`
Expand Down
1 change: 1 addition & 0 deletions server/api/service_gc_safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func newServiceGCSafepointHandler(svr *server.Server, rd *render.Render) *servic

// ListServiceGCSafepoint is the response for list service GC safepoint.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
// This type is in sync with `pd/client/http/types.go`.
type ListServiceGCSafepoint struct {
ServiceGCSafepoints []*endpoint.ServiceSafePoint `json:"service_gc_safe_points"`
MinServiceGcSafepoint uint64 `json:"min_service_gc_safe_point,omitempty"`
Expand Down
82 changes: 82 additions & 0 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/api"
"github.com/tikv/pd/tests"
)

Expand Down Expand Up @@ -835,3 +837,83 @@ func (suite *httpClientTestSuite) TestRetryOnLeaderChange() {
cancel()
wg.Wait()
}

func (suite *httpClientTestSuite) TestGetGCSafePoint() {
re := suite.Require()
client := suite.client
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()

// adding some safepoints to the server
list := &api.ListServiceGCSafepoint{
ServiceGCSafepoints: []*endpoint.ServiceSafePoint{
{
ServiceID: "AAA",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 1,
},
{
ServiceID: "BBB",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 2,
},
{
ServiceID: "CCC",
ExpiredAt: time.Now().Unix() + 10,
SafePoint: 3,
},
},
GCSafePoint: 1,
MinServiceGcSafepoint: 1,
}

storage := suite.cluster.GetLeaderServer().GetServer().GetStorage()
for _, ssp := range list.ServiceGCSafepoints {
err := storage.SaveServiceGCSafePoint(ssp)
re.NoError(err)
}
storage.SaveGCSafePoint(1)

// get the safepoints and start testing
l, err := client.GetGCSafePoint(ctx)
re.NoError(err)

re.Equal(uint64(1), l.GCSafePoint)
re.Equal(uint64(1), l.MinServiceGcSafepoint)
re.Len(l.ServiceGCSafepoints, 3)

// sort the gc safepoints based on order of ServiceID
sort.Slice(l.ServiceGCSafepoints, func(i, j int) bool {
return l.ServiceGCSafepoints[i].ServiceID < l.ServiceGCSafepoints[j].ServiceID
})

for i, val := range l.ServiceGCSafepoints {
re.Equal(list.ServiceGCSafepoints[i].ServiceID, val.ServiceID)
re.Equal(list.ServiceGCSafepoints[i].SafePoint, val.SafePoint)
}

// delete the safepoints
for i := 0; i < 3; i++ {
msg, err := client.DeleteGCSafePoint(ctx, list.ServiceGCSafepoints[i].ServiceID)
re.NoError(err)
re.Equal("Delete service GC safepoint successfully.", msg)
}

// check that the safepoitns are indeed deleted
l, err = client.GetGCSafePoint(ctx)
re.NoError(err)

re.Equal(uint64(1), l.GCSafePoint)
re.Equal(uint64(0), l.MinServiceGcSafepoint)
re.Empty(l.ServiceGCSafepoints)

// try delete gc_worker, should get an error
_, err = client.DeleteGCSafePoint(ctx, "gc_worker")
re.Error(err)

// try delete some non-exist safepoints, should return normally
var msg string
msg, err = client.DeleteGCSafePoint(ctx, "non_exist")
re.NoError(err)
re.Equal("Delete service GC safepoint successfully.", msg)
}
34 changes: 8 additions & 26 deletions tools/pd-ctl/pdctl/command/gc_safepoint_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,18 @@
package command

import (
"encoding/json"
"net/http"
"sort"

"github.com/spf13/cobra"
"github.com/tikv/pd/server/api"
)

var (
serviceGCSafepointPrefix = "pd/api/v1/gc/safepoint"
)

// NewServiceGCSafepointCommand return a service gc safepoint subcommand of rootCmd
func NewServiceGCSafepointCommand() *cobra.Command {
l := &cobra.Command{
Use: "service-gc-safepoint",
Short: "show all service gc safepoint",
Run: showSSPs,
Use: "service-gc-safepoint",
Short: "show all service gc safepoint",
PersistentPreRunE: requirePDClient,
Run: showSSPs,
}
l.AddCommand(NewDeleteServiceGCSafepointCommand())
return l
Expand All @@ -50,38 +44,26 @@ func NewDeleteServiceGCSafepointCommand() *cobra.Command {
}

func showSSPs(cmd *cobra.Command, _ []string) {
r, err := doRequest(cmd, serviceGCSafepointPrefix, http.MethodGet, http.Header{})
safepoint, err := PDCli.GetGCSafePoint(cmd.Context())
if err != nil {
cmd.Printf("Failed to get service GC safepoint: %s\n", err)
return
}
var safepoint api.ListServiceGCSafepoint
if err := json.Unmarshal([]byte(r), &safepoint); err != nil {
cmd.Printf("Failed to unmarshal service GC safepoint: %s\n", err)
return
}
sort.Slice(safepoint.ServiceGCSafepoints, func(i, j int) bool {
return safepoint.ServiceGCSafepoints[i].SafePoint < safepoint.ServiceGCSafepoints[j].SafePoint
})
data, err := json.MarshalIndent(safepoint, "", " ")
if err != nil {
cmd.Printf("Failed to marshal service GC safepoint: %s\n", err)
return
}
cmd.Println(string(data))
jsonPrint(cmd, safepoint)
}

func deleteSSP(cmd *cobra.Command, args []string) {
if len(args) != 1 {
cmd.Usage()
return
}
serviceID := args[0]
deleteURL := serviceGCSafepointPrefix + "/" + serviceID
r, err := doRequest(cmd, deleteURL, http.MethodDelete, http.Header{})
r, err := PDCli.DeleteGCSafePoint(cmd.Context(), args[0])
if err != nil {
cmd.Printf("Failed to delete service GC safepoint: %s\n", err)
return
}
cmd.Println(r)
jsonPrint(cmd, r)
}
Loading

0 comments on commit 6c30bbf

Please sign in to comment.