Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Apr 12, 2024
1 parent 8cbfa2d commit f2855ec
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
35 changes: 14 additions & 21 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,10 @@ func NewCoordinator(tlsconfig *tls.Config, db qdb.XQDB) *qdbCoordinator {

// TODO : unit tests
func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter bool) bool {
registerRouter := func() bool {
updateCoordinator := func() bool {
if !initialRouter {
return true
}
router := &topology.Router{
ID: uuid.NewString(),
Address: net.JoinHostPort(config.RouterConfig().Host, config.RouterConfig().GrpcApiPort),
State: qdb.OPENED,
}
if err := qc.RegisterRouter(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("register router when locking coordinator")
}
if err := qc.SyncRouterMetadata(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("sync router metadata when locking coordinator")
}
if err := qc.UpdateCoordinator(ctx, net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)); err != nil {
return false
}
Expand All @@ -312,15 +301,15 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
return false
case <-time.After(time.Second):
if err := qc.db.TryCoordinatorLock(context.TODO()); err == nil {
return registerRouter()
return updateCoordinator()
} else {
spqrlog.Zero.Error().Err(err).Msg("qdb already taken, waiting for connection")
}
}
}
}

return registerRouter()
return updateCoordinator()
}

// TODO : unit tests
Expand Down Expand Up @@ -717,18 +706,18 @@ func (qc *qdbCoordinator) DropKeyRange(ctx context.Context, id string) error {

// TODO : unit tests
func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyRange) error {
krBase, err := qc.LockKeyRange(ctx, uniteKeyRange.BaseKeyRangeId)
krBaseDb, err := qc.db.LockKeyRange(ctx, uniteKeyRange.BaseKeyRangeId)
if err != nil {
return err
}

defer func() {
if err := qc.UnlockKeyRange(ctx, uniteKeyRange.BaseKeyRangeId); err != nil {
if err := qc.db.UnlockKeyRange(ctx, uniteKeyRange.BaseKeyRangeId); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}()

krAppendage, err := qc.LockKeyRange(ctx, uniteKeyRange.AppendageKeyRangeId)
krAppendageDb, err := qc.db.LockKeyRange(ctx, uniteKeyRange.AppendageKeyRangeId)
if err != nil {
return err
}
Expand All @@ -739,16 +728,20 @@ func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyR
}
}()

ds, err := qc.db.GetDistribution(ctx, krBaseDb.DistributionId)
if err != nil {
return err
}

krBase := kr.KeyRangeFromDB(krBaseDb, ds.ColTypes)
krAppendage := kr.KeyRangeFromDB(krAppendageDb, ds.ColTypes)

if krBase.ShardID != krAppendage.ShardID {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges routing different shards")
}
if krBase.Distribution != krAppendage.Distribution {
return spqrerror.New(spqrerror.SPQR_KEYRANGE_ERROR, "failed to unite key ranges of different distributions")
}
ds, err := qc.db.GetDistribution(ctx, krBase.Distribution)
if err != nil {
return err
}
// TODO: check all types when composite keys are supported
krLeft, krRight := krBase, krAppendage
if kr.CmpRangesLess(krRight.LowerBound, krLeft.LowerBound, ds.ColTypes) {
Expand Down
21 changes: 19 additions & 2 deletions pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,36 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
var krAppendage *qdb.KeyRange
var err error

if krBase, err = qr.qdb.CheckLockedKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO
if krBase, err = qr.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO
return err
}

defer func(qdb qdb.QDB, ctx context.Context, keyRangeID string) {
err := qdb.UnlockKeyRange(ctx, keyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return
}
}(qr.qdb, ctx, req.BaseKeyRangeId)

ds, err := qr.qdb.GetDistribution(ctx, krBase.DistributionId)
if err != nil {
return err
}

// TODO: krRight seems to be empty.
if krAppendage, err = qr.qdb.CheckLockedKeyRange(ctx, req.AppendageKeyRangeId); err != nil {
if krAppendage, err = qr.qdb.LockKeyRange(ctx, req.AppendageKeyRangeId); err != nil {
return err
}

defer func(qdb qdb.QDB, ctx context.Context, keyRangeID string) {
err := qdb.UnlockKeyRange(ctx, keyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return
}
}(qr.qdb, ctx, req.BaseKeyRangeId)

if err = qr.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil {
return err
}
Expand Down

0 comments on commit f2855ec

Please sign in to comment.