Skip to content

Commit

Permalink
Do not add producer that has been added by saver (#901)
Browse files Browse the repository at this point in the history
* Do not add producer that has been added by saver

* Bug fix, only register the plugins not in loaders
  • Loading branch information
dachengx authored Oct 13, 2024
1 parent dca3545 commit 8843247
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
5 changes: 3 additions & 2 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ def _get_end_targets(plugins: dict) -> ty.Tuple[str]:
"""Get the datatype that is provided by a plugin but not depended on by any other plugin."""
provides = [prov for p in plugins.values() for prov in strax.to_str_tuple(p.provides)]
depends_on = [dep for p in plugins.values() for dep in strax.to_str_tuple(p.depends_on)]
uniques = list(set(provides) ^ set(depends_on))
uniques = list(set(provides) - set(depends_on))
return strax.to_str_tuple(uniques)

@property
Expand Down Expand Up @@ -1310,7 +1310,8 @@ def concat_loader(*args, **kwargs):
if len(intersec):
raise RuntimeError(f"{intersec} both computed and loaded?!")
if len(targets) > 1:
final_plugin = [t for t in targets if t in self._get_end_targets(plugins)][:1]
pendants = set(targets) & set(self._get_end_targets(plugins))
final_plugin = tuple(pendants - set(loaders))[:1]
self.log.warning(
"Multiple targets detected! This is only suitable for mass "
f"producing dataypes since only {final_plugin} will be "
Expand Down
10 changes: 8 additions & 2 deletions strax/processors/post_office.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ def state(self):
result.append(f"Total time spent: {sum(self.time_spent.values())}")
return "\n".join(result)

def register_producer(self, iterator: ty.Iterator[ty.Any], topic: ty.Union[str, ty.Tuple[str]]):
def register_producer(
self,
iterator: ty.Iterator[ty.Any],
topic: ty.Union[str, ty.Tuple[str]],
registered: ty.Tuple[str, ...] = tuple(),
):
"""Register iterator as the source of messages for topic.
If topic is a tuple of strings, the iterator should produce (topic -> message) dicts, with
Expand All @@ -128,7 +133,8 @@ def register_producer(self, iterator: ty.Iterator[ty.Any], topic: ty.Union[str,
# Multi-output producer, recurse
for sub_topic in topic:
self._multi_output_topics[sub_topic] = topic
self.register_producer(iterator, sub_topic)
if sub_topic not in registered:
self.register_producer(iterator, sub_topic)
return
assert isinstance(topic, str)
if topic in self._producers:
Expand Down
3 changes: 3 additions & 0 deletions strax/processors/single_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ def __init__(
continue
plugins_seen.append(p)

# Some data_types might be already saved and can be loaded;
# remove them from the list of provides
self.post_office.register_producer(
p.iter(iters={dep: self.post_office.get_iter(dep, d) for dep in p.depends_on}),
topic=strax.to_str_tuple(p.provides),
registered=tuple(components.loaders),
)

dtypes_built = {d: p for p in components.plugins.values() for d in p.provides}
Expand Down

0 comments on commit 8843247

Please sign in to comment.