Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATA-3443: Add export_tabular_data to data client #800

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ We use [`uv`](https://docs.astral.sh/uv/) to manage our environments and depende

4. When you're done making changes, check that your changes conform to any code formatting requirements and pass any tests.

- When testing, make sure you use the correct virtual environment by running either `uv make test` or `source .venv/bin/activate; make test`
- When testing, make sure you use the correct virtual environment by running either `uv run make test` or `source .venv/bin/activate; make test`

5. Commit your changes and open a pull request.

Expand Down
57 changes: 48 additions & 9 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
BinaryID,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureInterval,
CaptureMetadata,
ConfigureDatabaseUserRequest,
DataRequest,
Expand All @@ -33,6 +34,8 @@
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
Filter,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
Expand Down Expand Up @@ -254,7 +257,6 @@ async def tabular_data_by_sql(self, organization_id: str, sql_query: str) -> Lis
sql_query="SELECT * FROM readings LIMIT 5"
)


Args:
organization_id (str): The ID of the organization that owns the data.
You can obtain your organization ID from the Viam app's organization settings page.
Expand Down Expand Up @@ -284,7 +286,6 @@ async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes

print(f"Tabular Data: {tabular_data}")


Args:
organization_id (str): The ID of the organization that owns the data.
You can obtain your organization ID from the Viam app's organization settings page.
Expand All @@ -307,13 +308,12 @@ async def get_latest_tabular_data(

::

time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)

time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)

