Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATA-3443: Add export_tabular_data to data client #800

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ We use [`uv`](https://docs.astral.sh/uv/) to manage our environments and depende

4. When you're done making changes, check that your changes conform to any code formatting requirements and pass any tests.

- When testing, make sure you use the correct virtual environment by running either `uv make test` or `source .venv/bin/activate; make test`
- When testing, make sure you use the correct virtual environment by running either `uv run make test` or `source .venv/bin/activate; make test`

5. Commit your changes and open a pull request.

Expand Down
140 changes: 131 additions & 9 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
BinaryID,
BoundingBoxLabelsByFilterRequest,
BoundingBoxLabelsByFilterResponse,
CaptureInterval,
CaptureMetadata,
ConfigureDatabaseUserRequest,
DataRequest,
Expand All @@ -33,6 +34,8 @@
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
Filter,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
Expand Down Expand Up @@ -145,6 +148,69 @@ def __eq__(self, other: object) -> bool:
return str(self) == str(other)
return False

@dataclass
class TabularDataPoint:
"""Represents a tabular data point and its associated metadata."""

part_id: str
"""The robot part ID"""

resource_name: str
"""The resource name"""

resource_subtype: str
"""The resource subtype. Ex: `rdk:component:sensor`"""

method_name: str
"""The method used for data capture. Ex" `Readings`"""

time_captured: datetime
"""The time at which the data point was captured"""

organization_id: str
"""The organization ID"""

location_id: str
"""The location ID"""

robot_name: str
"""The robot name"""

robot_id: str
"""The robot ID"""

part_name: str
"""The robot part name"""

method_parameters: Mapping[str, Any]
"""Additional parameters associated with the data capture method"""

tags: List[str]
"""A list of tags associated with the data point"""

payload: Mapping[str, Any]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this payload is coming from a proto.Struct, you might want to make this Mapping[str, ValueTypes] (from viam.utils import ValueTypes)

"""The captured data"""

def __str__(self) -> str:
return (
f"TabularDataPoint("
f"robot='{self.robot_name}' (id={self.robot_id}), "
f"part='{self.part_name}' (id={self.part_id}), "
f"resource='{self.resource_name}' ({self.resource_subtype}), "
f"method='{self.method_name}', "
f"org={self.organization_id}, "
f"location={self.location_id}, "
f"time='{self.time_captured.isoformat()}', "
f"params={self.method_parameters}, "
f"tags={self.tags}, "
f"payload={self.payload})"
)

def __eq__(self, other: object) -> bool:
if isinstance(other, DataClient.TabularDataPoint):
return str(self) == str(other)
return False

def __init__(self, channel: Channel, metadata: Mapping[str, str]):
"""Create a `DataClient` that maintains a connection to app.

Expand Down Expand Up @@ -254,7 +320,6 @@ async def tabular_data_by_sql(self, organization_id: str, sql_query: str) -> Lis
sql_query="SELECT * FROM readings LIMIT 5"
)


Args:
organization_id (str): The ID of the organization that owns the data.
You can obtain your organization ID from the Viam app's organization settings page.
Expand Down Expand Up @@ -284,7 +349,6 @@ async def tabular_data_by_mql(self, organization_id: str, mql_binary: List[bytes

print(f"Tabular Data: {tabular_data}")


Args:
organization_id (str): The ID of the organization that owns the data.
You can obtain your organization ID from the Viam app's organization settings page.
Expand All @@ -307,13 +371,12 @@ async def get_latest_tabular_data(

::

time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)

time_captured, time_synced, payload = await data_client.get_latest_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>"
)

Args:
part_id (str): The ID of the part that owns the data.
Expand All @@ -327,6 +390,7 @@ async def get_latest_tabular_data(
datetime: The time captured,
datetime: The time synced,
Dict[str, ValueTypes]: The latest tabular data captured from the specified data source.

For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

Expand All @@ -338,6 +402,64 @@ async def get_latest_tabular_data(
return None
return response.time_captured.ToDatetime(), response.time_synced.ToDatetime(), struct_to_dict(response.payload)

async def export_tabular_data(
self, part_id: str, resource_name: str, resource_subtype: str, method_name: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
) -> List[TabularDataPoint]:
"""Obtain unified tabular data and metadata from the specified data source.

::

tabular_data = await data_client.export_tabular_data(
part_id="<PART-ID>",
resource_name="<RESOURCE-NAME>",
resource_subtype="<RESOURCE-SUBTYPE>",
method_name="<METHOD-NAME>",
start_time="<START_TIME>"
end_time="<END_TIME>"
)

print(f"My data: {tabular_data}")

Args:
part_id (str): The ID of the part that owns the data.
resource_name (str): The name of the requested resource that captured the data.
resource_subtype (str): The subtype of the requested resource that captured the data.
method_name (str): The data capture method name.
start_time (datetime): Optional start time for requesting a specific range of data.
end_time (datetime): Optional end time for requesting a specific range of data.

Returns:
List[TabularDataPoint]: The unified tabular data and metadata.

For more information, see `Data Client API <https://docs.viam.com/appendix/apis/data-client/>`_.
"""

interval=CaptureInterval(start=datetime_to_timestamp(start_time), end=datetime_to_timestamp(end_time))
request = ExportTabularDataRequest(
part_id=part_id, resource_name=resource_name, resource_subtype=resource_subtype, method_name=method_name, interval=interval
)
response: List[ExportTabularDataResponse] = await self._data_client.ExportTabularData(request, metadata=self._metadata)

return [
DataClient.TabularDataPoint(
part_id=resp.part_id,
resource_name=resp.resource_name,
resource_subtype=resp.resource_subtype,
method_name=resp.method_name,
time_captured=resp.time_captured.ToDatetime(),
organization_id=resp.organization_id,
location_id=resp.location_id,
robot_name=resp.robot_name,
robot_id=resp.robot_id,
part_name=resp.part_name,
method_parameters=struct_to_dict(resp.method_parameters),
tags=list(resp.tags),
payload=struct_to_dict(resp.payload)
)
for resp in response
]


async def binary_data_by_filter(
self,
filter: Optional[Filter] = None,
Expand Down
14 changes: 14 additions & 0 deletions tests/mocks/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@
DeleteBinaryDataByIDsResponse,
DeleteTabularDataRequest,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
GetDatabaseConnectionRequest,
GetDatabaseConnectionResponse,
GetLatestTabularDataRequest,
Expand Down Expand Up @@ -767,6 +769,7 @@ class MockData(UnimplementedDataServiceBase):
def __init__(
self,
tabular_response: List[DataClient.TabularData],
tabular_export_response: List[ExportTabularDataResponse],
tabular_query_response: List[Dict[str, Union[ValueTypes, datetime]]],
binary_response: List[BinaryData],
delete_remove_response: int,
Expand All @@ -775,6 +778,7 @@ def __init__(
hostname_response: str,
):
self.tabular_response = tabular_response
self.tabular_export_response = tabular_export_response
self.tabular_query_response = tabular_query_response
self.binary_response = binary_response
self.delete_remove_response = delete_remove_response
Expand Down Expand Up @@ -975,6 +979,16 @@ async def GetLatestTabularData(self, stream: Stream[GetLatestTabularDataRequest,
data = dict_to_struct(self.tabular_response[0].data)
await stream.send_message(GetLatestTabularDataResponse(time_captured=timestamp, time_synced=timestamp, payload=data))

async def ExportTabularData(self, stream: Stream[ExportTabularDataRequest, ExportTabularDataResponse]) -> None:
request = await stream.recv_message()
assert request is not None
self.part_id = request.part_id
self.resource_name = request.resource_name
self.resource_subtype = request.resource_subtype
self.method_name = request.method_name
self.interval = request.interval
for tabular_data in self.tabular_export_response:
await stream.send_message(tabular_data)

class MockDataset(DatasetServiceBase):
def __init__(self, create_response: str, datasets_response: Sequence[Dataset]):
Expand Down
46 changes: 44 additions & 2 deletions tests/test_data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from typing import List

import pytest
from google.protobuf.struct_pb2 import Struct
from google.protobuf.timestamp_pb2 import Timestamp
from grpclib.testing import ChannelFor

from viam.app.data_client import DataClient
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureMetadata, Filter, Order
from viam.utils import create_filter
from viam.proto.app.data import Annotations, BinaryData, BinaryID, BinaryMetadata, BoundingBox, CaptureInterval, CaptureMetadata, ExportTabularDataResponse, Filter, Order
from viam.utils import create_filter, dict_to_struct

from .mocks.services import MockData

Expand Down Expand Up @@ -56,6 +57,7 @@
bbox_labels=BBOX_LABELS,
dataset_id=DATASET_ID,
)
INTERVAL=CaptureInterval(start=START_TS, end=END_TS)

FILE_ID = "file_id"
BINARY_ID = BinaryID(file_id=FILE_ID, organization_id=ORG_ID, location_id=LOCATION_ID)
Expand Down Expand Up @@ -101,6 +103,20 @@
)

TABULAR_RESPONSE = [DataClient.TabularData(TABULAR_DATA, TABULAR_METADATA, START_DATETIME, END_DATETIME)]
TABULAR_EXPORT_RESPONSE = [ExportTabularDataResponse(
part_id=TABULAR_METADATA.part_id,
resource_name = TABULAR_METADATA.component_name,
resource_subtype = TABULAR_METADATA.component_type,
time_captured = END_TS,
organization_id = TABULAR_METADATA.organization_id,
location_id =TABULAR_METADATA.location_id,
robot_name = TABULAR_METADATA.robot_name,
robot_id = TABULAR_METADATA.robot_id,
part_name = TABULAR_METADATA.part_name,
method_parameters = Struct(),
tags = TABULAR_METADATA.tags,
payload = dict_to_struct(TABULAR_DATA),
)]
TABULAR_QUERY_RESPONSE = [
{"key1": START_DATETIME, "key2": "2", "key3": [1, 2, 3], "key4": {"key4sub1": END_DATETIME}},
]
Expand All @@ -117,6 +133,7 @@
def service() -> MockData:
return MockData(
tabular_response=TABULAR_RESPONSE,
tabular_export_response=TABULAR_EXPORT_RESPONSE,
tabular_query_response=TABULAR_QUERY_RESPONSE,
binary_response=BINARY_RESPONSE,
delete_remove_response=DELETE_REMOVE_RESPONSE,
Expand Down Expand Up @@ -179,6 +196,31 @@ async def test_get_latest_tabular_data(self, service: MockData):
assert time_captured == time
assert time_synced == time

async def test_export_tabular_data(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
tabular_data = await client.export_tabular_data(PART_ID, COMPONENT_NAME, COMPONENT_TYPE, METHOD, START_DATETIME, END_DATETIME)
assert tabular_data is not None
for tabular_datum in tabular_data:
assert tabular_datum is not None
assert tabular_datum.part_id == TABULAR_METADATA.part_id
assert tabular_datum.resource_name == TABULAR_METADATA.component_name
assert tabular_datum.resource_subtype == TABULAR_METADATA.component_type
assert tabular_datum.time_captured == END_DATETIME
assert tabular_datum.organization_id == TABULAR_METADATA.organization_id
assert tabular_datum.location_id == TABULAR_METADATA.location_id
assert tabular_datum.robot_name == TABULAR_METADATA.robot_name
assert tabular_datum.robot_id == TABULAR_METADATA.robot_id
assert tabular_datum.part_name == TABULAR_METADATA.part_name
assert tabular_datum.method_parameters == TABULAR_METADATA.method_parameters
assert tabular_datum.tags == TABULAR_METADATA.tags
assert tabular_datum.payload == TABULAR_DATA
assert service.part_id == PART_ID
assert service.resource_name == COMPONENT_NAME
assert service.resource_subtype == COMPONENT_TYPE
assert service.method_name == METHOD
assert service.interval == INTERVAL

async def test_binary_data_by_filter(self, service: MockData):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
Expand Down
Loading