@@ -22,7 +22,7 @@ class DataFrameRegistry:
2222
2323    def  __init__ (self ) ->  None :
2424        self ._registry : dict [str , Stream ] =  {}
25-         self ._wall_clock_registry : dict [str , Stream ] =  {}
25+         self ._wall_clock_registry : dict [str , tuple [ tuple [ Topic , ...],  Stream ] ] =  {}
2626        self ._topics : list [Topic ] =  []
2727        self ._repartition_origins : set [str ] =  set ()
2828        self ._topics_to_stream_ids : dict [str , set [str ]] =  {}
@@ -74,15 +74,11 @@ def register_wall_clock(
7474        self , dataframe : "StreamingDataFrame" , stream : Stream 
7575    ) ->  None :
7676        """ 
77-         Register a wall clock stream for the given topic. 
77+         Register a wall clock stream root for the given dataframe. 
78+         Stores the Stream itself to be composed later with an optional sink. 
7879        """ 
79-         topics  =  dataframe .topics 
80-         if  len (topics ) >  1 :
81-             raise  ValueError (
82-                 f"Expected a StreamingDataFrame with one topic, got { len (topics )}  
83-             )
84-         topic  =  topics [0 ]
85-         self ._wall_clock_registry [topic .name ] =  stream 
80+         # TODO: What if there are more wall clock streams for the same stream_id? 
81+         self ._wall_clock_registry [dataframe .stream_id ] =  (dataframe .topics , stream )
8682
8783    def  register_groupby (
8884        self ,
@@ -128,28 +124,28 @@ def compose_all(
128124        :param sink: callable to accumulate the results of the execution, optional. 
129125        :return: a {topic_name: composed} dict, where composed is a callable 
130126        """ 
131-         return  self ._compose (registry = self ._registry , sink = sink )
132- 
133-     def  compose_wall_clock (self ) ->  dict [str , VoidExecutor ]:
134-         """ 
135-         Composes all the wall clock streams and returns a dict of format {<topic>: <VoidExecutor>} 
136-         :return: a {topic_name: composed} dict, where composed is a callable 
137-         """ 
138-         return  self ._compose (registry = self ._wall_clock_registry )
139- 
140-     def  _compose (
141-         self , registry : dict [str , Stream ], sink : Optional [VoidExecutor ] =  None 
142-     ) ->  dict [str , VoidExecutor ]:
143127        executors  =  {}
144128        # Go over the registered topics with root Streams and compose them 
145-         for  topic , root_stream  in  registry .items ():
129+         for  topic , root_stream  in  self . _registry .items ():
146130            # If a root stream is connected to other roots, ".compose()" will 
147131            # return them all. 
148132            # Use the registered root Stream to filter them out. 
149133            root_executors  =  root_stream .compose (sink = sink )
150134            executors [topic ] =  root_executors [root_stream ]
151135        return  executors 
152136
137+     def  compose_wall_clock (self ) ->  dict [tuple [str , ...], VoidExecutor ]:
138+         """ 
139+         Compose all wall clock Streams and return executors keyed by stream_id. 
140+         Returns mapping: {stream_id: (topics, executor)} 
141+         """ 
142+         executors  =  {}
143+         for  _ , (topics , root_stream ) in  self ._wall_clock_registry .items ():
144+             root_executors  =  root_stream .compose ()
145+             _topics  =  tuple ({t .name  for  t  in  topics })
146+             executors [_topics ] =  root_executors [root_stream ]
147+         return  executors 
148+ 
153149    def  register_stream_id (self , stream_id : str , topic_names : list [str ]):
154150        """ 
155151        Register a mapping between the stream_id and topic names. 
0 commit comments