Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Echo method to send inactivity probe #368

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.18
uses: actions/setup-go@v2
- name: Set up Go 1.19.6
uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.19.6
id: go

- name: Install benchstat
Expand Down Expand Up @@ -82,10 +82,10 @@ jobs:
runs-on: ubuntu-latest

steps:
- name: Set up Go 1.18
uses: actions/setup-go@v1
- name: Set up Go 1.19.6
uses: actions/setup-go@v3
with:
go-version: 1.18
go-version: 1.19.6
id: go

- name: Check out code into the Go module directory
Expand Down
2 changes: 1 addition & 1 deletion cache/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Package cache provides a cache of model.Model elements that can be used in an OV

The cache can be accessed using a simple API:

cache.Table("Open_vSwitch").Row("<ovs-uuid>")
cache.Table("Open_vSwitch").Row("<ovs-uuid>")

It implements the ovsdb.NotificationHandler interface
such that it can be populated automatically by
Expand Down
2 changes: 1 addition & 1 deletion client/api_test_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (*testLogicalSwitch) Table() string {
return "Logical_Switch"
}

//LogicalSwitchPort struct defines an object in Logical_Switch_Port table
// LogicalSwitchPort struct defines an object in Logical_Switch_Port table
type testLogicalSwitchPort struct {
UUID string `ovsdb:"_uuid"`
Up *bool `ovsdb:"up"`
Expand Down
118 changes: 57 additions & 61 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-logr/logr"
"github.com/go-logr/stdr"
"github.com/ovn-org/libovsdb/cache"
syscall "github.com/ovn-org/libovsdb/internal"
"github.com/ovn-org/libovsdb/mapper"
"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
Expand Down Expand Up @@ -87,6 +88,7 @@ type ovsdbClient struct {
metrics metrics
connected bool
rpcClient *rpc2.Client
conn net.Conn
rpcMutex sync.RWMutex
// endpoints contains all possible endpoints; the first element is
// the active endpoint if connected=true
Expand Down Expand Up @@ -351,7 +353,10 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro
return "", fmt.Errorf("failed to open connection: %w", err)
}

o.createRPC2Client(c)
err = o.createRPC2Client(c)
if err != nil {
return "", err
}

serverDBNames, err := o.listDbs(ctx)
if err != nil {
Expand Down Expand Up @@ -422,11 +427,24 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro
// createRPC2Client creates an rpcClient using the provided connection
// It is also responsible for setting up go routines for client-side event handling
// Should only be called when the mutex is held
func (o *ovsdbClient) createRPC2Client(conn net.Conn) {
func (o *ovsdbClient) createRPC2Client(conn net.Conn) error {
o.stopCh = make(chan struct{})
if o.options.inactivityTimeout > 0 {
o.trafficSeen = make(chan struct{})
}
o.conn = conn
// set TCP_USER_TIMEOUT socket option for connection so that
// channel write doesn't block indefinitely on network disconnect.
var userTimeout time.Duration
if o.options.timeout > 0 {
userTimeout = o.options.timeout * 3
Copy link
Collaborator

@jcaamano jcaamano Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, why * 3? Wouldn't it make sense to use inactivityTimeout for this as well?

} else {
userTimeout = defaultTimeout
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that to start with we shouldn't have a default, and just set it to inactivityTimeout when that is set.

}
err := syscall.SetTCPUserTimeout(conn, userTimeout)
if err != nil {
return err
}
o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
o.rpcClient.SetBlocking(true)
o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error {
Expand All @@ -442,6 +460,7 @@ func (o *ovsdbClient) createRPC2Client(conn net.Conn) {
return o.update3(args, reply)
})
go o.rpcClient.Run()
return nil
}

// isEndpointLeader returns true if the currently connected endpoint is leader,
Expand Down Expand Up @@ -748,7 +767,7 @@ func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) er
func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.DatabaseSchema, error) {
args := ovsdb.NewGetSchemaArgs(dbName)
var reply ovsdb.DatabaseSchema
err := o.rpcClient.CallWithContext(ctx, "get_schema", args, &reply)
err := o.CallWithContext(ctx, "get_schema", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ovsdb.DatabaseSchema{}, ErrNotConnected
Expand All @@ -763,7 +782,7 @@ func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.Datab
// Should only be called when mutex is held
func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) {
var dbs []string
err := o.rpcClient.CallWithContext(ctx, "list_dbs", nil, &dbs)
err := o.CallWithContext(ctx, "list_dbs", nil, &dbs)
if err != nil {
if err == rpc2.ErrShutdown {
return nil, ErrNotConnected
Expand Down Expand Up @@ -836,7 +855,7 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite b
if dbgLogger.Enabled() {
dbgLogger.Info("transacting operations", "operations", fmt.Sprintf("%+v", operation))
}
err := o.rpcClient.CallWithContext(ctx, "transact", args, &reply)
err := o.CallWithContext(ctx, "transact", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return nil, ErrNotConnected
Expand Down Expand Up @@ -869,7 +888,7 @@ func (o *ovsdbClient) MonitorCancel(ctx context.Context, cookie MonitorCookie) e
if o.rpcClient == nil {
return ErrNotConnected
}
err := o.rpcClient.CallWithContext(ctx, "monitor_cancel", args, &reply)
err := o.CallWithContext(ctx, "monitor_cancel", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ErrNotConnected
Expand Down Expand Up @@ -981,15 +1000,15 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
switch monitor.Method {
case ovsdb.MonitorRPC:
var reply ovsdb.TableUpdates
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
tableUpdates = reply
case ovsdb.ConditionalMonitorRPC:
var reply ovsdb.TableUpdates2
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
tableUpdates = reply
case ovsdb.ConditionalMonitorSinceRPC:
var reply ovsdb.MonitorCondSinceReply
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
if err == nil && reply.Found {
monitor.LastTransactionID = reply.LastTransactionID
lastTransactionFound = true
Expand Down Expand Up @@ -1080,7 +1099,7 @@ func (o *ovsdbClient) Echo(ctx context.Context) error {
if o.rpcClient == nil {
return ErrNotConnected
}
err := o.rpcClient.CallWithContext(ctx, "echo", args, &reply)
err := o.CallWithContext(ctx, "echo", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ErrNotConnected
Expand Down Expand Up @@ -1197,72 +1216,33 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
}
}

func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call {
o.rpcMutex.RLock()
defer o.rpcMutex.RUnlock()
if o.rpcClient == nil {
return nil
}
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1))
}

func (o *ovsdbClient) handleInactivityProbes() {
defer o.handlerShutdown.Done()
echoReplied := make(chan string)
var lastEcho string
stopCh := o.stopCh
trafficSeen := o.trafficSeen
timer := time.NewTimer(o.options.inactivityTimeout)
for {
select {
case <-stopCh:
return
case <-trafficSeen:
// We got some traffic from the server, restart our timer
case ts := <-echoReplied:
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect()
if ts != lastEcho {
o.Disconnect()
return
if !timer.Stop() {
<-timer.C
}
lastEcho = ""
case <-time.After(o.options.inactivityTimeout):
// If there's a lastEcho already, then we didn't get a server reply, disconnect
if lastEcho != "" {
o.Disconnect()
return
}
// Otherwise send an echo
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro())
args := []interface{}{"libovsdb echo", thisEcho}
var reply []interface{}
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go()
call := o.sendEcho(args, &reply)
if call == nil {
o.Disconnect()
return
}
lastEcho = thisEcho
case <-timer.C:
// Otherwise send an echo in a goroutine so that transactions don't block
go func() {
// Wait for the echo reply
select {
case <-stopCh:
return
case <-call.Done:
if call.Error != nil {
// RPC timeout; disconnect
o.logger.V(3).Error(call.Error, "server echo reply error")
o.Disconnect()
} else if !reflect.DeepEqual(args, reply) {
o.logger.V(3).Info("warning: incorrect server echo reply",
"expected", args, "reply", reply)
o.Disconnect()
} else {
// Otherwise stuff thisEcho into the echoReplied channel
echoReplied <- thisEcho
}
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two nuances:

  • Do we make any checks for o.options.timeoutto be smaller than o.options.inactivityTimeout? Or should we expect multiple instances of this goroutine to be running simultaneously?
  • We might see traffic while the echo is in progress and still disconnect if the echo itself fails.

I don't think any of these are necessarily a problem but can be disconcerting if we end up seeing logs and troubleshooting scenarios where any of that is happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Do we make any checks for o.options.timeoutto be smaller than o.options.inactivityTimeout? Or should we expect multiple instances of this goroutine to be running simultaneously?
  • We might see traffic while the echo is in progress and still disconnect if the echo itself fails.

The first concern is fixed with commit e0bed3f.

For the 2nd one, Do you think echo fails whereas transaction would succeed. Does that really happen (both goes via same tcp channel) ? Do you want to have some retries before going for a disconnect ? If this is not really a concern for now, we can skip doing it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am just happy if we check that o.options.timeout is smaller than o.options.inactivityTimeout. I would drop everything else from e0bed3f

Copy link
Contributor Author

@pperiyasamy pperiyasamy Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok @jcaamano, done with commit c287d43 itself.
Hope you're still fine with cf04722 for go bump.

err := o.Echo(ctx)
if err != nil {
o.logger.V(3).Error(err, "server echo reply error")
o.Disconnect()
}
cancel()
}()
}
timer.Reset(o.options.inactivityTimeout)
}
}

Expand Down Expand Up @@ -1478,3 +1458,19 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con
func (o *ovsdbClient) WhereCache(predicate interface{}) ConditionalAPI {
return o.primaryDB().api.WhereCache(predicate)
}

// CallWithContext invokes the named function, waits for it to complete, and
// returns its error status, or an error from Context timeout.
func (o *ovsdbClient) CallWithContext(ctx context.Context, method string, args interface{}, reply interface{}) error {
// Set up read/write deadline for tcp connection before making
// a rpc request to the server.
if tcpConn, ok := o.conn.(*net.TCPConn); ok {
if o.options.timeout > 0 {
err := tcpConn.SetDeadline(time.Now().Add(o.options.timeout * 3))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to SetDeadline for each call? If so, there requires a mutex.

BTW: How does this affect rpcClient.CallWithContext? Does rpcClient discard the timeout context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, The SetDeadline looks to be needed for every read/write on tcp channel. This method is always invoked after o.rpcMutex is acquired. so we're safe here.

correct, The rpcClient.CallWithContext method is not fully timebound call, so fixing it here based on feedback from a PR raised there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a related issue in grpc-go maybe you can take a look.

issue: grpc/grpc#15889
pr: https://github.com/grpc/grpc-go/pull/2307/files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @halfcrazy , yes, it's the same issue that we are trying to fix in libovsdb client as well.
added a commit 3867774 to set up tcp user timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pperiyasamy have you considered @halfcrazy concerns about needing a mutex?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why * 3? Wouldn't it make sense to use inactivityTimeout for this?

if err != nil {
return err
}
}
}
return o.rpcClient.CallWithContext(ctx, method, args, reply)
}
38 changes: 18 additions & 20 deletions client/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,29 @@ Package client connects to, monitors and interacts with OVSDB servers (RFC7047).
This package uses structs, that contain the 'ovs' field tag to determine which field goes to
which column in the database. We refer to pointers to this structs as Models. Example:

type MyLogicalSwitch struct {
UUID string `ovsdb:"_uuid"` // _uuid tag is mandatory
Name string `ovsdb:"name"`
Ports []string `ovsdb:"ports"`
Config map[string]string `ovsdb:"other_config"`
}
type MyLogicalSwitch struct {
UUID string `ovsdb:"_uuid"` // _uuid tag is mandatory
Name string `ovsdb:"name"`
Ports []string `ovsdb:"ports"`
Config map[string]string `ovsdb:"other_config"`
}

Based on these Models a Database Model (see ClientDBModel type) is built to represent
the entire OVSDB:

clientDBModel, _ := client.NewClientDBModel("OVN_Northbound",
map[string]client.Model{
"Logical_Switch": &MyLogicalSwitch{},
})

clientDBModel, _ := client.NewClientDBModel("OVN_Northbound",
map[string]client.Model{
"Logical_Switch": &MyLogicalSwitch{},
})

The ClientDBModel represents the entire Database (or the part of it we're interested in).
Using it, the libovsdb.client package is able to properly encode and decode OVSDB messages
and store them in Model instances.
A client instance is created by simply specifying the connection information and the database model:

ovs, _ := client.Connect(context.Background(), clientDBModel)
ovs, _ := client.Connect(context.Background(), clientDBModel)

Main API
# Main API

After creating a OvsdbClient using the Connect() function, we can use a number of CRUD-like
to interact with the database:
Expand All @@ -43,7 +42,7 @@ and passed to client.Transact().
Others, such as List() and Get(), interact with the client's internal cache and are able to
return Model instances (or a list thereof) directly.

Conditions
# Conditions

Some API functions (Create() and Get()), can be run directly. Others, require us to use
a ConditionalAPI. The ConditionalAPI injects RFC7047 Conditions into ovsdb Operations as well as
Expand Down Expand Up @@ -111,15 +110,15 @@ cache element, an operation will be created matching on the "_uuid" column. The
quite large depending on the cache size and the provided function. Most likely there is a way to express the
same condition using Where() or WhereAll() which will be more efficient.

Get
# Get

Get() operation is a simple operation capable of retrieving one Model based on some of its schema indexes. E.g:

ls := &LogicalSwitch{UUID:"myUUID"}
err := ovs.Get(ls)
fmt.Printf("Name of the switch is: &s", ls.Name)

List
# List

List() searches the cache and populates a slice of Models. It can be used directly or using WhereCache()

Expand All @@ -131,7 +130,7 @@ List() searches the cache and populates a slice of Models. It can be used direct
return strings.HasPrefix(ls.Name, "ext_")
}).List(lsList)

Create
# Create

Create returns a list of operations to create the models provided. E.g:

Expand All @@ -143,7 +142,7 @@ Update returns a list of operations to update the matching rows to match the val
ls := &LogicalSwitch{ExternalIDs: map[string]string {"foo": "bar"}}
ops, err := ovs.Where(...).Update(&ls, &ls.ExternalIDs}

Mutate
# Mutate

Mutate returns a list of operations needed to mutate the matching rows as described by the list of Mutation objects. E.g:

Expand All @@ -154,11 +153,10 @@ Mutate returns a list of operations needed to mutate the matching rows as descri
Value: map[string]string{"foo":"bar"},
})

Delete
# Delete

Delete returns a list of operations needed to delete the matching rows. E.g:

ops, err := ovs.Where(...).Delete()

*/
package client
Loading
Loading