Skip to content

Commit

Permalink
Atomic change log published marking
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Oct 10, 2022
1 parent e319328 commit 4a61805
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,33 @@ func (conn *SqliteStreamDB) publishChangeLog() {
log.Error().Err(err).Msg("Unable to consume changes")
}

_, err = conn.Delete(conn.globalMetaTable()).
Where(goqu.C("id").Eq(change.Id)).
Prepared(true).
Executor().
Exec()
err = conn.WithTx(func(txDatabase *goqu.TxDatabase) error {
_, err = conn.Update(conn.metaTable(change.TableName, changeLogName)).
Set(goqu.Record{"state": Published}).
Where(goqu.Ex{"id": change.ChangeTableId}).
Prepared(true).
Executor().
Exec()

if err != nil {
return err
}

_, err = conn.Delete(conn.globalMetaTable()).
Where(goqu.C("id").Eq(change.Id)).
Prepared(true).
Executor().
Exec()

if err != nil {
return err
}

return nil
})

if err != nil {
log.Error().Err(err).Msg("Unable to cleanup global change log")
log.Error().Err(err).Msg("Unable to cleanup change log")
}
}
}
Expand Down Expand Up @@ -348,20 +367,6 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang
return err
}
}

_, err = conn.Update(conn.metaTable(tableName, changeLogName)).
Set(goqu.Record{"state": Published}).
Where(goqu.Ex{"id": changeRow.Id}).
Prepared(true).
Executor().
Exec()

if err != nil {
logger.Error().Err(err).Msg("Unable to cleanup change set row")
return err
}

logger.Debug().Msg("Changes notified...")
}

return nil
Expand Down

0 comments on commit 4a61805

Please sign in to comment.