Skip to content

Commit 2c834b4

Browse files
authored
Fix the migrating slot didn't compatible with old format (#310)
1 parent fc80b1a commit 2c834b4

File tree

12 files changed

+260
-71
lines changed

12 files changed

+260
-71
lines changed

cmd/client/command/helper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func printCluster(cluster *store.Cluster) {
5151
role = strings.ToUpper(store.RoleMaster)
5252
}
5353
migratingStatus := "NO"
54-
if shard.MigratingSlot != nil {
54+
if shard.IsMigrating() {
5555
migratingStatus = fmt.Sprintf("%s --> %d", shard.MigratingSlot, shard.TargetShardIndex)
5656
}
5757
columns := []string{fmt.Sprintf("%d", i), node.ID(), node.Addr(), role, migratingStatus}

controller/cluster.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,19 +318,12 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
318318
if !shard.IsMigrating() {
319319
continue
320320
}
321-
322321
sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
323322
if err != nil {
324323
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
325324
return
326325
}
327-
if sourceNodeClusterInfo.MigratingSlot == nil {
328-
log.Error("The source migration slot is empty",
329-
zap.String("migrating_slot", shard.MigratingSlot.String()),
330-
)
331-
return
332-
}
333-
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) {
326+
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
334327
log.Error("Mismatch migrating slot",
335328
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
336329
zap.String("migrating_slot", shard.MigratingSlot.String()),
@@ -355,9 +348,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
355348
c.updateCluster(clonedCluster)
356349
log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String()))
357350
case "success":
358-
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot)
351+
clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange)
359352
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
360-
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot,
353+
clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange,
361354
)
362355
migratedSlot := shard.MigratingSlot
363356
clonedCluster.Shards[i].ClearMigrateState()

controller/cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func TestCluster_FailureCount(t *testing.T) {
109109
mockNode0, mockNode1, mockNode2, mockNode3,
110110
},
111111
SlotRanges: []store.SlotRange{{Start: 0, Stop: 16383}},
112-
MigratingSlot: nil,
112+
MigratingSlot: &store.MigratingSlot{IsMigrating: false},
113113
TargetShardIndex: -1,
114114
}},
115115
}
@@ -221,7 +221,7 @@ func TestCluster_MigrateSlot(t *testing.T) {
221221
}()
222222
slotRange, err := store.NewSlotRange(0, 0)
223223
require.NoError(t, err)
224-
require.NoError(t, cluster.MigrateSlot(ctx, *slotRange, 1, false))
224+
require.NoError(t, cluster.MigrateSlot(ctx, slotRange, 1, false))
225225

226226
s := NewMockClusterStore()
227227
require.NoError(t, s.CreateCluster(ctx, ns, cluster))

server/api/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333

3434
type MigrateSlotRequest struct {
3535
Target int `json:"target" validate:"required"`
36-
Slot store.SlotRange `json:"slot" validate:"required"`
36+
Slot store.SlotRange `json:"slot" validate:"required"` // we don't use store.MigratingSlot here because we expect a valid SlotRange
3737
SlotOnly bool `json:"slot_only"`
3838
}
3939

