Skip to content

Commit

Permalink
Fix threading issues in Event Client (sonic-net#121)
Browse files Browse the repository at this point in the history
Why I did it

Streaming events was not working due to incomplete fix for race condition brought in PR sonic-net#100

Fix was not completed in last PR; issues:

Mutex Unlock was not being called as it was using defer statement inside a continuous for loop which was blocking all other go routines from accessing.
Mutex Lock was being used incorrectly as it should be only for when resource is being read not written to. Use RWMutex and RLock for writing to resource
  • Loading branch information
zbud-msft authored Jun 15, 2023
1 parent 37ce38b commit a600dc9
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 57 deletions.
105 changes: 67 additions & 38 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3106,9 +3106,9 @@ func TestConnectionsKeepAlive(t *testing.T) {
}

func TestClient(t *testing.T) {
var mutexDeInitDone sync.Mutex
var mutexHBDone sync.Mutex
var mutexIdxDone sync.Mutex
var mutexDeInit sync.RWMutex
var mutexHB sync.RWMutex
var mutexIdx sync.RWMutex

// sonic-host:device-test-event is a test event.
// Events client will drop it on floor.
Expand All @@ -3128,45 +3128,51 @@ func TestClient(t *testing.T) {

mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func(use_cache bool) unsafe.Pointer {
return nil
})
defer mock1.Reset()
})
defer mock1.Reset()

mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) {
rc := (int)(0)
var evt sdc.Evt_rcvd
mutexIdxDone.Lock()
defer mutexIdxDone.Unlock()
if event_index < len(events) {
evt = events[event_index]
event_index++
mutexIdx.Lock()
current_index := event_index
mutexIdx.Unlock()
if current_index < len(events) {
evt = events[current_index]
mutexIdx.RLock()
event_index = current_index + 1
mutexIdx.RUnlock()
} else {
time.Sleep(time.Millisecond * time.Duration(rcv_timeout))
rc = -1
}
return rc, evt
})
defer mock2.Reset()
})
defer mock2.Reset()

mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) {
mutexHBDone.Lock()
defer mutexHBDone.Unlock()
mutexHB.RLock()
heartbeat = val
mutexHB.RUnlock()
})

defer mock3.Reset()
defer mock3.Reset()

mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) {
mutexDeInitDone.Lock()
defer mutexDeInitDone.Unlock()
mutexDeInit.RLock()
deinit_done = true
mutexDeInit.RUnlock()
})

defer mock4.Reset()
defer mock4.Reset()

mock5 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Put", func(pq *queue.PriorityQueue, item ...queue.Item) error {
return fmt.Errorf("Queue error")
})
defer mock5.Reset()
defer mock5.Reset()

mock6 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Len", func(pq *queue.PriorityQueue) int {
return 150000 // Max size for pending events in PQ is 102400
})
defer mock6.Reset()

s := createServer(t, 8081)
go runServer(t, s)
Expand All @@ -3183,6 +3189,10 @@ func TestClient(t *testing.T) {
pause int
poll int
} {
{
desc: "dropped event",
poll: 3,
},
{
desc: "queue error",
poll: 3,
Expand All @@ -3194,27 +3204,39 @@ func TestClient(t *testing.T) {
}

sdc.C_init_subs(true)
var gotNotiMu sync.Mutex

var mutexNoti sync.RWMutex

for testNum, tt := range tests {
mutexHBDone.Lock()
mutexHB.RLock()
heartbeat = 0
mutexHBDone.Unlock()
mutexIdxDone.Lock()
mutexHB.RUnlock()

mutexIdx.RLock()
event_index = 0
mutexIdxDone.Unlock()
mutexIdx.RUnlock()

mutexDeInit.RLock()
deinit_done = false
mutexDeInit.RUnlock()

t.Run(tt.desc, func(t *testing.T) {
c := client.New()
defer c.Close()

var gotNoti []string
q.NotificationHandler = func(n client.Notification) error {
gotNotiMu.Lock()
defer gotNotiMu.Unlock()
if nn, ok := n.(client.Update); ok {
nn.TS = time.Unix(0, 200)
str := fmt.Sprintf("%v", nn.Val)
gotNoti = append(gotNoti, str)

mutexNoti.Lock()
currentNoti := gotNoti
mutexNoti.Unlock()

mutexNoti.RLock()
gotNoti = append(currentNoti, str)
mutexNoti.RUnlock()
}
return nil
}
Expand All @@ -3228,31 +3250,38 @@ func TestClient(t *testing.T) {

time.Sleep(time.Millisecond * 2000)

gotNotiMu.Lock()
// -1 to discount test event, which receiver would drop.
if testNum != 0 {
if testNum > 1 {
mutexNoti.Lock()
// -1 to discount test event, which receiver would drop.
if (len(events) - 1) != len(gotNoti) {
fmt.Printf("noti[%d] != events[%d]", len(gotNoti), len(events)-1)
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1)
}
mutexHBDone.Lock()

mutexHB.Lock()
if (heartbeat != HEARTBEAT_SET) {
t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET)
}
mutexHBDone.Unlock()
mutexHB.Unlock()

fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti))
mutexNoti.Unlock()
}
gotNotiMu.Unlock()
})

if testNum == 0 {
mock6.Reset()
}

if testNum == 1 {
mock5.Reset()
}
time.Sleep(time.Millisecond * 1000)

mutexDeInitDone.Lock()
mutexDeInit.Lock()
if deinit_done == false {
t.Errorf("Events client deinit *NOT* called.")
}
mutexDeInitDone.Unlock()
mutexDeInit.Unlock()
// t.Log("END of a TEST")
}

