diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index 9a3b39caf..7d779fba7 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -3,6 +3,7 @@ package provider import ( "context" "crypto/tls" + "fmt" "net" "time" @@ -323,9 +324,9 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo return registerRouter() } -// TODO : unit tests // RunCoordinator side effect: it runs an asynchronous goroutine // that checks the availability of the SPQR router +// TODO : unit tests func (qc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool) { if !qc.lockCoordinator(ctx, initialRouter) { return @@ -335,32 +336,39 @@ func (qc *qdbCoordinator) RunCoordinator(ctx context.Context, initialRouter bool if err != nil { spqrlog.Zero.Error(). Err(err). - Msg("faild to list key ranges") + Msg("failed to list key ranges") } + // Finish any key range move or data transfer transaction in progress for _, r := range ranges { + move, err := qc.GetKeyRangeMove(context.TODO(), r.KeyRangeID) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("error getting key range move from qdb") + } + tx, err := qc.db.GetTransferTx(context.TODO(), r.KeyRangeID) if err != nil { - continue + spqrlog.Zero.Error().Err(err).Msg("error getting data transfer transaction from qdb") } - if tx.ToStatus == qdb.Commited && tx.FromStatus != qdb.Commited { - datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, true) - tem := kr.MoveKeyRange{ - ShardId: tx.ToShardId, - Krid: r.KeyRangeID, + + var krm *kr.MoveKeyRange + if move != nil { + krm = &kr.MoveKeyRange{ + Krid: move.KeyRangeID, + ShardId: move.ShardId, } - err = qc.Move(context.TODO(), &tem) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("failed to move key range") + } else if tx != nil { + krm = &kr.MoveKeyRange{ + Krid: r.KeyRangeID, + ShardId: tx.ToShardId, } - } else if tx.FromStatus != qdb.Commited { - datatransfers.ResolvePreparedTransaction(context.TODO(), tx.ToShardId, tx.ToTxName, false) - datatransfers.ResolvePreparedTransaction(context.TODO(), tx.FromShardId, tx.FromTxName, false) } - err = qc.db.RemoveTransferTx(context.TODO(), r.KeyRangeID) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error removing from qdb") + if krm != nil { + err = qc.Move(context.TODO(), krm) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("error moving key range") + } } } @@ -796,8 +804,26 @@ func (qc *qdbCoordinator) RecordKeyRangeMove(ctx context.Context, m *qdb.MoveKey return m.MoveId, nil } +func (qc *qdbCoordinator) GetKeyRangeMove(ctx context.Context, krId string) (*qdb.MoveKeyRange, error) { + ls, err := qc.db.ListKeyRangeMoves(ctx) + if err != nil { + return nil, err + } + + for _, krm := range ls { + // after the coordinator restarts, it will continue the move that was previously initiated. + // key range move already exist for this key range + // complete it first + if krm.KeyRangeID == krId { + return krm, nil + } + } + + return nil, nil +} + // Move key range from one logical shard to another -// This function reshards data by locking a portion of it, +// This function re-shards data by locking a portion of it, // making it unavailable for read and write access during the process. // TODO : unit tests func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error { @@ -809,7 +835,7 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error Str("shard-id", req.ShardId). Msg("qdb coordinator move key range") - keyRange, err := qc.db.GetKeyRange(ctx, req.Krid) + keyRange, err := qc.GetKeyRange(ctx, req.Krid) if err != nil { return err } @@ -819,63 +845,91 @@ func (qc *qdbCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error return nil } - moveId, err := qc.RecordKeyRangeMove(ctx, - &qdb.MoveKeyRange{ - MoveId: uuid.New().String(), - ShardId: req.ShardId, - KeyRangeID: req.Krid, - Status: qdb.MoveKeyRangePlanned, - }) - - /* NOOP if key range move was already recorded */ + move, err := qc.GetKeyRangeMove(ctx, req.Krid) if err != nil { return err } - krmv, err := qc.LockKeyRange(ctx, req.Krid) - if err != nil { - return err - } - defer func() { - if err := qc.UnlockKeyRange(ctx, req.Krid); err != nil { - spqrlog.Zero.Error().Err(err).Msg("") + if move == nil { + // No key range moves in progress + move = &qdb.MoveKeyRange{ + MoveId: uuid.New().String(), + ShardId: req.ShardId, + KeyRangeID: req.Krid, + Status: qdb.MoveKeyRangePlanned, } - }() - - defer func() { - // set compelted status in the end of key range move operation - if err := qc.db.UpdateKeyRangeMoveStatus(ctx, moveId, qdb.MoveKeyRangeComplete); err != nil { - spqrlog.Zero.Error().Err(err).Msg("") + _, err = qc.RecordKeyRangeMove(ctx, move) + if err != nil { + return err } - }() - - /* physical changes on shards */ - err = datatransfers.MoveKeys(ctx, keyRange.ShardID, req.ShardId, *keyRange, qc.db) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("failed to move rows") - return err } - // Update the state of the distributed key-range metadata - krmv.ShardID = req.ShardId - if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, krmv); err != nil { - // TODO: check if unlock here is ok - return err - } + for move != nil { + switch move.Status { + case qdb.MoveKeyRangePlanned: + // lock the key range + _, err = qc.LockKeyRange(ctx, req.Krid) + if err != nil { + return err + } + if err = qc.db.UpdateKeyRangeMoveStatus(ctx, move.MoveId, qdb.MoveKeyRangeComplete); err != nil { + return err + } + move.Status = qdb.MoveKeyRangeStarted + case qdb.MoveKeyRangeStarted: + // move the data + ds, err := qc.GetDistribution(ctx, keyRange.Distribution) + if err != nil { + return err + } + err = datatransfers.MoveKeys(ctx, keyRange.ShardID, req.ShardId, keyRange, ds, qc.db, qc) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("failed to move rows") + return err + } - // Notify all routers about scheme changes. - if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error { - cl := routerproto.NewKeyRangeServiceClient(cc) - moveResp, err := cl.MoveKeyRange(ctx, &routerproto.MoveKeyRangeRequest{ - Id: krmv.ID, - ToShardId: krmv.ShardID, - }) - spqrlog.Zero.Debug(). - Interface("response", moveResp). - Msg("move key range response") - return err - }); err != nil { - return err + // update key range + krg, err := qc.GetKeyRange(ctx, req.Krid) + if err != nil { + return err + } + krg.ShardID = req.ShardId + if err := ops.ModifyKeyRangeWithChecks(ctx, qc.db, krg); err != nil { + // TODO: check if unlock here is ok + return err + } + + // Notify all routers about scheme changes. + if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error { + cl := routerproto.NewKeyRangeServiceClient(cc) + moveResp, err := cl.MoveKeyRange(ctx, &routerproto.MoveKeyRangeRequest{ + Id: krg.ID, + ToShardId: krg.ShardID, + }) + spqrlog.Zero.Debug(). + Interface("response", moveResp). + Msg("move key range response") + return err + }); err != nil { + return err + } + + if err := qc.db.UpdateKeyRangeMoveStatus(ctx, move.MoveId, qdb.MoveKeyRangeComplete); err != nil { + spqrlog.Zero.Error().Err(err).Msg("") + } + move.Status = qdb.MoveKeyRangeComplete + case qdb.MoveKeyRangeComplete: + // unlock key range + if err := qc.UnlockKeyRange(ctx, req.Krid); err != nil { + spqrlog.Zero.Error().Err(err).Msg("") + } + if err := qc.db.DeleteKeyRangeMove(ctx, move.MoveId); err != nil { + return err + } + move = nil + default: + return fmt.Errorf("unknown key range move status: \"%s\"", move.Status) + } } return nil } diff --git a/docker/shard/bin/setup b/docker/shard/bin/setup index 66caaa4d5..5b0eabdbf 100755 --- a/docker/shard/bin/setup +++ b/docker/shard/bin/setup @@ -40,14 +40,15 @@ sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE ROLE $POSTGRES # Create databases sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE DATABASE $POSTGRES_DB" -d postgres >> $SETUP_LOG 2>&1 || { - echo "ERROR: users creation failed, examine the log" + echo "ERROR: databases creation failed, examine the log" cat "$SETUP_LOG" exit 1 } - # Create extension - sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS pg_stat_statements; CREATE EXTENSION IF NOT EXISTS pg_stat_kcache; CREATE EXTENSION IF NOT EXISTS pg_comment_stats;" -d $POSTGRES_DB >> $SETUP_LOG 2>&1 || { - echo "ERROR: users creation failed, examine the log" + +# # Create extension + sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS postgres_fdw; CREATE EXTENSION IF NOT EXISTS pg_stat_statements; CREATE EXTENSION IF NOT EXISTS pg_stat_kcache; CREATE EXTENSION IF NOT EXISTS pg_comment_stats;" -d $POSTGRES_DB >> $SETUP_LOG 2>&1 || { + echo "ERROR: extensions creation failed, examine the log" cat "$SETUP_LOG" exit 1 } diff --git a/pkg/datatransfers/data_transfers.go b/pkg/datatransfers/data_transfers.go index 6a16d699c..1adf479a6 100644 --- a/pkg/datatransfers/data_transfers.go +++ b/pkg/datatransfers/data_transfers.go @@ -3,18 +3,18 @@ package datatransfers import ( "context" "fmt" - "github.com/pg-sharding/spqr/pkg/models/distributions" - "io" - "os" - "strings" - "sync" - - pgx "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5" _ "github.com/lib/pq" + "github.com/pg-sharding/spqr/coordinator" "github.com/pg-sharding/spqr/pkg/config" + "github.com/pg-sharding/spqr/pkg/models/distributions" "github.com/pg-sharding/spqr/pkg/models/kr" "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/qdb" + "io" + "os" + "strings" + "sync" ) type MoveTableRes struct { @@ -23,18 +23,12 @@ type MoveTableRes struct { } // TODO: use schema -// var schema = flag.String("shema", "", "") +// var schema = flag.String("schema", "", "") type ProxyW struct { w io.WriteCloser } -type pgxConnIface interface { - Begin(context.Context) (pgx.Tx, error) - BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error) - Close(context.Context) error -} - func (p *ProxyW) Write(bt []byte) (int, error) { return p.w.Write(bt) } @@ -55,6 +49,7 @@ func createConnString(shardID string) string { if len(sd.Hosts) == 0 { return "" } + // TODO find_master host := strings.Split(sd.Hosts[0], ":")[0] port := strings.Split(sd.Hosts[0], ":")[1] return fmt.Sprintf("user=%s host=%s port=%s dbname=%s password=%s", sd.User, host, port, sd.DB, sd.Password) @@ -81,11 +76,26 @@ MoveKeys performs physical key-range move from one datashard to another. It is assumed that passed key range is already locked on every online spqr-router. Steps: - - traverse pg_class to resolve all relations that matches given sharding rules - - create sql copy and delete queries to move data tuples. - - prepare and commit distributed move transaction + - create postgres_fdw on receiving shard + - copy data from sending shard to receiving shard via fdw + - delete data from sending shard */ -func MoveKeys(ctx context.Context, fromId, toId string, keyr qdb.KeyRange, db qdb.XQDB) error { +func MoveKeys(ctx context.Context, fromId, toId string, krg *kr.KeyRange, ds *distributions.Distribution, db qdb.XQDB, cr coordinator.Coordinator) error { + tx, err := db.GetTransferTx(ctx, krg.ID) + if err != nil { + return err + } + if tx == nil { + // No transaction in progress + tx = &qdb.DataTransferTransaction{ + ToShardId: toId, + FromShardId: fromId, + Status: qdb.Planned, + } + if err = db.RecordTransferTx(ctx, krg.ID, tx); err != nil { + return err + } + } if shards == nil { err := LoadConfig(config.CoordinatorConfig().ShardDataCfg) if err != nil { @@ -104,220 +114,161 @@ func MoveKeys(ctx context.Context, fromId, toId string, keyr qdb.KeyRange, db qd return err } - txFrom, txTo, err := beginTransactions(ctx, from, to) - if err != nil { - return err - } - defer func(ctx context.Context) { - err := rollbackTransactions(ctx, txTo, txFrom) - if err != nil { - spqrlog.Zero.Warn().Msg("error closing transaction") - } - }(ctx) - - var nextKeyRange *kr.KeyRange - moveKeyRange := kr.KeyRangeFromDB(&keyr) - qdbDs, err := db.GetDistribution(ctx, keyr.DistributionId) + upperBound, err := resolveNextBound(ctx, krg, cr) if err != nil { return err } - ds := distributions.DistributionFromDB(qdbDs) - if krs, err := db.ListKeyRanges(ctx, moveKeyRange.Distribution); err != nil { - return err - } else { - for _, currkr := range krs { - if kr.CmpRangesLess(moveKeyRange.LowerBound, currkr.LowerBound) { - if nextKeyRange == nil || kr.CmpRangesLess(currkr.LowerBound, nextKeyRange.LowerBound) { - nextKeyRange = kr.KeyRangeFromDB(currkr) + for tx != nil { + switch tx.Status { + case qdb.Planned: + // copy data of key range to receiving shard + if err = copyData(ctx, from, to, fromId, toId, krg, ds, upperBound); err != nil { + return err + } + tx.Status = qdb.DataCopied + err = db.RecordTransferTx(ctx, krg.ID, tx) + if err != nil { + return err + } + case qdb.DataCopied: + // drop data from sending shard + for _, rel := range ds.Relations { + // TODO get actual schema + res := from.QueryRow(ctx, fmt.Sprintf(`SELECT count(*) > 0 as table_exists FROM information_schema.tables WHERE table_name = '%s' AND table_schema = 'public'`, strings.ToLower(rel.Name))) + fromTableExists := false + if err = res.Scan(&fromTableExists); err != nil { + return err + } + if !fromTableExists { + continue + } + _, err = from.Exec(ctx, fmt.Sprintf(`DELETE FROM %s WHERE %s`, strings.ToLower(rel.Name), kr.GetKRCondition(ds, rel, krg, upperBound, ""))) + if err != nil { + return err } } + if err = db.RemoveTransferTx(ctx, krg.ID); err != nil { + return err + } + tx = nil + default: + return fmt.Errorf("incorrect data transfer transaction status: %s", tx.Status) } } - err = moveData(ctx, moveKeyRange, nextKeyRange, ds.Relations, txTo, txFrom) - if err != nil { - return err - } - - err = commitTransactions(ctx, fromId, toId, keyr.KeyRangeID, txTo, txFrom, db) - if err != nil { - return err - } - return nil } -func ResolvePreparedTransaction(ctx context.Context, sh, tx string, commit bool) { - if shards == nil { - err := LoadConfig(config.CoordinatorConfig().ShardDataCfg) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error loading config") - } - } - - db, err := pgx.Connect(ctx, createConnString(sh)) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error connecting to shard") - } - - if commit { - _, err = db.Exec(ctx, fmt.Sprintf("COMMIT PREPARED '%s'", tx)) - } else { - _, err = db.Exec(ctx, fmt.Sprintf("ROLLBACK PREPARED '%s'", tx)) - } - +func resolveNextBound(ctx context.Context, krg *kr.KeyRange, cr coordinator.Coordinator) (kr.KeyRangeBound, error) { + krs, err := cr.ListKeyRanges(ctx, krg.Distribution) if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error closing transaction") + return nil, err } -} - -func beginTransactions(ctx context.Context, from, to pgxConnIface) (pgx.Tx, pgx.Tx, error) { - txFrom, err := from.BeginTx(ctx, pgx.TxOptions{}) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error begining transaction") - return nil, nil, err - } - txTo, err := to.BeginTx(ctx, pgx.TxOptions{}) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error begining transaction") - return nil, nil, err + var bound kr.KeyRangeBound + for _, kRange := range krs { + if kr.CmpRangesLess(krg.LowerBound, kRange.LowerBound) && (bound == nil || kr.CmpRangesLess(kRange.LowerBound, bound)) { + bound = kRange.LowerBound + } } - return txFrom, txTo, nil + return bound, nil } -func commitTransactions(ctx context.Context, f, t string, krid string, txTo, txFrom pgx.Tx, db qdb.XQDB) error { - _, err := txTo.Exec(ctx, fmt.Sprintf("PREPARE TRANSACTION '%s-%s'", t, krid)) +func copyData(ctx context.Context, from, to *pgx.Conn, fromId, toId string, krg *kr.KeyRange, ds *distributions.Distribution, upperBound kr.KeyRangeBound) error { + fromShard := shards.ShardsData[fromId] + toShard := shards.ShardsData[toId] + dbName := fromShard.DB + fromHost := strings.Split(fromShard.Hosts[0], ":")[0] + serverName := fmt.Sprintf("%s_%s_%s", strings.Split(toShard.Hosts[0], ":")[0], dbName, fromHost) + // create postgres_fdw server on receiving shard + // TODO find master + _, err := to.Exec(ctx, fmt.Sprintf(`CREATE server IF NOT EXISTS %s FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname '%s', host '%s', port '%s')`, serverName, dbName, fromHost, strings.Split(fromShard.Hosts[0], ":")[1])) if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error preparing transaction") return err } - _, err = txFrom.Exec(ctx, fmt.Sprintf("PREPARE TRANSACTION '%s-%s'", f, krid)) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error preparing transaction") - _, err1 := txTo.Exec(ctx, fmt.Sprintf("ROLLBACK PREPARED '%s-%s'", t, krid)) - if err1 != nil { - spqrlog.Zero.Error().Err(err1).Msg("error closing transaction") - } + // create user mapping for postgres_fdw server + // TODO check if name is taken + schemaName := fmt.Sprintf("%s_schema", serverName) + if _, err = to.Exec(ctx, fmt.Sprintf(`DROP USER MAPPING IF EXISTS FOR %s SERVER %s`, toShard.User, serverName)); err != nil { return err } - - d := qdb.DataTransferTransaction{ - ToShardId: t, - ToTxName: fmt.Sprintf("%s-%s", t, krid), - FromTxName: fmt.Sprintf("%s-%s", f, krid), - FromShardId: f, - ToStatus: qdb.Processing, - FromStatus: qdb.Processing, - } - - err = db.RecordTransferTx(ctx, krid, &d) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error writing to qdb") - } - - _, err = txTo.Exec(ctx, fmt.Sprintf("COMMIT PREPARED '%s-%s'", t, krid)) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error closing transaction") + if _, err = to.Exec(ctx, fmt.Sprintf(`CREATE USER MAPPING FOR %s SERVER %s OPTIONS (user '%s', password '%s')`, toShard.User, serverName, fromShard.User, fromShard.Password)); err != nil { return err } - - d.ToStatus = qdb.Commited - err = db.RecordTransferTx(ctx, krid, &d) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error writing to qdb") - } - - _, err = txFrom.Exec(ctx, fmt.Sprintf("COMMIT PREPARED '%s-%s'", f, krid)) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error closing transaction") + // create foreign tables corresponding to such on sending shard + // TODO check if schemaName is not used by relations (needs schemas in distributions) + if _, err = to.Exec(ctx, fmt.Sprintf(`DROP SCHEMA IF EXISTS %s CASCADE`, schemaName)); err != nil { return err } - - d.FromStatus = qdb.Commited - err = db.RecordTransferTx(ctx, krid, &d) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("error removing from qdb") + if _, err = to.Exec(ctx, fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS %s`, schemaName)); err != nil { + return err } - return nil -} - -func rollbackTransactions(ctx context.Context, txTo, txFrom pgx.Tx) error { - err := txTo.Rollback(ctx) - if err != nil { - spqrlog.Zero.Warn().Msg("error closing transaction") - } - err1 := txFrom.Rollback(ctx) - if err1 != nil { - spqrlog.Zero.Warn().Msg("error closing transaction") - return err1 - } - return err -} - -// TODO enhance for multi-column sharding rules -func moveData(ctx context.Context, keyRange, nextKeyRange *kr.KeyRange, rels map[string]*distributions.DistributedRelation, txTo, txFrom pgx.Tx) error { - // TODO: use whole RFQN - rows, err := txFrom.Query(ctx, ` -SELECT table_name -FROM information_schema.tables; -`) + _, err = to.Exec(ctx, fmt.Sprintf(`IMPORT FOREIGN SCHEMA public FROM SERVER %s INTO %s`, serverName, schemaName)) if err != nil { return err } - res := make(map[string]struct{}) - for rows.Next() { - var tableName string - err = rows.Scan(&tableName) + for _, rel := range ds.Relations { + krCondition := kr.GetKRCondition(ds, rel, krg, upperBound, "") + // check that relation exists on sending shard and there is data to copy. If not, skip the relation + // TODO get actual schema + fromTableExists, err := checkTableExists(ctx, from, strings.ToLower(rel.Name), "public") if err != nil { return err } - - res[tableName] = struct{}{} - } - - rows.Close() - - for _, rel := range rels { - if _, ok := res[strings.ToLower(rel.Name)]; !ok { + if !fromTableExists { continue } - r, w, err := os.Pipe() + fromCount, err := getEntriesCount(ctx, from, rel.Name, krCondition) if err != nil { return err } - - pw := ProxyW{ - w: w, + // check that relation exists on receiving shard. If not, exit + toTableExists, err := checkTableExists(ctx, to, strings.ToLower(rel.Name), "public") + if err != nil { + return err } - - // This code does not work for multi-column key ranges. - var qry string - if nextKeyRange == nil { - qry = fmt.Sprintf("COPY (DELETE FROM %s WHERE %s >= %s RETURNING *) TO STDOUT", rel.Name, - rel.DistributionKey[0].Column, keyRange.LowerBound) - } else { - qry = fmt.Sprintf("COPY (DELETE FROM %s WHERE %s >= %s and %s < %s RETURNING *) TO STDOUT", rel.Name, - rel.DistributionKey[0].Column, keyRange.LowerBound, rel.DistributionKey[0].Column, nextKeyRange.LowerBound) + if !toTableExists { + return fmt.Errorf("relation %s does not exist on receiving shard", rel.Name) } - - go func() { - _, err = txFrom.Conn().PgConn().CopyTo(ctx, &pw, qry) - if err != nil { - spqrlog.Zero.Error().Err(err).Msg("") - } - - if err := pw.w.Close(); err != nil { - spqrlog.Zero.Error().Err(err).Msg("error closing pipe") - } - }() - _, err = txTo.Conn().PgConn().CopyFrom(ctx, - r, fmt.Sprintf("COPY %s FROM STDIN", rel.Name)) + toCount, err := getEntriesCount(ctx, to, rel.Name, krCondition) + if err != nil { + return err + } + // if data is already copied, skip + if toCount == fromCount { + continue + } + // if data is inconsistent, fail + if toCount > 0 && fromCount != 0 { + return fmt.Errorf("key count on sender & receiver mismatch") + } + query := fmt.Sprintf(` + INSERT INTO %s + SELECT * FROM %s + WHERE %s +`, strings.ToLower(rel.Name), fmt.Sprintf("%s.%s", schemaName, strings.ToLower(rel.Name)), krCondition) + _, err = to.Exec(ctx, query) if err != nil { - spqrlog.Zero.Error().Err(err).Msg("copy in failed") return err } } - return nil } + +func checkTableExists(ctx context.Context, conn *pgx.Conn, relName, schema string) (bool, error) { + res := conn.QueryRow(ctx, fmt.Sprintf(`SELECT count(*) > 0 as table_exists FROM information_schema.tables WHERE table_name = '%s' AND table_schema = '%s'`, relName, schema)) + exists := false + if err := res.Scan(&exists); err != nil { + return false, err + } + return exists, nil +} + +func getEntriesCount(ctx context.Context, conn *pgx.Conn, relName string, condition string) (int, error) { + res := conn.QueryRow(ctx, fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s`, relName, condition)) + count := 0 + if err := res.Scan(&count); err != nil { + return 0, err + } + return count, nil +} diff --git a/pkg/datatransfers/data_transfers_test.go b/pkg/datatransfers/data_transfers_test.go index 3eedccdc0..c6384577b 100644 --- a/pkg/datatransfers/data_transfers_test.go +++ b/pkg/datatransfers/data_transfers_test.go @@ -1,173 +1,12 @@ package datatransfers import ( - "context" - "fmt" "sync" "testing" - "github.com/golang/mock/gomock" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgconn" - mock "github.com/pg-sharding/spqr/pkg/mock/pgx" - "github.com/pg-sharding/spqr/qdb" "github.com/stretchr/testify/assert" ) -// commitTransactions tests -func TestCommitPositive(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - m1.EXPECT().Exec(context.TODO(), "COMMIT PREPARED 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - - m2 := mock.NewMockTx(ctrl) - m2.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh1-krid'").Return(pgconn.CommandTag{}, nil) - m2.EXPECT().Exec(context.TODO(), "COMMIT PREPARED 'sh1-krid'").Return(pgconn.CommandTag{}, nil) - - db, err := qdb.NewXQDB("mem") - assert.NoError(err) - err = commitTransactions(context.TODO(), "sh1", "sh2", "krid", m1, m2, db) - assert.NoError(err) - - tx, err := db.GetTransferTx(context.TODO(), "krid") - assert.NoError(err) - assert.Equal(tx.FromStatus, qdb.Commited) - assert.Equal(tx.ToStatus, qdb.Commited) - assert.Equal(tx.ToTxName, "sh2-krid") - assert.Equal(tx.FromTxName, "sh1-krid") -} - -func TestFailToCommitFirstTx(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - m1.EXPECT().Exec(context.TODO(), "COMMIT PREPARED 'sh2-krid'").Return(pgconn.CommandTag{}, fmt.Errorf("")) - - m2 := mock.NewMockTx(ctrl) - m2.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh1-krid'").Return(pgconn.CommandTag{}, nil) - - db, err := qdb.NewXQDB("mem") - assert.NoError(err) - err = commitTransactions(context.TODO(), "sh1", "sh2", "krid", m1, m2, db) - assert.Error(err) - - tx, err := db.GetTransferTx(context.TODO(), "krid") - assert.NoError(err) - assert.Equal(tx.FromStatus, qdb.Processing) - assert.Equal(tx.ToStatus, qdb.Processing) -} - -func TestFailToCommitSecondTx(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - m1.EXPECT().Exec(context.TODO(), "COMMIT PREPARED 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - - m2 := mock.NewMockTx(ctrl) - m2.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh1-krid'").Return(pgconn.CommandTag{}, nil) - m2.EXPECT().Exec(context.TODO(), "COMMIT PREPARED 'sh1-krid'").Return(pgconn.CommandTag{}, fmt.Errorf("")) - - db, err := qdb.NewXQDB("mem") - assert.NoError(err) - err = commitTransactions(context.TODO(), "sh1", "sh2", "krid", m1, m2, db) - assert.Error(err) - - tx, err := db.GetTransferTx(context.TODO(), "krid") - assert.NoError(err) - assert.Equal(tx.FromStatus, qdb.Processing) - assert.Equal(tx.ToStatus, qdb.Commited) -} - -func TestFailToPrepareFirstTx(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh2-krid'").Return(pgconn.CommandTag{}, fmt.Errorf("")) - - m2 := mock.NewMockTx(ctrl) - - db, err := qdb.NewXQDB("mem") - assert.NoError(err) - err = commitTransactions(context.TODO(), "sh1", "sh2", "krid", m1, m2, db) - assert.Error(err) - - _, err = db.GetTransferTx(context.TODO(), "krid") - assert.Error(err) -} - -func TestFailToPrepareSecondTx(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - m1.EXPECT().Exec(context.TODO(), "ROLLBACK PREPARED 'sh2-krid'").Return(pgconn.CommandTag{}, nil) - - m2 := mock.NewMockTx(ctrl) - m2.EXPECT().Exec(context.TODO(), "PREPARE TRANSACTION 'sh1-krid'").Return(pgconn.CommandTag{}, fmt.Errorf("")) - - db, err := qdb.NewXQDB("mem") - assert.NoError(err) - err = commitTransactions(context.TODO(), "sh1", "sh2", "krid", m1, m2, db) - assert.Error(err) - - _, err = db.GetTransferTx(context.TODO(), "krid") - assert.Error(err) -} - -// rollbackTransactions tests -func TestRollbackPositive(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Rollback(context.TODO()).Return(nil) - - m2 := mock.NewMockTx(ctrl) - m2.EXPECT().Rollback(context.TODO()).Return(nil) - - err := rollbackTransactions(context.TODO(), m1, m2) - assert.NoError(err) -} - -func TestRollbackFirstFailAndSecondRollbacks(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockTx(ctrl) - m1.EXPECT().Rollback(context.TODO()).Return(fmt.Errorf("")) - - m2 := mock.NewMockTx(ctrl) - m2.EXPECT().Rollback(context.TODO()).Return(nil) - - err := rollbackTransactions(context.TODO(), m1, m2) - assert.Error(err) -} - -//beginTransactions tests - -func TestBeginTxPositive(t *testing.T) { - assert := assert.New(t) - ctrl := gomock.NewController(t) - - m1 := mock.NewMockpgxConnIface(ctrl) - m1.EXPECT().BeginTx(context.TODO(), pgx.TxOptions{}).Return(nil, nil) - - m2 := mock.NewMockpgxConnIface(ctrl) - m2.EXPECT().BeginTx(context.TODO(), pgx.TxOptions{}).Return(nil, nil) - - _, _, err := beginTransactions(context.TODO(), m1, m2) - assert.NoError(err) -} - // createConnString and LoadConfig tests func TestConnectCreds(t *testing.T) { assert := assert.New(t) diff --git a/pkg/models/kr/keyrange.go b/pkg/models/kr/keyrange.go index a32dfff6e..b359971e4 100644 --- a/pkg/models/kr/keyrange.go +++ b/pkg/models/kr/keyrange.go @@ -1,9 +1,12 @@ package kr import ( + "fmt" + "github.com/pg-sharding/spqr/pkg/models/distributions" proto "github.com/pg-sharding/spqr/pkg/protos" "github.com/pg-sharding/spqr/qdb" spqrparser "github.com/pg-sharding/spqr/yacc/console" + "strings" ) type KeyRangeBound []byte @@ -104,3 +107,40 @@ func (kr *KeyRange) ToProto() *proto.KeyRangeInfo { DistributionId: kr.Distribution, } } + +// GetKRCondition returns SQL condition for elements of distributed relation between two key ranges +// TODO support multidimensional key ranges +func GetKRCondition(ds *distributions.Distribution, rel *distributions.DistributedRelation, kRange *KeyRange, upperBound KeyRangeBound, prefix string) string { + buf := make([]string, len(rel.DistributionKey)) + for i, entry := range rel.DistributionKey { + // TODO remove after multidimensional key range support + if i > 0 { + break + } + // TODO add hash (depends on col type) + hashedCol := "" + if prefix != "" { + hashedCol = fmt.Sprintf("%s.%s", prefix, entry.Column) + } else { + hashedCol = entry.Column + } + lBound := "" + if ds.ColTypes[i] == "varchar" { + lBound = fmt.Sprintf("'%s'", string(kRange.LowerBound)) + } else { + lBound = string(kRange.LowerBound) + } + if upperBound != nil { + rBound := "" + if ds.ColTypes[i] == "varchar" { + rBound = fmt.Sprintf("'%s'", string(upperBound)) + } else { + rBound = string(upperBound) + } + buf[i] = fmt.Sprintf("%s >= %s AND %s < %s", hashedCol, lBound, hashedCol, rBound) + } else { + buf[i] = fmt.Sprintf("%s >= %s", hashedCol, lBound) + } + } + return strings.Join(buf, " AND ") +} diff --git a/pkg/models/kr/keyrange_test.go b/pkg/models/kr/keyrange_test.go new file mode 100644 index 000000000..534c53c6b --- /dev/null +++ b/pkg/models/kr/keyrange_test.go @@ -0,0 +1,84 @@ +package kr_test + +import ( + "github.com/pg-sharding/spqr/pkg/models/distributions" + "github.com/pg-sharding/spqr/pkg/models/kr" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetKRCondition(t *testing.T) { + assert := assert.New(t) + + for i, c := range []struct { + ds *distributions.Distribution + rel *distributions.DistributedRelation + krg *kr.KeyRange + upperBound kr.KeyRangeBound + prefix string + expected string + }{ + { + ds: &distributions.Distribution{ColTypes: []string{"integer"}}, + rel: &distributions.DistributedRelation{ + Name: "rel", + DistributionKey: []distributions.DistributionKeyEntry{ + {Column: "col1", HashFunction: "ident"}, + }, + }, + krg: &kr.KeyRange{ID: "kr1", LowerBound: []byte("0")}, + upperBound: []byte("10"), + prefix: "", + expected: "col1 >= 0 AND col1 < 10", + }, + // prefix + { + ds: &distributions.Distribution{ColTypes: []string{"integer"}}, + rel: &distributions.DistributedRelation{ + Name: "rel", + DistributionKey: []distributions.DistributionKeyEntry{ + {Column: "col1", HashFunction: "ident"}, + }, + }, + krg: &kr.KeyRange{ID: "kr1", LowerBound: []byte("0")}, + upperBound: []byte("10"), + prefix: "rel", + expected: "rel.col1 >= 0 AND rel.col1 < 10", + }, + // no upper bound + { + ds: &distributions.Distribution{ColTypes: []string{"integer"}}, + rel: &distributions.DistributedRelation{ + Name: "rel", + DistributionKey: []distributions.DistributionKeyEntry{ + {Column: "col1", HashFunction: "ident"}, + }, + }, + krg: &kr.KeyRange{ID: "kr1", LowerBound: []byte("0")}, + upperBound: nil, + prefix: "", + expected: "col1 >= 0", + }, + // string columns + { + ds: &distributions.Distribution{ColTypes: []string{"varchar"}}, + rel: &distributions.DistributedRelation{ + Name: "rel", + DistributionKey: []distributions.DistributionKeyEntry{ + {Column: "col1", HashFunction: "ident"}, + }, + }, + krg: &kr.KeyRange{ID: "kr1", LowerBound: []byte("a")}, + upperBound: []byte("b"), + prefix: "", + expected: "col1 >= 'a' AND col1 < 'b'", + }, + } { + assert.Equal( + kr.GetKRCondition(c.ds, c.rel, c.krg, c.upperBound, c.prefix), + c.expected, + "test case %d", i, + ) + } + +} diff --git a/qdb/etcdqdb.go b/qdb/etcdqdb.go index 4fa39dde9..bfe4cc044 100644 --- a/qdb/etcdqdb.go +++ b/qdb/etcdqdb.go @@ -57,6 +57,7 @@ const ( shardsNamespace = "/shards/" relationMappingNamespace = "/relation_mappings/" taskGroupPath = "/move_task_group" + transactionNamespace = "/transfer_txs/" CoordKeepAliveTtl = 3 keyspace = "key_space" @@ -95,6 +96,10 @@ func (q *EtcdQDB) Client() *clientv3.Client { return q.cli } +func transferTxNodePath(key string) string { + return path.Join(transactionNamespace, key) +} + // ============================================================================== // KEY RANGES // ============================================================================== @@ -436,13 +441,17 @@ func (q *EtcdQDB) ShareKeyRange(id string) error { // TODO : unit tests func (q *EtcdQDB) RecordTransferTx(ctx context.Context, key string, info *DataTransferTransaction) error { + spqrlog.Zero.Debug(). + Str("key", key). + Msg("etcdqdb: record data transfer tx") + bts, err := json.Marshal(info) if err != nil { spqrlog.Zero.Error().Err(err).Msg("Failed to marshal transaction") return err } - _, err = q.cli.Put(ctx, key, string(bts)) + _, err = q.cli.Put(ctx, transferTxNodePath(key), string(bts)) if err != nil { spqrlog.Zero.Error().Err(err).Msg("Failed to write transaction") return err @@ -453,32 +462,35 @@ func (q *EtcdQDB) RecordTransferTx(ctx context.Context, key string, info *DataTr // TODO : unit tests func (q *EtcdQDB) GetTransferTx(ctx context.Context, key string) (*DataTransferTransaction, error) { - resp, err := q.cli.Get(ctx, key, clientv3.WithPrefix()) + spqrlog.Zero.Debug(). + Str("key", key). + Msg("etcdqdb: get data transfer tx") + + resp, err := q.cli.Get(ctx, transferTxNodePath(key)) if err != nil { spqrlog.Zero.Error().Err(err).Msg("Failed to get transaction") return nil, err } var st DataTransferTransaction - - for _, e := range resp.Kvs { - if err := json.Unmarshal(e.Value, &st); err != nil { - spqrlog.Zero.Error().Err(err).Msg("Failed to unmarshal transaction") - return nil, err - } - if st.ToStatus == "" { - continue - } + if len(resp.Kvs) == 0 { + return nil, nil } - if st.ToStatus == "" { - return nil, spqrerror.Newf(spqrerror.SPQR_TRANSFER_ERROR, "no transaction in qdb with key %s", key) + + if err := json.Unmarshal(resp.Kvs[0].Value, &st); err != nil { + spqrlog.Zero.Error().Err(err).Msg("Failed to unmarshal transaction") + return nil, err } return &st, nil } // TODO : unit tests func (q *EtcdQDB) RemoveTransferTx(ctx context.Context, key string) error { - _, err := q.cli.Delete(ctx, key) + spqrlog.Zero.Debug(). + Str("key", key). + Msg("etcdqdb: remove data transfer tx") + + _, err := q.cli.Delete(ctx, transferTxNodePath(key)) if err != nil { spqrlog.Zero.Error().Err(err).Msg("Failed to delete transaction") return err @@ -1143,7 +1155,7 @@ func (q *EtcdQDB) ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error spqrlog.Zero.Debug(). Interface("response", resp). - Msg("etcdqdb: list move key range oeprations") + Msg("etcdqdb: list move key range operations") return moves, nil } @@ -1174,15 +1186,12 @@ func (q *EtcdQDB) RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error func (q *EtcdQDB) UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error { spqrlog.Zero.Debug(). Str("id", moveId). - Msg("etcdqdb: update key range") + Msg("etcdqdb: update key range move status") - resp, err := q.cli.Get(ctx, keyRangeMovesNodePath(moveId), clientv3.WithPrefix()) + resp, err := q.cli.Get(ctx, keyRangeMovesNodePath(moveId)) if err != nil { return err } - if len(resp.Kvs) != 1 { - return spqrerror.Newf(spqrerror.SPQR_KEYRANGE_ERROR, "failed to update move key range operation by id %s", moveId) - } var moveKr MoveKeyRange if err := json.Unmarshal(resp.Kvs[0].Value, &moveKr); err != nil { return err @@ -1204,3 +1213,24 @@ func (q *EtcdQDB) UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s return nil } + +func (q *EtcdQDB) DeleteKeyRangeMove(ctx context.Context, moveId string) error { + spqrlog.Zero.Debug(). + Str("id", moveId). + Msg("etcdqdb: delete key range move") + + resp, err := q.cli.Get(ctx, keyRangeMovesNodePath(moveId)) + if err != nil { + return err + } + var moveKr MoveKeyRange + if err := json.Unmarshal(resp.Kvs[0].Value, &moveKr); err != nil { + return err + } + if moveKr.Status != MoveKeyRangeComplete { + return fmt.Errorf("cannot remove non-completed key range move") + } + _, err = q.cli.Delete(ctx, keyRangeMovesNodePath(moveId)) + + return err +} diff --git a/qdb/memqdb.go b/qdb/memqdb.go index 04a6a8a33..97b16347a 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -14,7 +14,6 @@ import ( ) type MemQDB struct { - ShardingSchemaKeeper // TODO create more mutex per map if needed mu sync.RWMutex @@ -119,6 +118,30 @@ func (q *MemQDB) DumpState() error { return nil } +// ============================================================================== +// KEY RANGE MOVES +// ============================================================================== + +func (q *MemQDB) RecordKeyRangeMove(ctx context.Context, m *MoveKeyRange) error { + // TODO implement + return nil +} + +func (q *MemQDB) ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error) { + // TODO implement + return nil, nil +} + +func (q *MemQDB) UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error { + // TODO implement + return nil +} + +func (q *MemQDB) DeleteKeyRangeMove(ctx context.Context, moveId string) error { + // TODO implement + return nil +} + // ============================================================================== // KEY RANGES // ============================================================================== @@ -388,7 +411,7 @@ func (q *MemQDB) GetTransferTx(_ context.Context, key string) (*DataTransferTran ans, ok := q.Transactions[key] if !ok { - return nil, spqrerror.Newf(spqrerror.SPQR_TRANSFER_ERROR, "no tx with key %s", key) + return nil, nil } return ans, nil } diff --git a/qdb/memqdb_test.go b/qdb/memqdb_test.go index ffd88e103..cd090f89c 100644 --- a/qdb/memqdb_test.go +++ b/qdb/memqdb_test.go @@ -31,10 +31,7 @@ var mockRouter = &qdb.Router{ var mockDataTransferTransaction = &qdb.DataTransferTransaction{ ToShardId: mockShard.ID, FromShardId: mockShard.ID, - FromTxName: "fake_tx_1", - ToTxName: "fake_tx_2", - FromStatus: "fake_st_1", - ToStatus: "fake_st_2", + Status: "fake_st", } // must run with -race diff --git a/qdb/qdb.go b/qdb/qdb.go index 6a3e909ac..cdc34dc58 100644 --- a/qdb/qdb.go +++ b/qdb/qdb.go @@ -14,6 +14,8 @@ type ShardingSchemaKeeper interface { ListKeyRangeMoves(ctx context.Context) ([]*MoveKeyRange, error) /* mark key range move as completed */ UpdateKeyRangeMoveStatus(ctx context.Context, moveId string, s MoveKeyRangeStatus) error + // DeleteKeyRangeMove deletes info about key range move + DeleteKeyRangeMove(ctx context.Context, moveId string) error } type TopolodyKeeper interface { @@ -109,15 +111,14 @@ func NewXQDB(qdbType string) (XQDB, error) { type TxStatus string const ( - Commited = TxStatus("commit") - Processing = TxStatus("process") + Planned = TxStatus("planned") + DataCopied = TxStatus("data_copied") ) +// DataTransferTransaction contains information about data transfer +// from one shard to another type DataTransferTransaction struct { ToShardId string `json:"to_shard"` FromShardId string `json:"from_shard"` - FromTxName string `json:"from_transaction"` - ToTxName string `json:"to_transaction"` - FromStatus TxStatus `json:"from_tx_status"` - ToStatus TxStatus `json:"to_tx_status"` + Status TxStatus `json:"status"` } diff --git a/test/feature/features/move.feature b/test/feature/features/move.feature index b4d49a4bd..12ca36d3c 100644 --- a/test/feature/features/move.feature +++ b/test/feature/features/move.feature @@ -71,6 +71,125 @@ Feature: Move test """ 001 """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh2" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh2" + } + ] + """ + + Scenario: MOVE KEY RANGE works with varchar keys + When I execute SQL on host "coordinator" + """ + CREATE DISTRIBUTION ds2 COLUMN TYPES varchar; + ADD KEY RANGE krid3 FROM a ROUTE TO sh1 FOR DISTRIBUTION ds2; + ADD KEY RANGE krid4 FROM aa ROUTE TO sh2 FOR DISTRIBUTION ds2; + ALTER DISTRIBUTION ds2 ATTACH RELATION xMoveStr DISTRIBUTION KEY w_id; + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + CREATE TABLE xMoveStr(w_id TEXT, s TEXT); + insert into xMoveStr(w_id, s) values('a', '001'); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + CREATE TABLE xMoveStr(w_id TEXT, s TEXT); + insert into xMoveStr(w_id, s) values('aa', '002'); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT * FROM xMoveStr + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 002 + """ + When I run SQL on host "shard1" + """ + SELECT * FROM xMoveStr + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 001 + """ + When I execute SQL on host "coordinator" + """ + MOVE KEY RANGE krid3 to sh2 + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT * FROM xMoveStr + """ + Then command return code should be "0" + And SQL result should match regexp + """ + .*002(.|\n)*001 + """ + When I run SQL on host "shard1" + """ + SELECT * FROM xMoveStr + """ + Then command return code should be "0" + And SQL result should not match regexp + """ + 001 + """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh1" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh2" + }, + { + "Key range ID":"krid3", + "Distribution ID":"ds2", + "Lower bound":"a", + "Shard ID":"sh2" + }, + { + "Key range ID":"krid4", + "Distribution ID":"ds2", + "Lower bound":"aa", + "Shard ID":"sh2" + } + ] + """ Scenario: MOVE KEY RANGE works with many rows When I run SQL on host "shard1" @@ -111,6 +230,28 @@ Feature: Move test """ .*001(.|\n)*002(.|\n)*003(.|\n)*004(.|\n)*005 """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh2" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh2" + } + ] + """ Scenario: MOVE KEY RANGE works with many tables When I run SQL on host "shard1" @@ -168,6 +309,28 @@ Feature: Move test """ 002 """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh2" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh2" + } + ] + """ Scenario: Move to non-existent shard fails When I run SQL on host "coordinator" @@ -190,3 +353,38 @@ Feature: Move test """ failed to fetch key range at /keyranges/krid3 """ + + Scenario: Move fails when table does not exist on receiver + When I run SQL on host "shard1" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values(1, '001'); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT * FROM xMove + """ + Then command return code should be "1" + And SQL error on host "shard2" should match regexp + """ + relation .* does not exist + """ + When I run SQL on host "shard1" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 001 + """ + When I run SQL on host "coordinator" + """ + MOVE KEY RANGE krid1 to sh2 + """ + Then command return code should be "1" + And SQL error on host "coordinator" should match regexp + """ + relation xMove does not exist on receiving shard + """ \ No newline at end of file diff --git a/test/feature/features/move_recover.feature b/test/feature/features/move_recover.feature index d032d84ac..eb864afd4 100644 --- a/test/feature/features/move_recover.feature +++ b/test/feature/features/move_recover.feature @@ -6,6 +6,7 @@ Feature: Move recover test When I execute SQL on host "coordinator" """ + REGISTER ROUTER r1 ADDRESS regress_router:7000; CREATE DISTRIBUTION ds1 COLUMN TYPES INTEGER; ADD KEY RANGE krid1 FROM 1 ROUTE TO sh1 FOR DISTRIBUTION ds1; ADD KEY RANGE krid2 FROM 11 ROUTE TO sh2 FOR DISTRIBUTION ds1; @@ -13,15 +14,14 @@ Feature: Move recover test """ Then command return code should be "0" - Scenario: Interrapted transaction continues - When I record in qdb data transfer transaction with name "krid2" - """ - {"to_shard": "sh1", - "from_shard": "sh2", - "from_transaction": "tx2", - "to_transaction": "tx1", - "from_tx_status": "process", - "to_tx_status": "commit" + Scenario: Planned key range movement continues + When I record in qdb key range move + """ + { + "move_id": "move1", + "key_range_id": "krid2", + "shard_id": "sh1", + "status": "PLANNED" } """ Then command return code should be "0" @@ -29,23 +29,91 @@ Feature: Move recover test """ CREATE TABLE xMove(w_id INT, s TEXT); insert into xMove(w_id, s) values(1, '001'); - insert into xMove(w_id, s) values(11, '002'); """ Then command return code should be "0" When I run SQL on host "shard2" """ CREATE TABLE xMove(w_id INT, s TEXT); insert into xMove(w_id, s) values(11, '002'); - BEGIN; - DELETE FROM xMove WHERE w_id = 11; - PREPARE TRANSACTION 'tx2'; + """ + Then command return code should be "0" + Given host "coordinator" is stopped + When I execute SQL on host "coordinator2" + """ + SHOW routers + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ SELECT * FROM xMove """ Then command return code should be "0" And SQL result should match regexp """ + 001(.|\n)*002 + """ + When I run SQL on host "shard2" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should not match regexp + """ 002 - """ + """ + When I run SQL on host "coordinator2" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh1" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh1" + } + ] + """ + And qdb should not contain transaction "krid2" + And qdb should not contain key range moves + + Scenario: Started key range movement continues + When I execute SQL on host "coordinator" + """ + LOCK KEY RANGE krid2 + """ + Then command return code should be "0" + When I record in qdb key range move + """ + { + "move_id": "move1", + "key_range_id": "krid2", + "shard_id": "sh1", + "status": "STARTED" + } + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values(1, '001'); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values(11, '002'); + """ + Then command return code should be "0" Given host "coordinator" is stopped When I execute SQL on host "coordinator2" """ @@ -70,16 +138,53 @@ Feature: Move recover test """ 002 """ + When I run SQL on host "coordinator2" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh1" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh1" + } + ] + """ + And qdb should not contain transaction "krid2" + And qdb should not contain key range moves - Scenario: Interrapted transaction rollbacks + Scenario: Started key range movement continues with planned transaction + When I execute SQL on host "coordinator" + """ + LOCK KEY RANGE krid2 + """ + Then command return code should be "0" + When I record in qdb key range move + """ + { + "move_id": "move1", + "key_range_id": "krid2", + "shard_id": "sh1", + "status": "STARTED" + } + """ + Then command return code should be "0" When I record in qdb data transfer transaction with name "krid2" """ - {"to_shard": "sh1", + { + "to_shard": "sh1", "from_shard": "sh2", - "from_transaction": "tx2", - "to_transaction": "tx1", - "from_tx_status": "process", - "to_tx_status": "process" + "status": "planned" } """ Then command return code should be "0" @@ -87,30 +192,176 @@ Feature: Move recover test """ CREATE TABLE xMove(w_id INT, s TEXT); insert into xMove(w_id, s) values(1, '001'); - BEGIN; + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + CREATE TABLE xMove(w_id INT, s TEXT); insert into xMove(w_id, s) values(11, '002'); - PREPARE TRANSACTION 'tx2'; + """ + Then command return code should be "0" + Given host "coordinator" is stopped + When I execute SQL on host "coordinator2" + """ + SHOW routers + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 001(.|\n)*002 + """ + When I run SQL on host "shard2" + """ SELECT * FROM xMove """ Then command return code should be "0" And SQL result should not match regexp """ 002 - """ + """ + When I run SQL on host "coordinator2" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh1" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh1" + } + ] + """ + And qdb should not contain transaction "krid2" + And qdb should not contain key range moves + + Scenario: Started key range movement continues with dataCopied transaction + When I execute SQL on host "coordinator" + """ + LOCK KEY RANGE krid2 + """ + Then command return code should be "0" + When I record in qdb key range move + """ + { + "move_id": "move1", + "key_range_id": "krid2", + "shard_id": "sh1", + "status": "STARTED" + } + """ + Then command return code should be "0" + When I record in qdb data transfer transaction with name "krid2" + """ + { + "to_shard": "sh1", + "from_shard": "sh2", + "status": "data_copied" + } + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values (1, '001'), (11, '002'); + """ + Then command return code should be "0" When I run SQL on host "shard2" """ CREATE TABLE xMove(w_id INT, s TEXT); insert into xMove(w_id, s) values(11, '002'); - BEGIN; - DELETE FROM xMove WHERE w_id = 11; - PREPARE TRANSACTION 'tx2'; + """ + Then command return code should be "0" + Given host "coordinator" is stopped + When I execute SQL on host "coordinator2" + """ + SHOW routers + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ SELECT * FROM xMove """ Then command return code should be "0" And SQL result should match regexp """ + 001(.|\n)*002 + """ + When I run SQL on host "shard2" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should not match regexp + """ 002 - """ + """ + When I run SQL on host "coordinator2" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh1" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh1" + } + ] + """ + And qdb should not contain transaction "krid2" + And qdb should not contain key range moves + + Scenario: Started key range movement continues with completed transaction + When I execute SQL on host "coordinator" + """ + LOCK KEY RANGE krid2 + """ + Then command return code should be "0" + When I record in qdb key range move + """ + { + "move_id": "move1", + "key_range_id": "krid2", + "shard_id": "sh1", + "status": "STARTED" + } + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values (1, '001'), (11, '002'); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" Given host "coordinator" is stopped When I execute SQL on host "coordinator2" """ @@ -124,23 +375,93 @@ Feature: Move recover test Then command return code should be "0" And SQL result should match regexp """ - 001 + 001(.|\n)*002 + """ + When I run SQL on host "shard2" """ + SELECT * FROM xMove + """ + Then command return code should be "0" And SQL result should not match regexp """ 002 """ + When I run SQL on host "coordinator2" + """ + SHOW key_ranges + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [ + { + "Key range ID":"krid1", + "Distribution ID":"ds1", + "Lower bound":"1", + "Shard ID":"sh1" + }, + { + "Key range ID":"krid2", + "Distribution ID":"ds1", + "Lower bound":"11", + "Shard ID":"sh1" + } + ] + """ + And qdb should not contain transaction "krid2" + And qdb should not contain key range moves + + Scenario: Completed key range movement continues + When I execute SQL on host "coordinator" + """ + LOCK KEY RANGE krid2 + """ + Then command return code should be "0" + When I record in qdb key range move + """ + { + "move_id": "move1", + "key_range_id": "krid2", + "shard_id": "sh1", + "status": "COMPLETE" + } + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values(1, '001'), (11, '002'); + """ + Then command return code should be "0" When I run SQL on host "shard2" """ - SELECT * FROM xMove + CREATE TABLE xMove(w_id INT, s TEXT); """ Then command return code should be "0" - And SQL result should match regexp + Given host "coordinator" is stopped + When I execute SQL on host "coordinator2" """ - 002 + SHOW routers """ + Then command return code should be "0" + # key range "krid2" is unlocked + When I execute SQL on host "router" + """ + INSERT INTO xMove (w_id, s) values (12, 'text') + """ + Then command return code should be "0" + And qdb should not contain key range moves - Scenario: coordinator saves transaction to QDB and processes it on restart + Scenario: Planned transaction continues + When I record in qdb data transfer transaction with name "krid2" + """ + { + "to_shard": "sh1", + "from_shard": "sh2", + "status": "planned" + } + """ + Then command return code should be "0" When I run SQL on host "shard1" """ CREATE TABLE xMove(w_id INT, s TEXT); @@ -153,17 +474,76 @@ Feature: Move recover test insert into xMove(w_id, s) values(11, '002'); """ Then command return code should be "0" - When I execute SQL on host "coordinator" + Given host "coordinator" is stopped + When I execute SQL on host "coordinator2" + """ + SHOW routers """ - MOVE KEY RANGE krid1 to sh2 + Then command return code should be "0" + When I run SQL on host "shard1" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 001(.|\n)*002 + """ + When I run SQL on host "shard2" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should not match regexp + """ + 002 + """ + And qdb should not contain transaction "krid2" + + Scenario: DataCopied transaction continues + When I record in qdb data transfer transaction with name "krid2" + """ + { + "to_shard": "sh1", + "from_shard": "sh2", + "status": "data_copied" + } + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values (1, '001'), (11, '002'); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + insert into xMove(w_id, s) values(11, '002'); """ Then command return code should be "0" - And qdb should contain transaction "krid1" Given host "coordinator" is stopped When I execute SQL on host "coordinator2" """ SHOW routers """ Then command return code should be "0" - And qdb should not contain transaction "krid1" - + When I run SQL on host "shard1" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 001(.|\n)*002 + """ + When I run SQL on host "shard2" + """ + SELECT * FROM xMove + """ + Then command return code should be "0" + And SQL result should not match regexp + """ + 002 + """ + And qdb should not contain transaction "krid2" diff --git a/test/feature/spqr_test.go b/test/feature/spqr_test.go index 025cc21d4..84dc9db78 100644 --- a/test/feature/spqr_test.go +++ b/test/feature/spqr_test.go @@ -821,13 +821,24 @@ func (tctx *testContext) stepQDBShouldNotContainRelation(key string) error { } } +func (tctx *testContext) stepRecordQDBKRMove(body *godog.DocString) error { + query := strings.TrimSpace(body.Content) + var m qdb.MoveKeyRange + if err := json.Unmarshal([]byte(query), &m); err != nil { + spqrlog.Zero.Error().Err(err).Msg("Failed to unmarshal request") + return err + } + + return tctx.qdb.RecordKeyRangeMove(context.TODO(), &m) +} + func (tctx *testContext) stepQDBShouldContainTx(key string) error { tx, err := tctx.qdb.GetTransferTx(context.TODO(), key) if err != nil { return err } - if tx == nil || tx.FromStatus == "" || tx.ToStatus == "" { + if tx == nil || tx.Status == "" { return fmt.Errorf("No valid transaction with key %s", key) } return nil @@ -835,12 +846,23 @@ func (tctx *testContext) stepQDBShouldContainTx(key string) error { func (tctx *testContext) stepQDBShouldNotContainTx(key string) error { tx, err := tctx.qdb.GetTransferTx(context.TODO(), key) - if tx == nil || err != nil || tx.FromStatus == "" || tx.ToStatus == "" { + if tx == nil || err != nil || tx.Status == "" { return nil } return fmt.Errorf("Valid transaction present with key %s", key) } +func (tctx *testContext) stepQDBShouldNotContainKRMoves() error { + txs, err := tctx.qdb.ListKeyRangeMoves(context.TODO()) + if err != nil { + return err + } + if len(txs) == 0 { + return nil + } + return fmt.Errorf("key range moves present") +} + func (tctx *testContext) stepErrorShouldMatch(host string, matcher string, body *godog.DocString) error { m, err := matchers.GetMatcher(matcher) if err != nil { @@ -935,8 +957,10 @@ func InitializeScenario(s *godog.ScenarioContext, t *testing.T, debug bool) { s.Step(`^SQL result should not match (\w+)$`, tctx.stepSQLResultShouldNotMatch) s.Step(`^I record in qdb data transfer transaction with name "([^"]*)"$`, tctx.stepRecordQDBTx) s.Step(`^qdb should not contain relation "([^"]*)"$`, tctx.stepQDBShouldNotContainRelation) + s.Step(`^I record in qdb key range move$`, tctx.stepRecordQDBKRMove) s.Step(`^qdb should contain transaction "([^"]*)"$`, tctx.stepQDBShouldContainTx) s.Step(`^qdb should not contain transaction "([^"]*)"$`, tctx.stepQDBShouldNotContainTx) + s.Step(`^qdb should not contain key range moves$`, tctx.stepQDBShouldNotContainKRMoves) s.Step(`^SQL error on host "([^"]*)" should match (\w+)$`, tctx.stepErrorShouldMatch) s.Step(`^file "([^"]*)" on host "([^"]*)" should match (\w+)$`, tctx.stepFileOnHostShouldMatch) s.Step(`^I fail to run SQL on host "([^"]*)"$`, tctx.stepIFailSQLOnHost)