Skip to content

Commit

Permalink
server: implement referential integrity
Browse files Browse the repository at this point in the history
Implement referential integrity and garbage collection as described in
RFC7047.

This is achieved keeping track of inverse references. A reference
tracker is used in the transaction to keep these references updated, to
check for referential integrity related violations and to add additional
updates to the transaction resulting from reference garbage collection.

The inverse references are stored in the database on commit.

The updates resulting from reference garbage collection are also sent to
any monitoring clients as expected.

Signed-off-by: Jaime Caamaño Ruiz <[email protected]>
  • Loading branch information
jcaamano committed Jan 9, 2024
1 parent 216837a commit 6dc2e3c
Show file tree
Hide file tree
Showing 9 changed files with 3,011 additions and 13 deletions.
2 changes: 2 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Database interface {
CheckIndexes(database string, table string, m model.Model) error
List(database, table string, conditions ...ovsdb.Condition) (map[string]model.Model, error)
Get(database, table string, uuid string) (model.Model, error)
GetReferences(database, table, row string) (References, error)
}

// Transaction abstracts a database transaction that can generate database
Expand All @@ -28,4 +29,5 @@ type Update interface {
GetUpdatedTables() []string
ForEachModelUpdate(table string, do func(uuid string, old, new model.Model) error) error
ForEachRowUpdate(table string, do func(uuid string, row ovsdb.RowUpdate2) error) error
ForReferenceUpdates(do func(references References) error) error
}
22 changes: 21 additions & 1 deletion database/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
type inMemoryDatabase struct {
databases map[string]*cache.TableCache
models map[string]model.ClientDBModel
references map[string]dbase.References
logger *logr.Logger
mutex sync.RWMutex
}
Expand All @@ -28,6 +29,7 @@ func NewDatabase(models map[string]model.ClientDBModel) dbase.Database {
return &inMemoryDatabase{
databases: make(map[string]*cache.TableCache),
models: models,
references: make(map[string]dbase.References),
mutex: sync.RWMutex{},
logger: &logger,
}
Expand Down Expand Up @@ -61,6 +63,7 @@ func (db *inMemoryDatabase) CreateDatabase(name string, schema ovsdb.DatabaseSch
return err
}
db.databases[name] = database
db.references[name] = make(dbase.References)
return nil
}

Expand All @@ -79,7 +82,15 @@ func (db *inMemoryDatabase) Commit(database string, id uuid.UUID, update dbase.U
targetDb := db.databases[database]
db.mutex.RUnlock()

return targetDb.ApplyCacheUpdate(update)
err := targetDb.ApplyCacheUpdate(update)
if err != nil {
return err
}

return update.ForReferenceUpdates(func(references dbase.References) error {
db.references[database].UpdateReferences(references)
return nil
})
}

func (db *inMemoryDatabase) CheckIndexes(database string, table string, m model.Model) error {
Expand Down Expand Up @@ -123,3 +134,12 @@ func (db *inMemoryDatabase) Get(database, table string, uuid string) (model.Mode
}
return targetTable.Row(uuid), nil
}

func (db *inMemoryDatabase) GetReferences(database, table, row string) (dbase.References, error) {
if !db.Exists(database) {
return nil, fmt.Errorf("db does not exist")
}
db.mutex.RLock()
defer db.mutex.RUnlock()
return db.references[database].GetReferences(table, row), nil
}
71 changes: 71 additions & 0 deletions database/references.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package database

// References tracks the references to rows from other rows at specific
// locations in the schema.
type References map[ReferenceSpec]Reference

// ReferenceSpec specifies details about where in the schema a reference occurs.
type ReferenceSpec struct {
// ToTable is the table of the row to which the reference is made
ToTable string

// FromTable is the table of the row from which the reference is made
FromTable string

// FromColumn is the column of the row from which the reference is made
FromColumn string

// FromValue flags if the reference is made on a map key or map value when
// the column is a map
FromValue bool
}

// Reference maps the UUIDs of rows to which the reference is made to the
// rows it is made from
type Reference map[string][]string

// GetReferences gets references to a row
func (rs References) GetReferences(table, uuid string) References {
refs := References{}
for spec, values := range rs {
if spec.ToTable != table {
continue
}
if _, ok := values[uuid]; ok {
refs[spec] = Reference{uuid: values[uuid]}
}
}
return refs
}

// UpdateReferences updates the references with the provided ones. Dangling
// references, that is, the references of rows that are no longer referenced
// from anywhere, are cleaned up.
func (rs References) UpdateReferences(other References) {
for spec, otherRefs := range other {
for to, from := range otherRefs {
rs.updateReference(spec, to, from)
}
}
}

