From 49be92a5e80caacfbe75fee0acf8f71dd865a5e5 Mon Sep 17 00:00:00 2001 From: Periyasamy Palanisamy Date: Tue, 29 Aug 2023 17:29:46 +0200 Subject: [PATCH] Use Echo method to send inactivity probe This commit uses client's Echo method to send inactivity probe as it can be made as a time bound call so that rpc read lock can not be held indefinitely. This would make disconnect happens immediately when current ovsdb leader is accidentally gone away (this happens in a very rare scenario in which case sendEcho method returns with unexpected EOF error after 12 mins only) and reconnects with new ovsdb leader. Signed-off-by: Periyasamy Palanisamy --- client/client.go | 63 +++++++++--------------------------------------- 1 file changed, 12 insertions(+), 51 deletions(-) diff --git a/client/client.go b/client/client.go index 10ea757e..8c9a1051 100644 --- a/client/client.go +++ b/client/client.go @@ -1197,72 +1197,33 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) { } } -func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call { - o.rpcMutex.RLock() - defer o.rpcMutex.RUnlock() - if o.rpcClient == nil { - return nil - } - return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1)) -} - func (o *ovsdbClient) handleInactivityProbes() { defer o.handlerShutdown.Done() - echoReplied := make(chan string) - var lastEcho string stopCh := o.stopCh trafficSeen := o.trafficSeen + timer := time.NewTimer(o.options.inactivityTimeout) for { select { case <-stopCh: return case <-trafficSeen: // We got some traffic from the server, restart our timer - case ts := <-echoReplied: - // Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect() - if ts != lastEcho { - o.Disconnect() - return + if !timer.Stop() { + <-timer.C } - lastEcho = "" - case <-time.After(o.options.inactivityTimeout): - // If there's a lastEcho already, then we didn't get a server reply, disconnect - if lastEcho != "" { - o.Disconnect() - return - } - // Otherwise send an echo - thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro()) - args := []interface{}{"libovsdb echo", thisEcho} - var reply []interface{} - // Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go() - call := o.sendEcho(args, &reply) - if call == nil { - o.Disconnect() - return - } - lastEcho = thisEcho + case <-timer.C: + // Otherwise send an echo in a goroutine so that transactions don't block go func() { - // Wait for the echo reply - select { - case <-stopCh: - return - case <-call.Done: - if call.Error != nil { - // RPC timeout; disconnect - o.logger.V(3).Error(call.Error, "server echo reply error") - o.Disconnect() - } else if !reflect.DeepEqual(args, reply) { - o.logger.V(3).Info("warning: incorrect server echo reply", - "expected", args, "reply", reply) - o.Disconnect() - } else { - // Otherwise stuff thisEcho into the echoReplied channel - echoReplied <- thisEcho - } + ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout) + err := o.Echo(ctx) + if err != nil { + o.logger.V(3).Error(err, "server echo reply error") + o.Disconnect() } + cancel() }() } + timer.Reset(o.options.inactivityTimeout) } }