From 4a61805247b7079e00125c31061cab31b38bf807 Mon Sep 17 00:00:00 2001 From: Zohaib Date: Sun, 9 Oct 2022 17:01:45 -0700 Subject: [PATCH] Atomic change log published marking --- db/change_log.go | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/db/change_log.go b/db/change_log.go index a213164..afa3d36 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -281,14 +281,33 @@ func (conn *SqliteStreamDB) publishChangeLog() { log.Error().Err(err).Msg("Unable to consume changes") } - _, err = conn.Delete(conn.globalMetaTable()). - Where(goqu.C("id").Eq(change.Id)). - Prepared(true). - Executor(). - Exec() + err = conn.WithTx(func(txDatabase *goqu.TxDatabase) error { + _, err = conn.Update(conn.metaTable(change.TableName, changeLogName)). + Set(goqu.Record{"state": Published}). + Where(goqu.Ex{"id": change.ChangeTableId}). + Prepared(true). + Executor(). + Exec() + + if err != nil { + return err + } + + _, err = conn.Delete(conn.globalMetaTable()). + Where(goqu.C("id").Eq(change.Id)). + Prepared(true). + Executor(). + Exec() + + if err != nil { + return err + } + + return nil + }) if err != nil { - log.Error().Err(err).Msg("Unable to cleanup global change log") + log.Error().Err(err).Msg("Unable to cleanup change log") } } } @@ -348,20 +367,6 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang return err } } - - _, err = conn.Update(conn.metaTable(tableName, changeLogName)). - Set(goqu.Record{"state": Published}). - Where(goqu.Ex{"id": changeRow.Id}). - Prepared(true). - Executor(). - Exec() - - if err != nil { - logger.Error().Err(err).Msg("Unable to cleanup change set row") - return err - } - - logger.Debug().Msg("Changes notified...") } return nil