// updateReference updates the references to a row at a specific location in the
// schema
func (rs References) updateReference(spec ReferenceSpec, to string, from []string) {
thisRefs, ok := rs[spec]
if !ok && len(from) > 0 {
// add references from a previously untracked location
rs[spec] = Reference{to: from}
return
}
if len(from) > 0 {
// replace references to this row at this specific location
thisRefs[to] = from
return
}
// otherwise remove previously tracked references
delete(thisRefs, to)
if len(thisRefs) == 0 {
delete(rs, spec)
}
}
61 changes: 55 additions & 6 deletions database/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func (t *Transaction) Transact(operations ...ovsdb.Operation) ([]*ovsdb.Operatio
if !t.Database.Exists(t.DbName) {
r := ovsdb.ResultFromError(fmt.Errorf("database does not exist"))
results[0] = &r
return results, nil
return results, updates.NewDatabaseUpdate(update, nil)
}

err := t.initializeCache()
if err != nil {
r := ovsdb.ResultFromError(err)
results[0] = &r
return results, nil
return results, updates.NewDatabaseUpdate(update, nil)
}

// Every Insert operation must have a UUID
Expand All @@ -70,7 +70,7 @@ func (t *Transaction) Transact(operations ...ovsdb.Operation) ([]*ovsdb.Operatio
if err != nil {
r := ovsdb.ResultFromError(err)
results[0] = &r
return results, nil
return results, updates.NewDatabaseUpdate(update, nil)
}

var r ovsdb.OperationResult
Expand Down Expand Up @@ -124,12 +124,29 @@ func (t *Transaction) Transact(operations ...ovsdb.Operation) ([]*ovsdb.Operatio

// if an operation failed, no need to do any further validation
if r.Error != "" {
return results, update
return results, updates.NewDatabaseUpdate(update, nil)
}

// if there is no updates, no need to do any further validation
if len(update.GetUpdatedTables()) == 0 {
return results, update
return results, updates.NewDatabaseUpdate(update, nil)
}

// check & update references
update, refUpdates, refs, err := updates.ProcessReferences(t.Model, t.Database, update)
if err != nil {
r = ovsdb.ResultFromError(err)
results = append(results, &r)
return results, updates.NewDatabaseUpdate(update, refs)
}

// apply updates resulting from referential integrity to the transaction
// caches so they are accounted for when checking index constraints
err = t.applyReferenceUpdates(refUpdates)
if err != nil {
r = ovsdb.ResultFromError(err)
results = append(results, &r)
return results, updates.NewDatabaseUpdate(update, refs)
}

// check index constraints
Expand All @@ -142,9 +159,41 @@ func (t *Transaction) Transact(operations ...ovsdb.Operation) ([]*ovsdb.Operatio
r := ovsdb.ResultFromError(err)
results = append(results, &r)
}

return results, updates.NewDatabaseUpdate(update, refs)
}

return results, update
return results, updates.NewDatabaseUpdate(update, refs)
}

func (t *Transaction) applyReferenceUpdates(update updates.ModelUpdates) error {
tables := update.GetUpdatedTables()
for _, table := range tables {
err := update.ForEachModelUpdate(table, func(uuid string, old, new model.Model) error {
// track deleted rows due to reference updates
if old != nil && new == nil {
t.DeletedRows[uuid] = struct{}{}
}
// warm the cache with updated and deleted rows due to reference
// updates
if old != nil && !t.Cache.Table(table).HasRow(uuid) {
row, err := t.Database.Get(t.DbName, table, uuid)
if err != nil {
return err
}
err = t.Cache.Table(table).Create(uuid, row, false)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
// apply reference updates to the cache
return t.Cache.ApplyCacheUpdate(update)
}

func (t *Transaction) initializeCache() error {
Expand Down
4 changes: 4 additions & 0 deletions ovsdb/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ type ReferentialIntegrityViolation struct {
operation *Operation
}

func NewReferentialIntegrityViolation(details string) *ReferentialIntegrityViolation {
return &ReferentialIntegrityViolation{details: details}
}

// Error implements the error interface
func (e *ReferentialIntegrityViolation) Error() string {
msg := referentialIntegrityViolation
Expand Down
4 changes: 2 additions & 2 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestMonitorFilter(t *testing.T) {
assert.NoError(t, err)
}
}
tu := monitor.filter2(update)
tu := monitor.filter2(updates.NewDatabaseUpdate(update, nil))
assert.Equal(t, tt.expected, tu)
})
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestMonitorFilter2(t *testing.T) {
assert.NoError(t, err)
}
}
tu := monitor.filter2(update)
tu := monitor.filter2(updates.NewDatabaseUpdate(update, nil))
assert.Equal(t, tt.expected, tu)
})
}
Expand Down
Loading

0 comments on commit 6dc2e3c

Please sign in to comment.