Skip to content

Commit

Permalink
feat: Update SERIAL sequence after INSERT queries
Browse files Browse the repository at this point in the history
Sequence values are not updated when a row is inserted including the
sequenced value, often the ID.

Checks for sequenced columns on startup and then runs setval() after
each insert to manually update the value for that column.
  • Loading branch information
13rac1 committed Apr 18, 2020
1 parent f570d62 commit 781fc4c
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
5 changes: 5 additions & 0 deletions cmd/axon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func main() {
logger.WithError(err).Fatal("unable to load target DB primary keys")
}

err = loadSerialColumns(targetDBConn)
if err != nil {
logger.WithError(err).Fatal("unable to load target DB serial keys")
}

// create a notify listener and start from changeset id 1
listener := warppipe.NewNotifyListener(warppipe.StartFromID(0))
wp := warppipe.NewWarpPipe(listener)
Expand Down
74 changes: 74 additions & 0 deletions cmd/axon/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
// maps primary key columns by table
var primaryKeys = make(map[string][]string)

// maps serial key columns by table
var serialColumns = make(map[string]string)

func checkTargetVersion(conn *sqlx.DB) error {
var serverVersion string
err := conn.QueryRow("SHOW server_version;").Scan(&serverVersion)
Expand Down Expand Up @@ -73,3 +76,74 @@ func getPrimaryKeyForChange(change *warppipe.Changeset) ([]string, error) {
}
return col, nil
}

func loadSerialColumns(conn *sqlx.DB) error {
var rows []struct {
TableName string `db:"table_name"`
ColumnName string `db:"column_name"`
ColumnDefault string `db:"column_default"`
}
err := conn.Select(&rows, `
SELECT
table_name,
column_name,
column_default
FROM information_schema.columns
WHERE
table_schema = 'public'
AND
column_default LIKE 'nextval(%'`,
)
if err != nil {
return fmt.Errorf("loadSerialColumns: %w", err)
}

for _, r := range rows {
colDefault := strings.Split(r.ColumnDefault, "'")
sequenceName := colDefault[1]
if !strings.HasSuffix(sequenceName, "_seq") {
return fmt.Errorf("loadSerialColumns: invalid sequence name: %s", sequenceName)
}

serialColumns[r.TableName+"/"+r.ColumnName] = sequenceName
}
log.Printf("serial columns found: %v", serialColumns)
return nil
}

func getSerialColumn(table, column string) (string, bool) {
if sequenceName, ok := serialColumns[table+"/"+column]; ok {
return sequenceName, true
}
return "", false
}

func updateSerialColumnSequence(conn *sqlx.DB, table string, columns []*warppipe.ChangesetColumn) error {
// Why no transaction? From the manual: Important: Because sequences are
// non-transactional, changes made by setval are not undone if the transaction
// rolls back.
// ref: https://www.postgresql.org/docs/9.6/functions-sequence.html
for _, c := range columns {
sequenceName, ok := getSerialColumn(table, c.Column)
if !ok {
// Column does not have a SERIAL sequence
continue
}
var rows []struct {
SetVal string `db:"setval"`
}
err := conn.Select(&rows, `
SELECT
setval(
$1,
$2,
true
)
`, sequenceName, c.Value)
if err != nil {
return fmt.Errorf("updateSerialColumns: %w", err)
}
log.Printf("sequence set %s: %s", sequenceName, rows[0].SetVal)
}
return nil
}
16 changes: 15 additions & 1 deletion cmd/axon/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,25 @@ func insertRow(conn *sqlx.DB, schema string, change *warppipe.Changeset) error {
}
if pqe.Code.Name() == "unique_violation" {
// Ignore duplicates
log.Print("insert duplicate row skipped")
// TODO: Should they be updated instead?
log.Printf("duplicate row insert skipped %s:", change)
// Still update column sequence on duplicate row. Shouldn't be required,
// but might be in odd circumstances.
err = updateSerialColumnSequence(conn, change.Table, change.NewValues)
if err != nil {
return err
}

return nil
}
return fmt.Errorf("PG error %s:%s failed to insert %s for query %s: %+v", pqe.Code, pqe.Code.Name(), change, removeDuplicateSpaces(query), err)
}

err = updateSerialColumnSequence(conn, change.Table, change.NewValues)
if err != nil {
return err
}

log.Printf("row inserted: %s", change)
return nil
}
Expand Down

0 comments on commit 781fc4c

Please sign in to comment.