Skip to content

Commit

Permalink
Use Echo method to send inactivity probe
Browse files Browse the repository at this point in the history
This commit uses client's Echo method to send inactivity probe as it can be
made as a time bound call so that rpc read lock can not be held indefinitely.
This would make disconnect happens immediately when current ovsdb leader is
accidentally gone away (this happens in a very rare scenario in which case
sendEcho method returns with unexpected EOF error after 12 mins only) and
reconnects with new ovsdb leader.

Signed-off-by: Periyasamy Palanisamy <[email protected]>
  • Loading branch information
pperiyasamy committed Sep 25, 2023
1 parent 239822f commit b865ddb
Showing 1 changed file with 7 additions and 51 deletions.
58 changes: 7 additions & 51 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,19 +1197,8 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
}
}

func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call {
o.rpcMutex.RLock()
defer o.rpcMutex.RUnlock()
if o.rpcClient == nil {
return nil
}
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1))
}

func (o *ovsdbClient) handleInactivityProbes() {
defer o.handlerShutdown.Done()
echoReplied := make(chan string)
var lastEcho string
stopCh := o.stopCh
trafficSeen := o.trafficSeen
for {
Expand All @@ -1218,49 +1207,16 @@ func (o *ovsdbClient) handleInactivityProbes() {
return
case <-trafficSeen:
// We got some traffic from the server, restart our timer
case ts := <-echoReplied:
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect()
if ts != lastEcho {
o.Disconnect()
return
}
lastEcho = ""
case <-time.After(o.options.inactivityTimeout):
// If there's a lastEcho already, then we didn't get a server reply, disconnect
if lastEcho != "" {
o.Disconnect()
return
}
// Otherwise send an echo
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro())
args := []interface{}{"libovsdb echo", thisEcho}
var reply []interface{}
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go()
call := o.sendEcho(args, &reply)
if call == nil {
o.Disconnect()
return
}
lastEcho = thisEcho
// Otherwise send an echo in a goroutine so that transactions don't block
go func() {
// Wait for the echo reply
select {
case <-stopCh:
return
case <-call.Done:
if call.Error != nil {
// RPC timeout; disconnect
o.logger.V(3).Error(call.Error, "server echo reply error")
o.Disconnect()
} else if !reflect.DeepEqual(args, reply) {
o.logger.V(3).Info("warning: incorrect server echo reply",
"expected", args, "reply", reply)
o.Disconnect()
} else {
// Otherwise stuff thisEcho into the echoReplied channel
echoReplied <- thisEcho
}
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
err := o.Echo(ctx)
if err != nil {
o.logger.V(3).Error(err, "server echo reply error")
o.Disconnect()
}
cancel()
}()
}
}
Expand Down

0 comments on commit b865ddb

Please sign in to comment.