Skip to content

Commit

Permalink
expand min-resolved-ts to support stores
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 2, 2023
1 parent 0c537bb commit 250f148
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
31 changes: 25 additions & 6 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package api

import (
"encoding/json"
"io"
"net/http"
"strconv"

Expand All @@ -38,9 +40,10 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved

// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type minResolvedTS struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
PersistInterval typeutil.Duration `json:"persist_interval,omitempty"`
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
PersistInterval typeutil.Duration `json:"persist_interval,omitempty"`
StoreMinResolvedTS map[uint64]uint64 `json:"store_min_resolved_ts"`
}

// @Tags min_store_resolved_ts
Expand Down Expand Up @@ -77,9 +80,25 @@ func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.R
c := h.svr.GetRaftCluster()
value := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval

var storeMinResolvedTS map[uint64]uint64
if b, err := io.ReadAll(r.Body); err == nil && len(b) != 0 {
// stores ids is an optional parameter.
// if it is not empty, return the min resolved ts of the specified stores into map.
var ids []string
err = json.Unmarshal(b, &ids)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
c := h.svr.GetRaftCluster()
storeMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
}

h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: value,
PersistInterval: persistInterval,
IsRealTime: persistInterval.Duration != 0,
MinResolvedTS: value,
PersistInterval: persistInterval,
IsRealTime: persistInterval.Duration != 0,
StoreMinResolvedTS: storeMinResolvedTS,
})
}
40 changes: 40 additions & 0 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package api

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -116,6 +119,29 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
})
}

func (suite *minResolvedTSTestSuite) TestMinResolvedTSByStores() {
// default run job
interval := typeutil.Duration{Duration: suite.defaultInterval}
suite.setMinResolvedTSPersistenceInterval(interval)
suite.Eventually(func() bool {
return interval == suite.svr.GetRaftCluster().GetPDServerConfig().MinResolvedTSPersistenceInterval
}, time.Second*10, time.Millisecond*20)
// set min resolved ts
rc := suite.svr.GetRaftCluster()
ts := uint64(233)
rc.SetMinResolvedTS(1, ts)
storeIDs := []string{"1"}
suite.checkMinResolvedTSByStores(&minResolvedTS{
MinResolvedTS: 0,
IsRealTime: true,
PersistInterval: interval,
StoreMinResolvedTS: map[uint64]uint64{
1: ts,
},
}, storeIDs)

}

func (suite *minResolvedTSTestSuite) setMinResolvedTSPersistenceInterval(duration typeutil.Duration) {
cfg := suite.svr.GetRaftCluster().GetPDServerConfig().Clone()
cfg.MinResolvedTSPersistenceInterval = duration
Expand All @@ -133,3 +159,17 @@ func (suite *minResolvedTSTestSuite) checkMinResolvedTS(expect *minResolvedTS) {
return reflect.DeepEqual(expect, listResp)
}, time.Second*10, time.Millisecond*20)
}

func (suite *minResolvedTSTestSuite) checkMinResolvedTSByStores(expect *minResolvedTS, storeIDs []string) {
suite.Eventually(func() bool {
data, _ := json.Marshal(storeIDs)
req, _ := http.NewRequest(http.MethodGet, suite.url, bytes.NewBuffer(data))
res, err := testDialClient.Do(req)
suite.NoError(err)
defer res.Body.Close()
listResp := &minResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
suite.NoError(err)
return reflect.DeepEqual(expect, listResp)
}, time.Second*10, time.Millisecond*20)
}
14 changes: 14 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,20 @@ func (c *RaftCluster) GetStoreMinResolvedTS(storeID uint64) uint64 {
return c.GetStore(storeID).GetMinResolvedTS()
}

// GetMinResolvedTSByStoreIDs returns the min resolved ts of the stores.
func (c *RaftCluster) GetMinResolvedTSByStoreIDs(ids []string) map[uint64]uint64 {
allMinResolvedTS := make(map[uint64]uint64)
for _, idStr := range ids {
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
log.Error("parse store id failed", errs.ZapError(err))
continue
}
allMinResolvedTS[storeID] = c.GetStoreMinResolvedTS(storeID)
}
return allMinResolvedTS
}

// GetExternalTS returns the external timestamp.
func (c *RaftCluster) GetExternalTS() uint64 {
c.RLock()
Expand Down

0 comments on commit 250f148

Please sign in to comment.