Skip to content

Commit

Permalink
[Data-3312] add basic data capture support for capturing tabular data…
Browse files Browse the repository at this point in the history
… to mongo (#4486)

Co-authored-by: Devin Hilly <[email protected]>
  • Loading branch information
nicksanford and dmhilly authored Nov 11, 2024
1 parent 900b4fd commit a3262bf
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 52 deletions.
50 changes: 50 additions & 0 deletions data/bson_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package data

import (
"fmt"

"go.mongodb.org/mongo-driver/bson"
"google.golang.org/protobuf/types/known/structpb"
)

// pbStructToBSON converts a structpb.Struct to a bson.M.
func pbStructToBSON(s *structpb.Struct) (bson.M, error) {
bsonMap := make(bson.M)
for k, v := range s.Fields {
bsonValue, err := convertPBStructValueToBSON(v)
if err != nil {
return nil, err
}
bsonMap[k] = bsonValue
}
return bsonMap, nil
}

func convertPBStructValueToBSON(v *structpb.Value) (interface{}, error) {
switch v.Kind.(type) {
case *structpb.Value_NullValue:
var ret interface{}
return ret, nil
case *structpb.Value_NumberValue:
return v.GetNumberValue(), nil
case *structpb.Value_StringValue:
return v.GetStringValue(), nil
case *structpb.Value_BoolValue:
return v.GetBoolValue(), nil
case *structpb.Value_StructValue:
return pbStructToBSON(v.GetStructValue())
case *structpb.Value_ListValue:
list := v.GetListValue()
var slice bson.A
for _, item := range list.Values {
bsonValue, err := convertPBStructValueToBSON(item)
if err != nil {
return nil, err
}
slice = append(slice, bsonValue)
}
return slice, nil
default:
return nil, fmt.Errorf("unsupported value type: %T", v.Kind)
}
}
186 changes: 186 additions & 0 deletions data/bson_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package data

import (
"encoding/json"
"fmt"
"testing"

"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.viam.com/test"
"google.golang.org/protobuf/types/known/structpb"
)

// bsonToStructPB converts a bson.M to a structpb.Struct.
func bsonToStructPB(bsonMap bson.M) (*structpb.Struct, error) {
s := &structpb.Struct{
Fields: make(map[string]*structpb.Value),
}
for k, v := range bsonMap {
value, err := convertBSONValueToStructPBValue(v)
if err != nil {
return nil, err
}
s.Fields[k] = value
}
return s, nil
}

func convertBSONValueToStructPBValue(v interface{}) (*structpb.Value, error) {
switch val := v.(type) {
case nil, primitive.Undefined:
return &structpb.Value{Kind: &structpb.Value_NullValue{}}, nil
case float64:
return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: val}}, nil
case int64:
return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(val)}}, nil
case int32:
return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(val)}}, nil
case string:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: val}}, nil
case bool:
return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: val}}, nil
case bson.M:
s, err := bsonToStructPB(val)
if err != nil {
return nil, err
}
return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: s}}, nil
case bson.A:
list := &structpb.ListValue{}
for _, item := range val {
value, err := convertBSONValueToStructPBValue(item)
if err != nil {
return nil, err
}
list.Values = append(list.Values, value)
}
return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: list}}, nil
case primitive.DateTime:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: val.Time().String()}}, nil
case primitive.Timestamp:
jsonStr, err := json.Marshal(val)
if err != nil {
return nil, err
}
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: string(jsonStr)}}, nil
case primitive.JavaScript:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: string(val)}}, nil
case primitive.Symbol:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: string(val)}}, nil
case primitive.DBPointer, primitive.CodeWithScope, primitive.Decimal128, primitive.Regex, primitive.ObjectID:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: val.(fmt.Stringer).String()}}, nil
case primitive.MinKey:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "MinKey"}}, nil
case primitive.MaxKey:
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: "MaxKey"}}, nil
case primitive.Binary:
// If it's a UUID, return the UUID as a hex string.
if val.Subtype == bson.TypeBinaryUUID {
data, err := uuid.FromBytes(val.Data)
if err != nil {
return nil, err
}
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: data.String()}}, nil
}

