Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,14 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx)
if err != nil {
log.Error("Failed to get the cluster info from the source node", zap.Error(err))
return
continue
}
if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) {
log.Error("Mismatch migrating slot",
zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()),
zap.String("migrating_slot", shard.MigratingSlot.String()),
)
return
continue
}
if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) {
log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex))
Expand Down
16 changes: 13 additions & 3 deletions server/api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package api
import (
"errors"
"strings"
"sync"

"github.com/gin-gonic/gin"

Expand All @@ -45,7 +46,8 @@ type CreateClusterRequest struct {
}

type ClusterHandler struct {
s store.Store
s store.Store
mu sync.Mutex
}

func (handler *ClusterHandler) List(c *gin.Context) {
Expand Down Expand Up @@ -118,16 +120,24 @@ func (handler *ClusterHandler) Remove(c *gin.Context) {
}

func (handler *ClusterHandler) MigrateSlot(c *gin.Context) {
handler.mu.Lock()
defer handler.mu.Unlock()

namespace := c.Param("namespace")
cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
s, _ := c.MustGet(consts.ContextKeyStore).(*store.ClusterStore)
cluster, err := s.GetCluster(c, namespace, c.Param("cluster"))
if err != nil {
helper.ResponseError(c, err)
return
}

var req MigrateSlotRequest
if err := c.BindJSON(&req); err != nil {
helper.ResponseBadRequest(c, err)
return
}

err := cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
err = cluster.MigrateSlot(c, req.Slot, req.Target, req.SlotOnly)
if err != nil {
helper.ResponseError(c, err)
return
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (srv *Server) initHandlers() {
clusters.POST("/:cluster/import", middleware.RequiredNamespace, handler.Cluster.Import)
clusters.GET("/:cluster", middleware.RequiredCluster, handler.Cluster.Get)
clusters.DELETE("/:cluster", middleware.RequiredCluster, handler.Cluster.Remove)
clusters.POST("/:cluster/migrate", middleware.RequiredCluster, handler.Cluster.MigrateSlot)
clusters.POST("/:cluster/migrate", handler.Cluster.MigrateSlot)
}

shards := clusters.Group("/:cluster/shards")
Expand Down
1 change: 1 addition & 0 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot SlotRange, targetS
return consts.ErrShardIsSame
}
if slotOnly {
cluster.Shards[sourceShardIdx].ClearMigrateState()
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil
Expand Down
Loading