Args:
part_id (str): The ID of the part that owns the data.
Expand All @@ -327,6 +327,7 @@ async def get_latest_tabular_data(
datetime: The time captured,
datetime: The time synced,
Dict[str, ValueTypes]: The latest tabular data captured from the specified data source.

For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

Expand All @@ -338,6 +339,44 @@ async def get_latest_tabular_data(
return None
return response.time_captured.ToDatetime(), response.time_synced.ToDatetime(), struct_to_dict(response.payload)

async def export_tabular_data(
self, part_id: str, resource_name: str, resource_subtype: str, method_name: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
) -> List[ExportTabularDataResponse]:
"""Obtain unified tabular data and metadata from the specified data source.

::

tabular_data = await data_client.export_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>",
start_time="<START_TIME>"
end_time="<END_TIME>"
)

print(f"My data: {tabular_data}")

Args:
part_id (str): The ID of the part that owns the data.
resource_name (str): The name of the requested resource that captured the data.
resource_subtype (str): The subtype of the requested resource that captured the data.
method_name (str): The data capture method name.
start_time (datetime): Optional start time for requesting a specific range of data.
end_time (datetime): Optional end time for requesting a specific range of data.

Returns:
List[ExportTabularDataResponse]: The unified tabular data and metadata.

For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

interval=CaptureInterval(start=datetime_to_timestamp(start_time), end=datetime_to_timestamp(end_time))
request = ExportTabularDataRequest(
part_id=part_id, resource_name=resource_name, resource_subtype=resource_subtype, method_name=method_name, interval=interval
)
return await self._data_client.ExportTabularData(request, metadata=self._metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExportTabularDataResponse has a lot of data in it that is maybe not needed? Which fields on the response would the user actually care about? Is it only the payload? Or also tags?

payload is a proto.Struct, which has really bad UI. We should avoid using that where we can, so we should determine what's actually important to return

Copy link
Member Author

@katiepeters katiepeters Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a note in the scope to optionally only return the payload in the future. As for the comment regarding proto.Struct, probably best for us to talk in person, so I'll wander over to your area again at some point!


async def binary_data_by_filter(
self,
filter: Optional[Filter] = None,
Expand Down
14 changes: 14 additions & 0 deletions tests/mocks/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
GetLatestTabularDataRequest,
Expand Down Expand Up @@ -767,6 +769,7 @@ class MockData(UnimplementedDataServiceBase):
def __init__(
self,
tabular_response: List[DataClient.TabularData],
tabular_export_response: List[ExportTabularDataResponse],
tabular_query_response: List[Dict[str, Union[ValueTypes, datetime]]],
binary_response: List[BinaryData],
delete_remove_response: int,
Expand All @@ -775,6 +778,7 @@ def __init__(
hostname_response: str,
):
self.tabular_response = tabular_response
self.tabular_export_response = tabular_export_response
self.tabular_query_response = tabular_query_response
self.binary_response = binary_response
self.delete_remove_response = delete_remove_response
Expand Down Expand Up @@ -975,6 +979,16 @@ async def GetLatestTabularData(self, stream: Stream[GetLatestTabularDataRequest,
data = dict_to_struct(self.tabular_response[0].data)
await stream.send_message(GetLatestTabularDataResponse(time_captured=timestamp, time_synced=timestamp, payload=data))

async def ExportTabularData(self, stream: Stream[ExportTabularDataRequest, ExportTabularDataResponse]) -> None:
request = await stream.recv_message()
assert request is not None
self.part_id = request.part_id
self.resource_name = request.resource_name
self.resource_subtype = request.resource_subtype
self.method_name = request.method_name
self.interval = request.interval
for tabular_data in self.tabular_export_response:
await stream.send_message(tabular_data)

class MockDataset(DatasetServiceBase):
def __init__(self, create_response: str, datasets_response: Sequence[Dataset]):
Expand Down
46 changes: 44 additions & 2 deletions tests/test_data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from typing import List

import pytest
from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp
from grpclib.testing import ChannelFor

from viam.app.data_client import DataClient
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureMetadata, Filter, Order
from viam.utils import create_filter
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureInterval, CaptureMetadata, ExportTabularDataResponse, Filter, Order
from viam.utils import create_filter, dict_to_struct

from .mocks.services import MockData

Expand Down Expand Up @@ -56,6 +57,7 @@
bbox_labels=BBOX_LABELS,
dataset_id=DATASET_ID,
)
INTERVAL=CaptureInterval(start=START_TS, end=END_TS)

FILE_ID = "file_id"
BINARY_ID = BinaryID(file_id=FILE_ID, organization_id=ORG_ID, location_id=LOCATION_ID)
Expand Down Expand Up @@ -101,6 +103,20 @@
)

TABULAR_RESPONSE = [DataClient.TabularData(TABULAR_DATA, TABULAR_METADATA, START_DATETIME, END_DATETIME)]
TABULAR_EXPORT_RESPONSE = [ExportTabularDataResponse(
part_id=TABULAR_METADATA.part_id,
resource_name = TABULAR_METADATA.component_name,
resource_subtype = TABULAR_METADATA.component_type,
time_captured = END_TS,
organization_id = TABULAR_METADATA.organization_id,
location_id =TABULAR_METADATA.location_id,
robot_name = TABULAR_METADATA.robot_name,
robot_id = TABULAR_METADATA.robot_id,
part_name = TABULAR_METADATA.part_name,
method_parameters = Struct(),
tags = TABULAR_METADATA.tags,
payload = dict_to_struct(TABULAR_DATA),
)]
TABULAR_QUERY_RESPONSE = [
{"key1": START_DATETIME, "key2": "2", "key3": [1, 2, 3], "key4": {"key4sub1": END_DATETIME}},
]
Expand All @@ -117,6 +133,7 @@
def service() -> MockData:
return MockData(
tabular_response=TABULAR_RESPONSE,
tabular_export_response=TABULAR_EXPORT_RESPONSE,
tabular_query_response=TABULAR_QUERY_RESPONSE,
binary_response=BINARY_RESPONSE,
delete_remove_response=DELETE_REMOVE_RESPONSE,
Expand Down Expand Up @@ -179,6 +196,31 @@ async def test_get_latest_tabular_data(self, service: MockData):
assert time_captured == time
assert time_synced == time

async def test_export_tabular_data(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
tabular_data = await client.export_tabular_data(PART_ID, COMPONENT_NAME, COMPONENT_TYPE, METHOD, START_DATETIME, END_DATETIME)
assert tabular_data is not None
for tabular_datum in tabular_data:
assert tabular_datum is not None
assert tabular_datum.part_id == TABULAR_METADATA.part_id
assert tabular_datum.resource_name == TABULAR_METADATA.component_name
assert tabular_datum.resource_subtype == TABULAR_METADATA.component_type
assert tabular_datum.time_captured == END_TS
assert tabular_datum.organization_id == TABULAR_METADATA.organization_id
assert tabular_datum.location_id == TABULAR_METADATA.location_id
assert tabular_datum.robot_name == TABULAR_METADATA.robot_name
assert tabular_datum.robot_id == TABULAR_METADATA.robot_id
assert tabular_datum.part_name == TABULAR_METADATA.part_name
assert tabular_datum.method_parameters == TABULAR_METADATA.method_parameters
assert tabular_datum.tags == TABULAR_METADATA.tags
assert tabular_datum.payload == TABULAR_DATA
assert service.part_id == PART_ID
assert service.resource_name == COMPONENT_NAME
assert service.resource_subtype == COMPONENT_TYPE
assert service.method_name == METHOD
assert service.interval == INTERVAL

async def test_binary_data_by_filter(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
Expand Down
Loading