diff --git a/db/changelog.go b/db/changelog.go index 6ff1164..4fd68b5 100644 --- a/db/changelog.go +++ b/db/changelog.go @@ -259,6 +259,7 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang Type: changeRow.Type, TableName: tableName, Row: row, + tableInfo: conn.watchTablesSchema[tableName], }) if err != nil { diff --git a/db/sqlite.go b/db/sqlite.go index 09dab1e..064d65b 100644 --- a/db/sqlite.go +++ b/db/sqlite.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "hash/fnv" + "sort" "sync" "github.com/doug-martin/goqu/v9" @@ -28,6 +29,7 @@ type ChangeLogEvent struct { Type string TableName string Row map[string]any + tableInfo []*ColumnInfo `cbor:"-"` } type ColumnInfo struct { @@ -181,6 +183,28 @@ func (e *ChangeLogEvent) Hash() (uint64, error) { return 0, err } - // Hash primary keys (TODO) + pkColumns := make([]string, 0, len(e.tableInfo)) + for _, itm := range e.tableInfo { + if itm.IsPrimaryKey { + pkColumns = append(pkColumns, itm.Name) + } + } + + pkTuples := make([]any, len(pkColumns)) + sort.Strings(pkColumns) + for i, pk := range pkColumns { + pkTuples[i] = e.Row[pk] + } + + bts, err := cbor.Marshal(pkTuples) + if err != nil { + return 0, err + } + + _, err = hasher.Write(bts) + if err != nil { + return 0, err + } + return hasher.Sum64(), nil }