diff --git a/internal/sql/pg/db.go b/internal/sql/pg/db.go index 2201881ca..8874b1956 100644 --- a/internal/sql/pg/db.go +++ b/internal/sql/pg/db.go @@ -650,13 +650,15 @@ func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.Resul // statements, plus we are also injecting the seq update query. var resChan chan []byte var res *sql.ResultSet + var seq int64 // in case the tx errors err := pgx.BeginTxFunc(ctx, db.pool.writer, pgx.TxOptions{ AccessMode: pgx.ReadWrite, IsoLevel: pgx.ReadCommitted, }, func(tx pgx.Tx) error { - seq, err := incrementSeq(ctx, tx) + var err error + seq, err = incrementSeq(ctx, tx) if err != nil { return err } @@ -670,6 +672,7 @@ func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.Resul }, ) if err != nil { + db.repl.abandonSeq(seq) return nil, err } db.discardCommitID(ctx, resChan) diff --git a/internal/sql/pg/replmon.go b/internal/sql/pg/replmon.go index 33450ebde..4309ab216 100644 --- a/internal/sql/pg/replmon.go +++ b/internal/sql/pg/replmon.go @@ -152,6 +152,12 @@ func (rm *replMon) recvID(seq int64, w io.Writer) (chan []byte, bool) { return c, true } +func (rm *replMon) abandonSeq(seq int64) { + delete(rm.promises, seq) + rm.changesetWriter.writer = nil + rm.changesetWriter.csChan = nil +} + func (rm *replMon) stop() { rm.quit() <-rm.done