diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index 0c0c514ce..9a21b9ffe 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -170,6 +170,12 @@ func DialRouter(r *topology.Router) (*grpc.ClientConn, error) { return grpc.Dial(r.Address, grpc.WithInsecure()) //nolint:all } +type CoordinatorClient interface { + client.Client + + CancelMsg() *pgproto3.CancelRequest +} + type qdbCoordinator struct { coordinator.Coordinator db qdb.QDB @@ -856,13 +862,17 @@ func (qc *qdbCoordinator) UnregisterRouter(ctx context.Context, rID string) erro return qc.db.DeleteRouter(ctx, rID) } -func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (client.Client, error) { +func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (CoordinatorClient, error) { cl := psqlclient.NewPsqlClient(nconn) if err := cl.Init(nil); err != nil { return nil, err } + if cl.CancelMsg() != nil { + return cl, nil + } + spqrlog.Zero.Info(). Str("user", cl.Usr()). Str("db", cl.DB()). @@ -893,6 +903,11 @@ func (qc *qdbCoordinator) ProcClient(ctx context.Context, nconn net.Conn) error return err } + if cl.CancelMsg() != nil { + // TODO: cancel client here + return nil + } + ci := grpcConnectionIterator{qdbCoordinator: qc} cli := clientinteractor.NewPSQLInteractor(cl) for { diff --git a/docker/tests/bin/move.sh b/docker/tests/bin/move.sh index 6ed741130..ab664716b 100755 --- a/docker/tests/bin/move.sh +++ b/docker/tests/bin/move.sh @@ -39,21 +39,11 @@ psql "host=spqr_router_1_1 sslmode=disable user=user1 dbname=db1 port=6432" -c " exit 1 } -psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "LOCK KEY RANGE krid2;" || { - echo "ERROR: tests failed" - exit 1 -} - psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "MOVE KEY RANGE krid2 to sh1;" || { echo "ERROR: tests failed" exit 1 } -psql "host=spqr_coordinator sslmode=disable user=user1 dbname=db1 port=7002" -c "UNLOCK KEY RANGE krid2;" || { - echo "ERROR: tests failed" - exit 1 -} - out=$(psql "host=spqr_shard_1 sslmode=disable user=user1 dbname=db1 port=6432" -c "select * from xMove") test "$out" = " w_id | s ------+----- diff --git a/qdb/etcdqdb.go b/qdb/etcdqdb.go index 9d6293b40..b64a76fe6 100644 --- a/qdb/etcdqdb.go +++ b/qdb/etcdqdb.go @@ -364,7 +364,7 @@ func (q *EtcdQDB) LockKeyRange(ctx context.Context, id string) (*KeyRange, error } defer unlockMutex(mu, ctx) - resp, err := q.cli.Get(ctx, keyLockPath(keyRangeID)) + resp, err := q.cli.Get(ctx, keyLockPath(keyRangeNodePath(keyRangeID))) if err != nil { return nil, err } @@ -428,7 +428,7 @@ func (q *EtcdQDB) UnlockKeyRange(ctx context.Context, id string) error { } defer unlockMutex(mu, ctx) - resp, err := q.cli.Get(ctx, keyLockPath(keyRangeID)) + resp, err := q.cli.Get(ctx, keyLockPath(keyRangeNodePath(keyRangeID))) if err != nil { return err }