Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of heartbeat interval in "ON_CHANGE" subscription #159

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ type subscriptionQuery struct {
Query []string
SubMode pb.SubscriptionMode
SampleInterval uint64
HeartbeatInterval uint64
}

func pathToString(q client.Path) string {
Expand Down Expand Up @@ -745,6 +746,7 @@ func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries []
Path: pp,
Mode: qq.SubMode,
SampleInterval: qq.SampleInterval,
HeartbeatInterval: qq.HeartbeatInterval,
})
}

Expand Down Expand Up @@ -792,14 +794,15 @@ func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query
}

// createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode.
func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query {
func createCountersDbQueryOnChangeMode(t *testing.T, interval time.Duration, paths ...string) client.Query {
return createQueryOrFail(t,
pb.SubscriptionList_STREAM,
"COUNTERS_DB",
[]subscriptionQuery{
{
Query: paths,
SubMode: pb.SubscriptionMode_ON_CHANGE,
HeartbeatInterval: uint64(interval.Nanoseconds()),
},
},
false)
Expand Down Expand Up @@ -1800,9 +1803,35 @@ func runTestSubscribe(t *testing.T, namespace string) {
generateIntervals bool
}
tests := []TestExec {
{
desc: "Testing invalid heartbeat interval",
q: createCountersDbQueryOnChangeMode(t, 10 * time.Second, "COUNTERS_PORT_NAME_MAP"),
wantSubErr: fmt.Errorf("rpc error: code = InvalidArgument desc = invalid heartbeat interval: 10s. It cannot be less than %v", sdc.MinHeartbeatInterval),
wantNoti: []client.Notification{},
},
{
desc: "stream query with Heartbeat interval for table key Ethernet68 with new test_field field",
q: createCountersDbQueryOnChangeMode(t, 30 * time.Second, "COUNTERS", "Ethernet68"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
tableName: "COUNTERS",
tableKey: "oid:0x1000000000039", // "Ethernet68": "oid:0x1000000000039",
delimitor: ":",
field: "test_field",
value: "test_value",
},
},
wantNoti: []client.Notification{
client.Connected{},
client.Update{Path: []string{"COUNTERS", "Ethernet68"}, TS: time.Unix(0, 200), Val: countersEthernet68Json},
client.Sync{},
client.Update{Path: []string{"COUNTERS", "Ethernet68"}, TS: time.Unix(0, 200), Val: countersEthernet68Json},
},
},
{
desc: "stream query for table COUNTERS_PORT_NAME_MAP with new test_field field",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS_PORT_NAME_MAP"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS_PORT_NAME_MAP"),
updates: []tablePathValue{{
dbName: "COUNTERS_DB",
tableName: "COUNTERS_PORT_NAME_MAP",
Expand All @@ -1818,7 +1847,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet68 with new test_field field",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1846,7 +1875,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "(use vendor alias) stream query for table key Ethernet68/1 with new test_field field",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1874,7 +1903,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for COUNTERS/Ethernet68/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1902,7 +1931,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1930,7 +1959,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for COUNTERS/Ethernet68/Pfcwd with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "Pfcwd"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "Pfcwd"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1958,7 +1987,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/Pfcwd with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "Pfcwd"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "Pfcwd"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1986,7 +2015,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet* with new test_field field on Ethernet68",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -2014,7 +2043,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet*/SAI_PORT_STAT_PFC_7_RX_PKTS with field value update",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand All @@ -2034,7 +2063,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet*/Pfcwd with field value update",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "Pfcwd"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "Pfcwd"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down
47 changes: 41 additions & 6 deletions sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client)
// Any non-zero value that less than this threshold is considered invalid argument.
var MinSampleInterval = time.Second

// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions.
// This is reserved value, which should be adjusted per BGPL benchmark result.
var MinHeartbeatInterval = 1 * time.Minute

// IntervalTicker is a factory method to implement interval ticking.
// Exposed for UT purposes.
var IntervalTicker = func(interval time.Duration) <-chan time.Time {
Expand Down Expand Up @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
for gnmiPath := range c.pathG2S {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, gnmiPath)
go streamOnChangeSubscription(c, gnmiPath, nil)
}
} else {
log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v",
Expand All @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, sub.GetPath())
go streamOnChangeSubscription(c, nil, sub)
} else {
enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode))
return
Expand All @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
}

// streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) {
if gnmiPath == nil {
gnmiPath = sub.GetPath()
}

// if heartbeatInterval is not assigned, use 0 to ignore periodical full sync
var heartbeatInterval time.Duration = 0
if sub != nil {
var err error
heartbeatInterval, err = validateHeartbeatInterval(sub)
if err != nil {
enqueueFatalMsg(c, err.Error())
c.synced.Done()
c.w.Done()
return
}
}

tblPaths := c.pathG2S[gnmiPath]
log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath)

if tblPaths[0].field != "" {
if len(tblPaths) > 1 {
go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false)
go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false)
} else {
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval)
}
} else {
// sample interval and update only parameters are not applicable
go dbTableKeySubscribe(c, gnmiPath, 0, true)
go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true)
}
}

Expand Down Expand Up @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
return requestedInterval, nil
}
}

// validateHeartbeatInterval validates the heartbeat interval of the given subscription.
func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) {
requestedInterval := time.Duration(sub.GetHeartbeatInterval())
if requestedInterval == 0 {
// If the heartbeat_interval is set to 0, the target MUST create the subscription
// and send the data with the MinHeartbeatInterval
return MinHeartbeatInterval, nil
} else if requestedInterval < MinHeartbeatInterval {
return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval)
} else {
return requestedInterval, nil
}
}
Loading