From 6f7071f8a3c63b7378aa2b2ab239fd1fb56a5ba0 Mon Sep 17 00:00:00 2001 From: Tom Date: Wed, 6 Mar 2024 15:50:13 +0000 Subject: [PATCH] Fix pandas resample bug --- src/ionbeam/aggregators/by_time.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ionbeam/aggregators/by_time.py b/src/ionbeam/aggregators/by_time.py index 4dcc5cc..37e2113 100644 --- a/src/ionbeam/aggregators/by_time.py +++ b/src/ionbeam/aggregators/by_time.py @@ -101,13 +101,15 @@ def add_message(self, msg: TabularMessage) -> None: # Incoming messages can cover multiple time slices # So we have to split them up - for period, resampled_data in msg.data.resample( - on="time", rule=self.granularity, kind="period", origin="epoch" - ): + for timestamp, resampled_data in msg.data.resample(on="time", rule=self.granularity, origin="epoch"): message_slice = TabularMessage( data=resampled_data, metadata=msg.metadata, ) + + assert isinstance(timestamp, pd.Timestamp) + period = timestamp.to_period(freq=self.granularity) + if period not in self.buckets: self.buckets[period] = TimeSliceBucket(self.source, self.observation_variable, period) @@ -163,7 +165,6 @@ class TimeAggregator(Aggregator): (source, observed_variable). The BucketContainer checks when each timeslice is reader to emit. """ - "How much time a data granule represents." granularity: str = "1H" @@ -194,7 +195,6 @@ def init(self, globals): self.emit_after_hours = self.globals.ingestion_time_constants.emit_after_hours self.time_direction = self.globals.ingestion_time_constants.time_direction - def emit_message(self, msg, bucket): msg = dataclasses.replace(msg, metadata=self.generate_metadata(msg)) msg = self.tag_message(msg, bucket.representative_previous_message) @@ -232,7 +232,7 @@ def process(self, message: TabularMessage | FinishMessage) -> Iterable[TabularMe f"Messages has timedelta = {message_timedelta} which is too close to the emit_after_hours setting ({self.emit_after_hours}H)!" ) - for period, data_chunk in message.data.resample(self.granularity, on="time", kind="period"): + for period, data_chunk in message.data.resample(self.granularity, on="time"): if data_chunk.empty: continue chunked_message = dataclasses.replace(message, data=data_chunk) @@ -246,7 +246,6 @@ def process(self, message: TabularMessage | FinishMessage) -> Iterable[TabularMe key = ( message.metadata.source, message.metadata.observation_variable, - # period, ) # Find the bucket container that manages this key and give the message to it