Skip to content

Commit

Permalink
Adding log table cleanup based on snapshotting
Browse files Browse the repository at this point in the history
This will allow configuring snashopt cleanup via log compaction.
  • Loading branch information
maxpert committed Sep 4, 2022
1 parent 96a7b54 commit 86385bd
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
31 changes: 24 additions & 7 deletions db/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ func init() {
)
}

func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error {
if err := conn.consumeReplicationEvent(event); err != nil {
return err
}
return nil
}

func (conn *SqliteStreamDB) CleanupChangeLogs() error {
for name := range conn.watchTablesSchema {
log.Debug().Str("table", name).Msg("Cleaning up change logs")
metaTableName := conn.metaTable(name, changeLogName)
_, err := conn.Delete(metaTableName).
Where(goqu.Ex{"state": Published}).
Prepared(true).
Executor().
Exec()

if err != nil {
return err
}
}
return nil
}

func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error) {
columns, ok := conn.watchTablesSchema[tableName]
if !ok {
Expand All @@ -74,13 +98,6 @@ func (conn *SqliteStreamDB) tableCDCScriptFor(tableName string) (string, error)
return spaceStripper.ReplaceAllString(buf.String(), "\n "), nil
}

func (conn *SqliteStreamDB) Replicate(event *ChangeLogEvent) error {
if err := conn.consumeReplicationEvent(event); err != nil {
return err
}
return nil
}

func (conn *SqliteStreamDB) consumeReplicationEvent(event *ChangeLogEvent) error {
return conn.WithTx(func(tnx *goqu.TxDatabase) error {
primaryKeyMap := conn.getPrimaryKeyMap(event)
Expand Down
5 changes: 5 additions & 0 deletions lib/replica_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (ssm *SQLiteStateMachine) Lookup(_ interface{}) (interface{}, error) {
}

func (ssm *SQLiteStateMachine) SaveSnapshot(_ io.Writer, _ sm.ISnapshotFileCollection, _ <-chan struct{}) error {
err := ssm.DB.CleanupChangeLogs()
if err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit 86385bd

Please sign in to comment.