Skip to content

Commit

Permalink
Adding global change log for transactional ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Oct 2, 2022
1 parent 1794885 commit 917ae7b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 59 deletions.
107 changes: 62 additions & 45 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// ScanLimit is number of change log rows processed at a time, to limit memory usage
const ScanLimit = uint(100)
const ScanLimit = uint(128)

var ErrNoTableMapping = errors.New("no table mapping found")
var ErrLogNotReadyToPublish = errors.New("not ready to publish changes")
Expand Down Expand Up @@ -47,6 +47,12 @@ type triggerTemplateData struct {
Triggers map[string]string
}

type globalChangeLogEntry struct {
Id int64 `db:"id"`
ChangeTableId int64 `db:"change_table_id"`
TableName string `db:"table_name"`
}

type changeLogEntry struct {
Id int64 `db:"id"`
Type string `db:"type"`
Expand All @@ -66,29 +72,6 @@ func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error {
return nil
}

func (conn *SqliteStreamDB) DeleteChangeLog(event *ChangeLogEvent) (bool, error) {
metaTableName := conn.metaTable(event.TableName, changeLogName)
rs, err := conn.Delete(metaTableName).
Where(goqu.Ex{
"state": Published,
"id": event.Id,
}).
Prepared(true).
Executor().
Exec()

if err != nil {
return false, err
}

count, err := rs.RowsAffected()
if err != nil {
return false, err
}

return count > 0, nil
}

func (conn *SqliteStreamDB) CleanupChangeLogs(beforeTime time.Time) (int64, error) {
total := int64(0)
for name := range conn.watchTablesSchema {
Expand Down Expand Up @@ -121,6 +104,10 @@ func (conn *SqliteStreamDB) metaTable(tableName string, name string) string {
return conn.prefix + tableName + "_" + name
}

func (conn *SqliteStreamDB) globalMetaTable() string {
return conn.prefix + "_change_log_global"
}

func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error) {
columns, ok := conn.watchTablesSchema[tableName]
if !ok {
Expand Down Expand Up @@ -202,7 +189,7 @@ func (conn *SqliteStreamDB) watchChanges(path string) {

errShm := watcher.Add(shmPath)
errWal := watcher.Add(walPath)
changeLogTicker := time.NewTicker(time.Millisecond * 100)
changeLogTicker := time.NewTicker(time.Millisecond * 500)
updateTime := time.Now()

for {
Expand Down Expand Up @@ -233,33 +220,59 @@ func (conn *SqliteStreamDB) watchChanges(path string) {
}
}

func (conn *SqliteStreamDB) getGlobalChanges(limit uint) ([]globalChangeLogEntry, error) {
var entries []globalChangeLogEntry
err := conn.
From(conn.globalMetaTable()).
Limit(limit).
ScanStructs(&entries)

if err != nil {
return nil, err
}
return entries, nil
}

func (conn *SqliteStreamDB) publishChangeLog() {
conn.publishLock.Lock()
processed := uint64(0)

defer conn.publishLock.Unlock()

for tableName := range conn.watchTablesSchema {
var changes []*changeLogEntry
err := conn.WithTx(func(tx *goqu.TxDatabase) error {
return tx.Select("id", "type", "state").
From(conn.metaTable(tableName, changeLogName)).
Where(goqu.Ex{"state": Pending}).
Limit(ScanLimit).
Prepared(true).
ScanStructs(&changes)
})
changes, err := conn.getGlobalChanges(ScanLimit)
if err != nil {
log.Error().Err(err).Msg("Unable to scan global changes")
return
}

if len(changes) < 0 {
return
}

for _, change := range changes {
logEntries := changeLogEntry{}

found, err := conn.Select("id", "type", "state").
From(conn.metaTable(change.TableName, changeLogName)).
Where(
goqu.C("state").Eq(Pending),
goqu.C("id").Eq(change.ChangeTableId),
).
Prepared(true).
ScanStruct(&logEntries)

if err != nil {
log.Error().Err(err).Msg("Error scanning last row ID")
return
}

if len(changes) <= 0 {
if !found {
log.Panic().
Str("table", change.TableName).
Int64("id", change.ChangeTableId).
Msg("Global change log row not found in corresponding table")
return
}

err = conn.consumeChangeLogs(tableName, changes)
err = conn.consumeChangeLogs(change.TableName, []*changeLogEntry{&logEntries})
if err != nil {
if err == ErrLogNotReadyToPublish {
break
Expand All @@ -268,9 +281,14 @@ func (conn *SqliteStreamDB) publishChangeLog() {
log.Error().Err(err).Msg("Unable to consume changes")
}

processed += uint64(len(changes))
if uint(len(changes)) <= ScanLimit {
break
_, err = conn.Delete(conn.globalMetaTable()).
Where(goqu.C("id").Eq(change.Id)).
Prepared(true).
Executor().
Exec()

if err != nil {
log.Error().Err(err).Msg("Unable to cleanup global change log")
}
}
}
Expand Down Expand Up @@ -331,8 +349,7 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang
}
}

_, err = conn.
Update(conn.metaTable(tableName, changeLogName)).
_, err = conn.Update(conn.metaTable(tableName, changeLogName)).
Set(goqu.Record{"state": Published}).
Where(goqu.Ex{"id": changeRow.Id}).
Prepared(true).
Expand All @@ -344,7 +361,7 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang
return err
}

logger.Debug().Msg("Notified change...")
logger.Debug().Msg("Changes notified...")
}

return nil
Expand Down
39 changes: 26 additions & 13 deletions db/table_change_log_script.tmpl
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
{{$ChangeLogTableName := (printf "%s%s_change_log" .Prefix .TableName)}}
{{$GlobalChangeLogTableName := (printf "%s_change_log_global" .Prefix)}}

CREATE TABLE IF NOT EXISTS {{$GlobalChangeLogTableName}} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
change_table_id INTEGER,
table_name TEXT
);

CREATE TABLE IF NOT EXISTS {{$ChangeLogTableName}} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{{range $index, $col := .Columns}}
val_{{$col.Name}} {{$col.Type}} {{if $col.NotNull}} NOT NULL {{end}},
val_{{$col.Name}} {{$col.Type}},
{{end}}
type TEXT,
created_at INTEGER,
Expand All @@ -20,19 +27,25 @@ WHEN (SELECT COUNT(*) FROM pragma_function_list WHERE name='marmot_version') < 1
BEGIN

INSERT INTO {{$ChangeLogTableName}}(
{{range $col := $.Columns}}
val_{{$col.Name}},
{{end}}
type,
created_at,
state
{{range $col := $.Columns}}
val_{{$col.Name}},
{{end}}
type,
created_at,
state
) VALUES(
{{range $col := $.Columns}}
{{$read_target}}.{{$col.Name}},
{{end}}
'{{$trigger}}',
CAST((strftime('%s','now') || substr(strftime('%f','now'),4)) as INT),
0 -- Pending
{{range $col := $.Columns}}
{{$read_target}}.{{$col.Name}},
{{end}}
'{{$trigger}}',
CAST((strftime('%s','now') || substr(strftime('%f','now'),4)) as INT),
0 -- Pending
);

INSERT INTO {{$GlobalChangeLogTableName}} (change_table_id, table_name)
VALUES (
last_insert_rowid(),
'{{$.TableName}}'
);

END;
Expand Down
1 change: 0 additions & 1 deletion marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func onChangeEvent(streamDB *db.SqliteStreamDB) func(data []byte) error {
return err
}

_, _ = streamDB.DeleteChangeLog(ev.Payload)
return streamDB.Replicate(ev.Payload)
}
}
Expand Down

0 comments on commit 917ae7b

Please sign in to comment.