33from time import time
44from typing import Any , Dict , List , Optional
55from grpclib import GRPCError , Status
6+ from google .protobuf .struct_pb2 import Struct
67
78from grpclib .client import Channel
89import viam
1920 StreamEventsResponse ,
2021 TriggerEventRequest ,
2122)
23+ from viam .utils import dict_to_struct
2224
2325from .input import Control , ControlFunction , Controller , Event , EventType
2426
@@ -36,19 +38,29 @@ def __init__(self, name: str, channel: Channel):
3638 self ._stream_lock = Lock ()
3739 self ._is_streaming = False
3840 self ._is_stream_ready = False
41+ self ._callback_extra : Struct = dict_to_struct ({})
3942 super ().__init__ (name )
4043
41- async def get_controls (self , * , timeout : Optional [float ] = None ) -> List [Control ]:
42- request = GetControlsRequest (controller = self .name )
44+ async def get_controls (self , * , extra : Optional [Dict [str , Any ]] = None , timeout : Optional [float ] = None ) -> List [Control ]:
45+ if extra is None :
46+ extra = {}
47+ request = GetControlsRequest (controller = self .name , extra = dict_to_struct (extra ))
4348 response : GetControlsResponse = await self .client .GetControls (request , timeout = timeout )
4449 return [Control (control ) for control in response .controls ]
4550
46- async def get_events (self , * , timeout : Optional [float ] = None ) -> Dict [Control , Event ]:
47- request = GetEventsRequest (controller = self .name )
51+ async def get_events (self , * , extra : Optional [Dict [str , Any ]] = None , timeout : Optional [float ] = None ) -> Dict [Control , Event ]:
52+ if extra is None :
53+ extra = {}
54+ request = GetEventsRequest (controller = self .name , extra = dict_to_struct (extra ))
4855 response : GetEventsResponse = await self .client .GetEvents (request , timeout = timeout )
4956 return {Control (event .control ): Event .from_proto (event ) for (event ) in response .events }
5057
51- def register_control_callback (self , control : Control , triggers : List [EventType ], function : Optional [ControlFunction ]):
58+ def register_control_callback (
59+ self , control : Control , triggers : List [EventType ], function : Optional [ControlFunction ], extra : Optional [Dict [str , Any ]] = None
60+ ):
61+ if extra is None :
62+ extra = {}
63+ self ._callback_extra = dict_to_struct (extra )
5264 with self ._lock :
5365 callbacks = self .callbacks .get (control , {})
5466 for trigger in triggers :
@@ -71,8 +83,10 @@ def handle_task_result(task: asyncio.Task):
7183 task = asyncio .create_task (self ._stream_events (), name = f"{ viam ._TASK_PREFIX } -input_stream_events" )
7284 task .add_done_callback (handle_task_result )
7385
74- async def trigger_event (self , event : Event , * , timeout : Optional [float ] = None ):
75- request = TriggerEventRequest (controller = self .name , event = event .proto )
86+ async def trigger_event (self , event : Event , * , extra : Optional [Dict [str , Any ]] = None , timeout : Optional [float ] = None ):
87+ if extra is None :
88+ extra = {}
89+ request = TriggerEventRequest (controller = self .name , event = event .proto , extra = dict_to_struct (extra ))
7690 try :
7791 await self .client .TriggerEvent (request , timeout = timeout )
7892 except GRPCError as e :
@@ -88,7 +102,7 @@ async def _stream_events(self):
88102 if not self .callbacks :
89103 return
90104
91- request = StreamEventsRequest (controller = self .name , events = [])
105+ request = StreamEventsRequest (controller = self .name , events = [], extra = self . _callback_extra )
92106 with self ._lock :
93107 for (control , callbacks ) in self .callbacks .items ():
94108 event = StreamEventsRequest .Events (
0 commit comments