Skip to content

Commit

Permalink
Define a TelemetryBlob type to hold JSON blobs
Browse files Browse the repository at this point in the history
The TelemetryBlob type provides helper methods that can be used to
validate that provided blobs are:
* Valid JSON
* JSON objects
* Contain a top-level "version" field
* Are not too big.

The validity of provided JSON blobs is checked when they are received
via a Generate() interface.

Also update client side item handling to leverage the json.RawMessage
type for storing the JSON blobs; this avoid undesirable processing of
the provided JSON blob data, which should remain untouched en-route
to long term storage in the SUSE Telemetry service.

Minor restructuring of the client side library, moving limits to it's
own subpackage to avoid an import loop when adding the CheckLimits()
helper method to the TelemetryBlob.

Fixes: #41, #26
  • Loading branch information
rtamalin committed Aug 2, 2024
1 parent 311db56 commit ffcfcf0
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 84 deletions.
23 changes: 17 additions & 6 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,22 @@ func (tc *TelemetryClient) Register() (err error) {
return nil
}

func (tc *TelemetryClient) Generate(telemetry types.TelemetryType, content []byte, tags types.Tags) error {
// Enforce size limits
tdl := telemetrylib.NewTelemetryDataLimits()
err := tdl.CheckLimits(content)
if err != nil {
func (tc *TelemetryClient) Generate(telemetry types.TelemetryType, content *types.TelemetryBlob, tags types.Tags) error {
// Enforce valid versioned JSON object
if err := content.Valid(); err != nil {
slog.Debug(
"Supplied content is not a versioned JSON object",
slog.String("error", err.Error()),
)
return err
}

// Enforce content size limits
if err := content.CheckLimits(); err != nil {
slog.Debug(
"Supplied JSON blob failed limits check",
slog.String("error", err.Error()),
)
return err
}

Expand All @@ -589,7 +600,7 @@ func (tc *TelemetryClient) Generate(telemetry types.TelemetryType, content []byt
"Generated Telemetry",
slog.String("name", telemetry.String()),
slog.String("tags", tags.String()),
slog.String("content", string(content)),
slog.String("content", content.String()),
)

return tc.processor.AddData(telemetry, content, tags)
Expand Down
26 changes: 9 additions & 17 deletions pkg/lib/items.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package telemetrylib

import (
"database/sql"
"fmt"
"encoding/json"
"log/slog"
"strings"

Expand All @@ -13,17 +13,12 @@ import (

type TelemetryDataItem struct {
Header TelemetryDataItemHeader `json:"header" validate:"required"`
TelemetryData map[string]interface{} `json:"telemetryData" validate:"required,dive"`
TelemetryData json.RawMessage `json:"telemetryData" validate:"required,dive"`
Footer TelemetryDataItemFooter `json:"footer" validate:"required"`
}

// func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, data map[string]interface{}) *TelemetryDataItem {
func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, marshaledData []byte) (*TelemetryDataItem, error) {
data, err := utils.DeserializeMap(string(marshaledData))
if err != nil {
return nil, fmt.Errorf("unable to unmarshal JSON: %s", err.Error())
}

func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, content *types.TelemetryBlob) *TelemetryDataItem {
tdi := new(TelemetryDataItem)

// fill in header fields
Expand All @@ -35,12 +30,12 @@ func NewTelemetryDataItem(telemetry types.TelemetryType, tags types.Tags, marsha
}

// fill in body
tdi.TelemetryData = data
tdi.TelemetryData = content.Bytes()

// fill in footer
tdi.Footer.Checksum = "ichecksum" // TODO

return tdi, nil
return tdi
}

type TelemetryDataItemHeader struct {
Expand Down Expand Up @@ -84,22 +79,19 @@ type TelemetryDataItemRow struct {
BundleId sql.NullInt64
}

func NewTelemetryDataItemRow(telemetry types.TelemetryType, tags types.Tags, marshaledData []byte) (*TelemetryDataItemRow, error) {
func NewTelemetryDataItemRow(telemetry types.TelemetryType, tags types.Tags, content *types.TelemetryBlob) *TelemetryDataItemRow {

item, err := NewTelemetryDataItem(telemetry, tags, marshaledData)
if err != nil {
return nil, fmt.Errorf("unable to create a new telemetry data item: %s", err.Error())
}
item := NewTelemetryDataItem(telemetry, tags, content)

dataItemRow := new(TelemetryDataItemRow)
dataItemRow.ItemId = item.Header.TelemetryId
dataItemRow.ItemType = item.Header.TelemetryType
dataItemRow.ItemTimestamp = item.Header.TelemetryTimeStamp
dataItemRow.ItemAnnotations = strings.Join(item.Header.TelemetryAnnotations, ",")
dataItemRow.ItemData = marshaledData
dataItemRow.ItemData = content.Bytes()
dataItemRow.ItemChecksum = item.Footer.Checksum

return dataItemRow, nil
return dataItemRow

}

Expand Down
17 changes: 6 additions & 11 deletions pkg/lib/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/SUSE/telemetry/pkg/config"
"github.com/SUSE/telemetry/pkg/types"
"github.com/SUSE/telemetry/pkg/utils"
)

type TelemetryProcessor interface {
Expand All @@ -16,7 +15,7 @@ type TelemetryProcessor interface {
// Add telemetry data - a method to process jsonData as a byte[]
AddData(
telemetry types.TelemetryType,
content []byte,
content *types.TelemetryBlob,
tags types.Tags,
) (err error)

Expand Down Expand Up @@ -106,14 +105,10 @@ func NewTelemetryProcessor(cfg *config.DBConfig) (TelemetryProcessor, error) {
return &p, err
}

func (p *TelemetryProcessorImpl) AddData(telemetry types.TelemetryType, marshaledData []byte, tags types.Tags) (err error) {
dataItemRow, err := NewTelemetryDataItemRow(telemetry, tags, marshaledData)
if err != nil {
return fmt.Errorf("unable to create telemetry data: %s", err.Error())
}
func (p *TelemetryProcessorImpl) AddData(telemetry types.TelemetryType, marshaledData *types.TelemetryBlob, tags types.Tags) (err error) {
dataItemRow := NewTelemetryDataItemRow(telemetry, tags, marshaledData)

err = dataItemRow.Insert(p.t.storer.Conn)
return
return dataItemRow.Insert(p.t.storer.Conn)
}

func (p *TelemetryProcessorImpl) GenerateBundle(clientId int64, customerId string, tags types.Tags) (bundleRow *TelemetryBundleRow, err error) {
Expand Down Expand Up @@ -250,11 +245,11 @@ func (p *TelemetryProcessorImpl) ToItem(itemRow *TelemetryDataItemRow) (item Tel
Checksum: itemRow.ItemChecksum,
}

data, err := utils.DeserializeMap(string(itemRow.ItemData))
//data, err := utils.DeserializeMap(string(itemRow.ItemData))

item = TelemetryDataItem{
Header: itemHeader,
TelemetryData: data,
TelemetryData: itemRow.ItemData,
Footer: itemFooter,
}

Expand Down
65 changes: 17 additions & 48 deletions pkg/lib/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package telemetrylib
import (
"fmt"
"log/slog"
"strings"
"testing"

"github.com/SUSE/telemetry/pkg/config"
Expand Down Expand Up @@ -75,18 +74,16 @@ func (t *TelemetryProcessorTestSuite) AfterTest() {
func (t *TelemetryProcessorTestSuite) TestAddTelemetryDataItem() {
telemetryType := types.TelemetryType("SLE-SERVER-Test")
tags := types.Tags{types.Tag("key1=value1"), types.Tag("key2")}
payload := `
{
"ItemA": 1,
"ItemB": "b",
"ItemC": "c"
}
`
payload := types.NewTelemetryBlob([]byte(`{
"ItemA": 1,
"ItemB": "b",
"ItemC": "c"
}`))

// test the fileEnv.yaml based datastores
processor := t.defaultEnv.telemetryprocessor

err := processor.AddData(telemetryType, []byte(payload), tags)
err := processor.AddData(telemetryType, payload, tags)
if err != nil {
t.Fail("Test failed to add telemetry data item to datastore")
}
Expand All @@ -104,14 +101,12 @@ func (t *TelemetryProcessorTestSuite) TestCreateBundle() {

tags := types.Tags{types.Tag("key1=value1"), types.Tag("key2")}

payload := `
{
"field1": "example_data",
"field2": null,
"field3": [1, 2, 3]
}
`
err := telemetryprocessor.AddData(telemetryType, []byte(payload), tags)
payload := types.NewTelemetryBlob([]byte(`{
"field1": "example_data",
"field2": null,
"field3": [1, 2, 3]
}`))
err := telemetryprocessor.AddData(telemetryType, payload, tags)

if err != nil {
t.Fail("Test failed to add telemetry data item")
Expand All @@ -121,14 +116,12 @@ func (t *TelemetryProcessorTestSuite) TestCreateBundle() {
telemetryType = types.TelemetryType("SLE-SERVER-Pkg")
newtags := types.Tags{types.Tag("key1=value1"), types.Tag("key2")}

payload = `
{
payload = types.NewTelemetryBlob([]byte(`{
"ItemA": 1,
"ItemB": "b"
}
`
}`))

err = telemetryprocessor.AddData(telemetryType, []byte(payload), newtags)
err = telemetryprocessor.AddData(telemetryType, payload, newtags)

if err != nil {
t.Fail("Test failed to add telemetry data item")
Expand Down Expand Up @@ -331,30 +324,6 @@ func (t *TelemetryProcessorTestSuite) TestReport() {

}

func (t *TelemetryProcessorTestSuite) TestAddTelemetryDataItemInvalidPayload() {

payload := `
{
"field1": "example_data",
"field2": null
"field3": [1, 2, 3]
}
`
telemetryType := types.TelemetryType("SLE-SERVER-Pkg")
var tags types.Tags

processor := t.defaultEnv.telemetryprocessor
err := processor.AddData(telemetryType, []byte(payload), tags)

expectedmsg := "unable to unmarshal JSON"

// Check if the string contains the substring
if !strings.Contains(err.Error(), expectedmsg) {
t.T().Errorf("String '%s' does not contain substring '%s'", err.Error(), expectedmsg)
}

}

func addDataItems(totalItems int, processor TelemetryProcessor) error {

telemetryType := types.TelemetryType("SLE-SERVER-Test")
Expand All @@ -372,8 +341,8 @@ func addDataItems(totalItems int, processor TelemetryProcessor) error {
`
numItems := 1
for numItems <= totalItems {
formattedJSON := fmt.Sprintf(payload, utils.GenerateRandomString(3))
err := processor.AddData(telemetryType, []byte(formattedJSON), tags)
formattedJSON := types.NewTelemetryBlob([]byte(fmt.Sprintf(payload, utils.GenerateRandomString(3))))
err := processor.AddData(telemetryType, formattedJSON, tags)
if err != nil {
slog.Error(
"Failed to add the item",
Expand Down
2 changes: 1 addition & 1 deletion pkg/lib/limits.go → pkg/limits/limits.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package telemetrylib
package limits

import (
"errors"
Expand Down
70 changes: 70 additions & 0 deletions pkg/types/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package types

import (
"encoding/json"
"fmt"
"log/slog"

"github.com/SUSE/telemetry/pkg/limits"
)

type TelemetryBlob struct {
bytes []byte
}

func NewTelemetryBlob(jsonBlob []byte) *TelemetryBlob {
return &TelemetryBlob{bytes: jsonBlob}
}

func (tb *TelemetryBlob) String() string {
return string(tb.bytes)
}

func (tb *TelemetryBlob) Bytes() []byte {
return tb.bytes
}

func (tb *TelemetryBlob) errNotValidJson() error {
return fmt.Errorf("not valid JSON blob")
}

func (tb *TelemetryBlob) errNotJsonObject(err error) error {
return fmt.Errorf("not a JSON object: %s", err.Error())
}

func (tb *TelemetryBlob) errNotVersionedObject() error {
return fmt.Errorf("missing 'version' field in JSON object")
}

func (tb *TelemetryBlob) validJson() bool {
return json.Valid(tb.Bytes())
}

func (tb *TelemetryBlob) Valid() error {
var data map[string]any

if !tb.validJson() {
return tb.errNotValidJson()
}

if err := json.Unmarshal(tb.Bytes(), &data); err != nil {
slog.Debug(
"Not a valid JSON object",
slog.String("blob", tb.String()),
slog.String("error", err.Error()),
)
newErr := tb.errNotJsonObject(err)
return newErr
}

if _, found := data["version"]; !found {
slog.Debug("Not a valid JSON object", slog.String("blob", tb.String()))
return tb.errNotVersionedObject()
}

return nil
}

func (tb *TelemetryBlob) CheckLimits() error {
return limits.NewTelemetryDataLimits().CheckLimits(tb.Bytes())
}
5 changes: 4 additions & 1 deletion telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func Generate(telemetry types.TelemetryType, class TelemetryClass, content []byt
return err
}

// check that the telemetry content is valid
blob := types.NewTelemetryBlob(content)

// attempt to load the default config file
cfg, err := config.NewConfig(client.CONFIG_PATH)
if err != nil {
Expand Down Expand Up @@ -113,7 +116,7 @@ func Generate(telemetry types.TelemetryType, class TelemetryClass, content []byt
}

// generate the telemetry, storing it in the local data store
err = tc.Generate(telemetry, content, tags)
err = tc.Generate(telemetry, blob, tags)
if err != nil {
slog.Warn(
"Failed to generate telemetry",
Expand Down

0 comments on commit ffcfcf0

Please sign in to comment.