Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Echo method to send inactivity probe #368

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type ovsdbClient struct {
metrics metrics
connected bool
rpcClient *rpc2.Client
conn net.Conn
rpcMutex sync.RWMutex
// endpoints contains all possible endpoints; the first element is
// the active endpoint if connected=true
Expand Down Expand Up @@ -427,6 +428,7 @@ func (o *ovsdbClient) createRPC2Client(conn net.Conn) {
if o.options.inactivityTimeout > 0 {
o.trafficSeen = make(chan struct{})
}
o.conn = conn
o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
o.rpcClient.SetBlocking(true)
o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error {
Expand Down Expand Up @@ -748,7 +750,7 @@ func (o *ovsdbClient) update3(params []json.RawMessage, reply *[]interface{}) er
func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.DatabaseSchema, error) {
args := ovsdb.NewGetSchemaArgs(dbName)
var reply ovsdb.DatabaseSchema
err := o.rpcClient.CallWithContext(ctx, "get_schema", args, &reply)
err := o.CallWithContext(ctx, "get_schema", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ovsdb.DatabaseSchema{}, ErrNotConnected
Expand All @@ -763,7 +765,7 @@ func (o *ovsdbClient) getSchema(ctx context.Context, dbName string) (ovsdb.Datab
// Should only be called when mutex is held
func (o *ovsdbClient) listDbs(ctx context.Context) ([]string, error) {
var dbs []string
err := o.rpcClient.CallWithContext(ctx, "list_dbs", nil, &dbs)
err := o.CallWithContext(ctx, "list_dbs", nil, &dbs)
if err != nil {
if err == rpc2.ErrShutdown {
return nil, ErrNotConnected
Expand Down Expand Up @@ -836,7 +838,7 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite b
if dbgLogger.Enabled() {
dbgLogger.Info("transacting operations", "operations", fmt.Sprintf("%+v", operation))
}
err := o.rpcClient.CallWithContext(ctx, "transact", args, &reply)
err := o.CallWithContext(ctx, "transact", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return nil, ErrNotConnected
Expand Down Expand Up @@ -869,7 +871,7 @@ func (o *ovsdbClient) MonitorCancel(ctx context.Context, cookie MonitorCookie) e
if o.rpcClient == nil {
return ErrNotConnected
}
err := o.rpcClient.CallWithContext(ctx, "monitor_cancel", args, &reply)
err := o.CallWithContext(ctx, "monitor_cancel", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ErrNotConnected
Expand Down Expand Up @@ -981,15 +983,15 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
switch monitor.Method {
case ovsdb.MonitorRPC:
var reply ovsdb.TableUpdates
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
tableUpdates = reply
case ovsdb.ConditionalMonitorRPC:
var reply ovsdb.TableUpdates2
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
tableUpdates = reply
case ovsdb.ConditionalMonitorSinceRPC:
var reply ovsdb.MonitorCondSinceReply
err = o.rpcClient.CallWithContext(ctx, monitor.Method, args, &reply)
err = o.CallWithContext(ctx, monitor.Method, args, &reply)
if err == nil && reply.Found {
monitor.LastTransactionID = reply.LastTransactionID
lastTransactionFound = true
Expand Down Expand Up @@ -1080,7 +1082,7 @@ func (o *ovsdbClient) Echo(ctx context.Context) error {
if o.rpcClient == nil {
return ErrNotConnected
}
err := o.rpcClient.CallWithContext(ctx, "echo", args, &reply)
err := o.CallWithContext(ctx, "echo", args, &reply)
if err != nil {
if err == rpc2.ErrShutdown {
return ErrNotConnected
Expand Down Expand Up @@ -1439,3 +1441,19 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con
func (o *ovsdbClient) WhereCache(predicate interface{}) ConditionalAPI {
return o.primaryDB().api.WhereCache(predicate)
}

// CallWithContext invokes the named function, waits for it to complete, and
// returns its error status, or an error from Context timeout.
func (o *ovsdbClient) CallWithContext(ctx context.Context, method string, args interface{}, reply interface{}) error {
// Set up read/write deadline for tcp connection before making
// a rpc request to the server.
if tcpConn, ok := o.conn.(*net.TCPConn); ok {
if o.options.timeout > 0 {
err := tcpConn.SetDeadline(time.Now().Add(o.options.timeout * 3))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to SetDeadline for each call? If so, there requires a mutex.

BTW: How does this affect rpcClient.CallWithContext? Does rpcClient discard the timeout context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, The SetDeadline looks to be needed for every read/write on tcp channel. This method is always invoked after o.rpcMutex is acquired. so we're safe here.

correct, The rpcClient.CallWithContext method is not fully timebound call, so fixing it here based on feedback from a PR raised there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a related issue in grpc-go maybe you can take a look.

issue: grpc/grpc#15889
pr: https://github.com/grpc/grpc-go/pull/2307/files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @halfcrazy , yes, it's the same issue that we are trying to fix in libovsdb client as well.
added a commit 3867774 to set up tcp user timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pperiyasamy have you considered @halfcrazy concerns about needing a mutex?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why * 3? Wouldn't it make sense to use inactivityTimeout for this?

if err != nil {
return err
}
}
}
return o.rpcClient.CallWithContext(ctx, method, args, reply)
}