From a600dc954ca253b2acb4dadb52c85ce44e07ff8e Mon Sep 17 00:00:00 2001 From: Zain Budhwani <99770260+zbud-msft@users.noreply.github.com> Date: Thu, 15 Jun 2023 11:05:24 -0700 Subject: [PATCH 1/4] Fix threading issues in Event Client (#121) Why I did it Streaming events was not working due to incomplete fix for race condition brought in PR #100 Fix was not completed in last PR; issues: Mutex Unlock was not being called as it was using defer statement inside a continuous for loop which was blocking all other go routines from accessing. Mutex Lock was being used incorrectly as it should be only for when resource is being read not written to. Use RWMutex and RLock for writing to resource --- gnmi_server/server_test.go | 105 ++++++++++++++++++----------- sonic_data_client/events_client.go | 62 +++++++++++------ 2 files changed, 110 insertions(+), 57 deletions(-) diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index f20b8208..195d0242 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -3106,9 +3106,9 @@ func TestConnectionsKeepAlive(t *testing.T) { } func TestClient(t *testing.T) { - var mutexDeInitDone sync.Mutex - var mutexHBDone sync.Mutex - var mutexIdxDone sync.Mutex + var mutexDeInit sync.RWMutex + var mutexHB sync.RWMutex + var mutexIdx sync.RWMutex // sonic-host:device-test-event is a test event. // Events client will drop it on floor. @@ -3128,45 +3128,51 @@ func TestClient(t *testing.T) { mock1 := gomonkey.ApplyFunc(sdc.C_init_subs, func(use_cache bool) unsafe.Pointer { return nil - }) - defer mock1.Reset() + }) + defer mock1.Reset() mock2 := gomonkey.ApplyFunc(sdc.C_recv_evt, func(h unsafe.Pointer) (int, sdc.Evt_rcvd) { rc := (int)(0) var evt sdc.Evt_rcvd - mutexIdxDone.Lock() - defer mutexIdxDone.Unlock() - if event_index < len(events) { - evt = events[event_index] - event_index++ + mutexIdx.Lock() + current_index := event_index + mutexIdx.Unlock() + if current_index < len(events) { + evt = events[current_index] + mutexIdx.RLock() + event_index = current_index + 1 + mutexIdx.RUnlock() } else { time.Sleep(time.Millisecond * time.Duration(rcv_timeout)) rc = -1 } return rc, evt - }) - defer mock2.Reset() + }) + defer mock2.Reset() mock3 := gomonkey.ApplyFunc(sdc.Set_heartbeat, func(val int) { - mutexHBDone.Lock() - defer mutexHBDone.Unlock() + mutexHB.RLock() heartbeat = val + mutexHB.RUnlock() }) - - defer mock3.Reset() + defer mock3.Reset() mock4 := gomonkey.ApplyFunc(sdc.C_deinit_subs, func(h unsafe.Pointer) { - mutexDeInitDone.Lock() - defer mutexDeInitDone.Unlock() + mutexDeInit.RLock() deinit_done = true + mutexDeInit.RUnlock() }) - - defer mock4.Reset() + defer mock4.Reset() mock5 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Put", func(pq *queue.PriorityQueue, item ...queue.Item) error { return fmt.Errorf("Queue error") }) - defer mock5.Reset() + defer mock5.Reset() + + mock6 := gomonkey.ApplyMethod(reflect.TypeOf(&queue.PriorityQueue{}), "Len", func(pq *queue.PriorityQueue) int { + return 150000 // Max size for pending events in PQ is 102400 + }) + defer mock6.Reset() s := createServer(t, 8081) go runServer(t, s) @@ -3183,6 +3189,10 @@ func TestClient(t *testing.T) { pause int poll int } { + { + desc: "dropped event", + poll: 3, + }, { desc: "queue error", poll: 3, @@ -3194,27 +3204,39 @@ func TestClient(t *testing.T) { } sdc.C_init_subs(true) - var gotNotiMu sync.Mutex + + var mutexNoti sync.RWMutex + for testNum, tt := range tests { - mutexHBDone.Lock() + mutexHB.RLock() heartbeat = 0 - mutexHBDone.Unlock() - mutexIdxDone.Lock() + mutexHB.RUnlock() + + mutexIdx.RLock() event_index = 0 - mutexIdxDone.Unlock() + mutexIdx.RUnlock() + + mutexDeInit.RLock() deinit_done = false + mutexDeInit.RUnlock() + t.Run(tt.desc, func(t *testing.T) { c := client.New() defer c.Close() var gotNoti []string q.NotificationHandler = func(n client.Notification) error { - gotNotiMu.Lock() - defer gotNotiMu.Unlock() if nn, ok := n.(client.Update); ok { nn.TS = time.Unix(0, 200) str := fmt.Sprintf("%v", nn.Val) - gotNoti = append(gotNoti, str) + + mutexNoti.Lock() + currentNoti := gotNoti + mutexNoti.Unlock() + + mutexNoti.RLock() + gotNoti = append(currentNoti, str) + mutexNoti.RUnlock() } return nil } @@ -3228,31 +3250,38 @@ func TestClient(t *testing.T) { time.Sleep(time.Millisecond * 2000) - gotNotiMu.Lock() - // -1 to discount test event, which receiver would drop. - if testNum != 0 { + if testNum > 1 { + mutexNoti.Lock() + // -1 to discount test event, which receiver would drop. if (len(events) - 1) != len(gotNoti) { - fmt.Printf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) + t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1) } - mutexHBDone.Lock() + + mutexHB.Lock() if (heartbeat != HEARTBEAT_SET) { t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET) } - mutexHBDone.Unlock() + mutexHB.Unlock() + fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti)) + mutexNoti.Unlock() } - gotNotiMu.Unlock() }) + if testNum == 0 { + mock6.Reset() + } + + if testNum == 1 { mock5.Reset() } time.Sleep(time.Millisecond * 1000) - mutexDeInitDone.Lock() + mutexDeInit.Lock() if deinit_done == false { t.Errorf("Events client deinit *NOT* called.") } - mutexDeInitDone.Unlock() + mutexDeInit.Unlock() // t.Log("END of a TEST") } diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index 9c387cdb..224ccb4d 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -72,11 +72,11 @@ type EventClient struct { subs_handle unsafe.Pointer stopped int - stopMutex sync.Mutex + stopMutex sync.RWMutex // Stats counter counters map[string]uint64 - countersMutex sync.Mutex + countersMutex sync.RWMutex last_latencies [LATENCY_LIST_SIZE]uint64 last_latency_index int @@ -184,7 +184,9 @@ func compute_latency(evtc *EventClient) { total += v } } + evtc.countersMutex.RLock() evtc.counters[LATENCY] = (uint64) (total/LATENCY_LIST_SIZE/1000/1000) + evtc.countersMutex.RUnlock() } } @@ -201,18 +203,20 @@ func update_stats(evtc *EventClient) { * This helps add some initial pause before accessing DB * for existing values. */ - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - for evtc.stopped == 0 { + for !evtc.isStopped() { var val uint64 compute_latency(evtc) + + evtc.countersMutex.Lock() for _, val = range evtc.counters { if val != 0 { break } } + evtc.countersMutex.Unlock() + if val != 0 { break } @@ -221,7 +225,7 @@ func update_stats(evtc *EventClient) { /* Populate counters from DB for cumulative counters. */ - if evtc.stopped == 0 { + if !evtc.isStopped() { ns := sdcfg.GetDbDefaultNamespace() rclient = redis.NewClient(&redis.Options{ @@ -249,15 +253,20 @@ func update_stats(evtc *EventClient) { } /* Main running loop that updates DB */ - for evtc.stopped == 0 { + for !evtc.isStopped() { tmp_counters := make(map[string]uint64) // compute latency compute_latency(evtc) - for key, val := range evtc.counters { + evtc.countersMutex.Lock() + current_counters := evtc.counters + evtc.countersMutex.Unlock() + + for key, val := range current_counters { tmp_counters[key] = val + db_counters[key] } + tmp_counters[DROPPED] += evtc.last_errors if (wr_counters == nil) || !reflect.DeepEqual(tmp_counters, *wr_counters) { @@ -307,7 +316,7 @@ func C_deinit_subs(h unsafe.Pointer) { func get_events(evtc *EventClient) { defer evtc.wg.Done() - str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) + str_ptr := C.malloc(C.sizeof_char * C.size_t(EVENT_BUFFSZ)) defer C.free(unsafe.Pointer(str_ptr)) evt_ptr = (*C.event_receive_op_C_t)(C.malloc(C.size_t(unsafe.Sizeof(C.event_receive_op_C_t{})))) @@ -322,9 +331,13 @@ func get_events(evtc *EventClient) { if rc == 0 { evtc.countersMutex.Lock() - evtc.counters[MISSED] += (uint64)(evt.Missed_cnt) + current_missed_cnt := evtc.counters[MISSED] evtc.countersMutex.Unlock() + evtc.countersMutex.RLock() + evtc.counters[MISSED] = current_missed_cnt + (uint64)(evt.Missed_cnt) + evtc.countersMutex.RUnlock() + if !strings.HasPrefix(evt.Event_str, TEST_EVENT) { qlen := evtc.q.Len() @@ -346,13 +359,17 @@ func get_events(evtc *EventClient) { log.V(1).Infof("Invalid event string: %v", evt.Event_str) } } else { - evtc.counters[DROPPED] += 1 + evtc.countersMutex.Lock() + dropped_cnt := evtc.counters[DROPPED] + evtc.countersMutex.Unlock() + + evtc.countersMutex.RLock() + evtc.counters[DROPPED] = dropped_cnt + 1 + evtc.countersMutex.RUnlock() } } } - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - if evtc.stopped == 1 { + if evtc.isStopped() { break } // TODO: Record missed count in stats table. @@ -362,9 +379,9 @@ func get_events(evtc *EventClient) { C_deinit_subs(evtc.subs_handle) evtc.subs_handle = nil // set evtc.stopped for case where send_event error and channel was not stopped - evtc.stopMutex.Lock() + evtc.stopMutex.RLock() evtc.stopped = 1 - evtc.stopMutex.Unlock() + evtc.stopMutex.RUnlock() } func send_event(evtc *EventClient, tv *gnmipb.TypedValue, @@ -396,18 +413,25 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w go update_stats(evtc) evtc.wg.Add(1) - evtc.stopMutex.Lock() - defer evtc.stopMutex.Unlock() - for evtc.stopped == 0 { + for !evtc.isStopped() { select { case <-evtc.channel: + evtc.stopMutex.RLock() evtc.stopped = 1 + evtc.stopMutex.RUnlock() log.V(3).Infof("Channel closed by client") return } } } +func (evtc *EventClient) isStopped() bool { + evtc.stopMutex.Lock() + val := evtc.stopped + evtc.stopMutex.Unlock() + return val == 1 +} + func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) { return nil, nil From 87d8eb3d75e6361f0709655a74342a4137d83798 Mon Sep 17 00:00:00 2001 From: Sachin Holla <51310506+sachinholla@users.noreply.github.com> Date: Fri, 16 Jun 2023 10:37:30 +0530 Subject: [PATCH 2/4] TranslClient: use PathValidator to sanitize the request paths (#112) * TranslClient: use PathValidator to sanitize the request paths TranslClient now uses the PathValidator utility to preprocess get, set and subscribe request paths before calling translib APIs. PathValidator is setup to performs following actions: * Verify the get/set/subscribe paths are known to translib * Insert missing module name prefixes in get/set/subscribe paths. gNMI allows the clients to omit them but translib APIs need them * Insert missing wildcard keys in subscribe paths Signed-off-by: Sachin Holla * Gotests for TranslClient based write cases - Set ENABLE_TRANSLIB_WRITE=y during pipeline tests to enable all TranslClient based write tests. They were skipped earlier - Setup correct translib test env in makefile 'check_gotest' target - Fixed path & payload of existing TranslClient based write tests - Added more get and set tests to cover path validator calls with prefixed and unprefixed paths - Skip unsupported gNOI tests --------- Signed-off-by: Sachin Holla --- Makefile | 20 ++- azure-pipelines.yml | 2 +- gnmi_server/client_subscribe.go | 2 +- gnmi_server/server_test.go | 230 +++++++++++++++++++----- sonic_data_client/transl_data_client.go | 33 ++-- tools/test/env.sh | 4 +- transl_utils/transl_utils.go | 129 ++++++------- 7 files changed, 288 insertions(+), 132 deletions(-) diff --git a/Makefile b/Makefile index e19bb496..24efbe55 100644 --- a/Makefile +++ b/Makefile @@ -82,17 +82,25 @@ endif swsscommon_wrap: make -C swsscommon -check_gotest: +DBCONFG = $(DBDIR)/database_config.json +ENVFILE = build/test/env.txt +TESTENV = $(shell cat $(ENVFILE)) + +$(DBCONFG): testdata/database_config.json sudo mkdir -p ${DBDIR} sudo cp ./testdata/database_config.json ${DBDIR} - sudo mkdir -p /usr/models/yang || true - sudo find $(MGMT_COMMON_DIR)/models -name '*.yang' -exec cp {} /usr/models/yang/ \; + +$(ENVFILE): + mkdir -p $(@D) + tools/test/env.sh | grep -v DB_CONFIG_PATH | tee $@ + +check_gotest: $(DBCONFG) $(ENVFILE) sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-config.txt -covermode=atomic -v github.com/sonic-net/sonic-gnmi/sonic_db_config - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -coverprofile=coverage-dialcout.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/dialout/dialout_client + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -coverprofile=coverage-dialcout.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/dialout/dialout_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-data.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_data_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-dbus.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_service_client - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-translutils.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/transl_utils + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -coverprofile=coverage-translutils.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/transl_utils $(GO) get github.com/axw/gocov/... $(GO) get github.com/AlekSi/gocov-xml $(GO) mod vendor diff --git a/azure-pipelines.yml b/azure-pipelines.yml index afe10128..1fabbae8 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -154,7 +154,7 @@ stages: - script: | pushd sonic-gnmi - make check_gotest + make check_gotest ENABLE_TRANSLIB_WRITE=y displayName: "Test" - publish: $(Build.ArtifactStagingDirectory)/ diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index 85d0d193..bea9ca50 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -163,7 +163,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { dc, err = sdc.NewDbClient(paths, prefix) } else { /* For any other target or no target create new Transl Client. */ - dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions) + dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{}) } if err != nil { diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index 195d0242..76b0ddbe 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -228,7 +228,7 @@ func runTestGet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa textPbPath string, wantRetCode codes.Code, wantRespVal interface{}, valTest bool) { //var retCodeOk bool // Send request - + t.Helper() var pbPath pb.Path if err := proto.UnmarshalText(textPbPath, &pbPath); err != nil { t.Fatalf("error in unmarshaling path: %v %v", textPbPath, err) @@ -276,6 +276,9 @@ func runTestGet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa t.Fatalf("error in unmarshaling IETF JSON data to json container: %v", err) } var wantJSONStruct interface{} + if v, ok := wantRespVal.(string); ok { + wantRespVal = []byte(v) + } if err := json.Unmarshal(wantRespVal.([]byte), &wantJSONStruct); err != nil { t.Fatalf("error in unmarshaling IETF JSON data to json container: %v", err) } @@ -302,10 +305,12 @@ type op_t int const ( Delete op_t = 1 Replace op_t = 2 + Update op_t = 3 ) func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTarget string, textPbPath string, wantRetCode codes.Code, wantRespVal interface{}, attributeData string, op op_t) { + t.Helper() // Send request var pbPath pb.Path if err := proto.UnmarshalText(textPbPath, &pbPath); err != nil { @@ -313,21 +318,34 @@ func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa } req := &pb.SetRequest{} switch op { - case Replace: + case Replace, Update: prefix := pb.Path{Target: pathTarget} var v *pb.TypedValue v = &pb.TypedValue{ Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: extractJSON(attributeData)}} + data := []*pb.Update{{Path: &pbPath, Val: v}} req = &pb.SetRequest{ Prefix: &prefix, - Replace: []*pb.Update{&pb.Update{Path: &pbPath, Val: v}}, + } + if op == Replace { + req.Replace = data + } else { + req.Update = data } case Delete: req = &pb.SetRequest{ Delete: []*pb.Path{&pbPath}, } } + + runTestSetRaw(t, ctx, gClient, req, wantRetCode) +} + +func runTestSetRaw(t *testing.T, ctx context.Context, gClient pb.GNMIClient, req *pb.SetRequest, + wantRetCode codes.Code) { + t.Helper() + _, err := gClient.Set(ctx, req) gotRetStatus, ok := status.FromError(err) if !ok { @@ -340,6 +358,26 @@ func runTestSet(t *testing.T, ctx context.Context, gClient pb.GNMIClient, pathTa } } +// pathToPb converts string representation of gnmi path to protobuf format +func pathToPb(s string) string { + p, _ := ygot.StringToStructuredPath(s) + return proto.MarshalTextString(p) +} + +func removeModulePrefixFromPathPb(t *testing.T, s string) string { + t.Helper() + var p pb.Path + if err := proto.UnmarshalText(s, &p); err != nil { + t.Fatalf("error unmarshaling path: %v %v", s, err) + } + for _, ele := range p.Elem { + if k := strings.IndexByte(ele.Name, ':'); k != -1 { + ele.Name = ele.Name[k+1:] + } + } + return proto.MarshalTextString(&p) +} + func runServer(t *testing.T, s *Server) { //t.Log("Starting RPC server on address:", s.Address()) err := s.Serve() // blocks until close @@ -753,8 +791,6 @@ func TestGnmiSet(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - var emptyRespVal interface{} - tds := []struct { desc string pathTarget string @@ -765,29 +801,28 @@ func TestGnmiSet(t *testing.T) { operation op_t valTest bool }{ + { + desc: "Invalid path", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-interfaces:interfaces/interface[name=Ethernet4]/unknown"), + wantRetCode: codes.Unknown, + operation: Delete, + }, { desc: "Set OC Interface MTU", pathTarget: "OC_YANG", - textPbPath: ` - elem: elem: > - `, + textPbPath: pathToPb("openconfig-interfaces:interfaces/interface[name=Ethernet4]/config"), attributeData: "../testdata/set_interface_mtu.json", wantRetCode: codes.OK, - wantRespVal: emptyRespVal, - operation: Replace, - valTest: false, + operation: Update, }, { desc: "Set OC Interface IP", pathTarget: "OC_YANG", - textPbPath: ` - elem: elem: > elem: elem: > - `, + textPbPath: pathToPb("/openconfig-interfaces:interfaces/interface[name=Ethernet4]/subinterfaces/subinterface[index=0]/openconfig-if-ip:ipv4"), attributeData: "../testdata/set_interface_ipv4.json", wantRetCode: codes.OK, - wantRespVal: emptyRespVal, - operation: Replace, - valTest: false, + operation: Update, }, // { // desc: "Check OC Interface values set", @@ -807,19 +842,82 @@ func TestGnmiSet(t *testing.T) { `, attributeData: "", wantRetCode: codes.OK, - wantRespVal: emptyRespVal, operation: Delete, valTest: false, }, + { + desc: "Set OC Interface IPv6 (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/interfaces/interface[name=Ethernet0]/subinterfaces/subinterface[index=0]/ipv6/addresses/address"), + attributeData: `{"address": [{"ip": "150::1","config": {"ip": "150::1","prefix-length": 80}}]}`, + wantRetCode: codes.OK, + operation: Update, + }, + { + desc: "Delete OC Interface IPv6 (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/interfaces/interface[name=Ethernet0]/subinterfaces/subinterface[index=0]/ipv6/addresses/address[ip=150::1]"), + wantRetCode: codes.OK, + operation: Delete, + }, + { + desc: "Create ACL (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/acl/acl-sets/acl-set"), + attributeData: `{"acl-set": [{"name": "A001", "type": "ACL_IPV4", + "config": {"name": "A001", "type": "ACL_IPV4", "description": "hello, world!"}}]}`, + wantRetCode: codes.OK, + operation: Update, + }, + { + desc: "Verify Create ACL", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]/config/description"), + wantRespVal: `{"openconfig-acl:description": "hello, world!"}`, + wantRetCode: codes.OK, + valTest: true, + }, + { + desc: "Replace ACL Description (unprefixed path)", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]/config/description"), + attributeData: `{"description": "dummy"}`, + wantRetCode: codes.OK, + operation: Replace, + }, + { + desc: "Verify Replace ACL Description", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]/config/description"), + wantRespVal: `{"openconfig-acl:description": "dummy"}`, + wantRetCode: codes.OK, + valTest: true, + }, + { + desc: "Delete ACL", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]"), + wantRetCode: codes.OK, + operation: Delete, + }, + { + desc: "Verify Delete ACL", + pathTarget: "OC_YANG", + textPbPath: pathToPb("/openconfig-acl:acl/acl-sets/acl-set[name=A001][type=ACL_IPV4]"), + wantRetCode: codes.NotFound, + valTest: true, + }, } for _, td := range tds { if td.valTest == true { - // wait for 2 seconds for change to sync - time.Sleep(2 * time.Second) t.Run(td.desc, func(t *testing.T) { runTestGet(t, ctx, gClient, td.pathTarget, td.textPbPath, td.wantRetCode, td.wantRespVal, td.valTest) }) + t.Run(td.desc + " (unprefixed path)", func(t *testing.T) { + p := removeModulePrefixFromPathPb(t, td.textPbPath) + runTestGet(t, ctx, gClient, td.pathTarget, p, td.wantRetCode, td.wantRespVal, td.valTest) + }) } else { t.Run(td.desc, func(t *testing.T) { runTestSet(t, ctx, gClient, td.pathTarget, td.textPbPath, td.wantRetCode, td.wantRespVal, td.attributeData, td.operation) @@ -2589,7 +2687,9 @@ func TestGNOI(t *testing.T) { t.Fatalf("Invalid System Time %d", resp.Time) } }) + t.Run("SonicShowTechsupport", func(t *testing.T) { + t.Skip("Not supported yet") sc := sgpb.NewSonicServiceClient(conn) rtime := time.Now().AddDate(0, -1, 0) req := &sgpb.TechsupportRequest{ @@ -2624,6 +2724,7 @@ func TestGNOI(t *testing.T) { for _, v := range cfg_data { t.Run("SonicCopyConfig", func(t *testing.T) { + t.Skip("Not supported yet") sc := sgpb.NewSonicServiceClient(conn) req := &sgpb.CopyConfigRequest{ Input: &sgpb.CopyConfigRequest_Input{ @@ -2709,7 +2810,7 @@ func TestBulkSet(t *testing.T) { go runServer(t, s) defer s.s.Stop() - // prepareDb(t) + prepareDbTranslib(t) //t.Log("Start gNMI client") tlsConfig := &tls.Config{InsecureSkipVerify: true} @@ -2726,32 +2827,81 @@ func TestBulkSet(t *testing.T) { gClient := pb.NewGNMIClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + t.Run("Set Multiple mtu", func(t *testing.T) { - pbPath1, _ := xpath.ToGNMIPath("openconfig-interfaces:interfaces/interface[name=Ethernet0]/config/mtu") - v := &pb.TypedValue{ - Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: []byte("{\"mtu\": 9104}")}} - update1 := &pb.Update{ - Path: pbPath1, - Val: v, - } - pbPath2, _ := xpath.ToGNMIPath("openconfig-interfaces:interfaces/interface[name=Ethernet4]/config/mtu") - v2 := &pb.TypedValue{ - Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: []byte("{\"mtu\": 9105}")}} - update2 := &pb.Update{ - Path: pbPath2, - Val: v2, - } + req := &pb.SetRequest{ + Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}}, + Update: []*pb.Update{ + newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), + newPbUpdate("interface[name=Ethernet4]/config/mtu", `{"mtu": 9105}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.OK) + }) + + t.Run("Update and Replace", func(t *testing.T) { + aclKeys := `"name": "A002", "type": "ACL_IPV4"` + req := &pb.SetRequest{ + Replace: []*pb.Update{ + newPbUpdate( + "openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{`+aclKeys+`, "config":{`+aclKeys+`}}]}`), + }, + Update: []*pb.Update{ + newPbUpdate( + "interfaces/interface[name=Ethernet0]/config/description", + `{"description": "Bulk update 1"}`), + newPbUpdate( + "openconfig-interfaces:interfaces/interface[name=Ethernet4]/config/description", + `{"description": "Bulk update 2"}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.OK) + }) + + aclPath1, _ := ygot.StringToStructuredPath("/acl/acl-sets") + aclPath2, _ := ygot.StringToStructuredPath("/openconfig-acl:acl/acl-sets") + t.Run("Multiple deletes", func(t *testing.T) { req := &pb.SetRequest{ - Update: []*pb.Update{update1, update2}, + Delete: []*pb.Path{aclPath1, aclPath2}, } + runTestSetRaw(t, ctx, gClient, req, codes.OK) + }) - _, err = gClient.Set(ctx, req) - _, ok := status.FromError(err) - if !ok { - t.Fatal("got a non-grpc error from grpc call") + t.Run("Invalid Update Path", func(t *testing.T) { + req := &pb.SetRequest{ + Delete: []*pb.Path{aclPath1, aclPath2}, + Update: []*pb.Update{ + newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.Unknown) + }) + + t.Run("Invalid Replace Path", func(t *testing.T) { + req := &pb.SetRequest{ + Delete: []*pb.Path{aclPath1, aclPath2}, + Replace: []*pb.Update{ + newPbUpdate("interface[name=Ethernet0]/config/mtu", `{"mtu": 9104}`), + }} + runTestSetRaw(t, ctx, gClient, req, codes.Unknown) + }) + + t.Run("Invalid Delete Path", func(t *testing.T) { + req := &pb.SetRequest{ + Prefix: &pb.Path{Elem: []*pb.PathElem{{Name: "interfaces"}}}, + Delete: []*pb.Path{aclPath1, aclPath2}, } + runTestSetRaw(t, ctx, gClient, req, codes.Unknown) }) + +} + +func newPbUpdate(path, value string) *pb.Update { + p, _ := ygot.StringToStructuredPath(path) + v := &pb.TypedValue_JsonIetfVal{JsonIetfVal: extractJSON(value)} + return &pb.Update{ + Path: p, + Val: &pb.TypedValue{Value: v}, + } } type loginCreds struct { diff --git a/sonic_data_client/transl_data_client.go b/sonic_data_client/transl_data_client.go index 08208523..b850d0cb 100644 --- a/sonic_data_client/transl_data_client.go +++ b/sonic_data_client/transl_data_client.go @@ -40,16 +40,24 @@ type TranslClient struct { extensions []*gnmi_extpb.Extension } -func NewTranslClient(prefix *gnmipb.Path, getpaths []*gnmipb.Path, ctx context.Context, extensions []*gnmi_extpb.Extension) (Client, error) { +func NewTranslClient(prefix *gnmipb.Path, getpaths []*gnmipb.Path, ctx context.Context, extensions []*gnmi_extpb.Extension, opts ...TranslClientOption) (Client, error) { var client TranslClient var err error client.ctx = ctx client.prefix = prefix client.extensions = extensions + if getpaths != nil { + var addWildcardKeys bool + for _, o := range opts { + if _, ok := o.(TranslWildcardOption); ok { + addWildcardKeys = true + } + } + client.path2URI = make(map[*gnmipb.Path]string) /* Populate GNMI path to REST URL map. */ - err = transutil.PopulateClientPaths(prefix, getpaths, &client.path2URI) + err = transutil.PopulateClientPaths(prefix, getpaths, &client.path2URI, addWildcardKeys) } if err != nil { @@ -99,7 +107,6 @@ func (c *TranslClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { func (c *TranslClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { rc, ctx := common_utils.GetContext(c.ctx) c.ctx = ctx - var uri string version := getBundleVersion(c.extensions) if version != nil { rc.BundleVersion = version @@ -109,19 +116,13 @@ func (c *TranslClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, upda return transutil.TranslProcessBulk(delete, replace, update, c.prefix, c.ctx) } else { if len(delete) == 1 { - /* Convert the GNMI Path to URI. */ - transutil.ConvertToURI(c.prefix, delete[0], &uri) - return transutil.TranslProcessDelete(uri, c.ctx) + return transutil.TranslProcessDelete(c.prefix, delete[0], c.ctx) } if len(replace) == 1 { - /* Convert the GNMI Path to URI. */ - transutil.ConvertToURI(c.prefix, replace[0].GetPath(), &uri) - return transutil.TranslProcessReplace(uri, replace[0].GetVal(), c.ctx) + return transutil.TranslProcessReplace(c.prefix, replace[0], c.ctx) } if len(update) == 1 { - /* Convert the GNMI Path to URI. */ - transutil.ConvertToURI(c.prefix, update[0].GetPath(), &uri) - return transutil.TranslProcessUpdate(uri, update[0].GetVal(), c.ctx) + return transutil.TranslProcessUpdate(c.prefix, update[0], c.ctx) } } return nil @@ -541,3 +542,11 @@ func getBundleVersion(extensions []*gnmi_extpb.Extension) *string { } return nil } + +type TranslClientOption interface { + IsTranslClientOption() +} + +type TranslWildcardOption struct{} + +func (t TranslWildcardOption) IsTranslClientOption() {} diff --git a/tools/test/env.sh b/tools/test/env.sh index 46209f14..a60b947e 100755 --- a/tools/test/env.sh +++ b/tools/test/env.sh @@ -2,6 +2,8 @@ set -e -. $(dirname ${BASH_SOURCE})/../../../sonic-mgmt-common/tools/test/env.sh \ +TOPDIR=$(realpath $(dirname ${BASH_SOURCE})/../..) + +. ${TOPDIR}/../sonic-mgmt-common/tools/test/env.sh \ --dest=${TOPDIR}/build/test \ --dbconfig-in=${TOPDIR}/testdata/database_config.json \ diff --git a/transl_utils/transl_utils.go b/transl_utils/transl_utils.go index b6b33857..2160fdbf 100644 --- a/transl_utils/transl_utils.go +++ b/transl_utils/transl_utils.go @@ -2,17 +2,19 @@ package transl_utils import ( "bytes" + "context" "encoding/json" - "strings" "fmt" + "log/syslog" + "strings" + + "github.com/Azure/sonic-mgmt-common/translib" + pathutil "github.com/Azure/sonic-mgmt-common/translib/path" + "github.com/Azure/sonic-mgmt-common/translib/tlerr" log "github.com/golang/glog" gnmipb "github.com/openconfig/gnmi/proto/gnmi" - "github.com/Azure/sonic-mgmt-common/translib" + "github.com/openconfig/ygot/ygot" "github.com/sonic-net/sonic-gnmi/common_utils" - "context" - "log/syslog" - "github.com/Azure/sonic-mgmt-common/translib/tlerr" - ) var ( @@ -58,55 +60,40 @@ func GnmiTranslFullPath(prefix, path *gnmipb.Path) *gnmipb.Path { } /* Populate the URI path corresponding GNMI paths. */ -func PopulateClientPaths(prefix *gnmipb.Path, paths []*gnmipb.Path, path2URI *map[*gnmipb.Path]string) error { - var req string - - /* Fetch the URI for each GET URI. */ +func PopulateClientPaths(prefix *gnmipb.Path, paths []*gnmipb.Path, path2URI *map[*gnmipb.Path]string, addWildcardKeys bool) error { + opts := []pathutil.PathValidatorOpt{ + &pathutil.AppendModulePrefix{}, + } + if addWildcardKeys { + opts = append(opts, &pathutil.AddWildcardKeys{}) + } for _, path := range paths { - ConvertToURI(prefix, path, &req) + req, err := ConvertToURI(prefix, path, opts...) + if err != nil { + return err + } (*path2URI)[path] = req } return nil } -/* Populate the URI path corresponding each GNMI paths. */ -func ConvertToURI(prefix *gnmipb.Path, path *gnmipb.Path, req *string) error { +// ConvertToURI returns translib path for a gnmi Path +func ConvertToURI(prefix, path *gnmipb.Path, opts ...pathutil.PathValidatorOpt) (string, error) { fullPath := path if prefix != nil { fullPath = GnmiTranslFullPath(prefix, path) } - elems := fullPath.GetElem() - *req = "/" - - if elems != nil { - /* Iterate through elements. */ - for i, elem := range elems { - log.V(6).Infof("index %d elem : %#v %#v", i, elem.GetName(), elem.GetKey()) - *req += elem.GetName() - key := elem.GetKey() - /* If no keys are present end the element with "/" */ - if key == nil { - *req += "/" - } - - /* If keys are present , process the keys. */ - if key != nil { - for k, v := range key { - log.V(6).Infof("elem : %#v %#v", k, v) - *req += "[" + k + "=" + v + "]" - } - - /* Append "/" after all keys are processed. */ - *req += "/" - } - } + if len(opts) == 0 { + opts = append(opts, &pathutil.AppendModulePrefix{}) + } + pv := pathutil.NewPathValidator(opts...) + if err := pv.Validate(fullPath); err != nil { + return "", err } - /* Trim the "/" at the end which is not required. */ - *req = strings.TrimSuffix(*req, "/") - return nil + return ygot.PathToString(fullPath) } /* Fill the values from TransLib. */ @@ -150,11 +137,14 @@ func TranslProcessGet(uriPath string, op *string, ctx context.Context) (*gnmipb. } /* Delete request handling. */ -func TranslProcessDelete(uri string, ctx context.Context) error { - var str3 string - payload := []byte(str3) +func TranslProcessDelete(prefix, delPath *gnmipb.Path, ctx context.Context) error { + uri, err := ConvertToURI(prefix, delPath) + if err != nil { + return err + } + rc, _ := common_utils.GetContext(ctx) - req := translib.SetRequest{Path:uri, Payload:payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} + req := translib.SetRequest{Path:uri, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} if rc.BundleVersion != nil { nver, err := translib.NewVersion(*rc.BundleVersion) if err != nil { @@ -176,13 +166,13 @@ func TranslProcessDelete(uri string, ctx context.Context) error { } /* Replace request handling. */ -func TranslProcessReplace(uri string, t *gnmipb.TypedValue, ctx context.Context) error { - /* Form the CURL request and send to client . */ - str := string(t.GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) +func TranslProcessReplace(prefix *gnmipb.Path, entry *gnmipb.Update, ctx context.Context) error { + uri, err := ConvertToURI(prefix, entry.GetPath()) + if err != nil { + return err + } - payload := []byte(str3) + payload := entry.GetVal().GetJsonIetfVal() rc, _ := common_utils.GetContext(ctx) req := translib.SetRequest{Path:uri, Payload:payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} if rc.BundleVersion != nil { @@ -208,13 +198,13 @@ func TranslProcessReplace(uri string, t *gnmipb.TypedValue, ctx context.Context) } /* Update request handling. */ -func TranslProcessUpdate(uri string, t *gnmipb.TypedValue, ctx context.Context) error { - /* Form the CURL request and send to client . */ - str := string(t.GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) +func TranslProcessUpdate(prefix *gnmipb.Path, entry *gnmipb.Update, ctx context.Context) error { + uri, err := ConvertToURI(prefix, entry.GetPath()) + if err != nil { + return err + } - payload := []byte(str3) + payload := entry.GetVal().GetJsonIetfVal() rc, _ := common_utils.GetContext(ctx) req := translib.SetRequest{Path:uri, Payload:payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}} if rc.BundleVersion != nil { @@ -266,12 +256,11 @@ func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update [ } } for _,d := range delete { - ConvertToURI(prefix, d, &uri) - var str3 string - payload := []byte(str3) + if uri, err = ConvertToURI(prefix, d); err != nil { + return err + } req := translib.SetRequest{ Path: uri, - Payload: payload, User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}, } if rc.BundleVersion != nil { @@ -284,11 +273,10 @@ func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update [ deleteUri = append(deleteUri, uri) } for _,r := range replace { - ConvertToURI(prefix, r.GetPath(), &uri) - str := string(r.GetVal().GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) - payload := []byte(str3) + if uri, err = ConvertToURI(prefix, r.GetPath()); err != nil { + return err + } + payload := r.GetVal().GetJsonIetfVal() req := translib.SetRequest{ Path: uri, Payload: payload, @@ -304,11 +292,10 @@ func TranslProcessBulk(delete []*gnmipb.Path, replace []*gnmipb.Update, update [ replaceUri = append(replaceUri, uri) } for _,u := range update { - ConvertToURI(prefix, u.GetPath(), &uri) - str := string(u.GetVal().GetJsonIetfVal()) - str3 := strings.Replace(str, "\n", "", -1) - log.V(2).Info("Incoming JSON body is", str) - payload := []byte(str3) + if uri, err = ConvertToURI(prefix, u.GetPath()); err != nil { + return err + } + payload := u.GetVal().GetJsonIetfVal() req := translib.SetRequest{ Path: uri, Payload: payload, From 214fa1c79575685719d2c3106eb8dd59f4c107fc Mon Sep 17 00:00:00 2001 From: Sachin Holla <51310506+sachinholla@users.noreply.github.com> Date: Sat, 17 Jun 2023 05:12:25 +0530 Subject: [PATCH 3/4] TranslClient: Use new translib subscription APIs (#122) * Allow data clients to send full gnmipb.Notification Added gnmipb.Notification as a member of the Value messgae object defined by sonic_internal.proto. Data clients now can fill a complete Notification object instead of timestamp, path, TypedValue separately. * Use origin to identify translib yang paths Use TranslClient to handle the subscribe requests when origin is "openconfig". Fallback to the existing target based identification logic if origin is not given. * Use new translib subscribe APIs in TranslClient StreamRun() enhancements: - Create a new SubscribeSession in the beginning and pass it to all further translib API calls - Call translib.IsSubscribeSupported() to reasolve preferences of the requested paths. This also splits the target_defined request into on_change and sample paths. - Use one translib.Subscribe() call passing all on_change enabled paths, if there are any. This will handle initial updates and subsequent on_change notifications. - Use one translib.Stream() call for each sample subscribe path, if present. This will be called in a loop at every sample interval. - Maintain the ygot objects received for each translib.Stream() call in a cache. Diff the current set of objects with the objects from the previous iteration to resolve deleted paths and modified values (when suppress_redundant is enabled). PollRun() and OnceRun() enhancements: - Call translib.Stream() with each subscribed path to generate notification data. - In poll mode, this is repeated when a poll message is received * Increase gnmi_server gotest timeout to 20m --- Makefile | 2 +- gnmi_server/client_subscribe.go | 34 +- gnmi_server/transl_sub_test.go | 937 ++++++++++++++++++++++++ go.mod | 1 + proto/sonic_internal.pb.go | 301 ++++++-- proto/sonic_internal.proto | 3 + sonic_data_client/db_client.go | 13 + sonic_data_client/transl_data_client.go | 520 ++++++------- sonic_data_client/transl_subscriber.go | 386 ++++++++++ 9 files changed, 1829 insertions(+), 368 deletions(-) create mode 100644 gnmi_server/transl_sub_test.go create mode 100644 sonic_data_client/transl_subscriber.go diff --git a/Makefile b/Makefile index 24efbe55..ab9578b3 100644 --- a/Makefile +++ b/Makefile @@ -96,7 +96,7 @@ $(ENVFILE): check_gotest: $(DBCONFG) $(ENVFILE) sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-config.txt -covermode=atomic -v github.com/sonic-net/sonic-gnmi/sonic_db_config - sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... + sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -race -timeout 20m -coverprofile=coverage-gnmi.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/gnmi_server -coverpkg ../... sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(TESTENV) $(GO) test -coverprofile=coverage-dialcout.txt -covermode=atomic -mod=vendor $(BLD_FLAGS) -v github.com/sonic-net/sonic-gnmi/dialout/dialout_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-data.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_data_client sudo CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CXXFLAGS="$(CGO_CXXFLAGS)" $(GO) test -race -coverprofile=coverage-dbus.txt -covermode=atomic -mod=vendor -v github.com/sonic-net/sonic-gnmi/sonic_service_client diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index bea9ca50..5d27177a 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -10,6 +10,7 @@ import ( log "github.com/golang/glog" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" sdc "github.com/sonic-net/sonic-gnmi/sonic_data_client" gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -128,23 +129,23 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { return grpc.Errorf(codes.InvalidArgument, "first message must be SubscriptionList: %q", query) } - var target string prefix := c.subscribe.GetPrefix() - if prefix == nil { - return grpc.Errorf(codes.Unimplemented, "No target specified in prefix") - } else { - target = prefix.GetTarget() - // TODO: add data client support for fetching non-db data - if target == "" { - return grpc.Errorf(codes.Unimplemented, "Empty target data not supported yet") - } - } + origin := prefix.GetOrigin() + target := prefix.GetTarget() paths, err := c.populateDbPathSubscrition(c.subscribe) if err != nil { return grpc.Errorf(codes.NotFound, "Invalid subscription path: %v %q", err, query) } + if o, err := ParseOrigin(paths); err != nil { + return err // origin conflict within paths + } else if len(origin) == 0 { + origin = o // Use origin from paths if not given in prefix + } else if len(o) != 0 && o != origin { + return status.Error(codes.InvalidArgument, "Origin conflict between prefix and paths") + } + if connectionKey, valid = connectionManager.Add(c.addr, query.String()); !valid { return grpc.Errorf(codes.Unavailable, "Server connections are at capacity.") } @@ -155,7 +156,18 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { mode := c.subscribe.GetMode() - if target == "OTHERS" { + log.V(3).Infof("mode=%v, origin=%q, target=%q", mode, origin, target) + + if origin == "openconfig" { + dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions, sdc.TranslWildcardOption{}) + } else if len(origin) != 0 { + return grpc.Errorf(codes.Unimplemented, "Unsupported origin: %s", origin) + } else if target == "" { + // This and subsequent conditions handle target based path identification + // when origin == "". As per the spec it should have been treated as "openconfig". + // But we take a deviation and stick to legacy logic for backward compatibility + return grpc.Errorf(codes.Unimplemented, "Empty target data not supported") + } else if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) diff --git a/gnmi_server/transl_sub_test.go b/gnmi_server/transl_sub_test.go new file mode 100644 index 00000000..4998ae22 --- /dev/null +++ b/gnmi_server/transl_sub_test.go @@ -0,0 +1,937 @@ +package gnmi + +import ( + "crypto/tls" + "fmt" + "path/filepath" + "reflect" + "strings" + "testing" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" + "github.com/golang/protobuf/proto" + "github.com/openconfig/gnmi/client" + gnmipath "github.com/openconfig/gnmi/path" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + extnpb "github.com/openconfig/gnmi/proto/gnmi_ext" + "github.com/openconfig/ygot/ygot" + spb "github.com/sonic-net/sonic-gnmi/proto" + dbconfig "github.com/sonic-net/sonic-gnmi/sonic_db_config" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" +) + +// This file contains subscription test cases for translib managed paths + +const ( + ONCE = gnmipb.SubscriptionList_ONCE + POLL = gnmipb.SubscriptionList_POLL + STREAM = gnmipb.SubscriptionList_STREAM + ON_CHANGE = gnmipb.SubscriptionMode_ON_CHANGE + SAMPLE = gnmipb.SubscriptionMode_SAMPLE + TARGET_DEFINED = gnmipb.SubscriptionMode_TARGET_DEFINED +) + +func TestTranslSubscribe(t *testing.T) { + s := createServer(t, 8081) + go runServer(t, s) + defer s.s.Stop() + + prepareDbTranslib(t) + + t.Run("origin=openconfig", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.OK) + sub.Verify(client.Sync{}) + }) + + t.Run("origin=invalid", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("invalid:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + t.Run("origin=empty,target=empty", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + t.Run("origin in path", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("openconfig:/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.OK) + sub.Verify(client.Sync{}) + }) + + t.Run("origin conflict", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{Path: strToPath("xxx:/openconfig-acl:acl/acl-sets")}}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("origin conflict in paths", func(t *testing.T) { + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("openconfig:/openconfig-acl:acl/acl-sets")}, + {Path: strToPath("closeconfig:/openconfig-interfaces/interfaces")}, + }} + sub := doSubscribe(t, req, codes.Unimplemented) + sub.Verify() + }) + + acl1Path := "/openconfig-acl:acl/acl-sets/acl-set[name=ONE][type=ACL_IPV4]" + acl2Path := "/openconfig-acl:acl/acl-sets/acl-set[name=TWO][type=ACL_IPV4]" + + acl1CreatePb := newPbUpdate("/openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{"name": "ONE", "type": "ACL_IPV4", "config": {"name": "ONE", "type": "ACL_IPV4"}}]}`) + acl2CreatePb := newPbUpdate("/openconfig-acl:acl/acl-sets/acl-set", + `{"acl-set": [{"name": "TWO", "type": "ACL_IPV4", "config": {"name": "TWO", "type": "ACL_IPV4", "description": "foo"}}]}`) + acl2DescUpdatePb := newPbUpdate(acl2Path+"/config/description", `{"description": "new"}`) + + acl1DeletePb := strToPath(acl1Path) + acl2DeletePb := strToPath(acl2Path) + acl2DescDeletePb := strToPath(acl2Path + "/config/description") + aclAllDeletePb := strToPath("/openconfig-acl:acl/acl-sets") + + t.Run("ONCE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + doSet(t, acl1CreatePb) + + req := &gnmipb.SubscriptionList{ + Mode: ONCE, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-acl:acl/acl-sets/acl-set")}, + }} + + sub := doSubscribe(t, req, codes.OK) + sub.Verify( + Updated(acl1Path+"/name", "ONE"), + Updated(acl1Path+"/type", "ACL_IPV4"), + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + client.Sync{}, + ) + }) + + t.Run("POLL", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start POLL subscription for ACL config container") + req := &gnmipb.SubscriptionList{ + Mode: POLL, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-acl:acl/acl-sets/acl-set[name=*][type=*]/config")}, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACl1") + time.Sleep(2 * time.Second) + doSet(t, acl1CreatePb) + + t.Logf("Verify poll updates include ACL1 data") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + client.Sync{}, + ) + + t.Logf("Create ACL2") + time.Sleep(2 * time.Second) + doSet(t, acl2CreatePb) + + t.Logf("Verify poll updates include both ACL1 and ACL2 data") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + client.Sync{}, + ) + + t.Logf("Delete ACL2") + time.Sleep(2 * time.Second) + doSet(t, acl2DeletePb) + + t.Logf("Verify poll updates now include ACL1 data only") + sub.Poll() + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + client.Sync{}, + ) + }) + + t.Run("ONCHANGE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start ON_CHANGE subscription for ACL config container") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/acl-set[name=*][type=*]/config"), Mode: ON_CHANGE}, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify no initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify update notifications for ACL2 data") + sub.Verify( + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + ) + + t.Logf("Create ACL1 and delete description of ACL2") + doSet(t, acl1CreatePb, acl2DescDeletePb) + + t.Logf("Verify delete notification for ACL2 description and updates for ACL1 data") + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Deleted(acl2Path+"/config/description"), + ) + + t.Logf("Delete ACL1 and set description for ACL2") + doSet(t, acl2DescUpdatePb, acl1DeletePb) + + t.Logf("Verify delete for ACL1 and update for ACL2 description") + sub.Verify( + Deleted(acl1Path+"/config"), + Updated(acl2Path+"/config/description", "new"), + ) + }) + + t.Run("ONCHANGE_unsupported", func(t *testing.T) { + t.Logf("Try ON_CHANGE for the top interface list") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/openconfig-interfaces:interfaces/interface[name=*]"), Mode: ON_CHANGE}, + }} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + sampleInterval := 25 * time.Second + + t.Run("SAMPLE", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL1") + doSet(t, acl1CreatePb) + + t.Logf("Start SAMPLE subscription for ACL state container.. interval=%v", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set[name=*][type=*]/state"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates include ACL1 data only") + sub.Verify( + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + client.Sync{}, + ) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify updates include both ACL data, for 3 intervals") + for i := 1; i <= 3; i++ { + t.Logf("interval %d", i) + sub.VerifyT(sampleInterval - 3*time.Second) // check no notifications before the interval + sub.Verify( + Updated(acl1Path+"/state/name", "ONE"), + Updated(acl1Path+"/state/type", "ACL_IPV4"), + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + Updated(acl2Path+"/state/description", "foo"), + ) + } + + t.Logf("Delete ACL1 and description of ACL2") + doSet(t, acl1DeletePb, acl2DescDeletePb) + + t.Logf("Verify next iteration includes deletes and updates (for remaining ACL2 data)") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Deleted(acl1Path+"/state"), + Deleted(acl2Path+"/state/description"), + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + ) + + t.Logf("Verify next iteration has updates only") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Updated(acl2Path+"/state/name", "TWO"), + Updated(acl2Path+"/state/type", "ACL_IPV4"), + ) + }) + + t.Run("SAMPLE_suppress_redundant", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL1 and ACL2") + doSet(t, acl1CreatePb, acl2CreatePb) + + t.Logf("Start SAMPLE subscription for ACL config container.. interval=%v, suppress_redundant=true", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set[name=*][type=*]/config"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + SuppressRedundant: true, + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates") + sub.Verify( + Updated(acl1Path+"/config/name", "ONE"), + Updated(acl1Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/name", "TWO"), + Updated(acl2Path+"/config/type", "ACL_IPV4"), + Updated(acl2Path+"/config/description", "foo"), + client.Sync{}, + ) + + t.Logf("Verify next iteration has no data (due to suppress_redundant)") + sub.VerifyT(sampleInterval + 3*time.Second) + + t.Logf("Delete ACL1 and update ACL2 description") + doSet(t, acl1DeletePb, acl2DescUpdatePb) + + t.Logf("Verify next iteration includes deletes and updates for modified paths only") + sub.VerifyT( + sampleInterval+3*time.Second, + Deleted(acl1Path+"/config"), + Updated(acl2Path+"/config/description", "new"), + ) + + t.Logf("Delete ACL2 description") + doSet(t, acl2DescDeletePb) + + t.Logf("Verify next iteration includes description delete only") + sub.VerifyT( + sampleInterval+3*time.Second, + Deleted(acl2Path+"/config/description"), + ) + + t.Logf("Verify next iteration has no data") + sub.VerifyT(sampleInterval + 3*time.Second) + }) + + t.Run("SAMPLE_leaf", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Start SAMPLE subscription for ACL description.. interval=%v, updates_only=true", sampleInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + UpdatesOnly: true, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set/state/description"), + SampleInterval: uint64(sampleInterval.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates, due to updates_only") + sub.Verify(client.Sync{}) + + t.Logf("Verify next iteration has the description value") + sub.VerifyT(sampleInterval - 3*time.Second) // check no notifications before the interval + sub.Verify( + Updated(acl2Path+"/state/description", "foo"), + ) + + t.Logf("Update ACL2 description") + doSet(t, acl2DescUpdatePb) + + t.Logf("Verify next iteration has the updated description") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Updated(acl2Path+"/state/description", "new"), + ) + + t.Logf("Delete ACL2") + doSet(t, acl2DeletePb) + + t.Logf("Verify next iteration has delete notification") + sub.VerifyT(sampleInterval - 3*time.Second) + sub.Verify( + Deleted(acl2Path + "/state/description"), + ) + + t.Logf("Verify next iteration has no notifications") + sub.VerifyT(sampleInterval + 3*time.Second) + }) + + t.Run("SAMPLE_invalid_interval", func(t *testing.T) { + t.Logf("Try SAMPLE with 1ms SamplerInterval (too low)") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/openconfig-acl:acl/acl-sets"), + SampleInterval: uint64(time.Millisecond.Nanoseconds()), + }, + }} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("SAMPLE_no_interval", func(t *testing.T) { + defer doSet(t, aclAllDeletePb) + + t.Logf("Start SAMPLE subscription for ACL description.. without setting SampleInterval") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-acl:acl/acl-sets"), + Subscription: []*gnmipb.Subscription{ + { + Mode: SAMPLE, + Path: strToPath("/acl-set/state/description"), + }, + }} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify empty initial updates") + sub.Verify(client.Sync{}) + + t.Logf("Create ACL2") + doSet(t, acl2CreatePb) + + t.Logf("Verify updates are received after default interval") + sub.VerifyT( + (translib.MinSubscribeInterval+2)*time.Second, + Updated(acl2Path+"/state/description", "foo"), + ) + }) + + t.Run("TARGETDEFINED", func(t *testing.T) { + t.Logf("Start TARGETDEFINED subscription for interface description, in-pkts and in-octets") + interval := 30 * time.Second + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + { + Path: strToPath("/state/description"), + Mode: TARGET_DEFINED, + }, { + Path: strToPath("/state/counters/in-pkts"), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }, { + Path: strToPath("/state/counters/in-octets"), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates includes all three data") + eth0Path := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]" + sub.Verify( + Updated(eth0Path+"/state/description", ""), + Updated(eth0Path+"/state/counters/in-pkts", uint64(0)), + Updated(eth0Path+"/state/counters/in-octets", uint64(0)), + client.Sync{}, + ) + + next := time.Now().Add(interval) + + t.Logf("Update port description") + updateDb(t, DbDataMap{ + "CONFIG_DB": {"PORT|Ethernet0": {"description": "the one"}}, + "APPL_DB": {"PORT_TABLE:Ethernet0": {"description": "the one"}}, + }) + + t.Logf("Verify update notification for port description") + sub.Verify( + Updated(eth0Path+"/state/description", "the one"), + ) + + t.Logf("Verify periodic updates for stats only") + for i := 1; i <= 2; i++ { + sub.VerifyT(time.Until(next) - 3*time.Second) + sub.Verify( + Updated(eth0Path+"/state/counters/in-pkts", uint64(0)), + Updated(eth0Path+"/state/counters/in-octets", uint64(0)), + ) + next = time.Now().Add(interval) + } + }) + + t.Run("TARGETDEFINED_split", func(t *testing.T) { + interval := 30 * time.Second + eth0State := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]/state" + + t.Logf("Start TARGETDEFINED subscription for interface state container") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath(eth0State), + Mode: TARGET_DEFINED, + SampleInterval: uint64(interval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates includes nodes from both state and counters containers") + sub.GlobCompare = true + sub.Verify( + Updated(eth0State+"/counters/*", nil), + Updated(eth0State+"/*", nil), + client.Sync{}, + ) + + t.Logf("Verify next updates contains only counters data") + sub.VerifyT(interval - 2*time.Second) + sub.Verify( + Updated(eth0State+"/counters/*", nil), + ) + }) + + t.Run("hearbeat", func(t *testing.T) { + saInterval := 30 * time.Second + hbInterval := saInterval + 10*time.Second + + t.Logf("Start an ON_CHANGE and SAMPLE subscription with heartbeat %v", hbInterval) + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/openconfig-interfaces:interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + { + Path: strToPath("/config/enabled"), + Mode: SAMPLE, + SuppressRedundant: true, + SampleInterval: uint64(saInterval.Nanoseconds()), + HeartbeatInterval: uint64(hbInterval.Nanoseconds()), + }, { + Path: strToPath("/state/oper-status"), + Mode: ON_CHANGE, + HeartbeatInterval: uint64(hbInterval.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.OK) + + t.Logf("Verify initial updates contains both data") + eth0Path := "/openconfig-interfaces:interfaces/interface[name=Ethernet0]" + sub.Verify( + Updated(eth0Path+"/config/enabled", false), + Updated(eth0Path+"/state/oper-status", "DOWN"), + client.Sync{}, + ) + + t.Logf("Verify updates received only after heartbeat interval") + sub.VerifyT(hbInterval - 2*time.Second) + sub.Verify( + Updated(eth0Path+"/config/enabled", false), + Updated(eth0Path+"/state/oper-status", "DOWN"), + ) + }) + + t.Run("hearbeat_invalid (sample)", func(t *testing.T) { + t.Logf("Try a SAMPLE subscription with 1ms heartbeat") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath("/interfaces/interface/config/mtu"), + Mode: SAMPLE, + SuppressRedundant: true, + HeartbeatInterval: uint64(time.Millisecond.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("hearbeat_invalid (onchange)", func(t *testing.T) { + t.Logf("Try an ON_CHANGE subscription with 1ms heartbeat") + req := &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{{ + Path: strToPath("/interfaces/interface/config/mtu"), + Mode: ON_CHANGE, + HeartbeatInterval: uint64(time.Millisecond.Nanoseconds()), + }}} + sub := doSubscribe(t, req, codes.InvalidArgument) + sub.Verify() + }) + + t.Run("bundle_version_0.0.0", func(t *testing.T) { + t.Logf("Start a subscription with BundleVersion=0.0.0") + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{ + Subscribe: &gnmipb.SubscriptionList{ + Mode: STREAM, + Prefix: strToPath("openconfig:/interfaces/interface[name=Ethernet0]"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/config/mtu"), Mode: ON_CHANGE}, + {Path: strToPath("/state/mtu"), Mode: SAMPLE}, + }}}, + Extension: []*extnpb.Extension{newBundleVersion(t, "0.0.0")}, + } + sub := doSubscribeRaw(t, req, codes.OK) + sub.Verify( + Updated("/openconfig-interfaces:interfaces/interface[name=Ethernet0]/config/mtu", uint64(9100)), + Updated("/openconfig-interfaces:interfaces/interface[name=Ethernet0]/state/mtu", uint64(9100)), + client.Sync{}, + ) + }) + + t.Run("bundle_version_invalid", func(t *testing.T) { + t.Logf("Start POLL subscription with BundleVersion=100.0.0") + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{ + Subscribe: &gnmipb.SubscriptionList{ + Mode: POLL, + Prefix: strToPath("openconfig:/"), + Subscription: []*gnmipb.Subscription{ + {Path: strToPath("/interfaces/interface[name=Ethernet0]/config/mtu")}, + }}}, + Extension: []*extnpb.Extension{newBundleVersion(t, "100.0.0")}, + } + sub := doSubscribeRaw(t, req, codes.InvalidArgument) + sub.Verify() + }) +} + +func strToPath(s string) *gnmipb.Path { + var origin string + if k := strings.IndexByte(s, ':') + 1; k > 0 && k < len(s) && s[k] == '/' { + origin = s[:k-1] + s = s[k:] + } + p, _ := ygot.StringToStructuredPath(s) + p.Origin = origin + return p +} + +func strToCPath(s string) client.Path { + p := strToPath(s) + return gnmipath.ToStrings(p, false) +} + +func Updated(p string, v interface{}) client.Update { + return client.Update{Path: strToCPath(p), Val: v} +} + +func Deleted(p string) client.Delete { + return client.Delete{Path: strToCPath(p)} +} + +type testSubscriber struct { + t *testing.T + client *client.CacheClient + notiQ *queue.Queue + + GlobCompare bool // treat expected paths as glob patterns in Verify() +} + +func doSubscribe(t *testing.T, subReq *gnmipb.SubscriptionList, exStatus codes.Code) *testSubscriber { + t.Helper() + req := &gnmipb.SubscribeRequest{ + Request: &gnmipb.SubscribeRequest_Subscribe{Subscribe: subReq}} + return doSubscribeRaw(t, req, exStatus) +} + +func doSubscribeRaw(t *testing.T, req *gnmipb.SubscribeRequest, exStatus codes.Code) *testSubscriber { + t.Helper() + q, err := client.NewQuery(req) + if err != nil { + t.Fatalf("NewQuery failed: %v", err) + } + + sub := &testSubscriber{ + t: t, + client: client.New(), + notiQ: queue.New(100), + } + + t.Cleanup(sub.close) + + q.Addrs = []string{"127.0.0.1:8081"} + q.TLS = &tls.Config{InsecureSkipVerify: true} + q.NotificationHandler = func(n client.Notification) error { + //fmt.Printf(">>>> %#v\n", n) + return sub.notiQ.Put(n) + } + + go func() { + err = sub.client.Subscribe(context.Background(), q) + if _, ok := status.FromError(err); !ok || status.Code(err) != exStatus { + msg := fmt.Sprintf("Subscribe failed: expected=%v, received=%v", exStatus, err) + sub.notiQ.Put(client.NewError(msg)) + } else if err != nil { + sub.notiQ.Dispose() // got the expected error.. stop listening immediately + } + }() + + return sub +} + +func (sub *testSubscriber) close() { + if sub != nil { + sub.client.Close() + sub.notiQ.Dispose() + } +} + +func (sub *testSubscriber) Poll() { + if err := sub.client.Poll(); err != nil { + sub.t.Helper() + sub.t.Fatalf("Poll failed: %v", err) + } +} + +func (sub *testSubscriber) Verify(expect ...client.Notification) { + sub.VerifyT(5*time.Second, expect...) +} + +func (sub *testSubscriber) VerifyT(timeout time.Duration, expect ...client.Notification) { + sub.t.Helper() + extra := make([]client.Notification, 0) + matched := make(map[int]client.Notification) + deadine := time.Now().Add(timeout) + + for { + n := sub.nextNoti(deadine) + if n == nil { + break // timeout + } + if err, ok := n.(client.Error); ok { + sub.t.Fatal(err.Error()) + } + + index := -1 + for i, ex := range expect { + if sub.compareNoti(n, ex) { + index = i + break + } + } + if index != -1 { + matched[index] = n + } else { + extra = append(extra, n) + } + if _, ok := n.(client.Sync); ok { + break + } + if !sub.GlobCompare && (len(matched) == len(expect)) { + break + } + } + + // if len(matched) == len(expect) && len(extra) == 0 { + // return + // } + switch { + case len(extra) != 0: // found extra updates + case sub.GlobCompare && len(matched) == 0 && len(expect) != 0: // no glob matches found + case !sub.GlobCompare && len(matched) != len(expect): // wrong number of matches + default: + return + } + + for _, n := range extra { + sub.t.Errorf("unexpected: %#v", n) + } + for i, n := range expect { + if matched[i] == nil { + sub.t.Errorf("missing: %#v", n) + } + } + sub.t.FailNow() +} + +func (sub *testSubscriber) nextNoti(deadline time.Time) client.Notification { + sub.t.Helper() + timeout := time.Until(deadline) + if timeout <= 0 { + return nil + } + n, err := sub.notiQ.Poll(1, timeout) + if err == queue.ErrTimeout || err == queue.ErrDisposed { + return nil + } else if err != nil { + sub.t.Fatalf("Unexpected error while waiting for a notification: %v", err) + } + + switch noti := n[0].(type) { + case client.Update: + noti.TS = time.Time{} + return noti + case client.Delete: + noti.TS = time.Time{} + return noti + case client.Error: + sub.t.Fatalf("Unexpected error notification: %s", noti.Error()) + case client.Connected: + return sub.nextNoti(deadline) + } + + return n[0].(client.Notification) +} + +func (sub *testSubscriber) compareNoti(n, exp client.Notification) bool { + if !sub.GlobCompare { + return reflect.DeepEqual(n, exp) + } + + var path, expPath string + var val, expVal interface{} + switch exp := exp.(type) { + case client.Update: + if u, ok := n.(client.Update); ok { + path, val = pathToString(u.Path), u.Val + expPath, expVal = pathToString(exp.Path), exp.Val + } else { + return false + } + case client.Delete: + if d, ok := n.(client.Delete); ok { + path = pathToString(d.Path) + expPath = pathToString(exp.Path) + } else { + return false + } + default: + return reflect.DeepEqual(n, exp) + } + + if ok, _ := filepath.Match(expPath, path); !ok { + return false + } + return expVal == nil || reflect.DeepEqual(val, expVal) +} + +func doSet(t *testing.T, data ...interface{}) { + t.Helper() + req := &gnmipb.SetRequest{} + for _, v := range data { + switch v := v.(type) { + case *gnmipb.Path: + req.Delete = append(req.Delete, v) + case *gnmipb.Update: + req.Update = append(req.Update, v) + default: + t.Fatalf("Unsupported set value: %T %v", v, v) + } + } + + cred := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}) + conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithTransportCredentials(cred)) + if err != nil { + t.Fatalf("Could not create client: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + defer conn.Close() + + _, err = gnmipb.NewGNMIClient(conn).Set(ctx, req) + if err != nil { + t.Fatalf("Set failed: %v", err) + } +} + +// DbDataMap is a map[DBNAME]map[KEY]map[FIELD]VALUE +type DbDataMap map[string]map[string]map[string]interface{} + +func updateDb(t *testing.T, data DbDataMap) { + t.Helper() + for dbName, tableData := range data { + n := dbconfig.GetDbId(dbName, dbconfig.GetDbDefaultNamespace()) + redis := getRedisClientN(t, n, dbconfig.GetDbDefaultNamespace()) + defer redis.Close() + for key, fields := range tableData { + if fields == nil { + redis.Del(key) + continue + } + + modFields := make(map[string]interface{}) + delFields := make([]string, 0) + for n, v := range fields { + if v == nil { + delFields = append(delFields, n) + } else { + modFields[n] = v + } + } + + if len(modFields) != 0 { + redis.HMSet(key, modFields) + } + if len(delFields) != 0 { + redis.HDel(key, delFields...) + } + } + } +} + +func newBundleVersion(t *testing.T, version string) *extnpb.Extension { + t.Helper() + v, err := proto.Marshal(&spb.BundleVersion{Version: version}) + if err != nil { + t.Fatalf("Invalid version %s; err=%v", version, err) + } + ext := &extnpb.RegisteredExtension{Id: spb.BUNDLE_VERSION_EXT, Msg: v} + return &extnpb.Extension{Ext: &extnpb.Extension_RegisteredExt{RegisteredExt: ext}} +} diff --git a/go.mod b/go.mod index 8bdccb5b..69c0adb2 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20201110031124-69a78807bb2b google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.2.8 ) diff --git a/proto/sonic_internal.pb.go b/proto/sonic_internal.pb.go index 9d9edbd9..239d7286 100644 --- a/proto/sonic_internal.pb.go +++ b/proto/sonic_internal.pb.go @@ -1,17 +1,32 @@ +// sonic_internal.proto describes the message format used internally by SONiC + // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.6.1 // source: sonic_internal.proto package gnmi_sonic -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" -import gnmi "github.com/openconfig/gnmi/proto/gnmi" +import ( + proto "github.com/golang/protobuf/proto" + gnmi "github.com/openconfig/gnmi/proto/gnmi" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 type State int32 @@ -21,111 +36,251 @@ const ( State_RUNNING State = 2 ) -var State_name = map[int32]string{ - 0: "STOPPED", - 1: "INIT", - 2: "RUNNING", -} -var State_value = map[string]int32{ - "STOPPED": 0, - "INIT": 1, - "RUNNING": 2, +// Enum value maps for State. +var ( + State_name = map[int32]string{ + 0: "STOPPED", + 1: "INIT", + 2: "RUNNING", + } + State_value = map[string]int32{ + "STOPPED": 0, + "INIT": 1, + "RUNNING": 2, + } +) + +func (x State) Enum() *State { + p := new(State) + *p = x + return p } func (x State) String() string { - return proto.EnumName(State_name, int32(x)) + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (State) Descriptor() protoreflect.EnumDescriptor { + return file_sonic_internal_proto_enumTypes[0].Descriptor() +} + +func (State) Type() protoreflect.EnumType { + return &file_sonic_internal_proto_enumTypes[0] +} + +func (x State) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use State.Descriptor instead. +func (State) EnumDescriptor() ([]byte, []int) { + return file_sonic_internal_proto_rawDescGZIP(), []int{0} } -func (State) EnumDescriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } // Value is the message that reprents a stream of updates for a given path, used internally. type Value struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + // prefix used with path - Prefix *gnmi.Path `protobuf:"bytes,1,opt,name=prefix" json:"prefix,omitempty"` + Prefix *gnmi.Path `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` // The device specific, or path corresponding to a value. - Path *gnmi.Path `protobuf:"bytes,2,opt,name=path" json:"path,omitempty"` + Path *gnmi.Path `protobuf:"bytes,2,opt,name=path,proto3" json:"path,omitempty"` // timestamp for the corresponding value, nanoseconds since epoch. // If timestamp is not set the default will assume to // be the current system time. - Timestamp int64 `protobuf:"varint,3,opt,name=timestamp" json:"timestamp,omitempty"` - Val *gnmi.TypedValue `protobuf:"bytes,4,opt,name=val" json:"val,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Val *gnmi.TypedValue `protobuf:"bytes,4,opt,name=val,proto3" json:"val,omitempty"` // Indicate target has sent all values associated with the subscription // at least once. - SyncResponse bool `protobuf:"varint,5,opt,name=sync_response,json=syncResponse" json:"sync_response,omitempty"` + SyncResponse bool `protobuf:"varint,5,opt,name=sync_response,json=syncResponse,proto3" json:"sync_response,omitempty"` // fatal error happened. - Fatal string `protobuf:"bytes,6,opt,name=fatal" json:"fatal,omitempty"` + Fatal string `protobuf:"bytes,6,opt,name=fatal,proto3" json:"fatal,omitempty"` + // Notification to be used in place of 1-4 if present + Notification *gnmi.Notification `protobuf:"bytes,7,opt,name=notification,proto3" json:"notification,omitempty"` } -func (m *Value) Reset() { *m = Value{} } -func (m *Value) String() string { return proto.CompactTextString(m) } -func (*Value) ProtoMessage() {} -func (*Value) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} } +func (x *Value) Reset() { + *x = Value{} + if protoimpl.UnsafeEnabled { + mi := &file_sonic_internal_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Value) String() string { + return protoimpl.X.MessageStringOf(x) +} -func (m *Value) GetPrefix() *gnmi.Path { - if m != nil { - return m.Prefix +func (*Value) ProtoMessage() {} + +func (x *Value) ProtoReflect() protoreflect.Message { + mi := &file_sonic_internal_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Value.ProtoReflect.Descriptor instead. +func (*Value) Descriptor() ([]byte, []int) { + return file_sonic_internal_proto_rawDescGZIP(), []int{0} +} + +func (x *Value) GetPrefix() *gnmi.Path { + if x != nil { + return x.Prefix } return nil } -func (m *Value) GetPath() *gnmi.Path { - if m != nil { - return m.Path +func (x *Value) GetPath() *gnmi.Path { + if x != nil { + return x.Path } return nil } -func (m *Value) GetTimestamp() int64 { - if m != nil { - return m.Timestamp +func (x *Value) GetTimestamp() int64 { + if x != nil { + return x.Timestamp } return 0 } -func (m *Value) GetVal() *gnmi.TypedValue { - if m != nil { - return m.Val +func (x *Value) GetVal() *gnmi.TypedValue { + if x != nil { + return x.Val } return nil } -func (m *Value) GetSyncResponse() bool { - if m != nil { - return m.SyncResponse +func (x *Value) GetSyncResponse() bool { + if x != nil { + return x.SyncResponse } return false } -func (m *Value) GetFatal() string { - if m != nil { - return m.Fatal +func (x *Value) GetFatal() string { + if x != nil { + return x.Fatal } return "" } -func init() { - proto.RegisterType((*Value)(nil), "gnmi.sonic.Value") - proto.RegisterEnum("gnmi.sonic.State", State_name, State_value) -} - -func init() { proto.RegisterFile("sonic_internal.proto", fileDescriptor1) } - -var fileDescriptor1 = []byte{ - // 269 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x8f, 0x4f, 0x4b, 0xc3, 0x30, - 0x18, 0xc6, 0xcd, 0xfa, 0xc7, 0xed, 0x9d, 0x42, 0x09, 0x3b, 0x04, 0x11, 0x29, 0xf5, 0x52, 0x14, - 0x5a, 0xd1, 0xaf, 0xa0, 0x48, 0x2f, 0xb5, 0x64, 0xd5, 0xeb, 0xc8, 0x6a, 0xda, 0x06, 0xda, 0x24, - 0xb4, 0x99, 0xb8, 0x6f, 0xe8, 0xc7, 0x92, 0xa6, 0x03, 0x0f, 0xde, 0xf2, 0xfc, 0x9e, 0xdf, 0x03, - 0x79, 0x61, 0x33, 0x2a, 0x29, 0xaa, 0x9d, 0x90, 0x86, 0x0f, 0x92, 0x75, 0x89, 0x1e, 0x94, 0x51, - 0x18, 0x1a, 0xd9, 0x8b, 0xc4, 0x56, 0x57, 0x0f, 0x8d, 0x30, 0xed, 0x61, 0x9f, 0x54, 0xaa, 0x4f, - 0x95, 0xe6, 0xb2, 0x52, 0xb2, 0x16, 0x4d, 0x3a, 0x19, 0xa9, 0xb5, 0xe7, 0xa7, 0x5d, 0xd8, 0x1c, - 0xfd, 0x20, 0xf0, 0x3e, 0x58, 0x77, 0xe0, 0x38, 0x02, 0x5f, 0x0f, 0xbc, 0x16, 0xdf, 0x04, 0x85, - 0x28, 0x5e, 0x3f, 0x42, 0x62, 0xb5, 0x82, 0x99, 0x96, 0x9e, 0x1a, 0x7c, 0x03, 0xae, 0x66, 0xa6, - 0x25, 0x8b, 0x7f, 0x86, 0xe5, 0xf8, 0x1a, 0x56, 0x46, 0xf4, 0x7c, 0x34, 0xac, 0xd7, 0xc4, 0x09, - 0x51, 0xec, 0xd0, 0x3f, 0x80, 0x23, 0x70, 0xbe, 0x58, 0x47, 0x5c, 0x3b, 0x0e, 0xe6, 0x71, 0x79, - 0xd4, 0xfc, 0xd3, 0x7e, 0x80, 0x4e, 0x25, 0xbe, 0x85, 0xcb, 0xf1, 0x28, 0xab, 0xdd, 0xc0, 0x47, - 0xad, 0xe4, 0xc8, 0x89, 0x17, 0xa2, 0x78, 0x49, 0x2f, 0x26, 0x48, 0x4f, 0x0c, 0x6f, 0xc0, 0xab, - 0x99, 0x61, 0x1d, 0xf1, 0x43, 0x14, 0xaf, 0xe8, 0x1c, 0xee, 0xee, 0xc1, 0xdb, 0x1a, 0x66, 0x38, - 0x5e, 0xc3, 0xf9, 0xb6, 0x7c, 0x2b, 0x8a, 0x97, 0xe7, 0xe0, 0x0c, 0x2f, 0xc1, 0xcd, 0xf2, 0xac, - 0x0c, 0xd0, 0x84, 0xe9, 0x7b, 0x9e, 0x67, 0xf9, 0x6b, 0xb0, 0xd8, 0xfb, 0xf6, 0xfc, 0xa7, 0xdf, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x1d, 0x16, 0xfb, 0x54, 0x01, 0x00, 0x00, +func (x *Value) GetNotification() *gnmi.Notification { + if x != nil { + return x.Notification + } + return nil +} + +var File_sonic_internal_proto protoreflect.FileDescriptor + +var file_sonic_internal_proto_rawDesc = []byte{ + 0x0a, 0x14, 0x73, 0x6f, 0x6e, 0x69, 0x63, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x73, 0x6f, 0x6e, + 0x69, 0x63, 0x1a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, + 0x70, 0x65, 0x6e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2f, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x80, 0x02, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x22, + 0x0a, 0x06, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, + 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x52, 0x06, 0x70, 0x72, 0x65, 0x66, + 0x69, 0x78, 0x12, 0x1e, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0a, 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x50, 0x61, 0x74, 0x68, 0x52, 0x04, 0x70, 0x61, + 0x74, 0x68, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x12, 0x22, 0x0a, 0x03, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, + 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x64, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, + 0x03, 0x76, 0x61, 0x6c, 0x12, 0x23, 0x0a, 0x0d, 0x73, 0x79, 0x6e, 0x63, 0x5f, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x73, 0x79, 0x6e, + 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x61, 0x74, + 0x61, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x66, 0x61, 0x74, 0x61, 0x6c, 0x12, + 0x36, 0x0a, 0x0c, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x6e, 0x6d, 0x69, 0x2e, 0x4e, 0x6f, 0x74, + 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0c, 0x6e, 0x6f, 0x74, 0x69, 0x66, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2a, 0x2b, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, + 0x04, 0x49, 0x4e, 0x49, 0x54, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, + 0x4e, 0x47, 0x10, 0x02, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_sonic_internal_proto_rawDescOnce sync.Once + file_sonic_internal_proto_rawDescData = file_sonic_internal_proto_rawDesc +) + +func file_sonic_internal_proto_rawDescGZIP() []byte { + file_sonic_internal_proto_rawDescOnce.Do(func() { + file_sonic_internal_proto_rawDescData = protoimpl.X.CompressGZIP(file_sonic_internal_proto_rawDescData) + }) + return file_sonic_internal_proto_rawDescData +} + +var file_sonic_internal_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_sonic_internal_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_sonic_internal_proto_goTypes = []interface{}{ + (State)(0), // 0: gnmi.sonic.State + (*Value)(nil), // 1: gnmi.sonic.Value + (*gnmi.Path)(nil), // 2: gnmi.Path + (*gnmi.TypedValue)(nil), // 3: gnmi.TypedValue + (*gnmi.Notification)(nil), // 4: gnmi.Notification +} +var file_sonic_internal_proto_depIdxs = []int32{ + 2, // 0: gnmi.sonic.Value.prefix:type_name -> gnmi.Path + 2, // 1: gnmi.sonic.Value.path:type_name -> gnmi.Path + 3, // 2: gnmi.sonic.Value.val:type_name -> gnmi.TypedValue + 4, // 3: gnmi.sonic.Value.notification:type_name -> gnmi.Notification + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_sonic_internal_proto_init() } +func file_sonic_internal_proto_init() { + if File_sonic_internal_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_sonic_internal_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Value); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_sonic_internal_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_sonic_internal_proto_goTypes, + DependencyIndexes: file_sonic_internal_proto_depIdxs, + EnumInfos: file_sonic_internal_proto_enumTypes, + MessageInfos: file_sonic_internal_proto_msgTypes, + }.Build() + File_sonic_internal_proto = out.File + file_sonic_internal_proto_rawDesc = nil + file_sonic_internal_proto_goTypes = nil + file_sonic_internal_proto_depIdxs = nil } diff --git a/proto/sonic_internal.proto b/proto/sonic_internal.proto index bb8b3402..6eef071d 100644 --- a/proto/sonic_internal.proto +++ b/proto/sonic_internal.proto @@ -31,4 +31,7 @@ message Value { // fatal error happened. string fatal = 6; + + // Notification to be used in place of 1-4 if present + gnmi.Notification notification = 7; } \ No newline at end of file diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index b9174e23..09a52d45 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -139,6 +139,13 @@ func (val Value) Compare(other queue.Item) int { return -1 } +func (val Value) GetTimestamp() int64 { + if n := val.GetNotification(); n != nil { + return n.GetTimestamp() + } + return val.Value.GetTimestamp() +} + type DbClient struct { prefix *gnmipb.Path pathG2S map[*gnmipb.Path][]tablePath @@ -365,6 +372,12 @@ func ValToResp(val Value) (*gnmipb.SubscribeResponse, error) { return nil, fmt.Errorf("%s", fatal) } + // In case the client returned a full gnmipb.Notification object + if n := val.GetNotification(); n != nil { + return &gnmipb.SubscribeResponse{ + Response: &gnmipb.SubscribeResponse_Update{Update: n}}, nil + } + return &gnmipb.SubscribeResponse{ Response: &gnmipb.SubscribeResponse_Update{ Update: &gnmipb.Notification{ diff --git a/sonic_data_client/transl_data_client.go b/sonic_data_client/transl_data_client.go index b850d0cb..21e63925 100644 --- a/sonic_data_client/transl_data_client.go +++ b/sonic_data_client/transl_data_client.go @@ -1,23 +1,26 @@ -// Package client provides a generic access layer for data available in system +// Package client provides a generic access layer for data available in system package client import ( - spb "github.com/sonic-net/sonic-gnmi/proto" - transutil "github.com/sonic-net/sonic-gnmi/transl_utils" + "context" + "fmt" + "reflect" + "runtime" + "sync" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" log "github.com/golang/glog" "github.com/golang/protobuf/proto" gnmipb "github.com/openconfig/gnmi/proto/gnmi" gnmi_extpb "github.com/openconfig/gnmi/proto/gnmi_ext" - "github.com/Workiva/go-datastructures/queue" - "sync" - "time" - "fmt" - "reflect" - "github.com/Azure/sonic-mgmt-common/translib" + "github.com/openconfig/ygot/ygot" "github.com/sonic-net/sonic-gnmi/common_utils" - "bytes" - "encoding/json" - "context" + spb "github.com/sonic-net/sonic-gnmi/proto" + transutil "github.com/sonic-net/sonic-gnmi/transl_utils" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -33,11 +36,14 @@ type TranslClient struct { channel chan struct{} q *queue.PriorityQueue - synced sync.WaitGroup // Control when to send gNMI sync_response - w *sync.WaitGroup // wait for all sub go routines to finish - mu sync.RWMutex // Mutex for data protection among routines for transl_client - ctx context.Context //Contains Auth info and request info + synced sync.WaitGroup // Control when to send gNMI sync_response + w *sync.WaitGroup // wait for all sub go routines to finish + mu sync.RWMutex // Mutex for data protection among routines for transl_client + ctx context.Context //Contains Auth info and request info extensions []*gnmi_extpb.Extension + + version *translib.Version // Client version; populated by parseVersion() + encoding gnmipb.Encoding } func NewTranslClient(prefix *gnmipb.Path, getpaths []*gnmipb.Path, ctx context.Context, extensions []*gnmi_extpb.Extension, opts ...TranslClientOption) (Client, error) { @@ -127,6 +133,7 @@ func (c *TranslClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, upda } return nil } + func enqueFatalMsgTranslib(c *TranslClient, msg string) { c.q.Put(Value{ &spb.Value{ @@ -135,375 +142,309 @@ func enqueFatalMsgTranslib(c *TranslClient, msg string) { }, }) } -type ticker_info struct{ - t *time.Ticker - sub *gnmipb.Subscription - heartbeat bool + +func enqueueSyncMessage(c *TranslClient) { + m := &spb.Value{ + Timestamp: time.Now().UnixNano(), + SyncResponse: true, + } + c.q.Put(Value{m}) +} + +// recoverSubscribe recovers from possible panics during subscribe handling. +// It pushes a fatal message to the RPC handler's queue, which forces the server to +// close the RPC with an error status. Should always be used as a deferred function. +func recoverSubscribe(c *TranslClient) { + if r := recover(); r != nil { + buff := make([]byte, 1<<12) + buff = buff[:runtime.Stack(buff, false)] + log.Error(string(buff)) + + err := status.Errorf(codes.Internal, "%v", r) + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + } +} + +type ticker_info struct { + t *time.Ticker + sub *gnmipb.Subscription + pathStr string + heartbeat bool +} + +func getTranslNotificationType(mode gnmipb.SubscriptionMode) translib.NotificationType { + switch mode { + case gnmipb.SubscriptionMode_ON_CHANGE: + return translib.OnChange + case gnmipb.SubscriptionMode_SAMPLE: + return translib.Sample + default: + return translib.TargetDefined + } +} + +func tickerCleanup(ticker_map map[int][]*ticker_info, c *TranslClient) { + for _, v := range ticker_map { + for _, ti := range v { + ti.t.Stop() + } + } } func (c *TranslClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) + c.q = q c.channel = stop - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + c.encoding = subscribe.Encoding + + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } ticker_map := make(map[int][]*ticker_info) + + defer tickerCleanup(ticker_map, c) var cases []reflect.SelectCase cases_map := make(map[int]int) var subscribe_mode gnmipb.SubscriptionMode - stringPaths := make([]string, len(subscribe.Subscription)) - for i,sub := range subscribe.Subscription { - stringPaths[i] = c.path2URI[sub.Path] + translPaths := make([]translib.IsSubscribePath, len(subscribe.Subscription)) + sampleCache := make(map[string]*ygotCache) + + for i, sub := range subscribe.Subscription { + translPaths[i].ID = uint32(i) + translPaths[i].Path = c.path2URI[sub.Path] + translPaths[i].Mode = getTranslNotificationType(sub.Mode) + } + + rc, _ := common_utils.GetContext(c.ctx) + ss := translib.NewSubscribeSession() + defer ss.Close() + + req := translib.IsSubscribeRequest{ + Paths: translPaths, + Session: ss, + User: translib.UserRoles{Name: rc.Auth.User, Roles: rc.Auth.Roles}, + } + if c.version != nil { + req.ClientVersion = *c.version } - req := translib.IsSubscribeRequest{Paths:stringPaths} - subSupport,_ := translib.IsSubscribeSupported(req) + + subSupport, err := translib.IsSubscribeSupported(req) + if err != nil { + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + return + } + var onChangeSubsString []string - var onChangeSubsgNMI []*gnmipb.Path - onChangeMap := make(map[string]*gnmipb.Path) - valueCache := make(map[string]string) - for i,sub := range subscribe.Subscription { - fmt.Println(sub.Mode, sub.SampleInterval) - switch sub.Mode { + for i, pInfo := range subSupport { + sub := subscribe.Subscription[pInfo.ID] + log.V(6).Infof("Start Sub: %v", sub) + pathStr := pInfo.Path + switch sub.Mode { case gnmipb.SubscriptionMode_TARGET_DEFINED: - - if subSupport[i].Err == nil && subSupport[i].IsOnChangeSupported { - if subSupport[i].PreferredType == translib.Sample { - subscribe_mode = gnmipb.SubscriptionMode_SAMPLE - } else if subSupport[i].PreferredType == translib.OnChange { - subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE - } + if pInfo.IsOnChangeSupported && pInfo.PreferredType == translib.OnChange { + subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE } else { subscribe_mode = gnmipb.SubscriptionMode_SAMPLE } - case gnmipb.SubscriptionMode_ON_CHANGE: - if subSupport[i].Err == nil && subSupport[i].IsOnChangeSupported { - if (subSupport[i].MinInterval > 0) { - subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE - }else{ - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid subscribe path %v", stringPaths[i])) - return - } + if pInfo.IsOnChangeSupported { + subscribe_mode = gnmipb.SubscriptionMode_ON_CHANGE } else { - enqueFatalMsgTranslib(c, fmt.Sprintf("ON_CHANGE Streaming mode invalid for %v", stringPaths[i])) + enqueFatalMsgTranslib(c, fmt.Sprintf("ON_CHANGE Streaming mode invalid for %v", pathStr)) return } case gnmipb.SubscriptionMode_SAMPLE: - if (subSupport[i].MinInterval > 0) { - subscribe_mode = gnmipb.SubscriptionMode_SAMPLE - }else{ - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid subscribe path %v", stringPaths[i])) - return - } + subscribe_mode = gnmipb.SubscriptionMode_SAMPLE default: - log.V(1).Infof("Bad Subscription Mode for client %v ", c) enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Subscription Mode %d", sub.Mode)) return } - fmt.Println("subscribe_mode:", subscribe_mode) + + if pInfo.MinInterval <= 0 { // should not happen + pInfo.MinInterval = translib.MinSubscribeInterval + } + + if hb := sub.HeartbeatInterval; hb > 0 && hb < uint64(pInfo.MinInterval)*uint64(time.Second) { + enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", + sub.HeartbeatInterval/uint64(time.Second), subSupport[i].MinInterval)) + return + } + + log.V(6).Infof("subscribe_mode %v for path %s", subscribe_mode, pathStr) if subscribe_mode == gnmipb.SubscriptionMode_SAMPLE { interval := int(sub.SampleInterval) + minInterval := pInfo.MinInterval * int(time.Second) if interval == 0 { - interval = subSupport[i].MinInterval * int(time.Second) - } else { - if interval < (subSupport[i].MinInterval*int(time.Second)) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Sample Interval %ds, minimum interval is %ds", interval/int(time.Second), subSupport[i].MinInterval)) - return - } + interval = minInterval + } else if interval < minInterval { + enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid SampleInterval %ds, minimum interval is %ds", interval/int(time.Second), pInfo.MinInterval)) + return } - if !subscribe.UpdatesOnly { - //Send initial data now so we can send sync response. - val, err := transutil.TranslProcessGet(c.path2URI[sub.Path], nil, c.ctx) - if err != nil { - return - } - spbv := &spb.Value{ - Prefix: c.prefix, - Path: sub.Path, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - c.q.Put(Value{spbv}) - valueCache[c.path2URI[sub.Path]] = string(val.GetJsonIetfVal()) + + reqPath, _ := ygot.StringToStructuredPath(pathStr) + yCache := newYgotCache(reqPath) + sampleCache[pathStr] = yCache + ts := translSubscriber{ + client: c, + session: ss, + sampleCache: yCache, + filterMsgs: subscribe.UpdatesOnly, + } + + // Force ignore init updates for subpaths to prevent duplicates. + // But performs duplicate gets though -- needs optimization. + if pInfo.IsSubPath { + ts.filterMsgs = true } - addTimer(c, ticker_map, &cases, cases_map, interval, sub, false) + // do initial sync & build the cache + ts.doSample(pathStr) + addTimer(c, ticker_map, &cases, cases_map, interval, sub, pathStr, false) //Heartbeat intervals are valid for SAMPLE in the case suppress_redundant is specified if sub.SuppressRedundant && sub.HeartbeatInterval > 0 { - if int(sub.HeartbeatInterval) < subSupport[i].MinInterval * int(time.Second) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", int(sub.HeartbeatInterval)/int(time.Second), subSupport[i].MinInterval)) - return - } - addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, true) + addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, pathStr, true) } } else if subscribe_mode == gnmipb.SubscriptionMode_ON_CHANGE { - onChangeSubsString = append(onChangeSubsString, c.path2URI[sub.Path]) - onChangeSubsgNMI = append(onChangeSubsgNMI, sub.Path) - onChangeMap[c.path2URI[sub.Path]] = sub.Path + onChangeSubsString = append(onChangeSubsString, pathStr) if sub.HeartbeatInterval > 0 { - if int(sub.HeartbeatInterval) < subSupport[i].MinInterval * int(time.Second) { - enqueFatalMsgTranslib(c, fmt.Sprintf("Invalid Heartbeat Interval %ds, minimum interval is %ds", int(sub.HeartbeatInterval)/int(time.Second), subSupport[i].MinInterval)) - return - } - addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, true) + addTimer(c, ticker_map, &cases, cases_map, int(sub.HeartbeatInterval), sub, pathStr, true) } - } + log.V(6).Infof("End Sub: %v", sub) } - if len(onChangeSubsString) > 0 { - c.w.Add(1) - c.synced.Add(1) - go TranslSubscribe(onChangeSubsgNMI, onChangeSubsString, onChangeMap, c, subscribe.UpdatesOnly) + if len(onChangeSubsString) > 0 { + ts := translSubscriber{ + client: c, + session: ss, + filterMsgs: subscribe.UpdatesOnly, + } + ts.doOnChange(onChangeSubsString) + } else { + // If at least one ON_CHANGE subscription was present, then + // ts.doOnChange() would have sent the sync message. + // Explicitly send one here if all are SAMPLE subscriptions. + enqueueSyncMessage(c) } - // Wait until all data values corresponding to the path(s) specified - // in the SubscriptionList has been transmitted at least once - c.synced.Wait() - spbs := &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - } - c.q.Put(Value{spbs}) + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.channel)}) for { chosen, _, ok := reflect.Select(cases) - - if !ok { return } - for _,tick := range ticker_map[cases_map[chosen]] { - fmt.Printf("tick, heartbeat: %t, path: %s", tick.heartbeat, c.path2URI[tick.sub.Path]) - val, err := transutil.TranslProcessGet(c.path2URI[tick.sub.Path], nil, c.ctx) - if err != nil { - return - } - spbv := &spb.Value{ - Prefix: c.prefix, - Path: tick.sub.Path, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - - - if (tick.sub.SuppressRedundant) && (!tick.heartbeat) && (string(val.GetJsonIetfVal()) == valueCache[c.path2URI[tick.sub.Path]]) { - log.V(6).Infof("Redundant Message Suppressed #%v", string(val.GetJsonIetfVal())) - } else { - c.q.Put(Value{spbv}) - valueCache[c.path2URI[tick.sub.Path]] = string(val.GetJsonIetfVal()) - log.V(6).Infof("Added spbv #%v", spbv) + for _, tick := range ticker_map[cases_map[chosen]] { + log.V(6).Infof("tick, heartbeat: %t, path: %s\n", tick.heartbeat, c.path2URI[tick.sub.Path]) + ts := translSubscriber{ + client: c, + session: ss, + sampleCache: sampleCache[tick.pathStr], + filterDups: (!tick.heartbeat && tick.sub.SuppressRedundant), } - - + ts.doSample(tick.pathStr) } } } -func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]reflect.SelectCase, cases_map map[int]int, interval int, sub *gnmipb.Subscription, heartbeat bool) { +func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]reflect.SelectCase, cases_map map[int]int, interval int, sub *gnmipb.Subscription, pathStr string, heartbeat bool) { //Reuse ticker for same sample intervals, otherwise create a new one. if ticker_map[interval] == nil { ticker_map[interval] = make([]*ticker_info, 1, 1) - ticker_map[interval][0] = &ticker_info { - t: time.NewTicker(time.Duration(interval) * time.Nanosecond), - sub: sub, + ticker_map[interval][0] = &ticker_info{ + t: time.NewTicker(time.Duration(interval) * time.Nanosecond), + sub: sub, + pathStr: pathStr, heartbeat: heartbeat, } cases_map[len(*cases)] = interval *cases = append(*cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ticker_map[interval][0].t.C)}) - }else { - ticker_map[interval] = append(ticker_map[interval], &ticker_info { - t: ticker_map[interval][0].t, - sub: sub, + } else { + ticker_map[interval] = append(ticker_map[interval], &ticker_info{ + t: ticker_map[interval][0].t, + sub: sub, + pathStr: pathStr, heartbeat: heartbeat, }) } - - -} - -func TranslSubscribe(gnmiPaths []*gnmipb.Path, stringPaths []string, pathMap map[string]*gnmipb.Path, c *TranslClient, updates_only bool) { - defer c.w.Done() - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx - q := queue.NewPriorityQueue(1, false) - var sync_done bool - req := translib.SubscribeRequest{Paths:stringPaths, Q:q, Stop:c.channel} - if rc.BundleVersion != nil { - nver, err := translib.NewVersion(*rc.BundleVersion) - if err != nil { - log.V(2).Infof("Subscribe operation failed with error =%v", err.Error()) - enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) - return - } - req.ClientVersion = nver - } - translib.Subscribe(req) - for { - items, err := q.Get(1) - if err != nil { - log.V(1).Infof("%v", err) - return - } - switch v := items[0].(type) { - case *translib.SubscribeResponse: - - if v.IsTerminated { - //DB Connection or other backend error - enqueFatalMsgTranslib(c, "DB Connection Error") - close(c.channel) - return - } - - var jv []byte - dst := new(bytes.Buffer) - json.Compact(dst, v.Payload) - jv = dst.Bytes() - - /* Fill the values into GNMI data structures . */ - val := &gnmipb.TypedValue{ - Value: &gnmipb.TypedValue_JsonIetfVal{ - JsonIetfVal: jv, - }} - - spbv := &spb.Value{ - Prefix: c.prefix, - Path: pathMap[v.Path], - Timestamp: v.Timestamp, - SyncResponse: false, - Val: val, - } - - //Don't send initial update with full object if user wants updates only. - if updates_only && !sync_done { - log.V(1).Infof("Msg suppressed due to updates_only") - } else { - c.q.Put(Value{spbv}) - } - - log.V(6).Infof("Added spbv #%v", spbv) - - if v.SyncComplete && !sync_done { - fmt.Println("SENDING SYNC") - c.synced.Done() - sync_done = true - } - default: - log.V(1).Infof("Unknown data type %v for %v in queue", items[0], v) - } - } } - - func (c *TranslClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) c.q = q c.channel = poll - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + c.encoding = subscribe.Encoding + + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } + synced := false for { _, more := <-c.channel if !more { log.V(1).Infof("%v poll channel closed, exiting pollDb routine", c) + enqueFatalMsgTranslib(c, "") return } + t1 := time.Now() - for gnmiPath, URIPath := range c.path2URI { + for _, gnmiPath := range c.path2URI { if synced || !subscribe.UpdatesOnly { - val, err := transutil.TranslProcessGet(URIPath, nil, c.ctx) - if err != nil { - return - } - - spbv := &spb.Value{ - Prefix: c.prefix, - Path: gnmiPath, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - - c.q.Put(Value{spbv}) - log.V(6).Infof("Added spbv #%v", spbv) + ts := translSubscriber{client: c} + ts.doSample(gnmiPath) } } - c.q.Put(Value{ - &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - }, - }) + enqueueSyncMessage(c) synced = true log.V(4).Infof("Sync done, poll time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) } } + func (c *TranslClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - rc, ctx := common_utils.GetContext(c.ctx) - c.ctx = ctx c.w = w defer c.w.Done() + defer recoverSubscribe(c) + c.q = q c.channel = once + c.encoding = subscribe.Encoding - version := getBundleVersion(c.extensions) - if version != nil { - rc.BundleVersion = version + if err := c.parseVersion(); err != nil { + enqueFatalMsgTranslib(c, err.Error()) + return } + _, more := <-c.channel if !more { log.V(1).Infof("%v once channel closed, exiting onceDb routine", c) + enqueFatalMsgTranslib(c, "") return } - t1 := time.Now() - for gnmiPath, URIPath := range c.path2URI { - val, err := transutil.TranslProcessGet(URIPath, nil, c.ctx) - if err != nil { - return - } - - if !subscribe.UpdatesOnly && val != nil { - spbv := &spb.Value{ - Prefix: c.prefix, - Path: gnmiPath, - Timestamp: time.Now().UnixNano(), - SyncResponse: false, - Val: val, - } - c.q.Put(Value{spbv}) - log.V(6).Infof("Added spbv #%v", spbv) - } + t1 := time.Now() + for _, gnmiPath := range c.path2URI { + ts := translSubscriber{client: c} + ts.doSample(gnmiPath) } - c.q.Put(Value{ - &spb.Value{ - Timestamp: time.Now().UnixNano(), - SyncResponse: true, - }, - }) + enqueueSyncMessage(c) log.V(4).Infof("Sync done, once time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) - + } func (c *TranslClient) Capabilities() []gnmipb.ModelData { @@ -527,22 +468,35 @@ func (c *TranslClient) SentOne(val *Value) { func (c *TranslClient) FailedSend() { } - func getBundleVersion(extensions []*gnmi_extpb.Extension) *string { - for _,e := range extensions { + for _, e := range extensions { switch v := e.Ext.(type) { - case *gnmi_extpb.Extension_RegisteredExt: - if v.RegisteredExt.Id == spb.BUNDLE_VERSION_EXT { - var bv spb.BundleVersion - proto.Unmarshal(v.RegisteredExt.Msg, &bv) - return &bv.Version - } - + case *gnmi_extpb.Extension_RegisteredExt: + if v.RegisteredExt.Id == spb.BUNDLE_VERSION_EXT { + var bv spb.BundleVersion + proto.Unmarshal(v.RegisteredExt.Msg, &bv) + return &bv.Version + } + } } return nil } +func (c *TranslClient) parseVersion() error { + bv := getBundleVersion(c.extensions) + if bv == nil { + return nil + } + v, err := translib.NewVersion(*bv) + if err != nil { + c.version = &v + return nil + } + log.V(4).Infof("Failed to parse version \"%s\"; err=%v", *bv, err) + return fmt.Errorf("Invalid bundle version: %v", *bv) +} + type TranslClientOption interface { IsTranslClientOption() } diff --git a/sonic_data_client/transl_subscriber.go b/sonic_data_client/transl_subscriber.go new file mode 100644 index 00000000..2b11658b --- /dev/null +++ b/sonic_data_client/transl_subscriber.go @@ -0,0 +1,386 @@ +//////////////////////////////////////////////////////////////////////////////// +// // +// Copyright 2021 Broadcom. The term Broadcom refers to Broadcom Inc. and/or // +// its subsidiaries. // +// // +// Licensed under the Apache License, Version 2.0 (the "License"); // +// you may not use this file except in compliance with the License. // +// You may obtain a copy of the License at // +// // +// http://www.apache.org/licenses/LICENSE-2.0 // +// // +// Unless required by applicable law or agreed to in writing, software // +// distributed under the License is distributed on an "AS IS" BASIS, // +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // +// See the License for the specific language governing permissions and // +// limitations under the License. // +// // +//////////////////////////////////////////////////////////////////////////////// + +package client + +import ( + "fmt" + "sync" + "time" + + "github.com/Azure/sonic-mgmt-common/translib" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/ygot/ygot" + spb "github.com/sonic-net/sonic-gnmi/proto" + "github.com/sonic-net/sonic-gnmi/transl_utils" +) + +// translSubscriber is an extension of TranslClient to service Subscribe RPC. +type translSubscriber struct { + client *TranslClient + session *translib.SubscribeSession + sampleCache *ygotCache // session cache for SAMPLE; optional + filterMsgs bool // Filter out messages till sync done (updates_only) + filterDups bool // Filter out duplicate updates (suppress_redundant) + stopOnSync bool // Stop upon sync message from translib + synced sync.WaitGroup // To signal receipt of sync message from translib + rcvdPaths map[string]bool // Paths from received SubscribeResponse + msgBuilder notificationBuilder +} + +// notificationBuilder creates gnmipb.Notification from a translib.SubscribeResponse +// instance. Input can be nil, indicating the end of current sample iteration. +type notificationBuilder func( + resp *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) + +// doSample invokes translib.Stream API to service SAMPLE, POLL and ONCE subscriptions. +// Timer for SAMPLE subscription should be managed outside. +func (ts *translSubscriber) doSample(path string) { + if ts.sampleCache != nil { + ts.msgBuilder = ts.sampleCache.msgBuilder // SAMPLE + ts.rcvdPaths = make(map[string]bool) + } else { + ts.msgBuilder = defaultMsgBuilder // ONCE, POLL or heartbeat for ON_CHANGE + } + + c := ts.client + req := translib.SubscribeRequest{ + Paths: []string{path}, + Q: queue.NewPriorityQueue(1, false), + Session: ts.session, + } + if c.version != nil { + req.ClientVersion = *c.version + } + + c.w.Add(1) + ts.synced.Add(1) + ts.stopOnSync = true + go ts.processResponses(req.Q) + + err := translib.Stream(req) + if err != nil { + req.Q.Dispose() + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error = %v", err)) + } + + ts.synced.Wait() +} + +// doOnChange handles the ON_CHANGE subscriptions through translib.Subscribe API. +// Returns only after initial updates and sync message are sent to the RPC queue. +func (ts *translSubscriber) doOnChange(stringPaths []string) { + c := ts.client + q := queue.NewPriorityQueue(1, false) + + req := translib.SubscribeRequest{ + Paths: stringPaths, + Q: q, + Stop: c.channel, + Session: ts.session, + } + if c.version != nil { + req.ClientVersion = *c.version + } + + c.w.Add(1) + ts.synced.Add(1) + ts.msgBuilder = defaultMsgBuilder + go ts.processResponses(q) + + err := translib.Subscribe(req) + if err != nil { + q.Dispose() + enqueFatalMsgTranslib(c, "Subscribe operation failed with error: "+err.Error()) + } + + ts.synced.Wait() +} + +// processResponses waits for SubscribeResponse messages from translib over a +// queue, formats them as spb.Value and pushes to the RPC queue. +func (ts *translSubscriber) processResponses(q *queue.PriorityQueue) { + c := ts.client + var syncDone bool + defer c.w.Done() + defer func() { + if !syncDone { + ts.synced.Done() + } + }() + defer recoverSubscribe(c) + + for { + items, err := q.Get(1) + if err == queue.ErrDisposed { + log.V(3).Info("PriorityQueue was disposed!") + return + } + if err != nil { + enqueFatalMsgTranslib(c, fmt.Sprintf("Subscribe operation failed with error =%v", err.Error())) + return + } + switch v := items[0].(type) { + case *translib.SubscribeResponse: + + if v.IsTerminated { + //DB Connection or other backend error + enqueFatalMsgTranslib(c, "DB Connection Error") + close(c.channel) + return + } + + if v.SyncComplete { + if ts.stopOnSync { + ts.notify(nil) + log.V(6).Infof("Stopping on sync signal from translib") + return + } + + log.V(6).Infof("SENDING SYNC") + enqueueSyncMessage(c) + syncDone = true + ts.synced.Done() + ts.filterMsgs = false + break + } + + if err := ts.notify(v); err != nil { + log.Warning(err) + enqueFatalMsgTranslib(c, "Internal error") + return + } + default: + log.V(1).Infof("Unknown data type %T in queue", v) + } + } +} + +func (ts *translSubscriber) notify(v *translib.SubscribeResponse) error { + msg, err := ts.msgBuilder(v, ts) + if err != nil { + return err + } + + if msg == nil || (len(msg.Update) == 0 && len(msg.Delete) == 0) { + log.V(6).Infof("Ignore nil message") + return nil + } + + spbv := &spb.Value{Notification: msg} + ts.client.q.Put(Value{spbv}) + log.V(6).Infof("Added spbv %#v", spbv) + return nil +} + +func (ts *translSubscriber) toPrefix(path string) *gnmipb.Path { + p, _ := ygot.StringToStructuredPath(path) + p.Target = ts.client.prefix.GetTarget() + p.Origin = ts.client.prefix.GetOrigin() + return p +} + +func defaultMsgBuilder(v *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) { + if v == nil { + return nil, nil + } + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + + p := ts.toPrefix(v.Path) + n := gnmipb.Notification{ + Prefix: p, + Timestamp: v.Timestamp, + } + + // Move last elem of v.Path to updates & deletes if v.Delete contains + // an empty path (to indicate the v.Path itself was deleted). + var extraPrefix *gnmipb.PathElem + if strSliceContains(v.Delete, "") { + extraPrefix = removeLastPathElem(p) + } + + if v.Update != nil { + var err error + n.Update, err = ts.ygotToScalarValues(extraPrefix, v.Update) + if err != nil { + return nil, err + } + } + + for _, del := range v.Delete { + p, err := ygot.StringToStructuredPath(del) + if err != nil { + return nil, err + } + insertFirstPathElem(p, extraPrefix) + n.Delete = append(n.Delete, p) + } + + return &n, nil +} + +// ygotToScalarValues returns scalar encoded values for a ygot object. +// If prefixElem is provided, it will be prefixed to each value's path. +func (ts *translSubscriber) ygotToScalarValues(prefixElem *gnmipb.PathElem, obj ygot.ValidatedGoStruct) ([]*gnmipb.Update, error) { + tmp, err := ygot.TogNMINotifications(obj, 0, + ygot.GNMINotificationsConfig{ + UsePathElem: true, + PathElemPrefix: nil, + Encoding: ts.client.encoding, + }) + if err != nil { + return nil, err + } + + updates := tmp[0].Update + if prefixElem != nil { + for _, u := range updates { + insertFirstPathElem(u.Path, prefixElem) + } + } + + return updates, nil +} + +// insertFirstPathElem inserts newElem at the beginning of path p. +func insertFirstPathElem(p *gnmipb.Path, newElem *gnmipb.PathElem) { + if newElem != nil { + ne := make([]*gnmipb.PathElem, 0, len(p.Elem)+1) + ne = append(ne, newElem) + p.Elem = append(ne, p.Elem...) + } +} + +// removeLastPathElem removes the last PathElem from the path p. +// Returns the removed element. +func removeLastPathElem(p *gnmipb.Path) *gnmipb.PathElem { + k := len(p.Elem) - 1 + if k < 0 { + return nil + } + if p.Element != nil { + p.Element = p.Element[:k] + } + last := p.Elem[k] + p.Elem = p.Elem[:k] + return last +} + +func strSliceContains(ss []string, v string) bool { + for _, s := range ss { + if s == v { + return true + } + } + return false +} + +// ygotCache holds path to ygot struct mappings +type ygotCache struct { + values map[string]ygot.GoStruct + pattern *gnmipb.Path // Prefix pattern for the cache keys +} + +// newYgotCache creates a new ygotCache instance +func newYgotCache(pattern *gnmipb.Path) *ygotCache { + return &ygotCache{ + values: make(map[string]ygot.GoStruct), + pattern: pattern, + } +} + +// msgBuilder is a notificationBuilder implementation to create a gnmipb.Notification +// message by comparing the SubscribeResponse.Update ygot struct to the cached value. +// Includes only modified or deleted leaf paths if translSubscriber.filterDups is set. +// Returns nil message if translSubscriber.filterMsgs is set or on error. +// Updates the cache with the new ygot struct (SubscribeResponse.Update). +// Special case: if SubscribeResponse is nil, calls deleteMsgBuilder to delete +// non-existing paths from the cache. +func (c *ygotCache) msgBuilder(v *translib.SubscribeResponse, ts *translSubscriber) (*gnmipb.Notification, error) { + if v == nil { + return c.deleteMsgBuilder(ts) + } + + old := c.values[v.Path] + c.values[v.Path] = v.Update + ts.rcvdPaths[v.Path] = true + log.V(2).Infof("%s updated; old=%p, new=%p, filterDups=%v", v.Path, old, v.Update, ts.filterDups) + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + + res, err := transl_utils.Diff(old, v.Update, + transl_utils.DiffOptions{ + RecordAll: !ts.filterDups, + }) + if err != nil { + return nil, err + } + + return &gnmipb.Notification{ + Timestamp: v.Timestamp, + Prefix: ts.toPrefix(v.Path), + Update: res.Update, + Delete: res.Delete, + }, nil +} + +// deleteMsgBuilder deletes the cache entries whose path does not appear in +// the translSubscriber.rcvdPaths map. Creates a gnmipb.Notification message +// for the deleted paths. Returns nil message if there are no such delete paths +// or translSubscriber.filterMsgs is set. +func (c *ygotCache) deleteMsgBuilder(ts *translSubscriber) (*gnmipb.Notification, error) { + if ts.filterMsgs { + log.V(2).Infof("Msg suppressed due to updates_only") + return nil, nil + } + var deletePaths []*gnmipb.Path + for path := range c.values { + if !ts.rcvdPaths[path] { + log.V(3).Infof("%s deleted", path) + deletePaths = append(deletePaths, c.toDeletePath(path)) + delete(c.values, path) + } + } + if len(deletePaths) == 0 { + return nil, nil + } + return &gnmipb.Notification{ + Timestamp: time.Now().UnixNano(), + Prefix: ts.toPrefix("/"), + Delete: deletePaths, + }, nil +} + +func (c *ygotCache) toDeletePath(path string) *gnmipb.Path { + p, _ := ygot.StringToStructuredPath(path) + // p will be parent container path when subscribing to a leaf. + // Append the leaf suffix to p if p is shorter than subscribe path. + if n := len(p.Elem); n < len(c.pattern.Elem) { + suffix := c.pattern.Elem[n:] + p.Elem = append(p.Elem, suffix...) + } + return p +} From fd78c42e85f7f21f906058ac7d0bdead408d0c3a Mon Sep 17 00:00:00 2001 From: Mai Bui Date: Mon, 19 Jun 2023 17:25:26 -0400 Subject: [PATCH 4/4] add semgrep (#126) **Why I did it** [Semgrep](https://github.com/returntocorp/semgrep) is a static analysis tool to find security vulnerabilities. When opening a PR or commtting to PR, Semgrep performs a diff-aware scanning, which scans changed files in PRs. When merging PR, Semgrep performs a full scan on master branch and report all findings. Ref: - [Supported Language](https://semgrep.dev/docs/supported-languages/#language-maturity) - [Semgrep Rules](https://registry.semgrep.dev/rule) **How I did it** Integrate Semgrep into this repository by committing a job configuration file --- .github/workflows/semgrep.yml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 .github/workflows/semgrep.yml diff --git a/.github/workflows/semgrep.yml b/.github/workflows/semgrep.yml new file mode 100644 index 00000000..975769a5 --- /dev/null +++ b/.github/workflows/semgrep.yml @@ -0,0 +1,22 @@ +name: Semgrep + +on: + pull_request: {} + push: + branches: + - master + - '201[7-9][0-1][0-9]' + - '202[0-9][0-1][0-9]' + +jobs: + semgrep: + if: github.repository_owner == 'sonic-net' + name: Semgrep + runs-on: ubuntu-latest + container: + image: returntocorp/semgrep + steps: + - uses: actions/checkout@v3 + - run: semgrep ci + env: + SEMGREP_RULES: p/default