Skip to content

Commit

Permalink
Fix stream info retrieval (#824)
Browse files Browse the repository at this point in the history
* fix stream info retrieval
  • Loading branch information
oliver006 authored Aug 26, 2023
1 parent b49a8b8 commit 754f8f0
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 65 deletions.
5 changes: 0 additions & 5 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ var (
altDBNumStr = "12"
invalidDBNumStr = "16"
dbNumStrFull = fmt.Sprintf("db%s", dbNumStr)

TestStreamTimestamps = []string{
"1638006862416-0",
"1638006862417-2",
}
)

const (
Expand Down
2 changes: 1 addition & 1 deletion exporter/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func createKeyFixtures(t *testing.T, c redis.Conn, fixtures []keyFixture) {
for _, f := range fixtures {
args := append([]interface{}{f.key}, f.args...)
if _, err := c.Do(f.command, args...); err != nil {
t.Errorf("Error creating fixture: %#v, %#v", f, err)
t.Fatalf("Error creating fixture: %#v, %#v", f, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *Exporter) registerConstMetric(ch chan<- prometheus.Metric, metric strin
description := e.findOrCreateMetricDescription(metric, labelValues)
m, err := prometheus.NewConstMetric(description, valType, val, labelValues...)
if err != nil {
log.Debugf("registerConstMetric( %s , %.2f) err: %s", metric, err, val)
log.Debugf("registerConstMetric( %s , %.2f) err: %s", metric, val, err)
return
}

Expand Down
32 changes: 20 additions & 12 deletions exporter/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,40 @@ type streamGroupConsumersInfo struct {
}

func getStreamInfo(c redis.Conn, key string) (*streamInfo, error) {
v, err := redis.Values(doRedisCmd(c, "XINFO", "STREAM", key))
values, err := redis.Values(doRedisCmd(c, "XINFO", "STREAM", key))
if err != nil {
return nil, err
}

// Scan slice to struct
var stream streamInfo
if err := redis.ScanStruct(v, &stream); err != nil {
if err := redis.ScanStruct(values, &stream); err != nil {
return nil, err
}

// Extract first and last id from slice
stream.FirstEntryId = getEntryId(v, 17)
stream.LastEntryId = getEntryId(v, 19)
stream.FirstEntryId = getStreamEntryId(values, 17)
stream.LastEntryId = getStreamEntryId(values, 19)

stream.StreamGroupsInfo, err = scanStreamGroups(c, key)
if err != nil {
return nil, err
}

log.Debugf("stream: %#v", &stream)
log.Debugf("getStreamInfo() stream: %#v", &stream)
return &stream, nil
}

func getEntryId(redisValue []interface{}, index int) string {
var emptyStreamId = ""

if len(redisValue) < index || len(redisValue) > index && len(redisValue[index].([]interface{})) < 2 {
return emptyStreamId
func getStreamEntryId(redisValue []interface{}, index int) string {
if len(redisValue) < index || redisValue[index] == nil || len(redisValue[index].([]interface{})) < 2 {
log.Debugf("Failed to parse StreamEntryId")
return ""
}

entryId, ok := redisValue[index].([]interface{})[0].([]byte)
if !ok {
return emptyStreamId
log.Debugf("Failed to parse StreamEntryId")
return ""
}
return string(entryId)
}
Expand Down Expand Up @@ -141,9 +141,17 @@ func scanStreamGroupConsumers(c redis.Conn, stream string, group string) ([]stre
}

func parseStreamItemId(id string) float64 {
if strings.TrimSpace(id) == "" {
return 0
}
frags := strings.Split(id, "-")
if len(frags) == 0 {
log.Errorf("Couldn't parse StreamItemId: %s", id)
return 0
}
parsedId, err := strconv.ParseFloat(strings.Split(id, "-")[0], 64)
if err != nil {
log.Errorf("Couldn't parse given stream timestamp: %s", err)
log.Errorf("Couldn't parse given StreamItemId: [%s] err: %s", id, err)
}
return parsedId
}
Expand Down
Loading

0 comments on commit 754f8f0

Please sign in to comment.