Skip to content

Commit

Permalink
Adding back PK to be part of Hash
Browse files Browse the repository at this point in the history
  • Loading branch information
maxpert committed Sep 7, 2022
1 parent 9dcacab commit 5aca1f7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
1 change: 1 addition & 0 deletions db/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (conn *SqliteStreamDB) consumeChangeLogs(tableName string, changes []*chang
Type: changeRow.Type,
TableName: tableName,
Row: row,
tableInfo: conn.watchTablesSchema[tableName],
})

if err != nil {
Expand Down
26 changes: 25 additions & 1 deletion db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"fmt"
"hash/fnv"
"sort"
"sync"

"github.com/doug-martin/goqu/v9"
Expand All @@ -28,6 +29,7 @@ type ChangeLogEvent struct {
Type string
TableName string
Row map[string]any
tableInfo []*ColumnInfo `cbor:"-"`
}

type ColumnInfo struct {
Expand Down Expand Up @@ -181,6 +183,28 @@ func (e *ChangeLogEvent) Hash() (uint64, error) {
return 0, err
}

// Hash primary keys (TODO)
pkColumns := make([]string, 0, len(e.tableInfo))
for _, itm := range e.tableInfo {
if itm.IsPrimaryKey {
pkColumns = append(pkColumns, itm.Name)
}
}

pkTuples := make([]any, len(pkColumns))
sort.Strings(pkColumns)
for i, pk := range pkColumns {
pkTuples[i] = e.Row[pk]
}

bts, err := cbor.Marshal(pkTuples)
if err != nil {
return 0, err
}

_, err = hasher.Write(bts)
if err != nil {
return 0, err
}

return hasher.Sum64(), nil
}

0 comments on commit 5aca1f7

Please sign in to comment.