From f44140bb2bff1a99a3db8bc16951ad6dc4736432 Mon Sep 17 00:00:00 2001 From: Zohaib Date: Mon, 10 Oct 2022 18:15:31 -0700 Subject: [PATCH] Adding lock for eliminating any race conditions between publish and replicate --- db/change_log.go | 6 ++++++ db/sqlite.go | 2 ++ 2 files changed, 8 insertions(+) diff --git a/db/change_log.go b/db/change_log.go index b430a63..32e179f 100644 --- a/db/change_log.go +++ b/db/change_log.go @@ -130,6 +130,9 @@ func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error) } func (conn *SqliteStreamDB) consumeReplicationEvent(event *ChangeLogEvent) error { + conn.dbLock.Lock() + defer conn.dbLock.Unlock() + return conn.WithTx(func(tnx *goqu.TxDatabase) error { primaryKeyMap := conn.getPrimaryKeyMap(event) if primaryKeyMap == nil { @@ -245,6 +248,9 @@ func (conn *SqliteStreamDB) publishChangeLog() { } defer conn.publishLock.Unlock() + conn.dbLock.Lock() + defer conn.dbLock.Unlock() + changes, err := conn.getGlobalChanges(ScanLimit) if err != nil { log.Error().Err(err).Msg("Unable to scan global changes") diff --git a/db/sqlite.go b/db/sqlite.go index d7eb371..593d38f 100644 --- a/db/sqlite.go +++ b/db/sqlite.go @@ -25,6 +25,7 @@ type SqliteStreamDB struct { rawConnection *sqlite3.SQLiteConn watcher *fsnotify.Watcher publishLock *sync.Mutex + dbLock *sync.Mutex dbPath string prefix string @@ -68,6 +69,7 @@ func OpenStreamDB(path string) (*SqliteStreamDB, error) { watcher: nil, dbPath: path, prefix: MarmotPrefix, + dbLock: &sync.Mutex{}, publishLock: &sync.Mutex{}, watchTablesSchema: map[string][]*ColumnInfo{}, }