Skip to content

Commit

Permalink
Fix pandas resample bug
Browse files Browse the repository at this point in the history
  • Loading branch information
TomHodson committed Mar 6, 2024
1 parent 2872a50 commit 6f7071f
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions src/ionbeam/aggregators/by_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ def add_message(self, msg: TabularMessage) -> None:

# Incoming messages can cover multiple time slices
# So we have to split them up
for period, resampled_data in msg.data.resample(
on="time", rule=self.granularity, kind="period", origin="epoch"
):
for timestamp, resampled_data in msg.data.resample(on="time", rule=self.granularity, origin="epoch"):
message_slice = TabularMessage(
data=resampled_data,
metadata=msg.metadata,
)

assert isinstance(timestamp, pd.Timestamp)
period = timestamp.to_period(freq=self.granularity)

if period not in self.buckets:
self.buckets[period] = TimeSliceBucket(self.source, self.observation_variable, period)

Expand Down Expand Up @@ -163,7 +165,6 @@ class TimeAggregator(Aggregator):
(source, observed_variable). The BucketContainer checks when each timeslice is reader to emit.
"""


"How much time a data granule represents."
granularity: str = "1H"

Expand Down Expand Up @@ -194,7 +195,6 @@ def init(self, globals):
self.emit_after_hours = self.globals.ingestion_time_constants.emit_after_hours
self.time_direction = self.globals.ingestion_time_constants.time_direction


def emit_message(self, msg, bucket):
msg = dataclasses.replace(msg, metadata=self.generate_metadata(msg))
msg = self.tag_message(msg, bucket.representative_previous_message)
Expand Down Expand Up @@ -232,7 +232,7 @@ def process(self, message: TabularMessage | FinishMessage) -> Iterable[TabularMe
f"Messages has timedelta = {message_timedelta} which is too close to the emit_after_hours setting ({self.emit_after_hours}H)!"
)

for period, data_chunk in message.data.resample(self.granularity, on="time", kind="period"):
for period, data_chunk in message.data.resample(self.granularity, on="time"):
if data_chunk.empty:
continue
chunked_message = dataclasses.replace(message, data=data_chunk)
Expand All @@ -246,7 +246,6 @@ def process(self, message: TabularMessage | FinishMessage) -> Iterable[TabularMe
key = (
message.metadata.source,
message.metadata.observation_variable,
# period,
)

# Find the bucket container that manages this key and give the message to it
Expand Down

0 comments on commit 6f7071f

Please sign in to comment.