From bb197ff748f4391286bfcc965085008a12fdb433 Mon Sep 17 00:00:00 2001 From: Yury Frolov <57130330+EinKrebs@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:32:11 +0500 Subject: [PATCH] Disable move key range in LocalCoordinator + refactoring (#780) --- pkg/coord/local/clocal.go | 97 +++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 50 deletions(-) diff --git a/pkg/coord/local/clocal.go b/pkg/coord/local/clocal.go index d808a9c78..5fc8b335b 100644 --- a/pkg/coord/local/clocal.go +++ b/pkg/coord/local/clocal.go @@ -3,6 +3,7 @@ package local import ( "context" "fmt" + "github.com/pg-sharding/spqr/pkg/models/spqrerror" "math/rand" "sync" @@ -439,21 +440,21 @@ func (lc *LocalCoordinator) WorldShards() []string { // // Returns: // - error: an error if the move operation encounters any issues. -func (qr *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error { +func (lc *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error { var krmv *qdb.KeyRange var err error - if krmv, err = qr.qdb.CheckLockedKeyRange(ctx, req.Krid); err != nil { + if krmv, err = lc.qdb.CheckLockedKeyRange(ctx, req.Krid); err != nil { return err } - ds, err := qr.qdb.GetDistribution(ctx, krmv.DistributionId) + ds, err := lc.qdb.GetDistribution(ctx, krmv.DistributionId) if err != nil { return err } var reqKr = kr.KeyRangeFromDB(krmv, ds.ColTypes) reqKr.ShardID = req.ShardId - return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, reqKr) + return ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, reqKr) } // TODO : unit tests @@ -466,12 +467,12 @@ func (qr *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) erro // // Returns: // - error: an error if the unite operation encounters any issues. -func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) error { +func (lc *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) error { var krBase *qdb.KeyRange var krAppendage *qdb.KeyRange var err error - if krBase, err = qr.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO + if krBase, err = lc.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO return err } @@ -481,19 +482,19 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er spqrlog.Zero.Error().Err(err).Msg("") return } - }(qr.qdb, ctx, req.BaseKeyRangeId) + }(lc.qdb, ctx, req.BaseKeyRangeId) - ds, err := qr.qdb.GetDistribution(ctx, krBase.DistributionId) + ds, err := lc.qdb.GetDistribution(ctx, krBase.DistributionId) if err != nil { return err } // TODO: krRight seems to be empty. - if krAppendage, err = qr.qdb.GetKeyRange(ctx, req.AppendageKeyRangeId); err != nil { + if krAppendage, err = lc.qdb.GetKeyRange(ctx, req.AppendageKeyRangeId); err != nil { return err } - if err = qr.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil { + if err = lc.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil { return err } @@ -506,7 +507,7 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er krBaseCopy.LowerBound = newBound united := kr.KeyRangeFromDB(krBaseCopy, ds.ColTypes) - return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, united) + return ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, united) } // Caller should lock key range @@ -520,7 +521,7 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er // // Returns: // - error: an error if the split operation encounters any issues. -func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error { +func (lc *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error { var krOld *qdb.KeyRange var err error @@ -530,17 +531,17 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er Str("source-id", req.SourceID). Msg("split request is") - if krOld, err = qr.qdb.LockKeyRange(ctx, req.SourceID); err != nil { + if krOld, err = lc.qdb.LockKeyRange(ctx, req.SourceID); err != nil { return err } defer func() { - if err := qr.qdb.UnlockKeyRange(ctx, req.SourceID); err != nil { + if err := lc.qdb.UnlockKeyRange(ctx, req.SourceID); err != nil { spqrlog.Zero.Error().Err(err).Msg("") } }() - ds, err := qr.qdb.GetDistribution(ctx, krOld.DistributionId) + ds, err := lc.qdb.GetDistribution(ctx, krOld.DistributionId) if err != nil { return err } @@ -570,11 +571,11 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er krOld.LowerBound = req.Bound // TODO: fix } - if err := ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, kr.KeyRangeFromDB(krOld, ds.ColTypes)); err != nil { + if err := ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, kr.KeyRangeFromDB(krOld, ds.ColTypes)); err != nil { return err } - if err := ops.CreateKeyRangeWithChecks(ctx, qr.qdb, krNew); err != nil { + if err := ops.CreateKeyRangeWithChecks(ctx, lc.qdb, krNew); err != nil { return fmt.Errorf("failed to add a new key range: %w", err) } @@ -592,13 +593,13 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er // Returns: // - *kr.KeyRange: the locked KeyRange object. // - error: an error if the lock operation encounters any issues. -func (qr *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.KeyRange, error) { - keyRangeDB, err := qr.qdb.LockKeyRange(ctx, krid) +func (lc *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.KeyRange, error) { + keyRangeDB, err := lc.qdb.LockKeyRange(ctx, krid) if err != nil { return nil, err } - ds, err := qr.qdb.GetDistribution(ctx, keyRangeDB.DistributionId) + ds, err := lc.qdb.GetDistribution(ctx, keyRangeDB.DistributionId) if err != nil { return nil, err } @@ -616,8 +617,8 @@ func (qr *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr. // // Returns: // - error: an error if the unlock operation encounters any issues. -func (qr *LocalCoordinator) UnlockKeyRange(ctx context.Context, krid string) error { - return qr.qdb.UnlockKeyRange(ctx, krid) +func (lc *LocalCoordinator) UnlockKeyRange(ctx context.Context, krid string) error { + return lc.qdb.UnlockKeyRange(ctx, krid) } // TODO : unit tests @@ -652,10 +653,10 @@ func (lc *LocalCoordinator) AddDataShard(ctx context.Context, ds *datashards.Dat // // Returns: // - []string: a slice of strings containing the names of the data shards. -func (qr *LocalCoordinator) Shards() []string { +func (lc *LocalCoordinator) Shards() []string { var ret []string - for name := range qr.DataShardCfgs { + for name := range lc.DataShardCfgs { ret = append(ret, name) } @@ -697,13 +698,13 @@ func (lc *LocalCoordinator) GetKeyRange(ctx context.Context, krId string) (*kr.K // Returns: // - []*kr.KeyRange: a slice of KeyRange objects retrieved. // - error: an error if the retrieval encounters any issues. -func (qr *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) { +func (lc *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) { var ret []*kr.KeyRange - if krs, err := qr.qdb.ListKeyRanges(ctx, distribution); err != nil { + if krs, err := lc.qdb.ListKeyRanges(ctx, distribution); err != nil { return nil, err } else { for _, keyRange := range krs { - ds, err := qr.qdb.GetDistribution(ctx, keyRange.DistributionId) + ds, err := lc.qdb.GetDistribution(ctx, keyRange.DistributionId) if err != nil { return nil, err @@ -726,8 +727,8 @@ func (qr *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution stri // Returns: // - []*kr.KeyRange: a slice of KeyRange objects representing all key ranges. // - error: an error if the retrieval encounters any issues. -func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) { - if krs, err := qr.qdb.ListAllKeyRanges(ctx); err != nil { +func (lc *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) { + if krs, err := lc.qdb.ListAllKeyRanges(ctx); err != nil { return nil, err } else { var ret []*kr.KeyRange @@ -738,7 +739,7 @@ func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRang var err error var ok bool if ds, ok = cache[keyRange.DistributionId]; !ok { - ds, err = qr.qdb.GetDistribution(ctx, keyRange.DistributionId) + ds, err = lc.qdb.GetDistribution(ctx, keyRange.DistributionId) if err != nil { return nil, err } @@ -761,7 +762,7 @@ func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRang // Returns: // - []*topology.Router: a slice of Router objects representing all routers. // - error: an error if the retrieval encounters any issues. -func (qr *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router, error) { +func (lc *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router, error) { return []*topology.Router{{ ID: "local", }}, nil @@ -775,20 +776,16 @@ func (qr *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router // // Returns: // - error: An error if the creation encounters any issues. -func (qr *LocalCoordinator) CreateKeyRange(ctx context.Context, kr *kr.KeyRange) error { - return ops.CreateKeyRangeWithChecks(ctx, qr.qdb, kr) +func (lc *LocalCoordinator) CreateKeyRange(ctx context.Context, kr *kr.KeyRange) error { + return ops.CreateKeyRangeWithChecks(ctx, lc.qdb, kr) } -// MoveKeyRange moves a key range in the LocalCoordinator. -// -// Parameters: -// - ctx (context.Context): The context of the operation. -// - kr (*kr.KeyRange): The key range object to be moved. +// MoveKeyRange is disabled in LocalCoordinator // // Returns: -// - error: An error if the move encounters any issues. -func (qr *LocalCoordinator) MoveKeyRange(ctx context.Context, kr *kr.KeyRange) error { - return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, kr) +// - error: SPQR_INVALID_REQUEST error +func (lc *LocalCoordinator) MoveKeyRange(_ context.Context, _ *kr.KeyRange) error { + return spqrerror.New(spqrerror.SPQR_INVALID_REQUEST, "MoveKeyRange is not available in local coordinator") } var ErrNotCoordinator = fmt.Errorf("request is unprocessable in router") @@ -801,7 +798,7 @@ var ErrNotCoordinator = fmt.Errorf("request is unprocessable in router") // // Returns: // - error: An error indicating the registration status. -func (qr *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Router) error { +func (lc *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Router) error { return ErrNotCoordinator } @@ -813,7 +810,7 @@ func (qr *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Rout // // Returns: // - error: An error indicating the unregistration status. -func (qr *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) error { +func (lc *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) error { return ErrNotCoordinator } @@ -825,7 +822,7 @@ func (qr *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) err // // Returns: // - error: An error indicating the synchronization status. In this case, it returns ErrNotCoordinator. -func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topology.Router) error { +func (lc *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topology.Router) error { return ErrNotCoordinator } @@ -837,7 +834,7 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo // // Returns: // - error: An error indicating the update status. -func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error { +func (lc *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error { return ErrNotCoordinator } @@ -849,8 +846,8 @@ func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, ro // // Returns: // - error: An error indicating the update status. -func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error { - return qr.qdb.UpdateCoordinator(ctx, addr) +func (lc *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error { + return lc.qdb.UpdateCoordinator(ctx, addr) } // GetCoordinator retrieves the coordinator address from the local coordinator. @@ -861,8 +858,8 @@ func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) // Returns: // - string: The address of the coordinator. // - error: An error indicating the retrieval status. -func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) { - addr, err := qr.qdb.GetCoordinator(ctx) +func (lc *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) { + addr, err := lc.qdb.GetCoordinator(ctx) spqrlog.Zero.Debug().Str("address", addr).Msg("resp local coordiantor: get coordinator") return addr, err } @@ -876,7 +873,7 @@ func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) // Returns: // - *datashards.DataShard: The retrieved DataShard, or nil if it doesn't exist. // - error: An error indicating the retrieval status, or ErrNotCoordinator if the operation is not supported by the LocalCoordinator. -func (qr *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) { +func (lc *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) { return nil, ErrNotCoordinator }