Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move key ranges via postgres_fdw #566

Merged
merged 48 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
aa8ae5d
Initial rewrite of MoveKeys
EinKrebs Mar 21, 2024
dd79dee
Refactored condition for key range
EinKrebs Mar 21, 2024
a748c55
Check before copying data
EinKrebs Mar 21, 2024
d97010a
Get next bound actually
EinKrebs Mar 21, 2024
3856b0e
Fixed logic of MoveKeys
EinKrebs Mar 21, 2024
dff9e4e
Add DeleteKeyRangeMove method to EtcdQDB
EinKrebs Mar 21, 2024
d48676f
Change logic of (*qdbCoordinator).Move
EinKrebs Mar 21, 2024
d2c00f2
Implemented getKRCondition
EinKrebs Mar 21, 2024
32b845a
Fixed idleness of (*qdbCoordinator).Move
EinKrebs Mar 21, 2024
2674820
Fix typo
EinKrebs Mar 21, 2024
b163fbf
Fix side-stepping qdb.MoveKeyRangeStarted
EinKrebs Mar 21, 2024
f30cf78
Change GetTransferTx logic to return nil when no transaction
EinKrebs Mar 21, 2024
b0986ba
Fix RunCoordinator current transactions logic
EinKrebs Mar 21, 2024
c00a5f8
Add debug logging to qdb
EinKrebs Mar 21, 2024
ea1d1ee
Added & fixed tests
EinKrebs Mar 25, 2024
8c5a879
Fix importing foreign schema
EinKrebs Mar 21, 2024
69990bc
Added TODO to etcdqdb.go
EinKrebs Mar 21, 2024
afa1565
Add user mapping to sender shard
EinKrebs Mar 21, 2024
b9b6b27
Delete redundant function
EinKrebs Mar 21, 2024
36f160a
Check for relation existence on both sender & receiver
EinKrebs Mar 21, 2024
9f3016c
Drop old schema on receiver
EinKrebs Mar 21, 2024
5b6222d
Fix table names
EinKrebs Mar 21, 2024
2927933
Fixed table names
EinKrebs Mar 21, 2024
1bbaa23
Fixed copying data
EinKrebs Mar 22, 2024
aaedfda
Removed redundant fields & types from DataTransferTransaction
EinKrebs Mar 22, 2024
f8602f1
Move copyData out of MoveKeys
EinKrebs Mar 22, 2024
3b44045
Fixed checking table existence
EinKrebs Mar 22, 2024
d36a6b2
Add test for non-existent table on the receiving shard to mover featu…
EinKrebs Mar 22, 2024
5fef62c
Added & fixed tests
EinKrebs Mar 25, 2024
e35397a
Fix lint
EinKrebs Mar 25, 2024
6d9ecb4
Added test for moving key range with varchar column type
EinKrebs Mar 26, 2024
dd01cfb
Moved GetKRCondition to util package
EinKrebs Mar 26, 2024
648ec5a
Removed excess GetKeyRange method from qdbCoordinator
EinKrebs Mar 27, 2024
bffe04d
Moved GetKRCondition to kr package
EinKrebs Mar 27, 2024
826f0f3
Fix continuation of key range moves on coordinator startup
EinKrebs Mar 27, 2024
00400e4
Added test for continuation of key range moves
EinKrebs Mar 27, 2024
ee1f2a8
Moved transaction records in etcd to separate namespace
EinKrebs Mar 27, 2024
91a6afa
Fix lint
EinKrebs Mar 27, 2024
6753765
Removed nolint
EinKrebs Mar 27, 2024
6ee4cb6
Add another move recover test
EinKrebs Mar 27, 2024
b2cc4f5
Add some comments
EinKrebs Mar 27, 2024
13c6e74
Separated table existence & key range key count checks
EinKrebs Mar 27, 2024
fcfaad3
Fix typo in comment
EinKrebs Mar 27, 2024
d962395
Removed obsolete TODO
EinKrebs Mar 29, 2024
fee12a2
Declared missing methods from QDB
EinKrebs Mar 29, 2024
f7a8474
Add docstrign for DataTransferTransaction
EinKrebs Mar 29, 2024
831c9b3
Add unit test for GetKRCondition
EinKrebs Apr 1, 2024
80d6e4d
Fix build
EinKrebs Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 121 additions & 67 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package provider
import (
"context"
"crypto/tls"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -323,9 +324,9 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
return registerRouter()
}

