Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create high-level classes and functions to build and parse DriftProtocol messages #22

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ lib/
build/
dist/
python/pkg/drift_protocol.egg-info/
python/pkg/drift_protocol
!python/pkg/drift_protocol/
python/pkg/drift_protocol/*
!python/pkg/drift_protocol/easy/
!python/pkg/drift_protocol/__init__.py.in
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ repos:
name: black
entry: black
language: python
language_version: python3.8
args:
- .
- --check
4 changes: 2 additions & 2 deletions cpp/test_package/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ def requirements(self):
def build(self):
cmake = CMake(self)
cmake.configure()
cmake.build()
cmake.build_from_msg()

def layout(self):
cmake_layout(self)

def test(self):
if can_run(self):
cmd = os.path.join(self.cpp.build.bindirs[0], "test_package")
cmd = os.path.join(self.cpp.build_from_msg.bindirs[0], "test_package")
self.run(cmd, env="conanrun")
3 changes: 3 additions & 0 deletions python/pkg/drift_protocol/easy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from drift_protocol.easy.package import DriftPackage, StatusCode

from drift_protocol.easy.trigger import TriggerMessage
Empty file.
190 changes: 190 additions & 0 deletions python/pkg/drift_protocol/easy/package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""Drfit Package"""
import time
from typing import Optional, Dict, Tuple

from google.protobuf.any_pb2 import Any
from drift_protocol.common import (
DriftPackage as ProtoDriftPackage,
StatusCode,
DataPayload,
)
from drift_bytes import Variant, OutputBuffer, InputBuffer

from drift_protocol.meta import TypedDataInfo, MetaInfo


class DriftPackage:
def __init__(self):
self._proto = ProtoDriftPackage()

@staticmethod
def parse(message):
"""
Parse drift package

Args:
message: Message as bytes

Returns:
Drift package
"""
package = DriftPackage()
package._proto.ParseFromString(message)
return package

@staticmethod
def build_from_msg(
message,
pkg_id: Optional[int | float] = None,
status: StatusCode = StatusCode.GOOD,
**kwargs
) -> "DriftPackage":
"""
Build drift package

Args:
message: Message to be packed
pkg_id: ID as timestamp as float in seconds or int in milliseconds. If None, current time is used.
status: Status code

Keyword Args:
publish_timestamp (int | float): Publish timestamp as float in seconds or int in milliseconds.
If None, current time is used.
source_timestamp (int | float): Source timestamp as float in seconds or int in milliseconds.
If None, current time is used.
Returns:
Drift package
"""
package = DriftPackage()
if pkg_id is None:
pkg_id = time.time()

pkg_id = int(pkg_id * 1000) if isinstance(pkg_id, float) else pkg_id
package._proto.id = pkg_id
package._proto.status = status

publish_timestamp = kwargs.get("publish_timestamp", time.time())
if isinstance(publish_timestamp, float):
publish_timestamp = int(publish_timestamp * 1000)
package._proto.publish_timestamp.FromMilliseconds(publish_timestamp)

source_timestamp = kwargs.get("source_timestamp", time.time())
if isinstance(source_timestamp, float):
source_timestamp = int(source_timestamp * 1000)
package._proto.source_timestamp.FromMilliseconds(source_timestamp)

msg = Any()

if hasattr(message, "_proto"):
msg.Pack(message._proto)
else:
msg.Pack(message)

package._proto.data.append(msg)

return package

@staticmethod
def build_typed_data(
values: Dict[str, Variant.SUPPORTED_TYPES],
statuses: Optional[Dict[str, int]] = None,
pkg_id: Optional[int | float] = None,
status: StatusCode = StatusCode.GOOD,
**kwargs
) -> "DriftPackage":
"""
Build drift package from typed data
"""
statuses = statuses or {}

info = TypedDataInfo()

buffer = OutputBuffer(len(values))
for idx, entry in enumerate(values.items()):
name, value = entry
buffer[idx] = value

item = TypedDataInfo.Item()
item.name = name
item.status = statuses.get(name, StatusCode.GOOD)
info.items.append(item)

data_payload = DataPayload()
data_payload.data = buffer.bytes()

package = DriftPackage.build_from_msg(
data_payload, pkg_id=pkg_id, status=status, **kwargs
)
package._proto.meta.type = MetaInfo.TYPED_DATA
package._proto.meta.typed_data_info.CopyFrom(info)

return package

@property
def id(self) -> int:
"""
Package ID
"""
return self._proto.id

@property
def status(self) -> StatusCode:
"""
Package status
"""
return self._proto.status

@property
def publish_timestamp(self) -> float:
"""
Publish timestamp in seconds
"""
return self._proto.publish_timestamp.ToMilliseconds() / 1000

@property
def source_timestamp(self) -> float:
"""
Source timestamp in seconds
"""
return self._proto.source_timestamp.ToMilliseconds() / 1000

@property
def data_type(self):
"""
Data type

Returns:
Data type or None if there is no meta info
"""
if self._proto.meta:
return self._proto.meta.type
return None

def as_typed_data(self) -> Optional[Dict[str, Tuple[Variant.SUPPORTED_TYPES, int]]]:
"""
Get typed data

Returns:
Typed data or None if there is no meta info
"""
if self._proto.meta and self._proto.meta.type == MetaInfo.TYPED_DATA:
data = {}
payload = DataPayload()
self._proto.data[0].Unpack(payload)

buffer = InputBuffer(payload.data)

for idx, item in enumerate(self._proto.meta.typed_data_info.items):
data[item.name] = (buffer[idx].value, item.status)
return data

return None

def unpack(self, msg):
"""
Unpack message it must be either a protobuf message or an "easy" message
"""
if hasattr(msg, "_proto"):
self._proto.data[0].Unpack(msg._proto)
else:
self._proto.data[0].Unpack(msg)
55 changes: 55 additions & 0 deletions python/pkg/drift_protocol/easy/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Trigger messages"""
import time
from typing import Dict, Optional

from drift_protocol.trigger_service.trigger_message_pb2 import (
TriggerMessage as ProtoTriggerMessage,
)


class TriggerMessage:
"""Trigger message base class"""

def __init__(self) -> None:
self._proto = ProtoTriggerMessage()

@staticmethod
def parse(message: bytes) -> "TriggerMessage":
"""
Parse trigger message

Args:
message: Message as bytes

Returns:
Trigger message
"""
trigger = TriggerMessage()
trigger._proto.ParseFromString(message)
return trigger

@staticmethod
def build(timestamp: Optional[int | float] = None) -> "TriggerMessage":
"""
Build trigger message

Args:
timestamp: Timestamp as float in seconds or int in milliseconds. If None, current time is used.

Returns:
Trigger message
"""
trigger = TriggerMessage()
if timestamp is None:
timestamp = time.time()

timestamp = int(timestamp * 1000) if isinstance(timestamp, float) else timestamp
trigger._proto.timestamp.FromMilliseconds(timestamp)
return trigger

@property
def timestamp(self) -> float:
"""
Timestamp in seconds
"""
return self._proto.timestamp.ToMilliseconds() / 1000
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,5 @@ def packages(self):
package_dir={"": "pkg"},
packages=LazyPackageFinder(finder=partial(find_packages, where="pkg")),
python_requires=">=3.7",
install_requires=[f"protobuf>={PROTOBUF_VERSION}, <=3.20.3"],
install_requires=[f"protobuf>={PROTOBUF_VERSION}, <=3.20.3", "drift-bytes>=0.3.0"],
)
77 changes: 77 additions & 0 deletions python/tests/easy/test_package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Unit test for package.py"""

from drift_protocol.common import DriftPackage as ProtoDriftPackage
from drift_protocol.easy import DriftPackage, TriggerMessage, StatusCode
from drift_protocol.meta import MetaInfo


def test_build_package():
"""Test building package"""
message = TriggerMessage.build(1_000_000)
package = DriftPackage.build_from_msg(
message,
pkg_id=1_000_000,
status=StatusCode.GOOD,
source_timestamp=0,
publish_timestamp=2_000_000,
)

assert package.id > 0
assert package.status == StatusCode.GOOD
assert package.source_timestamp == 0
assert package.publish_timestamp == 2000.0

new_message = TriggerMessage()
package.unpack(new_message)

assert new_message.timestamp == 1000.0


def test_parse_package():
"""Should parse a protobuf message"""
proto = ProtoDriftPackage()
proto.id = 1_000_000

message = proto.SerializeToString()
package = DriftPackage.parse(message)

assert package.id == 1_000_000


def test_build_from_typed_data():
"""Should build a package from typed data"""
values = {
"int": 100,
"float": 25.9,
"string": "test",
"bool": True,
"list": [1, 2, 3],
}

statuses = {
"string": StatusCode.BAD,
"bool": StatusCode.GOOD,
}

package = DriftPackage.build_typed_data(
values,
statuses,
pkg_id=1_000_000,
status=StatusCode.GOOD,
source_timestamp=0,
publish_timestamp=2_000_000,
)

assert package.id > 0
assert package.status == StatusCode.GOOD
assert package.source_timestamp == 0
assert package.publish_timestamp == 2000.0

assert package.data_type == MetaInfo.TYPED_DATA
assert package.as_typed_data() == {
"int": (100, StatusCode.GOOD),
"float": (25.9, StatusCode.GOOD),
"string": ("test", StatusCode.BAD),
"bool": (True, StatusCode.GOOD),
"list": ([1, 2, 3], StatusCode.GOOD),
}
24 changes: 24 additions & 0 deletions python/tests/easy/test_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Unit tests for trigger.py"""

from drift_protocol.easy import TriggerMessage


def test_build_trigger_message():
"""Test building trigger message"""
trigger_message = TriggerMessage.build(1_000_000)
assert trigger_message.timestamp == 1000.0

trigger_message = TriggerMessage.build(1000.0)
assert trigger_message.timestamp == 1000.0

trigger_message = TriggerMessage.build()
assert trigger_message.timestamp > 0


def test_parse_trigger_message():
"""Test parsing trigger message"""
trigger_message = TriggerMessage.build(1_000_000)
message = trigger_message._proto.SerializeToString()

parsed_trigger_message = TriggerMessage.parse(message)
assert parsed_trigger_message.timestamp == 1000.0
Loading