diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index f20b8208..195d0242 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -3106,9 +3106,9 @@ func TestConnectionsKeepAlive(t *testing.T) { } func TestClient(t *testing.T) { - var mutexDeInitDone sync.Mutex - var mutexHBDone sync.Mutex - var mutexIdxDone sync.Mutex + var mutexDeInit sync.RWMutex + var mutexHB sync.RWMutex + var mutexIdx sync.RWMutex // sonic-host:device-test-event is a test event. // Events client will drop it on floor. @@ -3128,45 +3128,51 @@ func TestClient(t *testing.T) { mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func(use_cache bool) unsafe.Pointer { return nil - }) - defer mock1.Reset() + }) + defer mock1.Reset() mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) { rc := (int)(0) var evt sdc.Evt_rcvd - mutexIdxDone.Lock() - defer mutexIdxDone.Unlock() - if event_index < len(events) { - evt = events[event_index] - event_index++ + mutexIdx.Lock() + current_index := event_index + mutexIdx.Unlock() + if current_index < len(events) { + evt = events[current_index] + mutexIdx.RLock() + event_index = current_index + 1 + mutexIdx.RUnlock() } else { time.Sleep(time.Millisecond * time.Duration(rcv_timeout)) rc = -1 } return rc, evt - }) - defer mock2.Reset() + }) + defer mock2.Reset() mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) { - mutexHBDone.Lock() - defer mutexHBDone.Unlock() + mutexHB.RLock() heartbeat = val + mutexHB.RUnlock() }) - - defer mock3.Reset() + defer mock3.Reset() mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) { - mutexDeInitDone.Lock() - defer mutexDeInitDone.Unlock() + mutexDeInit.RLock() deinit_done = true + mutexDeInit.RUnlock() }) - - defer mock4.Reset() + defer mock4.Reset() mock5 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Put", func(pq *queue.PriorityQueue, item ...queue.Item) error { return fmt.Errorf("Queue error") }) - defer mock5.Reset() + defer mock5.Reset() + + mock6 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Len", func(pq *queue.PriorityQueue) int { + return 150000 // Max size for pending events in PQ is 102400 + }) + defer mock6.Reset() s := createServer(t, 8081) go runServer(t, s) @@ -3183,6 +3189,10 @@ func TestClient(t *testing.T) { pause int poll int } { + { + desc: "dropped event", + poll: 3, + }, { desc: "queue error", poll: 3, @@ -3194,27 +3204,39 @@ func TestClient(t *testing.T) { } sdc.C_init_subs(true) - var gotNotiMu sync.Mutex + + var mutexNoti sync.RWMutex + for testNum, tt := range tests { - mutexHBDone.Lock() + mutexHB.RLock() heartbeat = 0 - mutexHBDone.Unlock() - mutexIdxDone.Lock() + mutexHB.RUnlock() + + mutexIdx.RLock() event_index = 0 - mutexIdxDone.Unlock() + mutexIdx.RUnlock() + + mutexDeInit.RLock() deinit_done = false + mutexDeInit.RUnlock() + t.Run(tt.desc, func(t *testing.T) { c := client.New() defer c.Close() var gotNoti []string q.NotificationHandler = func(n client.Notification) error { - gotNotiMu.Lock() - defer gotNotiMu.Unlock() if nn, ok := n.(client.Update); ok { nn.TS = time.Unix(0, 200) str := fmt.Sprintf("%v", nn.Val) - gotNoti = append(gotNoti, str) + + mutexNoti.Lock() + currentNoti := gotNoti + mutexNoti.Unlock() + + mutexNoti.RLock() + gotNoti = append(currentNoti, str) + mutexNoti.RUnlock() } return nil } @@ -3228,31 +3250,38 @@ func TestClient(t *testing.T) { time.Sleep(time.Millisecond * 2000) - gotNotiMu.Lock() - // -1 to discount test event, which receiver would drop. - if testNum != 0 { + if testNum > 1 { + mutexNoti.Lock() + // -1 to discount test event, which receiver would drop. if (len(events) - 1) != len(gotNoti) { - fmt.Printf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) + t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) } - mutexHBDone.Lock() + + mutexHB.Lock() if (heartbeat != HEARTBEAT_SET) { t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET) } - mutexHBDone.Unlock() + mutexHB.Unlock() + fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti)) + mutexNoti.Unlock() } - gotNotiMu.Unlock() }) + if testNum == 0 { + mock6.Reset() + } + + if testNum == 1 { mock5.Reset() } time.Sleep(time.Millisecond * 1000) - mutexDeInitDone.Lock() + mutexDeInit.Lock() if deinit_done == false { t.Errorf("Events client deinit *NOT* called.") } - mutexDeInitDone.Unlock() + mutexDeInit.Unlock() // t.Log("END of a TEST") } diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index 9c387cdb..224ccb4d 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -72,11 +72,11 @@ type EventClient struct { subs_handle unsafe.Pointer stopped int - stopMutex sync.Mutex + stopMutex sync.RWMutex // Stats counter counters map[string]uint64 - countersMutex sync.Mutex + countersMutex sync.RWMutex last_latencies [LATENCY_LIST_SIZE]uint64 last_latency_index int @@ -184,7 +184,9 @@ func compute_latency(evtc *EventClient) { total += v } } + evtc.countersMutex.RLock() evtc.counters[LATENCY] = (uint64) (total/LATENCY_LIST_SIZE/1000/1000) + evtc.countersMutex.RUnlock() } } @@ -201,18 +203,20 @@ func update_stats(evtc *EventClient) { * This helps add some initial pause before accessing DB * for existing values. */ - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - for evtc.stopped == 0 { + for !evtc.isStopped() { var val uint64 compute_latency(evtc) + + evtc.countersMutex.Lock() for _, val = range evtc.counters { if val != 0 { break } } + evtc.countersMutex.Unlock() + if val != 0 { break } @@ -221,7 +225,7 @@ func update_stats(evtc *EventClient) { /* Populate counters from DB for cumulative counters. */ - if evtc.stopped == 0 { + if !evtc.isStopped() { ns := sdcfg.GetDbDefaultNamespace() rclient = redis.NewClient(&redis.Options{ @@ -249,15 +253,20 @@ func update_stats(evtc *EventClient) { } /* Main running loop that updates DB */ - for evtc.stopped == 0 { + for !evtc.isStopped() { tmp_counters := make(map[string]uint64) // compute latency compute_latency(evtc) - for key, val := range evtc.counters { + evtc.countersMutex.Lock() + current_counters := evtc.counters + evtc.countersMutex.Unlock() + + for key, val := range current_counters { tmp_counters[key] = val + db_counters[key] } + tmp_counters[DROPPED] += evtc.last_errors if (wr_counters == nil) || !reflect.DeepEqual(tmp_counters, *wr_counters) { @@ -307,7 +316,7 @@ func C_deinit_subs(h unsafe.Pointer) { func get_events(evtc *EventClient) { defer evtc.wg.Done() - str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) + str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) defer C.free(unsafe.Pointer(str_ptr)) evt_ptr = (*C.event_receive_op_C_t)(C.malloc(C.size_t(unsafe.Sizeof(C.event_receive_op_C_t{})))) @@ -322,9 +331,13 @@ func get_events(evtc *EventClient) { if rc == 0 { evtc.countersMutex.Lock() - evtc.counters[MISSED] += (uint64)(evt.Missed_cnt) + current_missed_cnt := evtc.counters[MISSED] evtc.countersMutex.Unlock() + evtc.countersMutex.RLock() + evtc.counters[MISSED] = current_missed_cnt + (uint64)(evt.Missed_cnt) + evtc.countersMutex.RUnlock() + if !strings.HasPrefix(evt.Event_str, TEST_EVENT) { qlen := evtc.q.Len() @@ -346,13 +359,17 @@ func get_events(evtc *EventClient) { log.V(1).Infof("Invalid event string: %v", evt.Event_str) } } else { - evtc.counters[DROPPED] += 1 + evtc.countersMutex.Lock() + dropped_cnt := evtc.counters[DROPPED] + evtc.countersMutex.Unlock() + + evtc.countersMutex.RLock() + evtc.counters[DROPPED] = dropped_cnt + 1 + evtc.countersMutex.RUnlock() } } } - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - if evtc.stopped == 1 { + if evtc.isStopped() { break } // TODO: Record missed count in stats table. @@ -362,9 +379,9 @@ func get_events(evtc *EventClient) { C_deinit_subs(evtc.subs_handle) evtc.subs_handle = nil // set evtc.stopped for case where send_event error and channel was not stopped - evtc.stopMutex.Lock() + evtc.stopMutex.RLock() evtc.stopped = 1 - evtc.stopMutex.Unlock() + evtc.stopMutex.RUnlock() } func send_event(evtc *EventClient, tv *gnmipb.TypedValue, @@ -396,18 +413,25 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w go update_stats(evtc) evtc.wg.Add(1) - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - for evtc.stopped == 0 { + for !evtc.isStopped() { select { case <-evtc.channel: + evtc.stopMutex.RLock() evtc.stopped = 1 + evtc.stopMutex.RUnlock() log.V(3).Infof("Channel closed by client") return } } } +func (evtc *EventClient) isStopped() bool { + evtc.stopMutex.Lock() + val := evtc.stopped + evtc.stopMutex.Unlock() + return val == 1 +} + func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) { return nil, nil