Expand Down
62 changes: 43 additions & 19 deletions sonic_data_client/events_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ type EventClient struct {
subs_handle unsafe.Pointer

stopped int
stopMutex sync.Mutex
stopMutex sync.RWMutex

// Stats counter
counters map[string]uint64
countersMutex sync.Mutex
countersMutex sync.RWMutex

last_latencies [LATENCY_LIST_SIZE]uint64
last_latency_index int
Expand Down Expand Up @@ -184,7 +184,9 @@ func compute_latency(evtc *EventClient) {
total += v
}
}
evtc.countersMutex.RLock()
evtc.counters[LATENCY] = (uint64) (total/LATENCY_LIST_SIZE/1000/1000)
evtc.countersMutex.RUnlock()
}
}

Expand All @@ -201,18 +203,20 @@ func update_stats(evtc *EventClient) {
* This helps add some initial pause before accessing DB
* for existing values.
*/
evtc.stopMutex.Lock()
defer evtc.stopMutex.Unlock()

for evtc.stopped == 0 {
for !evtc.isStopped() {
var val uint64

compute_latency(evtc)

evtc.countersMutex.Lock()
for _, val = range evtc.counters {
if val != 0 {
break
}
}
evtc.countersMutex.Unlock()

if val != 0 {
break
}
Expand All @@ -221,7 +225,7 @@ func update_stats(evtc *EventClient) {


/* Populate counters from DB for cumulative counters. */
if evtc.stopped == 0 {
if !evtc.isStopped() {
ns := sdcfg.GetDbDefaultNamespace()

rclient = redis.NewClient(&redis.Options{
Expand Down Expand Up @@ -249,15 +253,20 @@ func update_stats(evtc *EventClient) {
}

/* Main running loop that updates DB */
for evtc.stopped == 0 {
for !evtc.isStopped() {
tmp_counters := make(map[string]uint64)

// compute latency
compute_latency(evtc)

for key, val := range evtc.counters {
evtc.countersMutex.Lock()
current_counters := evtc.counters
evtc.countersMutex.Unlock()

for key, val := range current_counters {
tmp_counters[key] = val + db_counters[key]
}

tmp_counters[DROPPED] += evtc.last_errors

if (wr_counters == nil) || !reflect.DeepEqual(tmp_counters, *wr_counters) {
Expand Down Expand Up @@ -307,7 +316,7 @@ func C_deinit_subs(h unsafe.Pointer) {
func get_events(evtc *EventClient) {
defer evtc.wg.Done()

str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ))
str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ))
defer C.free(unsafe.Pointer(str_ptr))

evt_ptr = (*C.event_receive_op_C_t)(C.malloc(C.size_t(unsafe.Sizeof(C.event_receive_op_C_t{}))))
Expand All @@ -322,9 +331,13 @@ func get_events(evtc *EventClient) {

if rc == 0 {
evtc.countersMutex.Lock()
evtc.counters[MISSED] += (uint64)(evt.Missed_cnt)
current_missed_cnt := evtc.counters[MISSED]
evtc.countersMutex.Unlock()

evtc.countersMutex.RLock()
evtc.counters[MISSED] = current_missed_cnt + (uint64)(evt.Missed_cnt)
evtc.countersMutex.RUnlock()

if !strings.HasPrefix(evt.Event_str, TEST_EVENT) {
qlen := evtc.q.Len()

Expand All @@ -346,13 +359,17 @@ func get_events(evtc *EventClient) {
log.V(1).Infof("Invalid event string: %v", evt.Event_str)
}
} else {
evtc.counters[DROPPED] += 1
evtc.countersMutex.Lock()
dropped_cnt := evtc.counters[DROPPED]
evtc.countersMutex.Unlock()

evtc.countersMutex.RLock()
evtc.counters[DROPPED] = dropped_cnt + 1
evtc.countersMutex.RUnlock()
}
}
}
evtc.stopMutex.Lock()
defer evtc.stopMutex.Unlock()
if evtc.stopped == 1 {
if evtc.isStopped() {
break
}
// TODO: Record missed count in stats table.
Expand All @@ -362,9 +379,9 @@ func get_events(evtc *EventClient) {
C_deinit_subs(evtc.subs_handle)
evtc.subs_handle = nil
// set evtc.stopped for case where send_event error and channel was not stopped
evtc.stopMutex.Lock()
evtc.stopMutex.RLock()
evtc.stopped = 1
evtc.stopMutex.Unlock()
evtc.stopMutex.RUnlock()
}

func send_event(evtc *EventClient, tv *gnmipb.TypedValue,
Expand Down Expand Up @@ -396,18 +413,25 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w
go update_stats(evtc)
evtc.wg.Add(1)

evtc.stopMutex.Lock()
defer evtc.stopMutex.Unlock()
for evtc.stopped == 0 {
for !evtc.isStopped() {
select {
case <-evtc.channel:
evtc.stopMutex.RLock()
evtc.stopped = 1
evtc.stopMutex.RUnlock()
log.V(3).Infof("Channel closed by client")
return
}
}
}

func (evtc *EventClient) isStopped() bool {
evtc.stopMutex.Lock()
val := evtc.stopped
evtc.stopMutex.Unlock()
return val == 1
}


func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) {
return nil, nil
Expand Down

0 comments on commit a600dc9

Please sign in to comment.