Skip to content

Commit ab9f92e

Browse files
authored
Fix data race when migrating slots (#324)
1 parent 6bac9a4 commit ab9f92e

File tree

4 files changed

+35
-8
lines changed

4 files changed

+35
-8
lines changed

controller/cluster.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -329,17 +329,22 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
329329
if !shard.IsMigrating() {
330330
continue
331331
}
332-
sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
332+
sourceNode := shard.GetMasterNode()
333+
sourceNodeClusterInfo, err := sourceNode.GetClusterInfo(ctx)
333334
if err != nil {
334-
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
335-
return
335+
log.With(
336+
zap.Int("shard_index", i),
337+
zap.String("source_node", sourceNode.ID()),
338+
).Error("Failed to get the cluster info from the source node", zap.Error(err))
339+
continue
336340
}
337341
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
338342
log.Error("Mismatch migrating slot",
343+
zap.Int("shard_index", i),
339344
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
340345
zap.String("migrating_slot", shard.MigratingSlot.String()),
341346
)
342-
return
347+
continue
343348
}
344349
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
345350
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))

server/api/cluster.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ package api
2222

2323
import (
2424
"errors"
25+
"fmt"
2526
"strings"
27+
"sync"
2628

2729
"github.com/gin-gonic/gin"
2830

@@ -45,7 +47,14 @@ type CreateClusterRequest struct {
4547
}
4648

4749
type ClusterHandler struct {
48-
s store.Store
50+
s store.Store
51+
locks sync.Map
52+
}
53+
54+
func (handler *ClusterHandler) getLock(ns, cluster string) *sync.RWMutex {
55+
value, _ := handler.locks.LoadOrStore(fmt.Sprintf("%s/%s", ns, cluster), &sync.RWMutex{})
56+
lock, _ := value.(*sync.RWMutex)
57+
return lock
4958
}
5059

5160
func (handler *ClusterHandler) List(c *gin.Context) {
@@ -119,15 +128,26 @@ func (handler *ClusterHandler) Remove(c *gin.Context) {
119128

120129
func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
121130
namespace := c.Param("namespace")
122-
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
131+
clusterName := c.Param("cluster")
132+
133+
lock := handler.getLock(namespace, clusterName)
134+
lock.Lock()
135+
defer lock.Unlock()
136+
137+
s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore)
138+
cluster, err := s.GetCluster(c, namespace, clusterName)
139+
if err != nil {
140+
helper.ResponseError(c, err)
141+
return
142+
}
123143

124144
var req MigrateSlotRequest
125145
if err := c.BindJSON(&req); err != nil {
126146
helper.ResponseBadRequest(c, err)
127147
return
128148
}
129149

130-
err := cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
150+
err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
131151
if err != nil {
132152
helper.ResponseError(c, err)
133153
return

server/route.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (srv *Server) initHandlers() {
6969
clusters.POST("/:cluster/import", middleware.RequiredNamespace, handler.Cluster.Import)
7070
clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get)
7171
clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove)
72-
clusters.POST("/:cluster/migrate", middleware.RequiredCluster, handler.Cluster.MigrateSlot)
72+
clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot)
7373
}
7474

7575
shards := clusters.Group("/:cluster/shards")

store/cluster.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS
207207
return consts.ErrShardIsSame
208208
}
209209
if slotOnly {
210+
// clear source migrating info to avoid mismatch migrating slot error
211+
cluster.Shards[sourceShardIdx].ClearMigrateState()
210212
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
211213
cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
212214
return nil

0 commit comments

Comments
 (0)