// TODO : unit tests
// RunCoordinator side effect: it runs an asynchronous goroutine
// that checks the availability of the SPQR router
// TODO : unit tests
func (qc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool) {
if !qc.lockCoordinator(ctx, initialRouter) {
return
Expand All @@ -335,32 +336,39 @@ func (qc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool
if err != nil {
spqrlog.Zero.Error().
Err(err).
Msg("faild to list key ranges")
Msg("failed to list key ranges")
}

// Finish any key range move or data transfer transaction in progress
for _, r := range ranges {
move, err := qc.GetKeyRangeMove(context.TODO(), r.KeyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("error getting key range move from qdb")
}

tx, err := qc.db.GetTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
continue
spqrlog.Zero.Error().Err(err).Msg("error getting data transfer transaction from qdb")
}
if tx.ToStatus == qdb.Commited && tx.FromStatus != qdb.Commited {
EinKrebs marked this conversation as resolved.
Show resolved Hide resolved
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, true)
tem := kr.MoveKeyRange{
ShardId: tx.ToShardId,
Krid: r.KeyRangeID,

var krm *kr.MoveKeyRange
if move != nil {
krm = &kr.MoveKeyRange{
Krid: move.KeyRangeID,
ShardId: move.ShardId,
}
err = qc.Move(context.TODO(), &tem)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to move key range")
} else if tx != nil {
krm = &kr.MoveKeyRange{
Krid: r.KeyRangeID,
ShardId: tx.ToShardId,
}
} else if tx.FromStatus != qdb.Commited {
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.ToShardId, tx.ToTxName, false)
datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, false)
}

err = qc.db.RemoveTransferTx(context.TODO(), r.KeyRangeID)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("error removing from qdb")
if krm != nil {
err = qc.Move(context.TODO(), krm)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("error moving key range")
}
}
}

Expand Down Expand Up @@ -796,8 +804,26 @@ func (qc *qdbCoordinator) RecordKeyRangeMove(ctx context.Context, m *qdb.MoveKey
return m.MoveId, nil
}

func (qc *qdbCoordinator) GetKeyRangeMove(ctx context.Context, krId string) (*qdb.MoveKeyRange, error) {
ls, err := qc.db.ListKeyRangeMoves(ctx)
if err != nil {
return nil, err
}

for _, krm := range ls {
// after the coordinator restarts, it will continue the move that was previously initiated.
// key range move already exist for this key range
// complete it first
if krm.KeyRangeID == krId {
return krm, nil
}
}

return nil, nil
}

// Move key range from one logical shard to another
// This function reshards data by locking a portion of it,
// This function re-shards data by locking a portion of it,
// making it unavailable for read and write access during the process.
// TODO : unit tests
func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error {
Expand All @@ -809,7 +835,7 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
Str("shard-id", req.ShardId).
Msg("qdb coordinator move key range")

keyRange, err := qc.db.GetKeyRange(ctx, req.Krid)
keyRange, err := qc.GetKeyRange(ctx, req.Krid)
if err != nil {
return err
}
Expand All @@ -819,63 +845,91 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error
return nil
}

moveId, err := qc.RecordKeyRangeMove(ctx,
&qdb.MoveKeyRange{
MoveId: uuid.New().String(),
ShardId: req.ShardId,
KeyRangeID: req.Krid,
Status: qdb.MoveKeyRangePlanned,
})

/* NOOP if key range move was already recorded */
move, err := qc.GetKeyRangeMove(ctx, req.Krid)
EinKrebs marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

