Skip to content

Commit

Permalink
Use Echo method to send inactivity probe
Browse files Browse the repository at this point in the history
This commit client's Echo method to send inactivity probe as it can be made
as a time bound call so that rpc lock is freed in advance. This would make
disconnect happen immediately when current ovsdb leader is accidentally gone
away and reconnects with new ovsdb leader.

Signed-off-by: Periyasamy Palanisamy <[email protected]>
  • Loading branch information
pperiyasamy committed Sep 25, 2023
1 parent 239822f commit b50fa7f
Showing 1 changed file with 5 additions and 51 deletions.
56 changes: 5 additions & 51 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,19 +1197,8 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
}
}

func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call {
o.rpcMutex.RLock()
defer o.rpcMutex.RUnlock()
if o.rpcClient == nil {
return nil
}
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1))
}

func (o *ovsdbClient) handleInactivityProbes() {
defer o.handlerShutdown.Done()
echoReplied := make(chan string)
var lastEcho string
stopCh := o.stopCh
trafficSeen := o.trafficSeen
for {
Expand All @@ -1218,50 +1207,15 @@ func (o *ovsdbClient) handleInactivityProbes() {
return
case <-trafficSeen:
// We got some traffic from the server, restart our timer
case ts := <-echoReplied:
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect()
if ts != lastEcho {
o.Disconnect()
return
}
lastEcho = ""
case <-time.After(o.options.inactivityTimeout):
// If there's a lastEcho already, then we didn't get a server reply, disconnect
if lastEcho != "" {
o.Disconnect()
return
}
// Otherwise send an echo
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro())
args := []interface{}{"libovsdb echo", thisEcho}
var reply []interface{}
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go()
call := o.sendEcho(args, &reply)
if call == nil {
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
err := o.Echo(ctx)
if err != nil {
o.logger.V(3).Error(err, "server echo reply error")
o.Disconnect()
return
}
lastEcho = thisEcho
go func() {
// Wait for the echo reply
select {
case <-stopCh:
return
case <-call.Done:
if call.Error != nil {
// RPC timeout; disconnect
o.logger.V(3).Error(call.Error, "server echo reply error")
o.Disconnect()
} else if !reflect.DeepEqual(args, reply) {
o.logger.V(3).Info("warning: incorrect server echo reply",
"expected", args, "reply", reply)
o.Disconnect()
} else {
// Otherwise stuff thisEcho into the echoReplied channel
echoReplied <- thisEcho
}
}
}()
cancel()
}
}
}
Expand Down

0 comments on commit b50fa7f

Please sign in to comment.