Skip to content

Commit

Permalink
Merge pull request #238 from dave-tucker/fix-server
Browse files Browse the repository at this point in the history
Fix The Server Impl so OVN-K CI can pass
  • Loading branch information
dave-tucker authored Oct 8, 2021
2 parents 016ead9 + ceaf12d commit cd95f2d
Show file tree
Hide file tree
Showing 14 changed files with 545 additions and 136 deletions.
11 changes: 7 additions & 4 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,15 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
}
t.eventProcessor.AddEvent(addEvent, table, nil, m)
case row.Modify != nil:
modified := tCache.Row(uuid)
if modified == nil {
existing := tCache.Row(uuid)
if existing == nil {
panic(fmt.Errorf("row with uuid %s does not exist", uuid))
}
modified := tCache.Row(uuid)
err := t.ApplyModifications(table, modified, *row.Modify)
if err != nil {
panic(err)
}
existing := tCache.Row(uuid)
if !reflect.DeepEqual(modified, existing) {
if err := tCache.Update(uuid, modified, false); err != nil {
panic(err)
Expand Down Expand Up @@ -720,7 +720,7 @@ func (e *eventProcessor) AddEvent(eventType string, table string, old model.Mode
// noop
return
default:
log.Print("dropping event because event buffer is full")
log.Print("libovsdb: dropping event because event buffer is full")
}
}

Expand Down Expand Up @@ -871,6 +871,9 @@ func (t *TableCache) ApplyModifications(tableName string, base model.Model, upda
bv.SetMapIndex(mk, mv)
}
}
if len(bv.MapKeys()) == 0 {
bv = reflect.Zero(nv.Type())
}
err = info.SetField(k, bv.Interface())
if err != nil {
return err
Expand Down
13 changes: 9 additions & 4 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,12 @@ func TestTableCacheApplyModifications(t *testing.T) {
&testDBModel{Value: "foo"},
&testDBModel{Value: "bar"},
},

{
"noop",
ovsdb.Row{"value": "bar"},
&testDBModel{Value: "bar"},
&testDBModel{Value: "bar"},
},
{
"add to set",
ovsdb.Row{"set": aFooSet},
Expand Down Expand Up @@ -1236,7 +1241,7 @@ func TestTableCacheApplyModifications(t *testing.T) {
"delete map key",
ovsdb.Row{"map": aFooMap},
&testDBModel{Map: map[string]string{"foo": "bar"}},
&testDBModel{Map: map[string]string{}},
&testDBModel{Map: nil},
},
{
"multiple map operations",
Expand Down Expand Up @@ -1276,8 +1281,8 @@ func TestTableCacheApplyModifications(t *testing.T) {
err = tc.ApplyModifications("Open_vSwitch", original, tt.update)
require.NoError(t, err)
require.Equal(t, tt.expected, original)
if reflect.DeepEqual(original, tt.base) {
t.Error("original and base are equal")
if !reflect.DeepEqual(tt.expected, tt.base) {
require.NotEqual(t, tt.base, original)
}
})
}
Expand Down
33 changes: 20 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,10 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {

o.createRPC2Client(c)

// from now on, if err is nil, always tear down the RPC session
defer func() {
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
}
}()

serverDBNames, err := o.listDbs(ctx)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
return err
}

Expand All @@ -271,12 +265,16 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
}
if !found {
err = fmt.Errorf("target database %s not found", dbName)
o.rpcClient.Close()
o.rpcClient = nil
return err
}

// load and validate the schema
schema, err := o.getSchema(ctx, dbName)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
return err
}

Expand All @@ -288,6 +286,8 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
}
err = fmt.Errorf("database %s validation error (%d): %s", dbName, len(errors),
strings.Join(combined, ". "))
o.rpcClient.Close()
o.rpcClient = nil
return err
}

Expand All @@ -300,6 +300,8 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
db.cache, err = cache.NewTableCache(schema, db.model, nil)
if err != nil {
db.cacheMutex.Unlock()
o.rpcClient.Close()
o.rpcClient = nil
return err
}
db.api = newAPI(db.cache)
Expand All @@ -314,10 +316,14 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) error {
var leader bool
leader, err = o.isEndpointLeader(ctx)
if err != nil {
o.rpcClient.Close()
o.rpcClient = nil
return err
}
if !leader {
err = fmt.Errorf("endpoint is not leader")
o.rpcClient.Close()
o.rpcClient = nil
return err
}
}
Expand Down Expand Up @@ -610,7 +616,6 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, operation ...
}

