Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,22 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
if !shard.IsMigrating() {
continue
}
sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
sourceNode := shard.GetMasterNode()
sourceNodeClusterInfo, err := sourceNode.GetClusterInfo(ctx)
if err != nil {
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
return
log.With(
zap.Int("shard_index", i),
zap.String("source_node", sourceNode.ID()),
).Error("Failed to get the cluster info from the source node", zap.Error(err))
continue
}
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
log.Error("Mismatch migrating slot",
zap.Int("shard_index", i),
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
continue
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
Expand Down
26 changes: 23 additions & 3 deletions server/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package api

import (
"errors"
"fmt"
"strings"
"sync"

"github.com/gin-gonic/gin"

Expand All @@ -45,7 +47,14 @@ type CreateClusterRequest struct {
}

type ClusterHandler struct {
s store.Store
s store.Store
locks sync.Map
}

func (handler *ClusterHandler) getLock(ns, cluster string) *sync.RWMutex {
value, _ := handler.locks.LoadOrStore(fmt.Sprintf("%s/%s", ns, cluster), &sync.RWMutex{})
lock, _ := value.(*sync.RWMutex)
return lock
}

func (handler *ClusterHandler) List(c *gin.Context) {
Expand Down Expand Up @@ -119,15 +128,26 @@ func (handler *ClusterHandler) Remove(c *gin.Context) {

func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
namespace := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
clusterName := c.Param("cluster")

lock := handler.getLock(namespace, clusterName)
lock.Lock()
defer lock.Unlock()

s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore)
cluster, err := s.GetCluster(c, namespace, clusterName)
if err != nil {
helper.ResponseError(c, err)
return
}

var req MigrateSlotRequest
if err := c.BindJSON(&req); err != nil {
helper.ResponseBadRequest(c, err)
return
}

err := cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
if err != nil {
helper.ResponseError(c, err)
return
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (srv *Server) initHandlers() {
clusters.POST("/:cluster/import", middleware.RequiredNamespace, handler.Cluster.Import)
clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get)
clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove)
clusters.POST("/:cluster/migrate", middleware.RequiredCluster, handler.Cluster.MigrateSlot)
clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot)
}

shards := clusters.Group("/:cluster/shards")
Expand Down
2 changes: 2 additions & 0 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS
return consts.ErrShardIsSame
}
if slotOnly {
// clear source migrating info to avoid mismatch migrating slot error
cluster.Shards[sourceShardIdx].ClearMigrateState()
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil
Expand Down
Loading