Skip to content

Commit

Permalink
Merge pull request #356 from pperiyasamy/inactivity-check
Browse files Browse the repository at this point in the history
Add inactivity probe for ovsdb connection
  • Loading branch information
dcbw authored Jul 11, 2023
2 parents 24370f6 + 685d3ee commit 6785b52
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 127 deletions.
91 changes: 88 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type ovsdbClient struct {

handlerShutdown *sync.WaitGroup

trafficSeen chan struct{}

logger *logr.Logger
}

Expand Down Expand Up @@ -304,6 +306,10 @@ func (o *ovsdbClient) connect(ctx context.Context, reconnect bool) error {
}

go o.handleDisconnectNotification()
if o.options.inactivityTimeout > 0 {
o.handlerShutdown.Add(1)
go o.handleInactivityProbes()
}
for _, db := range o.databases {
o.handlerShutdown.Add(1)
eventStopChan := make(chan struct{})
Expand Down Expand Up @@ -418,6 +424,9 @@ func (o *ovsdbClient) tryEndpoint(ctx context.Context, u *url.URL) (string, erro
// Should only be called when the mutex is held
func (o *ovsdbClient) createRPC2Client(conn net.Conn) {
o.stopCh = make(chan struct{})
if o.options.inactivityTimeout > 0 {
o.trafficSeen = make(chan struct{})
}
o.rpcClient = rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
o.rpcClient.SetBlocking(true)
o.rpcClient.Handle("echo", func(_ *rpc2.Client, args []interface{}, reply *[]interface{}) error {
Expand Down Expand Up @@ -445,7 +454,7 @@ func (o *ovsdbClient) isEndpointLeader(ctx context.Context) (bool, string, error
Table: "Database",
Columns: []string{"name", "model", "leader", "sid"},
}
results, err := o.transact(ctx, serverDB, op)
results, err := o.transact(ctx, serverDB, true, op)
if err != nil {
return false, "", fmt.Errorf("could not check if server was leader: %w", err)
}
Expand Down Expand Up @@ -793,10 +802,10 @@ func (o *ovsdbClient) Transact(ctx context.Context, operation ...ovsdb.Operation
}
}
defer o.rpcMutex.RUnlock()
return o.transact(ctx, o.primaryDBName, operation...)
return o.transact(ctx, o.primaryDBName, false, operation...)
}

func (o *ovsdbClient) transact(ctx context.Context, dbName string, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
func (o *ovsdbClient) transact(ctx context.Context, dbName string, skipChWrite bool, operation ...ovsdb.Operation) ([]ovsdb.OperationResult, error) {
var reply []ovsdb.OperationResult
db := o.databases[dbName]
db.modelMutex.RLock()
Expand Down Expand Up @@ -824,6 +833,10 @@ func (o *ovsdbClient) transact(ctx context.Context, dbName string, operation ...
}
return nil, err
}

if !skipChWrite && o.trafficSeen != nil {
o.trafficSeen <- struct{}{}
}
return reply, nil
}

Expand Down Expand Up @@ -1174,10 +1187,82 @@ func (o *ovsdbClient) handleClientErrors(stopCh <-chan struct{}) {
}
}

func (o *ovsdbClient) sendEcho(args []interface{}, reply *[]interface{}) *rpc2.Call {
o.rpcMutex.RLock()
defer o.rpcMutex.RUnlock()
if o.rpcClient == nil {
return nil
}
return o.rpcClient.Go("echo", args, reply, make(chan *rpc2.Call, 1))
}

func (o *ovsdbClient) handleInactivityProbes() {
defer o.handlerShutdown.Done()
echoReplied := make(chan string)
var lastEcho string
stopCh := o.stopCh
trafficSeen := o.trafficSeen
for {
select {
case <-stopCh:
return
case <-trafficSeen:
// We got some traffic from the server, restart our timer
case ts := <-echoReplied:
// Got a response from the server, check it against lastEcho; if same clear lastEcho; if not same Disconnect()
if ts != lastEcho {
o.Disconnect()
return
}
lastEcho = ""
case <-time.After(o.options.inactivityTimeout):
// If there's a lastEcho already, then we didn't get a server reply, disconnect
if lastEcho != "" {
o.Disconnect()
return
}
// Otherwise send an echo
thisEcho := fmt.Sprintf("%d", time.Now().UnixMicro())
args := []interface{}{"libovsdb echo", thisEcho}
var reply []interface{}
// Can't use o.Echo() because it blocks; we need the Call object direct from o.rpcClient.Go()
call := o.sendEcho(args, &reply)
if call == nil {
o.Disconnect()
return
}
lastEcho = thisEcho
go func() {
// Wait for the echo reply
select {
case <-stopCh:
return
case <-call.Done:
if call.Error != nil {
// RPC timeout; disconnect
o.logger.V(3).Error(call.Error, "server echo reply error")
o.Disconnect()
} else if !reflect.DeepEqual(args, reply) {
o.logger.V(3).Info("warning: incorrect server echo reply",
"expected", args, "reply", reply)
o.Disconnect()
} else {
// Otherwise stuff thisEcho into the echoReplied channel
echoReplied <- thisEcho
}
}
}()
}
}
}

func (o *ovsdbClient) handleDisconnectNotification() {
<-o.rpcClient.DisconnectNotify()
// close the stopCh, which will stop the cache event processor
close(o.stopCh)
if o.trafficSeen != nil {
close(o.trafficSeen)
}
o.metrics.numDisconnects.Inc()
// wait for client related handlers to shutdown
o.handlerShutdown.Wait()
Expand Down
90 changes: 90 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,96 @@ func setLeader(t *testing.T, cli Client, row *serverdb.Database, isLeader bool)
assert.NoErrorf(t, err, "%+v", opErr)
}

func TestClientInactiveCheck(t *testing.T) {
var defSchema ovsdb.DatabaseSchema
err := json.Unmarshal([]byte(schema), &defSchema)
require.NoError(t, err)

serverDBModel, err := serverdb.FullDatabaseModel()
require.NoError(t, err)
// Create server
server, sock := newOVSDBServer(t, defDB, defSchema)

// Create client to test inactivity check.
endpoint := fmt.Sprintf("unix:%s", sock)
ovs, err := newOVSDBClient(serverDBModel,
WithInactivityCheck(2*time.Second, 1*time.Second, &backoff.ZeroBackOff{}),
WithEndpoint(endpoint))
require.NoError(t, err)
err = ovs.Connect(context.Background())
require.NoError(t, err)
t.Cleanup(ovs.Close)

// Make server to do echo off and then on for two times.
// Ensure this is detected by client's inactivity probe
// each time and then reconnects to the server when it
// is started responding to echo requests.

// 1st test for client with making server not to respond for echo requests.
notified := make(chan struct{})
ready := make(chan struct{})
disconnectNotify := ovs.rpcClient.DisconnectNotify()
go func() {
ready <- struct{}{}
<-disconnectNotify
notified <- struct{}{}
}()
<-ready
server.DoEcho(false)
select {
case <-notified:
// got notification
case <-time.After(5 * time.Second):
assert.Fail(t, "client doesn't detect the echo failure")
}

// 2nd test for client with making server to respond for echo requests.
server.DoEcho(true)
loop:
for timeout := time.After(5 * time.Second); ; {
select {
case <-timeout:
assert.Fail(t, "reconnect is not successful")
default:
if ovs.Connected() {
break loop
}
}
}

// 3rd test for client with making server not to respond for echo requests.
notified = make(chan struct{})
ready = make(chan struct{})
disconnectNotify = ovs.rpcClient.DisconnectNotify()
go func() {
ready <- struct{}{}
<-disconnectNotify
notified <- struct{}{}
}()
<-ready
server.DoEcho(false)
select {
case <-notified:
// got notification
case <-time.After(5 * time.Second):
assert.Fail(t, "client doesn't detect the echo failure")
}

// 4th test for client with making server to respond for echo requests.
server.DoEcho(true)
loop1:
for timeout := time.After(5 * time.Second); ; {
select {
case <-timeout:
assert.Fail(t, "reconnect is not successful")
default:
if ovs.Connected() {
break loop1
}
}
}
}

func TestClientReconnectLeaderOnly(t *testing.T) {
rand.Seed(time.Now().UnixNano())

Expand Down
17 changes: 17 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type options struct {
shouldRegisterMetrics bool // in case metrics are changed after-the-fact
metricNamespace string // prometheus metric namespace
metricSubsystem string // prometheus metric subsystem
inactivityTimeout time.Duration
}

type Option func(o *options) error
Expand Down Expand Up @@ -111,6 +112,22 @@ func WithReconnect(timeout time.Duration, backoff backoff.BackOff) Option {
}
}

// WithInactivityCheck tells the client to send Echo request to ovsdb server periodically
// upon inactivityTimeout. When Echo request fails, then it attempts to reconnect
// with server. The inactivity check is performed as long as the connection is established.
// The reconnectTimeout argument is used to construct the context on each call to Connect,
// while reconnectBackoff dictates the backoff algorithm to use.
func WithInactivityCheck(inactivityTimeout, reconnectTimeout time.Duration,
reconnectBackoff backoff.BackOff) Option {
return func(o *options) error {
o.reconnect = true
o.timeout = reconnectTimeout
o.backoff = reconnectBackoff
o.inactivityTimeout = inactivityTimeout
return nil
}
}

// WithLogger allows setting a specific log sink. Otherwise, the default
// go log package is used.
func WithLogger(l *logr.Logger) Option {
Expand Down
2 changes: 1 addition & 1 deletion modelgen/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func TestFieldType(t *testing.T) {
out string
}{
{"t1", "c1", &singleValueSetSchema, "*string"},
{"t1", "c2", &multipleValueSetSchema, "[2]string"},
{"t1", "c2", &multipleValueSetSchema, "[]string"},
}

for _, tt := range tests {
Expand Down
18 changes: 17 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type OvsdbServer struct {
done chan struct{}
db database.Database
ready bool
doEcho bool
readyMutex sync.RWMutex
models map[string]model.DatabaseModel
modelsMutex sync.RWMutex
Expand All @@ -35,12 +36,16 @@ type OvsdbServer struct {
txnMutex sync.Mutex
}

func init() {
stdr.SetVerbosity(5)
}

// NewOvsdbServer returns a new OvsdbServer
func NewOvsdbServer(db database.Database, models ...model.DatabaseModel) (*OvsdbServer, error) {
l := stdr.NewWithOptions(log.New(os.Stderr, "", log.LstdFlags), stdr.Options{LogCaller: stdr.All}).WithName("server")
stdr.SetVerbosity(5)
o := &OvsdbServer{
done: make(chan struct{}, 1),
doEcho: true,
db: db,
models: make(map[string]model.DatabaseModel),
modelsMutex: sync.RWMutex{},
Expand Down Expand Up @@ -83,6 +88,12 @@ func (o *OvsdbServer) OnDisConnect(f func(*rpc2.Client)) {
o.srv.OnDisconnect(f)
}

func (o *OvsdbServer) DoEcho(ok bool) {
o.readyMutex.Lock()
o.doEcho = ok
o.readyMutex.Unlock()
}

// Serve starts the OVSDB server on the given path and protocol
func (o *OvsdbServer) Serve(protocol string, path string) error {
var err error
Expand Down Expand Up @@ -382,6 +393,11 @@ func (o *OvsdbServer) Unlock(client *rpc2.Client, args []interface{}, reply *[]i

// Echo tests the liveness of the connection
func (o *OvsdbServer) Echo(client *rpc2.Client, args []interface{}, reply *[]interface{}) error {
o.readyMutex.Lock()
defer o.readyMutex.Unlock()
if !o.doEcho {
return fmt.Errorf("no echo reply")
}
echoReply := make([]interface{}, len(args))
copy(echoReply, args)
*reply = echoReply
Expand Down
Loading

0 comments on commit 6785b52

Please sign in to comment.