// Otherwise return a list of the raw bytes.
list := make([]*structpb.Value, len(val.Data))
for i, b := range val.Data {
list[i] = &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(b)}}
}
return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: list}}}, nil
default:
return nil, fmt.Errorf("unsupported BSON type: %T", v)
}
}

func TestBSONToStructPBAndBack(t *testing.T) {
tests := []struct {
name string
input *structpb.Struct
expectedBSON primitive.M
}{
{
name: "Primitive fields are properly converted between structpb.Struct <-> BSON.",
input: &structpb.Struct{
Fields: map[string]*structpb.Value{
"name": {Kind: &structpb.Value_StringValue{StringValue: "John"}},
"age": {Kind: &structpb.Value_NumberValue{NumberValue: 30}},
"alive": {Kind: &structpb.Value_BoolValue{BoolValue: true}},
"nullable": {Kind: &structpb.Value_NullValue{}},
},
},
expectedBSON: bson.M{
"name": "John",
"age": 30.0,
"alive": true,
"nullable": nil,
},
},
{
name: "Nested struct fields are properly converted between structpb.Struct <-> BSON.",
input: &structpb.Struct{
Fields: map[string]*structpb.Value{
"person": {
Kind: &structpb.Value_StructValue{
StructValue: &structpb.Struct{
Fields: map[string]*structpb.Value{
"name": {Kind: &structpb.Value_StringValue{StringValue: "Alice"}},
"age": {Kind: &structpb.Value_NumberValue{NumberValue: 25}},
"alive": {Kind: &structpb.Value_BoolValue{BoolValue: true}},
},
},
},
},
},
},
expectedBSON: bson.M{
"person": bson.M{
"name": "Alice",
"age": float64(25),
"alive": true,
},
},
},
{
name: "List fields are properly converted between structpb.Struct <-> BSON.",
input: &structpb.Struct{
Fields: map[string]*structpb.Value{
"names": {
Kind: &structpb.Value_ListValue{
ListValue: &structpb.ListValue{
Values: []*structpb.Value{
{Kind: &structpb.Value_StringValue{StringValue: "Bob"}},
{Kind: &structpb.Value_StringValue{StringValue: "Charlie"}},
},
},
},
},
},
},
expectedBSON: bson.M{
"names": bson.A{"Bob", "Charlie"},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Convert StructPB to BSON
bsonMap, err := pbStructToBSON(tc.input)
test.That(t, err, test.ShouldBeNil)

// Validate the BSON is structured as expected.
test.That(t, bsonMap, test.ShouldResemble, tc.expectedBSON)

// Convert BSON back to StructPB
result, err := bsonToStructPB(bsonMap)
test.That(t, err, test.ShouldBeNil)

// Check if the result matches the original input
test.That(t, result, test.ShouldResemble, tc.input)
})
}
}
4 changes: 3 additions & 1 deletion data/capture_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
v1 "go.viam.com/api/app/datasync/v1"
)

const captureAllFromCamera = "CaptureAllFromCamera"

