Skip to content

Commit

Permalink
Reuse reconnect logic for inactivity check
Browse files Browse the repository at this point in the history
This reuses reconnect code associated with handling disconnect notification
for the echo failure, It makes code more readble and avoids unneccessary use
of additional mutexes and flags.

Signed-off-by: Periyasamy Palanisamy <[email protected]>
  • Loading branch information
pperiyasamy committed May 30, 2023
1 parent 78325e0 commit 7d0abec
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 222 deletions.
124 changes: 48 additions & 76 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,20 @@ type ovsdbClient struct {
primaryDBName string
databases map[string]*database

errorCh chan error
stopCh chan struct{}
disconnect chan struct{}
shutdown bool
isInactivity bool
isInactivityCheckRunning bool
stopInactivity chan struct{}
inactivityCheckStopped chan struct{}
inactivityMutex sync.Mutex
shutdownMutex sync.Mutex
reconnectMutex sync.Mutex
errorCh chan error
stopCh chan struct{}
disconnect chan struct{}
shutdown bool
shutdownMutex sync.Mutex

handlerShutdown *sync.WaitGroup

isInactivity bool
inactivityNotify chan struct{}
stopInactivity chan struct{}
inactivityCheckStopped chan struct{}
inactivityMutex sync.Mutex

logger *logr.Logger
}

Expand Down Expand Up @@ -159,11 +159,10 @@ func newOVSDBClient(clientDBModel model.ClientDBModel, opts ...Option) (*ovsdbCl
deferredUpdates: make([]*bufferedUpdate, 0),
},
},
errorCh: make(chan error),
handlerShutdown: &sync.WaitGroup{},
disconnect: make(chan struct{}),
stopInactivity: make(chan struct{}),
inactivityCheckStopped: make(chan struct{}),
errorCh: make(chan error),
handlerShutdown: &sync.WaitGroup{},
disconnect: make(chan struct{}),
stopInactivity: make(chan struct{}),
}
var err error
ovs.options, err = newOptions(opts...)
Expand Down Expand Up @@ -212,7 +211,7 @@ func newOVSDBClient(clientDBModel model.ClientDBModel, opts ...Option) (*ovsdbCl
// The connection can be configured using one or more Option(s), like WithTLSConfig
// If no WithEndpoint option is supplied, the default of unix:/var/run/openvswitch/ovsdb.sock is used
func (o *ovsdbClient) Connect(ctx context.Context) error {
if err := o.connect(ctx, false, false); err != nil {
if err := o.connect(ctx, false); err != nil {
if err == ErrAlreadyConnected {
return nil
}
Expand All @@ -223,12 +222,6 @@ func (o *ovsdbClient) Connect(ctx context.Context) error {
return err
}
}
o.rpcMutex.Lock()
defer o.rpcMutex.Unlock()
if o.options.inactivityCheck && !o.isInactivityCheckRunning {
go o.handleInactivityCheck()
o.isInactivityCheckRunning = true
}
return nil
}

Expand All @@ -254,7 +247,7 @@ func (o *ovsdbClient) resetRPCClient() {
}
}

func (o *ovsdbClient) connect(ctx context.Context, reconnect, inactivity bool) error {
func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
o.rpcMutex.Lock()
defer o.rpcMutex.Unlock()
if o.rpcClient != nil {
Expand Down Expand Up @@ -318,9 +311,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect, inactivity bool) e
}
}
}
if !inactivity {
go o.handleDisconnectNotification()
}
o.inactivityNotify = make(chan struct{})
o.inactivityCheckStopped = make(chan struct{})
go o.handleDisconnectNotification()
go o.handleInactivityCheck(o.inactivityCheckStopped)
for _, db := range o.databases {
o.handlerShutdown.Add(1)
eventStopChan := make(chan struct{})
Expand Down Expand Up @@ -1191,55 +1185,32 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
}
}

