From fa2045438298d2256394db2108fb80e2d2e4af76 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 18 Jan 2022 23:16:49 -0600 Subject: [PATCH] client: honor last transaction IDs Signed-off-by: Dan Williams --- cache/cache.go | 37 +++++++++++++++++++++ client/client.go | 83 ++++++++++++++++++++++++++++++++++++++---------- util/errors.go | 20 ++++++++++++ 3 files changed, 123 insertions(+), 17 deletions(-) create mode 100644 util/errors.go diff --git a/cache/cache.go b/cache/cache.go index 45a9c158..01223697 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -17,6 +17,7 @@ import ( "github.com/ovn-org/libovsdb/mapper" "github.com/ovn-org/libovsdb/model" "github.com/ovn-org/libovsdb/ovsdb" + "github.com/ovn-org/libovsdb/util" ) const ( @@ -703,6 +704,42 @@ func (t *TableCache) Purge(dbModel model.DatabaseModel) { } } +// PurgeTable drops all data in the given table's cache and reinitializes it using the +// provided database model +func (t *TableCache) PurgeTable(dbModel model.DatabaseModel, name string) error { + return t.PurgeTableRows(dbModel, name, nil) +} + +// PurgeTableRows drops all rows in the given table's cache that match the given conditions +func (t *TableCache) PurgeTableRows(dbModel model.DatabaseModel, name string, conditions []ovsdb.Condition) error { + t.mutex.Lock() + defer t.mutex.Unlock() + t.dbModel = dbModel + tableTypes := t.dbModel.Types() + dataType, ok := tableTypes[name] + if !ok { + return fmt.Errorf("table %s not found", name) + } + if len(conditions) == 0 { + t.cache[name] = newRowCache(name, t.dbModel, dataType) + return nil + } + + r := t.cache[name] + rows, err := r.RowsByCondition(conditions) + if err != nil { + return err + } + delErrors := []error{} + for uuid := range rows { + if err := r.Delete(uuid); err != nil { + delErrors = append(delErrors, fmt.Errorf("failed to delete %s: %w", uuid, err)) + } + } + + return util.CombineErrors(delErrors, "failed to delete rows") +} + // AddEventHandler registers the supplied EventHandler to receive cache events func (t *TableCache) AddEventHandler(handler EventHandler) { t.eventProcessor.AddEventHandler(handler) diff --git a/client/client.go b/client/client.go index b2cc81dc..ad09e728 100644 --- a/client/client.go +++ b/client/client.go @@ -24,6 +24,7 @@ import ( "github.com/ovn-org/libovsdb/model" "github.com/ovn-org/libovsdb/ovsdb" "github.com/ovn-org/libovsdb/ovsdb/serverdb" + "github.com/ovn-org/libovsdb/util" ) // Constants defined for libovsdb @@ -199,6 +200,65 @@ func (o *ovsdbClient) Connect(ctx context.Context) error { return nil } +func (db *database) purge(name string) error { + // If a table has any !Since monitors or has no conditions, purge it + // If a table only has Since monitors with conditions, purge only rows that match the conditions + type purge struct { + conditions []ovsdb.Condition + purgeAll bool + } + + purges := make(map[string]*purge) + for _, monitor := range db.monitors { + for _, tm := range monitor.Tables { + p, ok := purges[tm.Table] + if !ok { + p = &purge{} + purges[tm.Table] = p + } + if monitor.Method == ovsdb.ConditionalMonitorSinceRPC { + model, err := db.model.NewModel(tm.Table) + if err != nil { + p.purgeAll = true + continue + } + info, err := db.model.NewModelInfo(model) + if err != nil { + p.purgeAll = true + continue + } + ovsdbCond, err := db.model.Mapper.NewCondition(info, tm.Condition.Field, tm.Condition.Function, tm.Condition.Value) + if err != nil { + p.purgeAll = true + continue + } + p.conditions = append(p.conditions, *ovsdbCond) + } else { + p.purgeAll = true + } + } + } + if len(purges) == 0 { + db.cache.Purge(db.model) + return nil + } + + var purgeErrors []error + for name, p := range purges { + if p.purgeAll { + if err := db.cache.PurgeTable(db.model, name); err != nil { + purgeErrors = append(purgeErrors, err) + } + } else { + if err := db.cache.PurgeTableRows(db.model, name, p.conditions); err != nil { + purgeErrors = append(purgeErrors, err) + } + } + } + + return util.CombineErrors(purgeErrors, fmt.Sprintf("failed to purge database %s", name)) +} + func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { o.rpcMutex.Lock() defer o.rpcMutex.Unlock() @@ -226,15 +286,7 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { } if !connected { - if len(connectErrors) == 1 { - return connectErrors[0] - } - var combined []string - for _, e := range connectErrors { - combined = append(combined, e.Error()) - } - - return fmt.Errorf("unable to connect to any endpoints: %s", strings.Join(combined, ". ")) + return util.CombineErrors(connectErrors, "unable to connect to any endpoints") } // if we're reconnecting, re-start all the monitors @@ -243,6 +295,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error { for dbName, db := range o.databases { db.monitorsMutex.Lock() defer db.monitorsMutex.Unlock() + + if err := db.purge(dbName); err != nil { + o.logger.V(3).Error(err, "failed to purge") + } for id, request := range db.monitors { err := o.monitor(ctx, MonitorCookie{DatabaseName: dbName, ID: id}, true, request) if err != nil { @@ -349,8 +405,6 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error { return err } db.api = newAPI(db.cache, o.logger) - } else { - db.cache.Purge(db.model) } db.cacheMutex.Unlock() } @@ -832,12 +886,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne var args []interface{} if monitor.Method == ovsdb.ConditionalMonitorSinceRPC { - // FIXME: We should pass the monitor.LastTransactionID here - // But that would require delaying clearing the cache until - // after the monitors have been re-established - the logic - // would also need to be different for monitor and monitor_cond - // as we must always clear the cache in that instance - args = ovsdb.NewMonitorCondSinceArgs(dbName, cookie, requests, emptyUUID) + args = ovsdb.NewMonitorCondSinceArgs(dbName, cookie, requests, monitor.LastTransactionID) } else { args = ovsdb.NewMonitorArgs(dbName, cookie, requests) } diff --git a/util/errors.go b/util/errors.go new file mode 100644 index 00000000..a7d3a7ba --- /dev/null +++ b/util/errors.go @@ -0,0 +1,20 @@ +package util + +import ( + "fmt" + "strings" +) + +func CombineErrors(errors []error, msg string) error { + if len(errors) == 0 { + return nil + } else if len(errors) == 1 { + return errors[0] + } + + var combined []string + for _, e := range errors { + combined = append(combined, e.Error()) + } + return fmt.Errorf("%s: %s", msg, strings.Join(combined, ". ")) +}