Skip to content

Commit

Permalink
pg: autocommit fix with errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Aug 23, 2024
1 parent 8e97ef3 commit dce7e67
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
5 changes: 4 additions & 1 deletion internal/sql/pg/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,15 @@ func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.Resul
// statements, plus we are also injecting the seq update query.
var resChan chan []byte
var res *sql.ResultSet
var seq int64 // in case the tx errors
err := pgx.BeginTxFunc(ctx, db.pool.writer,
pgx.TxOptions{
AccessMode: pgx.ReadWrite,
IsoLevel: pgx.ReadCommitted,
},
func(tx pgx.Tx) error {
seq, err := incrementSeq(ctx, tx)
var err error
seq, err = incrementSeq(ctx, tx)
if err != nil {
return err
}
Expand All @@ -670,6 +672,7 @@ func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.Resul
},
)
if err != nil {
db.repl.abandonSeq(seq)
return nil, err
}
db.discardCommitID(ctx, resChan)
Expand Down
6 changes: 6 additions & 0 deletions internal/sql/pg/replmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func (rm *replMon) recvID(seq int64, w io.Writer) (chan []byte, bool) {
return c, true
}

func (rm *replMon) abandonSeq(seq int64) {
delete(rm.promises, seq)
rm.changesetWriter.writer = nil
// rm.changesetWriter.csChan = nil
}

func (rm *replMon) stop() {
rm.quit()
<-rm.done
Expand Down

0 comments on commit dce7e67

Please sign in to comment.