diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bdbac266..251d7cfd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,10 +12,10 @@ jobs: runs-on: ubuntu-latest steps: - - name: Set up Go 1.18 - uses: actions/setup-go@v2 + - name: Set up Go 1.19.6 + uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.19.6 id: go - name: Install benchstat @@ -82,10 +82,10 @@ jobs: runs-on: ubuntu-latest steps: - - name: Set up Go 1.18 - uses: actions/setup-go@v1 + - name: Set up Go 1.19.6 + uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.19.6 id: go - name: Check out code into the Go module directory diff --git a/cache/doc.go b/cache/doc.go index 3b176f27..25f1597e 100644 --- a/cache/doc.go +++ b/cache/doc.go @@ -3,7 +3,7 @@ Package cache provides a cache of model.Model elements that can be used in an OV The cache can be accessed using a simple API: - cache.Table("Open_vSwitch").Row("") + cache.Table("Open_vSwitch").Row("") It implements the ovsdb.NotificationHandler interface such that it can be populated automatically by diff --git a/client/api_test_model.go b/client/api_test_model.go index 36ea476e..05328678 100644 --- a/client/api_test_model.go +++ b/client/api_test_model.go @@ -128,7 +128,7 @@ func (*testLogicalSwitch) Table() string { return "Logical_Switch" } -//LogicalSwitchPort struct defines an object in Logical_Switch_Port table +// LogicalSwitchPort struct defines an object in Logical_Switch_Port table type testLogicalSwitchPort struct { UUID string `ovsdb:"_uuid"` Up *bool `ovsdb:"up"` diff --git a/client/client.go b/client/client.go index 10ea757e..5e92b8ff 100644 --- a/client/client.go +++ b/client/client.go @@ -21,6 +21,7 @@ import ( "github.com/go-logr/logr" "github.com/go-logr/stdr" "github.com/ovn-org/libovsdb/cache" + syscall "github.com/ovn-org/libovsdb/internal" "github.com/ovn-org/libovsdb/mapper" "github.com/ovn-org/libovsdb/model" "github.com/ovn-org/libovsdb/ovsdb" @@ -87,6 +88,7 @@ type ovsdbClient struct { metrics metrics connected bool rpcClient *rpc2.Client + conn net.Conn rpcMutex sync.RWMutex // endpoints contains all possible endpoints; the first element is // the active endpoint if connected=true @@ -351,7 +353,10 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro return "", fmt.Errorf("failed to open connection: %w", err) } - o.createRPC2Client(c) + err = o.createRPC2Client(c) + if err != nil { + return "", err + } serverDBNames, err := o.listDbs(ctx) if err != nil { @@ -422,11 +427,24 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro // createRPC2Client creates an rpcClient using the provided connection // It is also responsible for setting up go routines for client-side event handling // Should only be called when the mutex is held -func (o *ovsdbClient) createRPC2Client(conn net.Conn) { +func (o *ovsdbClient) createRPC2Client(conn net.Conn) error { o.stopCh = make(chan struct{}) if o.options.inactivityTimeout > 0 { o.trafficSeen = make(chan struct{}) } + o.conn = conn + // set TCP_USER_TIMEOUT socket option for connection so that + // channel write doesn't block indefinitely on network disconnect. + var userTimeout time.Duration + if o.options.timeout > 0 { + userTimeout = o.options.timeout * 3 + } else { + userTimeout = defaultTimeout + } + err := syscall.SetTCPUserTimeout(conn, userTimeout) + if err != nil { + return err + } o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn)) o.rpcClient.SetBlocking(true) o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error { @@ -442,6 +460,7 @@ func (o *ovsdbClient) createRPC2Client(conn net.Conn) { return o.update3(args, reply) }) go o.rpcClient.Run() + return nil } // isEndpointLeader returns true if the currently connected endpoint is leader, @@ -748,7 +767,7 @@ func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) er func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.DatabaseSchema, error) { args := ovsdb.NewGetSchemaArgs(dbName) var reply ovsdb.DatabaseSchema - err := o.rpcClient.CallWithContext(ctx, "get_schema", args, &reply) + err := o.CallWithContext(ctx, "get_schema", args, &reply) if err != nil { if err == rpc2.ErrShutdown { return ovsdb.DatabaseSchema{}, ErrNotConnected @@ -763,7 +782,7 @@ func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.Datab // Should only be called when mutex is held func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) { var dbs []string - err := o.rpcClient.CallWithContext(ctx, "list_dbs", nil, &dbs) + err := o.CallWithContext(ctx, "list_dbs", nil, &dbs) if err != nil { if err == rpc2.ErrShutdown { return nil, ErrNotConnected @@ -836,7 +855,7 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite b if dbgLogger.Enabled() { dbgLogger.Info("transacting operations", "operations", fmt.Sprintf("%+v", operation)) } - err := o.rpcClient.CallWithContext(ctx, "transact", args, &reply) + err := o.CallWithContext(ctx, "transact", args, &reply) if err != nil { if err == rpc2.ErrShutdown { return nil, ErrNotConnected @@ -869,7 +888,7 @@ func (o *ovsdbClient) MonitorCancel(ctx context.Context, cookie MonitorCookie) e if o.rpcClient == nil { return ErrNotConnected } - err := o.rpcClient.CallWithContext(ctx, "monitor_cancel", args, &reply) + err := o.CallWithContext(ctx, "monitor_cancel", args, &reply) if err != nil { if err == rpc2.ErrShutdown { return ErrNotConnected @@ -981,15 +1000,15 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne switch monitor.Method { case ovsdb.MonitorRPC: var reply ovsdb.TableUpdates - err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply) + err = o.CallWithContext(ctx, monitor.Method, args, &reply) tableUpdates = reply case ovsdb.ConditionalMonitorRPC: var reply ovsdb.TableUpdates2 - err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply) + err = o.CallWithContext(ctx, monitor.Method, args, &reply) tableUpdates = reply case ovsdb.ConditionalMonitorSinceRPC: var reply ovsdb.MonitorCondSinceReply - err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply) + err = o.CallWithContext(ctx, monitor.Method, args, &reply) if err == nil && reply.Found { monitor.LastTransactionID = reply.LastTransactionID lastTransactionFound = true @@ -1080,7 +1099,7 @@ func (o *ovsdbClient) Echo(ctx context.Context) error { if o.rpcClient == nil { return ErrNotConnected } - err := o.rpcClient.CallWithContext(ctx, "echo", args, &reply) + err := o.CallWithContext(ctx, "echo", args, &reply) if err != nil { if err == rpc2.ErrShutdown { return ErrNotConnected @@ -1197,72 +1216,33 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) { } } -func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call { - o.rpcMutex.RLock() - defer o.rpcMutex.RUnlock() - if o.rpcClient == nil { - return nil - } - return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1)) -} - func (o *ovsdbClient) handleInactivityProbes() { defer o.handlerShutdown.Done() - echoReplied := make(chan string) - var lastEcho string stopCh := o.stopCh trafficSeen := o.trafficSeen + timer := time.NewTimer(o.options.inactivityTimeout) for { select { case <-stopCh: return case <-trafficSeen: // We got some traffic from the server, restart our timer - case ts := <-echoReplied: - // Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect() - if ts != lastEcho { - o.Disconnect() - return + if !timer.Stop() { + <-timer.C } - lastEcho = "" - case <-time.After(o.options.inactivityTimeout): - // If there's a lastEcho already, then we didn't get a server reply, disconnect - if lastEcho != "" { - o.Disconnect() - return - } - // Otherwise send an echo - thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro()) - args := []interface{}{"libovsdb echo", thisEcho} - var reply []interface{} - // Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go() - call := o.sendEcho(args, &reply) - if call == nil { - o.Disconnect() - return - } - lastEcho = thisEcho + case <-timer.C: + // Otherwise send an echo in a goroutine so that transactions don't block go func() { - // Wait for the echo reply - select { - case <-stopCh: - return - case <-call.Done: - if call.Error != nil { - // RPC timeout; disconnect - o.logger.V(3).Error(call.Error, "server echo reply error") - o.Disconnect() - } else if !reflect.DeepEqual(args, reply) { - o.logger.V(3).Info("warning: incorrect server echo reply", - "expected", args, "reply", reply) - o.Disconnect() - } else { - // Otherwise stuff thisEcho into the echoReplied channel - echoReplied <- thisEcho - } + ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout) + err := o.Echo(ctx) + if err != nil { + o.logger.V(3).Error(err, "server echo reply error") + o.Disconnect() } + cancel() }() } + timer.Reset(o.options.inactivityTimeout) } } @@ -1478,3 +1458,19 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con func (o *ovsdbClient) WhereCache(predicate interface{}) ConditionalAPI { return o.primaryDB().api.WhereCache(predicate) } + +// CallWithContext invokes the named function, waits for it to complete, and +// returns its error status, or an error from Context timeout. +func (o *ovsdbClient) CallWithContext(ctx context.Context, method string, args interface{}, reply interface{}) error { + // Set up read/write deadline for tcp connection before making + // a rpc request to the server. + if tcpConn, ok := o.conn.(*net.TCPConn); ok { + if o.options.timeout > 0 { + err := tcpConn.SetDeadline(time.Now().Add(o.options.timeout * 3)) + if err != nil { + return err + } + } + } + return o.rpcClient.CallWithContext(ctx, method, args, reply) +} diff --git a/client/doc.go b/client/doc.go index 90e409ee..2f1aabba 100644 --- a/client/doc.go +++ b/client/doc.go @@ -4,30 +4,29 @@ Package client connects to, monitors and interacts with OVSDB servers (RFC7047). This package uses structs, that contain the 'ovs' field tag to determine which field goes to which column in the database. We refer to pointers to this structs as Models. Example: - type MyLogicalSwitch struct { - UUID string `ovsdb:"_uuid"` // _uuid tag is mandatory - Name string `ovsdb:"name"` - Ports []string `ovsdb:"ports"` - Config map[string]string `ovsdb:"other_config"` - } + type MyLogicalSwitch struct { + UUID string `ovsdb:"_uuid"` // _uuid tag is mandatory + Name string `ovsdb:"name"` + Ports []string `ovsdb:"ports"` + Config map[string]string `ovsdb:"other_config"` + } Based on these Models a Database Model (see ClientDBModel type) is built to represent the entire OVSDB: - clientDBModel, _ := client.NewClientDBModel("OVN_Northbound", - map[string]client.Model{ - "Logical_Switch": &MyLogicalSwitch{}, - }) - + clientDBModel, _ := client.NewClientDBModel("OVN_Northbound", + map[string]client.Model{ + "Logical_Switch": &MyLogicalSwitch{}, + }) The ClientDBModel represents the entire Database (or the part of it we're interested in). Using it, the libovsdb.client package is able to properly encode and decode OVSDB messages and store them in Model instances. A client instance is created by simply specifying the connection information and the database model: - ovs, _ := client.Connect(context.Background(), clientDBModel) + ovs, _ := client.Connect(context.Background(), clientDBModel) -Main API +# Main API After creating a OvsdbClient using the Connect() function, we can use a number of CRUD-like to interact with the database: @@ -43,7 +42,7 @@ and passed to client.Transact(). Others, such as List() and Get(), interact with the client's internal cache and are able to return Model instances (or a list thereof) directly. -Conditions +# Conditions Some API functions (Create() and Get()), can be run directly. Others, require us to use a ConditionalAPI. The ConditionalAPI injects RFC7047 Conditions into ovsdb Operations as well as @@ -111,7 +110,7 @@ cache element, an operation will be created matching on the "_uuid" column. The quite large depending on the cache size and the provided function. Most likely there is a way to express the same condition using Where() or WhereAll() which will be more efficient. -Get +# Get Get() operation is a simple operation capable of retrieving one Model based on some of its schema indexes. E.g: @@ -119,7 +118,7 @@ Get() operation is a simple operation capable of retrieving one Model based on s err := ovs.Get(ls) fmt.Printf("Name of the switch is: &s", ls.Name) -List +# List List() searches the cache and populates a slice of Models. It can be used directly or using WhereCache() @@ -131,7 +130,7 @@ List() searches the cache and populates a slice of Models. It can be used direct return strings.HasPrefix(ls.Name, "ext_") }).List(lsList) -Create +# Create Create returns a list of operations to create the models provided. E.g: @@ -143,7 +142,7 @@ Update returns a list of operations to update the matching rows to match the val ls := &LogicalSwitch{ExternalIDs: map[string]string {"foo": "bar"}} ops, err := ovs.Where(...).Update(&ls, &ls.ExternalIDs} -Mutate +# Mutate Mutate returns a list of operations needed to mutate the matching rows as described by the list of Mutation objects. E.g: @@ -154,11 +153,10 @@ Mutate returns a list of operations needed to mutate the matching rows as descri Value: map[string]string{"foo":"bar"}, }) -Delete +# Delete Delete returns a list of operations needed to delete the matching rows. E.g: ops, err := ovs.Where(...).Delete() - */ package client diff --git a/client/options.go b/client/options.go index 81ccffe2..66a60b5f 100644 --- a/client/options.go +++ b/client/options.go @@ -2,6 +2,7 @@ package client import ( "crypto/tls" + "errors" "net/url" "time" @@ -14,6 +15,7 @@ const ( defaultTCPEndpoint = "tcp:127.0.0.1:6640" defaultSSLEndpoint = "ssl:127.0.0.1:6640" defaultUnixEndpoint = "unix:/var/run/openvswitch/ovsdb.sock" + defaultTimeout = 60 * time.Second ) type options struct { @@ -120,6 +122,9 @@ func WithReconnect(timeout time.Duration, backoff backoff.BackOff) Option { func WithInactivityCheck(inactivityTimeout, reconnectTimeout time.Duration, reconnectBackoff backoff.BackOff) Option { return func(o *options) error { + if reconnectTimeout >= inactivityTimeout { + return errors.New("inactivity timeout value should be greater than reconnect timeout value") + } o.reconnect = true o.timeout = reconnectTimeout o.backoff = reconnectBackoff diff --git a/cmd/modelgen/main.go b/cmd/modelgen/main.go index 59883379..7c9733e3 100644 --- a/cmd/modelgen/main.go +++ b/cmd/modelgen/main.go @@ -4,7 +4,7 @@ import ( "encoding/json" "flag" "fmt" - "io/ioutil" + "io" "log" "os" "path/filepath" @@ -56,7 +56,7 @@ func main() { } defer schemaFile.Close() - schemaBytes, err := ioutil.ReadAll(schemaFile) + schemaBytes, err := io.ReadAll(schemaFile) if err != nil { log.Fatal(err) } diff --git a/cmd/print_schema/print_schema.go b/cmd/print_schema/print_schema.go index e1b5fe06..2cbdd090 100644 --- a/cmd/print_schema/print_schema.go +++ b/cmd/print_schema/print_schema.go @@ -4,7 +4,7 @@ import ( "encoding/json" "flag" "fmt" - "io/ioutil" + "io" "log" "os" "runtime" @@ -53,7 +53,7 @@ func main() { } defer schemaFile.Close() - schemaBytes, err := ioutil.ReadAll(schemaFile) + schemaBytes, err := io.ReadAll(schemaFile) if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 1fd49840..a9c12881 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ovn-org/libovsdb -go 1.18 +go 1.19 require ( github.com/cenkalti/backoff/v4 v4.1.3 diff --git a/internal/syscall_linux.go b/internal/syscall_linux.go new file mode 100644 index 00000000..5138f22d --- /dev/null +++ b/internal/syscall_linux.go @@ -0,0 +1,31 @@ +package internal + +import ( + "fmt" + "net" + "syscall" + "time" + + "golang.org/x/sys/unix" +) + +// SetTCPUserTimeout sets the TCP user timeout on a connection's socket +func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error { + tcpconn, ok := conn.(*net.TCPConn) + if !ok { + // not a TCP connection. exit early + return nil + } + rawConn, err := tcpconn.SyscallConn() + if err != nil { + return fmt.Errorf("error getting raw connection: %v", err) + } + err = rawConn.Control(func(fd uintptr) { + err = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, int(timeout/time.Millisecond)) + }) + if err != nil { + return fmt.Errorf("error setting option on socket: %v", err) + } + + return nil +} diff --git a/internal/syscall_nonlinux.go b/internal/syscall_nonlinux.go new file mode 100644 index 00000000..6e6a26d6 --- /dev/null +++ b/internal/syscall_nonlinux.go @@ -0,0 +1,14 @@ +//go:build !linux +// +build !linux + +package internal + +import ( + "net" + "time" +) + +// SetTCPUserTimeout is a no-op function under non-linux environments. +func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error { + return nil +} diff --git a/mapper/mapper.go b/mapper/mapper.go index 5ca7a412..56a0fa2b 100644 --- a/mapper/mapper.go +++ b/mapper/mapper.go @@ -12,12 +12,14 @@ import ( // to what column in the database id through field a field tag. // The tag used is "ovsdb" and has the following structure // 'ovsdb:"${COLUMN_NAME}"' +// // where COLUMN_NAME is the name of the column and must match the schema // -//Example: -// type MyObj struct { -// Name string `ovsdb:"name"` -// } +// Example: +// +// type MyObj struct { +// Name string `ovsdb:"name"` +// } type Mapper struct { Schema ovsdb.DatabaseSchema } diff --git a/model/model.go b/model/model.go index c8575f5b..618e28e7 100644 --- a/model/model.go +++ b/model/model.go @@ -16,12 +16,13 @@ import ( // The struct may also have non-tagged fields (which will be ignored by the API calls) // The Model interface must be implemented by the pointer to such type // Example: -//type MyLogicalRouter struct { -// UUID string `ovsdb:"_uuid"` -// Name string `ovsdb:"name"` -// ExternalIDs map[string]string `ovsdb:"external_ids"` -// LoadBalancers []string `ovsdb:"load_balancer"` -//} +// +// type MyLogicalRouter struct { +// UUID string `ovsdb:"_uuid"` +// Name string `ovsdb:"name"` +// ExternalIDs map[string]string `ovsdb:"external_ids"` +// LoadBalancers []string `ovsdb:"load_balancer"` +// } type Model interface{} type CloneableModel interface { diff --git a/modelgen/doc.go b/modelgen/doc.go index 26dd8686..b2a32f53 100644 --- a/modelgen/doc.go +++ b/modelgen/doc.go @@ -2,6 +2,5 @@ Package modelgen provides core functionality to implement Model code generators based on a schema. It allows to create and customize a text/template that can generate the Model types that libovsdb can work with. - */ package modelgen diff --git a/modelgen/generator.go b/modelgen/generator.go index daa5c861..c10d3ce7 100644 --- a/modelgen/generator.go +++ b/modelgen/generator.go @@ -4,8 +4,8 @@ import ( "bytes" "fmt" "go/format" - "io/ioutil" "log" + "os" "text/template" ) @@ -47,11 +47,11 @@ func (g *generator) Generate(filename string, tmpl *template.Template, args inte fmt.Print("\n") return nil } - content, err := ioutil.ReadFile(filename) + content, err := os.ReadFile(filename) if err == nil && bytes.Equal(content, src) { return nil } - return ioutil.WriteFile(filename, src, 0644) + return os.WriteFile(filename, src, 0644) } // NewGenerator returns a new Generator diff --git a/ovsdb/schema.go b/ovsdb/schema.go index cf80aa50..dca119e8 100644 --- a/ovsdb/schema.go +++ b/ovsdb/schema.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "math" "os" "strings" @@ -48,7 +47,7 @@ func (schema DatabaseSchema) Print(w io.Writer) { // SchemaFromFile returns a DatabaseSchema from a file func SchemaFromFile(f *os.File) (DatabaseSchema, error) { - data, err := ioutil.ReadAll(f) + data, err := io.ReadAll(f) if err != nil { return DatabaseSchema{}, err } @@ -124,7 +123,7 @@ of this library, we define an ExtendedType that includes all possible column typ atomic fields). */ -//ExtendedType includes atomic types as defined in the RFC plus Enum, Map and Set +// ExtendedType includes atomic types as defined in the RFC plus Enum, Map and Set type ExtendedType = string // RefType is used to define the possible RefTypes