args := ovsdb.NewTransactArgs(dbName, operation...)

if o.rpcClient == nil {
return nil, ErrNotConnected
}
Expand Down Expand Up @@ -761,20 +766,22 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
}

if !reconnecting {
db := o.databases[dbName]
db.monitorsMutex.Lock()
db.monitors[cookie.ID] = monitor
db.monitorsMutex.Unlock()
}

if monitor.Method == ovsdb.MonitorRPC {
u := tableUpdates.(ovsdb.TableUpdates)
o.databases[dbName].cache.Populate(u)
db.cacheMutex.Lock()
defer db.cacheMutex.Unlock()
db.cache.Update(nil, u)
} else {
u := tableUpdates.(ovsdb.TableUpdates2)
o.databases[dbName].cache.Populate2(u)
db.cacheMutex.Lock()
defer db.cacheMutex.Unlock()
db.cache.Update2(nil, u)
}

return nil
}

Expand Down
1 change: 0 additions & 1 deletion ovsdb/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ func NativeToOvsAtomic(basicType string, nativeElem interface{}) (interface{}, e
// NativeToOvs transforms an native type to a ovs type based on the column type information
func NativeToOvs(column *ColumnSchema, rawElem interface{}) (interface{}, error) {
naType := NativeType(column)

if t := reflect.TypeOf(rawElem); t != naType {
return nil, NewErrWrongType("NativeToOvs", naType.String(), rawElem)
}
Expand Down
26 changes: 25 additions & 1 deletion ovsdb/updates2.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,31 @@ func (r *RowUpdate2) Merge(new *RowUpdate2) {
return
}
if r.Modify != nil && new.Modify != nil {
r.Modify = new.Modify
currentRowData := *r.Modify
newRowData := *new.Modify
for k, v := range newRowData {
if _, ok := currentRowData[k]; !ok {
currentRowData[k] = v
} else {
switch v.(type) {
case OvsSet:
oSet := currentRowData[k].(OvsSet)
newSet := v.(OvsSet)
oSet.GoSet = append(oSet.GoSet, newSet.GoSet...)
case OvsMap:
oMap := currentRowData[k].(OvsMap)
newMap := v.(OvsMap)
for newK, newV := range newMap.GoMap {
if _, ok := oMap.GoMap[newK]; !ok {
oMap.GoMap[newK] = newV
}
}
default:
panic("ARGH!")
}
}
}
r.Modify = &currentRowData
return
}
if r.Modify != nil && new.Delete != nil {
Expand Down
47 changes: 47 additions & 0 deletions ovsdb/updates2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ovsdb

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAddRowUpdate2Merge(t *testing.T) {
tests := []struct {
name string
initial *RowUpdate2
new *RowUpdate2
expected *RowUpdate2
}{
{
"insert then modify",
&RowUpdate2{Insert: &Row{"foo": "bar"}},
&RowUpdate2{Modify: &Row{"foo": "baz"}},
&RowUpdate2{Insert: &Row{"foo": "baz"}},
},
{
"insert then delete",
&RowUpdate2{Insert: &Row{"foo": "bar"}},
&RowUpdate2{Delete: &Row{"foo": "bar"}},
&RowUpdate2{Delete: &Row{"foo": "bar"}},
},
{
"modify then delete",
&RowUpdate2{Modify: &Row{"foo": "baz"}},
&RowUpdate2{Delete: &Row{"foo": "baz"}},
&RowUpdate2{Delete: &Row{"foo": "baz"}},
},
{
"modify then modify",
&RowUpdate2{Modify: &Row{"foo": "baz"}},
&RowUpdate2{Modify: &Row{"bar": "quux"}},
&RowUpdate2{Modify: &Row{"foo": "baz", "bar": "quux"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.initial.Merge(tt.new)
assert.Equal(t, tt.expected, tt.initial)
})
}
}
Loading

0 comments on commit cd95f2d

Please sign in to comment.