Skip to content

Commit

Permalink
Adding lock for eliminating any race conditions between publish and r…
Browse files Browse the repository at this point in the history
…eplicate
  • Loading branch information
maxpert committed Oct 11, 2022
1 parent 8c738f3 commit f44140b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
6 changes: 6 additions & 0 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error)
}

func (conn *SqliteStreamDB) consumeReplicationEvent(event *ChangeLogEvent) error {
conn.dbLock.Lock()
defer conn.dbLock.Unlock()

return conn.WithTx(func(tnx *goqu.TxDatabase) error {
primaryKeyMap := conn.getPrimaryKeyMap(event)
if primaryKeyMap == nil {
Expand Down Expand Up @@ -245,6 +248,9 @@ func (conn *SqliteStreamDB) publishChangeLog() {
}
defer conn.publishLock.Unlock()

conn.dbLock.Lock()
defer conn.dbLock.Unlock()

changes, err := conn.getGlobalChanges(ScanLimit)
if err != nil {
log.Error().Err(err).Msg("Unable to scan global changes")
Expand Down
2 changes: 2 additions & 0 deletions db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type SqliteStreamDB struct {
rawConnection *sqlite3.SQLiteConn
watcher *fsnotify.Watcher
publishLock *sync.Mutex
dbLock *sync.Mutex

dbPath string
prefix string
Expand Down Expand Up @@ -68,6 +69,7 @@ func OpenStreamDB(path string) (*SqliteStreamDB, error) {
watcher: nil,
dbPath: path,
prefix: MarmotPrefix,
dbLock: &sync.Mutex{},
publishLock: &sync.Mutex{},
watchTablesSchema: map[string][]*ColumnInfo{},
}
Expand Down

0 comments on commit f44140b

Please sign in to comment.