server/api/cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func TestClusterBasics(t *testing.T) {
131131
slotRange, err := store.NewSlotRange(3, 3)
132132
require.NoError(t, err)
133133
testMigrateReq := &MigrateSlotRequest{
134-
Slot: *slotRange,
134+
Slot: slotRange,
135135
SlotOnly: true,
136136
Target: 1,
137137
}
@@ -271,7 +271,7 @@ func TestClusterMigrateData(t *testing.T) {
271271
currentVersion := gotCluster.Version.Load()
272272
sourceSlotRanges := gotCluster.Shards[0].SlotRanges
273273
targetSlotRanges := gotCluster.Shards[1].SlotRanges
274-
require.EqualValues(t, slotRange, *gotCluster.Shards[0].MigratingSlot)
274+
require.EqualValues(t, slotRange, gotCluster.Shards[0].MigratingSlot.SlotRange)
275275
require.EqualValues(t, 1, gotCluster.Shards[0].TargetShardIndex)
276276

277277
// Run the controller to check and update the migration status

store/cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (cluster *Cluster) findShardIndexBySlot(slot SlotRange) (int, error) {
181181
for i := 0; i < len(cluster.Shards); i++ {
182182
slotRanges := cluster.Shards[i].SlotRanges
183183
for _, slotRange := range slotRanges {
184-
if slotRange.HasOverlap(&slot) {
184+
if slotRange.HasOverlap(slot) {
185185
if sourceShardIdx != -1 {
186186
return sourceShardIdx, consts.ErrSlotRangeBelongsToMultipleShards
187187
}
@@ -226,7 +226,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS
226226
}
227227

228228
// Will start the data migration in the background
229-
cluster.Shards[sourceShardIdx].MigratingSlot = &slot
229+
cluster.Shards[sourceShardIdx].MigratingSlot = FromSlotRange(slot)
230230
cluster.Shards[sourceShardIdx].TargetShardIndex = targetShardIdx
231231
return nil
232232
}

store/cluster_node.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ type ClusterNode struct {
8585
}
8686

8787
type ClusterInfo struct {
88-
CurrentEpoch int64 `json:"cluster_current_epoch"`
89-
MigratingSlot *SlotRange `json:"migrating_slot"`
90-
MigratingState string `json:"migrating_state"`
88+
CurrentEpoch int64 `json:"cluster_current_epoch"`
89+
MigratingSlot *MigratingSlot `json:"migrating_slot"`
90+
MigratingState string `json:"migrating_state"`
9191
}
9292

9393
type ClusterNodeInfo struct {
@@ -195,10 +195,11 @@ func (n *ClusterNode) GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
195195
}
196196
case "migrating_slot", "migrating_slot(s)":
197197
// TODO(@git-hulk): handle multiple migrating slots
198-
clusterInfo.MigratingSlot, err = ParseSlotRange(fields[1])
198+
slotRange, err := ParseSlotRange(fields[1])
199199
if err != nil {
200200
return nil, err
201201
}
202+
clusterInfo.MigratingSlot = FromSlotRange(*slotRange)
202203
case "migrating_state":
203204
clusterInfo.MigratingState = fields[1]
204205
}

store/cluster_shard.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,17 @@ import (
3333
"github.com/apache/kvrocks-controller/consts"
3434
)
3535

36+
const (
37+
// the old migrating slot was denoted by an int and -1 was
38+
// used to denote a non migrating slot
39+
NotMigratingInt = -1
40+
)
41+
3642
type Shard struct {
37-
Nodes []Node `json:"nodes"`
38-
SlotRanges []SlotRange `json:"slot_ranges"`
39-
TargetShardIndex int `json:"target_shard_index"`
40-
MigratingSlot *SlotRange `json:"migrating_slot"`
43+
Nodes []Node `json:"nodes"`
44+
SlotRanges []SlotRange `json:"slot_ranges"`
45+
TargetShardIndex int `json:"target_shard_index"`
46+
MigratingSlot *MigratingSlot `json:"migrating_slot"`
4147
}
4248

4349
type Shards []*Shard
@@ -112,7 +118,7 @@ func (shard *Shard) addNode(addr, role, password string) error {
112118
}
113119

114120
func (shard *Shard) IsMigrating() bool {
115-
return shard.MigratingSlot != nil && shard.TargetShardIndex != -1
121+
return shard.MigratingSlot != nil && shard.MigratingSlot.IsMigrating && shard.TargetShardIndex != -1
116122
}
117123

118124
func (shard *Shard) GetMasterNode() Node {
@@ -206,7 +212,7 @@ func (shard *Shard) promoteNewMaster(ctx context.Context, masterNodeID, preferre
206212
return preferredNewMasterNode.ID(), nil
207213
}
208214

209-
func (shard *Shard) HasOverlap(slotRange *SlotRange) bool {
215+
func (shard *Shard) HasOverlap(slotRange SlotRange) bool {
210216
for _, shardSlotRange := range shard.SlotRanges {
211217
if shardSlotRange.HasOverlap(slotRange) {
212218
return true
@@ -261,7 +267,7 @@ func (shard *Shard) UnmarshalJSON(bytes []byte) error {
261267
var data struct {
262268
SlotRanges []SlotRange `json:"slot_ranges"`
263269
TargetShardIndex int `json:"target_shard_index"`
264-
MigratingSlot *SlotRange `json:"migrating_slot"`
270+
MigratingSlot *MigratingSlot `json:"migrating_slot"`
265271
Nodes []*ClusterNode `json:"nodes"`
266272
}
267273
if err := json.Unmarshal(bytes, &data); err != nil {

store/cluster_shard_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ import (
2929

3030
func TestShard_HasOverlap(t *testing.T) {
3131
shard := NewShard()
32-
slotRange := &SlotRange{Start: 0, Stop: 100}
33-
shard.SlotRanges = append(shard.SlotRanges, *slotRange)
32+
slotRange := SlotRange{Start: 0, Stop: 100}
33+
shard.SlotRanges = append(shard.SlotRanges, slotRange)
3434
require.True(t, shard.HasOverlap(slotRange))
35-
require.True(t, shard.HasOverlap(&SlotRange{Start: 50, Stop: 150}))
36-
require.False(t, shard.HasOverlap(&SlotRange{Start: 101, Stop: 150}))
35+
require.True(t, shard.HasOverlap(SlotRange{Start: 50, Stop: 150}))
36+
require.False(t, shard.HasOverlap(SlotRange{Start: 101, Stop: 150}))
3737
}
3838

3939
func TestShard_Sort(t *testing.T) {
@@ -56,17 +56,22 @@ func TestShard_Sort(t *testing.T) {
5656
func TestShard_IsServicing(t *testing.T) {
5757
var err error
5858
shard := NewShard()
59+
shard.TargetShardIndex = 0
60+
shard.MigratingSlot = &MigratingSlot{IsMigrating: false}
61+
require.False(t, shard.IsServicing())
62+
5963
shard.TargetShardIndex = 0
6064
shard.MigratingSlot = nil
6165
require.False(t, shard.IsServicing())
6266

6367
shard.TargetShardIndex = 0
64-
shard.MigratingSlot, err = NewSlotRange(1, 1)
68+
slotRange, err := NewSlotRange(1, 1)
6569
require.Nil(t, err)
70+
shard.MigratingSlot = FromSlotRange(slotRange)
6671
require.True(t, shard.IsServicing())
6772

6873
shard.TargetShardIndex = -1
69-
shard.MigratingSlot = nil
74+
shard.MigratingSlot = &MigratingSlot{IsMigrating: false}
7075
shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
7176
require.True(t, shard.IsServicing())
7277

store/cluster_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@ func TestCluster_FindIndexShardBySlot(t *testing.T) {
4444

4545
slotRange, err := NewSlotRange(0, 0)
4646
require.NoError(t, err)
47-
shard, err := cluster.findShardIndexBySlot(*slotRange)
47+
shard, err := cluster.findShardIndexBySlot(slotRange)
4848
require.NoError(t, err)
4949
require.Equal(t, 0, shard)
5050

5151
slotRange, err = NewSlotRange(MaxSlotID/3+1, MaxSlotID/3+1)
5252
require.NoError(t, err)
53-
shard, err = cluster.findShardIndexBySlot(*slotRange)
53+
shard, err = cluster.findShardIndexBySlot(slotRange)
5454
require.NoError(t, err)
5555
require.Equal(t, 1, shard)
5656

5757
slotRange, err = NewSlotRange(MaxSlotID, MaxSlotID)
5858
require.NoError(t, err)
59-
shard, err = cluster.findShardIndexBySlot(*slotRange)
59+
shard, err = cluster.findShardIndexBySlot(slotRange)
6060
require.NoError(t, err)
6161
require.Equal(t, 2, shard)
6262
}

0 commit comments

Comments
 (0)