func (o *ovsdbClient) handleInactivityCheck() {
func (o *ovsdbClient) handleInactivityCheck(stopCh chan<- struct{}) {
if !o.options.inactivityCheck {
return
}
echoFailCount := 0
o.rpcMutex.Lock()
timeout := o.options.timeout
o.rpcMutex.Unlock()
for {
select {
case <-o.stopInactivity:
close(o.inactivityCheckStopped)
close(stopCh)
return
default:
o.reconnectMutex.Lock()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
if !o.isClientUnderInactivity() {
err := o.Echo(ctx)
if err != nil {
echoFailCount++
// When echo failure reaches maxAllowedEchoFailCount, then consider ovndb connection as inactive.
if echoFailCount >= maxAllowedEchoFailCount {
o.setClientIsInActivity(true)
}
} else {
echoFailCount = 0
}
} else if o.isClientUnderInactivity() {
// Since connection to ovsdb server is inactive for 2 * interval, try to reconnect with it.
o.rpcMutex.Lock()
if o.rpcClient != nil {
// close the stopCh, which will stop the cache event processor
close(o.stopCh)
// wait for client related handlers to shutdown
o.handlerShutdown.Wait()
}
o.rpcClient = nil
o.rpcMutex.Unlock()
// need to ensure deferredUpdates is cleared on every reconnect attempt
for _, db := range o.databases {
db.cacheMutex.Lock()
db.deferredUpdates = make([]*bufferedUpdate, 0)
db.deferUpdates = true
db.cacheMutex.Unlock()
}
err := o.connect(ctx, true, true)
if err == nil || err == ErrAlreadyConnected {
o.setClientIsInActivity(false)
echoFailCount = 0
ctx, cancel := context.WithTimeout(context.Background(), o.options.inactivityTimeout)
err := o.Echo(ctx)
if err != nil {
echoFailCount++
// When echo failure reaches maxAllowedEchoFailCount, then consider ovndb connection as inactive.
if echoFailCount >= maxAllowedEchoFailCount {
o.setClientIsInActivity(true)
close(o.inactivityNotify)
cancel()
return
}
} else {
o.setClientIsInActivity(false)
echoFailCount = 0
}
o.reconnectMutex.Unlock()
cancel()
time.Sleep(o.options.inactivityCheckInterval)
}
Expand All @@ -1259,13 +1230,10 @@ func (o *ovsdbClient) setClientIsInActivity(inactivity bool) {
}

func (o *ovsdbClient) handleDisconnectNotification() {
var disConnectCh <-chan struct{}
o.rpcMutex.Lock()
disConnectCh = o.rpcClient.DisconnectNotify()
o.rpcMutex.Unlock()
<-disConnectCh
o.reconnectMutex.Lock()
defer o.reconnectMutex.Unlock()
select {
case <-o.rpcClient.DisconnectNotify():
case <-o.inactivityNotify:
}
// close the stopCh, which will stop the cache event processor
close(o.stopCh)
o.metrics.numDisconnects.Inc()
Expand All @@ -1286,7 +1254,7 @@ func (o *ovsdbClient) handleDisconnectNotification() {
}
ctx, cancel := context.WithTimeout(context.Background(), o.options.timeout)
defer cancel()
err := o.connect(ctx, true, false)
err := o.connect(ctx, true)
if err != nil {
if suppressionCounter < 5 {
o.logger.V(2).Error(err, "failed to reconnect")
Expand Down Expand Up @@ -1331,6 +1299,10 @@ func (o *ovsdbClient) handleDisconnectNotification() {
}
o.metrics.numMonitors.Set(0)

o.shutdownMutex.Lock()
defer o.shutdownMutex.Unlock()
o.shutdown = false

select {
case o.disconnect <- struct{}{}:
// sent disconnect notification to client
Expand Down
19 changes: 5 additions & 14 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,7 @@ func TestClientInactiveCheck(t *testing.T) {
// Create client to test inactivity check.
endpoint := fmt.Sprintf("unix:%s", sock)
ovs, err := newOVSDBClient(serverDBModel,
WithReconnect(5*time.Second, &backoff.ZeroBackOff{}),
WithInactivityCheck(1*time.Second, 2*time.Second),
WithInactivityCheck(1*time.Second, 2*time.Second, 5*time.Second, &backoff.ZeroBackOff{}),
WithEndpoint(endpoint))
require.NoError(t, err)
err = ovs.Connect(context.Background())
Expand All @@ -986,30 +985,22 @@ func TestClientInactiveCheck(t *testing.T) {
// is started responding to echo requests.
server.DoEcho(false)
require.Eventually(t, func() bool {
ovs.inactivityMutex.Lock()
defer ovs.inactivityMutex.Unlock()
return ovs.isInactivity == true
return ovs.isClientUnderInactivity() == true
}, 10*time.Second, 1*time.Second)

server.DoEcho(true)
require.Eventually(t, func() bool {
ovs.inactivityMutex.Lock()
defer ovs.inactivityMutex.Unlock()
return ovs.isInactivity == false
return ovs.isClientUnderInactivity() == false
}, 10*time.Second, 1*time.Second)

server.DoEcho(false)
require.Eventually(t, func() bool {
ovs.inactivityMutex.Lock()
defer ovs.inactivityMutex.Unlock()
return ovs.isInactivity == true
return ovs.isClientUnderInactivity() == true
}, 10*time.Second, 1*time.Second)

server.DoEcho(true)
require.Eventually(t, func() bool {
ovs.inactivityMutex.Lock()
defer ovs.inactivityMutex.Unlock()
return ovs.isInactivity == false
return ovs.isClientUnderInactivity() == false
}, 10*time.Second, 1*time.Second)
}

Expand Down
22 changes: 13 additions & 9 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type options struct {
metricSubsystem string // prometheus metric subsystem
inactivityCheck bool
inactivityCheckInterval time.Duration
inactivityTimeout time.Duration
}

type Option func(o *options) error
Expand Down Expand Up @@ -113,17 +114,20 @@ func WithReconnect(timeout time.Duration, backoff backoff.BackOff) Option {
}
}

// WithInactivityCheck tells the client to send Echo request to ovsdb
// server periodically at specified inactivityCheckInterval. When echo
// request fails consecutively 2 * inactivityCheckInterval, then attempts
// to reconnect with server. Once reconnect is successful, then inactivity
// check would go on until connection to the server is shutdown.
// The timeout argument is used for constructing the context for sending
// each Echo and Reconnect requests.
func WithInactivityCheck(timeout, inactivityCheckInterval time.Duration) Option {
// WithInactivityCheck tells the client to send Echo request to ovsdb server periodically at
// specified inactivityCheckInterval. When Echo request fails consecutively 2 * inactivityCheckInterval,
// then attempts to reconnect with server. The inactivity check is performed as long as the connection is
// established. The reconnectTimeout argument is used to construct the context on each call to Connect,
// while while reconnectBackoff dictates the backoff algorithm to use. The inactivityTimeout is used to
// construct the context on each call to Echo request.
func WithInactivityCheck(inactivityTimeout, inactivityCheckInterval, reconnectTimeout time.Duration,
reconnectBackoff backoff.BackOff) Option {
return func(o *options) error {
o.reconnect = true
o.inactivityCheck = true
o.timeout = timeout
o.timeout = reconnectTimeout
o.backoff = reconnectBackoff
o.inactivityTimeout = inactivityTimeout
o.inactivityCheckInterval = inactivityCheckInterval
return nil
}
Expand Down
Loading

0 comments on commit 7d0abec

Please sign in to comment.