Skip to content

Commit

Permalink
Use Echo method to send inactivity probe
Browse files Browse the repository at this point in the history
This commit uses client's Echo method to send inactivity probe as it can be
made as a time bound call so that rpc read lock can not be held indefinitely.
This would make disconnect happens immediately when current ovsdb leader is
accidentally gone away (this happens in a very rare scenario in which case
sendEcho method returns with unexpected EOF error after 12 mins only) and
reconnects with new ovsdb leader.

Signed-off-by: Periyasamy Palanisamy <[email protected]>
  • Loading branch information
pperiyasamy committed Sep 25, 2023
1 parent 239822f commit 49be92a
Showing 1 changed file with 12 additions and 51 deletions.
63 changes: 12 additions & 51 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,72 +1197,33 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
}
}

func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call {
o.rpcMutex.RLock()
defer o.rpcMutex.RUnlock()
if o.rpcClient == nil {
return nil
}
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1))
}

func (o *ovsdbClient) handleInactivityProbes() {
defer o.handlerShutdown.Done()
echoReplied := make(chan string)
var lastEcho string
stopCh := o.stopCh
trafficSeen := o.trafficSeen
timer := time.NewTimer(o.options.inactivityTimeout)
for {
select {
case <-stopCh:
return
case <-trafficSeen:
// We got some traffic from the server, restart our timer
case ts := <-echoReplied:
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect()
if ts != lastEcho {
o.Disconnect()
return
if !timer.Stop() {
<-timer.C
}
lastEcho = ""
case <-time.After(o.options.inactivityTimeout):
// If there's a lastEcho already, then we didn't get a server reply, disconnect
if lastEcho != "" {
o.Disconnect()
return
}
// Otherwise send an echo
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro())
args := []interface{}{"libovsdb echo", thisEcho}
var reply []interface{}
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go()
call := o.sendEcho(args, &reply)
if call == nil {
o.Disconnect()
return
}
lastEcho = thisEcho
case <-timer.C:
// Otherwise send an echo in a goroutine so that transactions don't block
go func() {
// Wait for the echo reply
select {
case <-stopCh:
return
case <-call.Done:
if call.Error != nil {
// RPC timeout; disconnect
o.logger.V(3).Error(call.Error, "server echo reply error")
o.Disconnect()
} else if !reflect.DeepEqual(args, reply) {
o.logger.V(3).Info("warning: incorrect server echo reply",
"expected", args, "reply", reply)
o.Disconnect()
} else {
// Otherwise stuff thisEcho into the echoReplied channel
echoReplied <- thisEcho
}
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
err := o.Echo(ctx)
if err != nil {
o.logger.V(3).Error(err, "server echo reply error")
o.Disconnect()
}
cancel()
}()
}
timer.Reset(o.options.inactivityTimeout)
}
}

Expand Down

0 comments on commit 49be92a

Please sign in to comment.