// CaptureBufferedWriter is a buffered, persistent queue of SensorData.
type CaptureBufferedWriter interface {
Write(item *v1.SensorData) error
Expand Down Expand Up @@ -63,7 +65,7 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error {
// We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images
// and their corresponding annotations. We want each image and its annotations to be stored in a
// separate file.
} else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" {
} else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == captureAllFromCamera {
if err := b.nextFile.Close(); err != nil {
return err
}
Expand Down
71 changes: 68 additions & 3 deletions data/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/benbjohnson/clock"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.opencensus.io/trace"
v1 "go.viam.com/api/app/datasync/v1"
pb "go.viam.com/api/common/v1"
Expand Down Expand Up @@ -58,9 +60,14 @@ type Collector interface {
type collector struct {
clock clock.Clock
captureResults chan *v1.SensorData
captureErrors chan error
interval time.Duration
params map[string]*anypb.Any

mongoCollection *mongo.Collection
componentName string
componentType string
methodName string
captureErrors chan error
interval time.Duration
params map[string]*anypb.Any
// `lock` serializes calls to `Flush` and `Close`.
lock sync.Mutex
logger logging.Logger
Expand Down Expand Up @@ -257,7 +264,11 @@ func NewCollector(captureFunc CaptureFunc, params CollectorParams) (Collector, e
c = params.Clock
}
return &collector{
componentName: params.ComponentName,
componentType: params.ComponentType,
methodName: params.MethodName,
captureResults: make(chan *v1.SensorData, params.QueueSize),
mongoCollection: params.MongoCollection,
captureErrors: make(chan error, params.QueueSize),
interval: params.Interval,
params: params.MethodParams,
Expand Down Expand Up @@ -285,10 +296,64 @@ func (c *collector) writeCaptureResults() {
c.logger.Error(errors.Wrap(err, fmt.Sprintf("failed to write to collector %s", c.target.Path())).Error())
return
}

c.maybeWriteToMongo(msg)
}
}
}

// TabularData is a denormalized sensor reading.
type TabularData struct {
TimeRequested time.Time `bson:"time_requested"`
TimeReceived time.Time `bson:"time_received"`
ComponentName string `bson:"component_name"`
ComponentType string `bson:"component_type"`
MethodName string `bson:"method_name"`
Data bson.M `bson:"data"`
}

// maybeWriteToMongo will write to the mongoCollection
// if it is non-nil and the msg is tabular data
// logs errors on failure.
func (c *collector) maybeWriteToMongo(msg *v1.SensorData) {
if c.mongoCollection == nil {
return
}

// DATA-3338:
// currently vision.CaptureAllFromCamera and camera.GetImages are stored in .capture files as VERY LARGE
// tabular sensor data
// That is a mistake which we are rectifying but in the meantime we don't want data captured from those methods to be synced
// to mongo
if getDataType(c.methodName) == v1.DataType_DATA_TYPE_BINARY_SENSOR || c.methodName == captureAllFromCamera {
return
}

s := msg.GetStruct()
if s == nil {
return
}

data, err := pbStructToBSON(s)
if err != nil {
c.logger.Error(errors.Wrap(err, "failed to convert sensor data into bson"))
return
}

td := TabularData{
TimeRequested: msg.Metadata.TimeRequested.AsTime(),
TimeReceived: msg.Metadata.TimeReceived.AsTime(),
ComponentName: c.componentName,
ComponentType: c.componentType,
MethodName: c.methodName,
Data: data,
}

if _, err := c.mongoCollection.InsertOne(c.cancelCtx, td); err != nil {
c.logger.Error(errors.Wrap(err, "failed to write to mongo"))
}
}

func (c *collector) logCaptureErrs() {
for err := range c.captureErrors {
now := c.clock.Now().Unix()
Expand Down
20 changes: 12 additions & 8 deletions data/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/benbjohnson/clock"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/protobuf/types/known/anypb"

"go.viam.com/rdk/logging"
Expand All @@ -18,14 +19,17 @@ type CollectorConstructor func(resource interface{}, params CollectorParams) (Co

// CollectorParams contain the parameters needed to construct a Collector.
type CollectorParams struct {
ComponentName string
Interval time.Duration
MethodParams map[string]*anypb.Any
Target CaptureBufferedWriter
QueueSize int
BufferSize int
Logger logging.Logger
Clock clock.Clock
MongoCollection *mongo.Collection
ComponentName string
ComponentType string
MethodName string
Interval time.Duration
MethodParams map[string]*anypb.Any
Target CaptureBufferedWriter
QueueSize int
BufferSize int
Logger logging.Logger
Clock clock.Clock
}

// Validate validates that p contains all required parameters.
Expand Down
Loading

0 comments on commit a3262bf

Please sign in to comment.