Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATA-3441 Update data export command #4596

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9df72a5
Update API version and add deprecation nolints #DATA-3441
katiepeters Dec 2, 2024
c391e5e
Split binary and tabular into separate subcommands #DATA-3441
katiepeters Dec 3, 2024
b90725c
Fix duplicate flag; add new flag names #DATA-3441
katiepeters Dec 3, 2024
337a368
Create POC streaming download #DATA-3441
katiepeters Dec 3, 2024
94ee605
Remove 'data' folder; use temporary file until stream completes #DATA…
katiepeters Dec 3, 2024
b7cf33c
Add ExportTabularData to testutils #DATA-3441
katiepeters Dec 3, 2024
95d1e80
Re-create 'actions' for easier testing #DATA-3441
katiepeters Dec 3, 2024
359d405
Start updating tabular client tests #DATA-3441
katiepeters Dec 3, 2024
54d4ce6
Merge with main; resolve conflicts #DATA-3441
katiepeters Dec 5, 2024
12384c1
Update API version #DATA-3441
katiepeters Dec 5, 2024
78ecee1
Finish updating tests #DATA-3441
katiepeters Dec 5, 2024
e86d4bb
Update data service client to match go sdk PR #DATA-3441
katiepeters Dec 5, 2024
e95e161
Remove file if program exits early #DATA-3441
katiepeters Dec 5, 2024
98e5621
Split up tabularData func into smaller functions #DATA-3441
katiepeters Dec 5, 2024
404ab76
Remove stream close (not needed) #DATA-3441
katiepeters Dec 5, 2024
e751eeb
Adjust the logic #DATA-3441
katiepeters Dec 6, 2024
a5f6732
Merge with main and resolve conflicts #DATA-3441
katiepeters Dec 10, 2024
9db0b4d
Create separate success/error cases #DATA-3441
katiepeters Dec 10, 2024
f26d47b
Remove timeout flag #DATA-3441
katiepeters Dec 10, 2024
e6f4df6
Move newline creation #DATA-3441
katiepeters Dec 10, 2024
592df9b
Update tests to account for newline #DATA-3441
katiepeters Dec 10, 2024
0f79105
Bump api version #DATA-3441
katiepeters Dec 10, 2024
ca98423
Lint/small tweaks #DATA-3441
katiepeters Dec 10, 2024
1d40a29
Merge with main and resolve conflicts #DATA-3441
katiepeters Dec 13, 2024
31f27df
Add correct args #DATA-3441
katiepeters Dec 13, 2024
2ce78bd
Remove shouldNotBeNil tests #DATA-3441
katiepeters Dec 13, 2024
0967465
Fix usage text #DATA-3441
katiepeters Dec 13, 2024
bde1971
Lint #DATA-3441
katiepeters Dec 13, 2024
2989eec
Remove nil check #DATA-3441
katiepeters Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/data_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func (d *DataClient) TabularDataByFilter(ctx context.Context, opts *DataByFilter
countOnly = opts.CountOnly
includeInternalData = opts.IncludeInternalData
}
//nolint:deprecated,staticcheck
katiepeters marked this conversation as resolved.
Show resolved Hide resolved
resp, err := d.dataClient.TabularDataByFilter(ctx, &pb.TabularDataByFilterRequest{
DataRequest: &dataReq,
CountOnly: countOnly,
Expand Down Expand Up @@ -1219,6 +1220,7 @@ func binaryMetadataFromProto(proto *pb.BinaryMetadata) BinaryMetadata {
}
}

//nolint:deprecated,staticcheck
func tabularDataFromProto(proto *pb.TabularData, metadata *pb.CaptureMetadata) TabularData {
return TabularData{
Data: proto.Data.AsMap(),
Expand Down
6 changes: 6 additions & 0 deletions app/data_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,24 @@ func TestDataClient(t *testing.T) {

t.Run("TabularDataByFilter", func(t *testing.T) {
dataStruct, _ := utils.StructToStructPb(data)
//nolint:deprecated,staticcheck
tabularDataPb := &pb.TabularData{
Data: dataStruct,
MetadataIndex: 0,
TimeRequested: timestamppb.New(start),
TimeReceived: timestamppb.New(end),
}
//nolint:deprecated,staticcheck
grpcClient.TabularDataByFilterFunc = func(ctx context.Context, in *pb.TabularDataByFilterRequest,
opts ...grpc.CallOption,
//nolint:deprecated,staticcheck
) (*pb.TabularDataByFilterResponse, error) {
test.That(t, in.DataRequest, test.ShouldResemble, dataRequestToProto(dataRequest))
test.That(t, in.CountOnly, test.ShouldBeTrue)
test.That(t, in.IncludeInternalData, test.ShouldBeTrue)
//nolint:deprecated,staticcheck
return &pb.TabularDataByFilterResponse{
//nolint:deprecated,staticcheck
Data: []*pb.TabularData{tabularDataPb},
Count: pbCount,
Last: last,
Expand Down Expand Up @@ -695,6 +700,7 @@ func TestDataSyncClient(t *testing.T) {
t.Run("TabularDataCaptureUpload", func(t *testing.T) {
uploadMetadata.Type = DataTypeTabularSensor
dataStruct, _ := utils.StructToStructPb(data)
//nolint:deprecated,staticcheck
tabularDataPb := &pb.TabularData{
Data: dataStruct,
MetadataIndex: 0,
Expand Down
105 changes: 71 additions & 34 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ const (
dataFlagAliasRobotName = "robot-name"
dataFlagPartName = "part-name"
dataFlagComponentType = "component-type"
dataFlagResourceSubtype = "resource-subtype"
dataFlagComponentName = "component-name"
dataFlagResourceName = "resource-name"
dataFlagMethod = "method"
dataFlagMimeTypes = "mime-types"
dataFlagStart = "start"
dataFlagEnd = "end"
dataFlagChunkLimit = "chunk-limit"
dataFlagParallelDownloads = "parallel"
dataFlagTags = "tags"
dataFlagBboxLabels = "bbox-labels"
Expand Down Expand Up @@ -483,42 +484,78 @@ var app = &cli.App{
HideHelpCommand: true,
Subcommands: []*cli.Command{
{
Name: "export",
Usage: "download data from Viam cloud",
UsageText: createUsageText("data export", []string{dataFlagDestination, dataFlagDataType}, true),
Flags: append([]cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.UintFlag{
Name: dataFlagChunkLimit,
Usage: "maximum number of results per download request (tabular data only)",
Value: 100000,
},
&cli.UintFlag{
Name: dataFlagParallelDownloads,
Usage: "number of download requests to make in parallel (binary data only)",
Value: 100,
},
&cli.StringSliceFlag{
Name: dataFlagTags,
Usage: "tags filter. " +
"accepts tagged for all tagged data, untagged for all untagged data, or a list of tags for all data matching any of the tags",
},
&cli.StringFlag{
Name: dataFlagDataType,
Usage: "type of data to download. can be binary or tabular",
Name: "export",
Usage: "download data from Viam cloud",
Subcommands: []*cli.Command{
{
Name: "binary",
Usage: "download binary data",
UsageText: createUsageText("data export binary", []string{dataFlagDestination}, true),
Flags: append([]cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.UintFlag{
Name: dataFlagParallelDownloads,
Usage: "number of download requests to make in parallel",
Value: 100,
},
&cli.UintFlag{
Name: dataFlagTimeout,
Usage: "number of seconds to wait for large file downloads",
Value: 30,
},
&cli.StringSliceFlag{
Name: dataFlagTags,
Usage: "tags filter. accepts tagged for all tagged data, untagged for all untagged data, or a list of tags",
},
}, commonFilterFlags...),
Action: DataExportBinaryAction,
},
&cli.UintFlag{
Name: dataFlagTimeout,
Usage: "number of seconds to wait for large file downloads",
Value: 30,
{
Name: "tabular",
Usage: "download tabular data",
UsageText: createUsageText("data export tabular", []string{dataFlagDestination, "part-id", "component-name", "method"}, true),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] resource name here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh!!!

Flags: []cli.Flag{
&cli.PathFlag{
Name: dataFlagDestination,
Required: true,
Usage: "output directory for downloaded data",
},
&cli.StringFlag{
Name: "part-id",
Required: true,
Usage: "part id",
},
&cli.StringFlag{
Name: "resource-name",
Required: true,
Usage: "resource name (sometimes called 'component name')",
},
&cli.StringFlag{
Name: "resource-subtype",
Required: true,
Usage: "resource subtype (sometimes called 'component type')",
},
&cli.StringFlag{
Name: dataFlagMethod,
Required: true,
Usage: "method name",
},
&cli.StringFlag{
Name: "start",
Usage: "ISO-8601 timestamp indicating the start of the interval",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be RFC 3339? I see thats what we're using for the time layout. (I know both are hella similar)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change what we were already using, but I suspect that we chose it to be a little more flexible with user input

},
&cli.StringFlag{
Name: "end",
Usage: "ISO-8601 timestamp indicating the end of the interval",
},
},
Action: DataExportTabularAction,
},
},
commonFilterFlags...),
Action: DataExportAction,
},
{
Name: "delete",
Expand Down
164 changes: 110 additions & 54 deletions cli/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cli

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/fs"
"maps"
"os"
Expand Down Expand Up @@ -103,7 +105,6 @@ func setup(

if dataClient != nil {
// these flags are only relevant when testing a dataClient
flags.String(dataFlagDataType, dataTypeTabular, "")
flags.String(dataFlagDestination, utils.ResolveFile(""), "")
}

Expand Down Expand Up @@ -322,71 +323,126 @@ func TestUpdateBillingServiceAction(t *testing.T) {
test.That(t, out.messages[7], test.ShouldContainSubstring, "USA")
}

func TestTabularDataByFilterAction(t *testing.T) {
pbStruct, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)
type mockDataServiceClient struct {
grpc.ClientStream
responses []*datapb.ExportTabularDataResponse
index int
err error
}

// calls to `TabularDataByFilter` will repeat so long as data continue to be returned,
// so we need a way of telling our injected method when data has already been sent so we
// can send an empty response
var dataRequested bool
tabularDataByFilterFunc := func(ctx context.Context, in *datapb.TabularDataByFilterRequest, opts ...grpc.CallOption,
) (*datapb.TabularDataByFilterResponse, error) {
if dataRequested {
return &datapb.TabularDataByFilterResponse{}, nil
}
dataRequested = true
return &datapb.TabularDataByFilterResponse{
Data: []*datapb.TabularData{{Data: pbStruct}},
Metadata: []*datapb.CaptureMetadata{{LocationId: "loc-id"}},
}, nil
func (m *mockDataServiceClient) Recv() (*datapb.ExportTabularDataResponse, error) {
if m.err != nil {
return nil, m.err
}

dsc := &inject.DataServiceClient{
TabularDataByFilterFunc: tabularDataByFilterFunc,
if m.index >= len(m.responses) {
return nil, io.EOF
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")
resp := m.responses[m.index]
m.index++

test.That(t, ac.dataExportAction(cCtx), test.ShouldBeNil)
test.That(t, len(errOut.messages), test.ShouldEqual, 0)
test.That(t, len(out.messages), test.ShouldEqual, 4)
test.That(t, out.messages[0], test.ShouldEqual, "Downloading..")
test.That(t, out.messages[1], test.ShouldEqual, ".")
test.That(t, out.messages[2], test.ShouldEqual, ".")
test.That(t, out.messages[3], test.ShouldEqual, "\n")

// expectedDataSize is the expected string length of the data returned by the injected call
expectedDataSize := 98
b := make([]byte, expectedDataSize)

// `data.ndjson` is the standardized name of the file data is written to in the `tabularData` call
filePath := utils.ResolveFile("data/data.ndjson")
file, err := os.Open(filePath)
test.That(t, err, test.ShouldBeNil)
return resp, nil
}

dataSize, err := file.Read(b)
test.That(t, err, test.ShouldBeNil)
test.That(t, dataSize, test.ShouldEqual, expectedDataSize)
func newMockExportStream(responses []*datapb.ExportTabularDataResponse, err error) *mockDataServiceClient {
return &mockDataServiceClient{
responses: responses,
err: err,
}
}

func TestDataExportTabularAction(t *testing.T) {
t.Run("successful case", func(t *testing.T) {
pbStructPayload1, err := protoutils.StructToStructPb(map[string]interface{}{"bool": true, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)

savedData := string(b)
expectedData := "{\"MetadataIndex\":0,\"TimeReceived\":null,\"TimeRequested\":null,\"bool\":true,\"float\":1,\"string\":\"true\"}"
test.That(t, savedData, test.ShouldEqual, expectedData)
pbStructPayload2, err := protoutils.StructToStructPb(map[string]interface{}{"booly": false, "string": "true", "float": float64(1)})
test.That(t, err, test.ShouldBeNil)

expectedMetadataSize := 23
b = make([]byte, expectedMetadataSize)
exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption,
) (datapb.DataService_ExportTabularDataClient, error) {
return newMockExportStream([]*datapb.ExportTabularDataResponse{
{LocationId: "loc-id", Payload: pbStructPayload1},
{LocationId: "loc-id", Payload: pbStructPayload2},
}, nil), nil
}

// metadata is named `0.json` based on its index in the metadata array
filePath = utils.ResolveFile("metadata/0.json")
file, err = os.Open(filePath)
test.That(t, err, test.ShouldBeNil)
dsc := &inject.DataServiceClient{
ExportTabularDataFunc: exportTabularDataFunc,
}

metadataSize, err := file.Read(b)
test.That(t, err, test.ShouldBeNil)
test.That(t, metadataSize, test.ShouldEqual, expectedMetadataSize)
cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")

test.That(t, ac.dataExportTabularAction(cCtx), test.ShouldBeNil)
test.That(t, len(errOut.messages), test.ShouldEqual, 0)
test.That(t, len(out.messages), test.ShouldEqual, 3)
test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading...\n")

filePath := utils.ResolveFile(dataFileName)

data, err := os.ReadFile(filePath)
test.That(t, err, test.ShouldBeNil)

savedMetadata := string(b)
test.That(t, savedMetadata, test.ShouldEqual, "{\"locationId\":\"loc-id\"}")
// Output is unstable, so parse back into maps before comparing to expected.
var actual []map[string]interface{}
decoder := json.NewDecoder(strings.NewReader(string(data)))
for decoder.More() {
var item map[string]interface{}
err = decoder.Decode(&item)
test.That(t, err, test.ShouldBeNil)
actual = append(actual, item)
}

expectedData := []map[string]interface{}{
{
"locationId": "loc-id",
"payload": map[string]interface{}{
"bool": true,
"float": float64(1),
"string": "true",
},
},
{
"locationId": "loc-id",
"payload": map[string]interface{}{
"booly": false,
"float": float64(1),
"string": "true",
},
},
}

test.That(t, actual, test.ShouldResemble, expectedData)
})

t.Run("error case", func(t *testing.T) {
exportTabularDataFunc := func(ctx context.Context, in *datapb.ExportTabularDataRequest, opts ...grpc.CallOption,
) (datapb.DataService_ExportTabularDataClient, error) {
return newMockExportStream([]*datapb.ExportTabularDataResponse{}, errors.New("whoops")), nil
}

dsc := &inject.DataServiceClient{
ExportTabularDataFunc: exportTabularDataFunc,
}

cCtx, ac, out, errOut := setup(&inject.AppServiceClient{}, dsc, nil, nil, nil, "token")

err := ac.dataExportTabularAction(cCtx)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeError, errors.New("error receiving tabular data: whoops"))
test.That(t, len(errOut.messages), test.ShouldEqual, 0)

// Test that export was retried (total of 5 tries).
test.That(t, len(out.messages), test.ShouldEqual, 7)
test.That(t, strings.Join(out.messages, ""), test.ShouldEqual, "Downloading.......\n")

// Test that the data.ndjson file was removed.
filePath := utils.ResolveFile(dataFileName)
_, err = os.ReadFile(filePath)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeError, fmt.Errorf("open %s: no such file or directory", filePath))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[super-duper nit] I think we only need the test.ShouldBeError, that also handles if the error is nil I believe.

})
}

func TestBaseURLParsing(t *testing.T) {
Expand Down
Loading
Loading