krmv, err := qc.LockKeyRange(ctx, req.Krid)
if err != nil {
return err
}
defer func() {
if err := qc.UnlockKeyRange(ctx, req.Krid); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
if move == nil {
// No key range moves in progress
move = &qdb.MoveKeyRange{
MoveId: uuid.New().String(),
ShardId: req.ShardId,
KeyRangeID: req.Krid,
Status: qdb.MoveKeyRangePlanned,
}
}()

defer func() {
// set compelted status in the end of key range move operation
if err := qc.db.UpdateKeyRangeMoveStatus(ctx, moveId, qdb.MoveKeyRangeComplete); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
_, err = qc.RecordKeyRangeMove(ctx, move)
if err != nil {
return err
}
}()

/* physical changes on shards */
err = datatransfers.MoveKeys(ctx, keyRange.ShardID, req.ShardId, *keyRange, qc.db)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to move rows")
return err
}

// Update the state of the distributed key-range metadata
krmv.ShardID = req.ShardId
if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, krmv); err != nil {
// TODO: check if unlock here is ok
return err
}
for move != nil {
switch move.Status {
case qdb.MoveKeyRangePlanned:
// lock the key range
_, err = qc.LockKeyRange(ctx, req.Krid)
if err != nil {
return err
}
if err = qc.db.UpdateKeyRangeMoveStatus(ctx, move.MoveId, qdb.MoveKeyRangeComplete); err != nil {
return err
}
move.Status = qdb.MoveKeyRangeStarted
case qdb.MoveKeyRangeStarted:
// move the data
ds, err := qc.GetDistribution(ctx, keyRange.Distribution)
if err != nil {
return err
}
err = datatransfers.MoveKeys(ctx, keyRange.ShardID, req.ShardId, keyRange, ds, qc.db, qc)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to move rows")
return err
}

// Notify all routers about scheme changes.
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
moveResp, err := cl.MoveKeyRange(ctx, &routerproto.MoveKeyRangeRequest{
Id: krmv.ID,
ToShardId: krmv.ShardID,
})
spqrlog.Zero.Debug().
Interface("response", moveResp).
Msg("move key range response")
return err
}); err != nil {
return err
// update key range
krg, err := qc.GetKeyRange(ctx, req.Krid)
if err != nil {
return err
}
krg.ShardID = req.ShardId
if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, krg); err != nil {
// TODO: check if unlock here is ok
return err
}

// Notify all routers about scheme changes.
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
moveResp, err := cl.MoveKeyRange(ctx, &routerproto.MoveKeyRangeRequest{
Id: krg.ID,
ToShardId: krg.ShardID,
})
spqrlog.Zero.Debug().
Interface("response", moveResp).
Msg("move key range response")
return err
}); err != nil {
return err
}

if err := qc.db.UpdateKeyRangeMoveStatus(ctx, move.MoveId, qdb.MoveKeyRangeComplete); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
move.Status = qdb.MoveKeyRangeComplete
case qdb.MoveKeyRangeComplete:
// unlock key range
if err := qc.UnlockKeyRange(ctx, req.Krid); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
if err := qc.db.DeleteKeyRangeMove(ctx, move.MoveId); err != nil {
return err
}
move = nil
default:
return fmt.Errorf("unknown key range move status: \"%s\"", move.Status)
}
}
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions docker/shard/bin/setup
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE ROLE $POSTGRES

# Create databases
sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE DATABASE $POSTGRES_DB" -d postgres >> $SETUP_LOG 2>&1 || {
echo "ERROR: users creation failed, examine the log"
echo "ERROR: databases creation failed, examine the log"
cat "$SETUP_LOG"
exit 1
}

# Create extension
sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS pg_stat_statements; CREATE EXTENSION IF NOT EXISTS pg_stat_kcache; CREATE EXTENSION IF NOT EXISTS pg_comment_stats;" -d $POSTGRES_DB >> $SETUP_LOG 2>&1 || {
echo "ERROR: users creation failed, examine the log"

# # Create extension
sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS postgres_fdw; CREATE EXTENSION IF NOT EXISTS pg_stat_statements; CREATE EXTENSION IF NOT EXISTS pg_stat_kcache; CREATE EXTENSION IF NOT EXISTS pg_comment_stats;" -d $POSTGRES_DB >> $SETUP_LOG 2>&1 || {
echo "ERROR: extensions creation failed, examine the log"
cat "$SETUP_LOG"
exit 1
}
Expand Down
Loading
Loading