From 44e0d560861a9a020d572206e97f2aa34d399ebe Mon Sep 17 00:00:00 2001 From: Antonis Christofides Date: Fri, 10 Jan 2025 09:20:32 +0200 Subject: [PATCH] When aggregating, recalculate last record if partial (fixes DEV-62) Until now, when aggregating, it ignored and did not store the last record of the aggregated time series if it contained the MISS flag. This was because more source time series data might become available later. However it also meant that if more source time series data never became available, the last aggregated record would never be calculated. In addition, we sometimes want to get an estimation of the current daily value a bit before midnight. Now the last record is calculated alright given enough data (i.e. if the missing data are up to max_missing), and it is recalculated when more source data become available. This commit also upgrades pthelma to 2.1, and, correspondingly, changes the missing flag from "MISS" to "MISSING3", where, in this example, 3 is the number of missing values in the source data. This will not migrate existing records. This lack of migration should not cause any problems with aggregation. --- enhydris/autoprocess/models.py | 46 +++---- .../tests/test_models/test_aggregation.py | 126 +++++++++++++++++- enhydris/models/timeseries.py | 5 +- requirements.txt | 2 +- 4 files changed, 149 insertions(+), 30 deletions(-) diff --git a/enhydris/autoprocess/models.py b/enhydris/autoprocess/models.py index 7afeb370..2fbfd8ab 100644 --- a/enhydris/autoprocess/models.py +++ b/enhydris/autoprocess/models.py @@ -463,6 +463,27 @@ def save(self, force_insert=False, force_update=False, *args, **kwargs): self._check_resulting_timestamp_offset() super().save(force_insert, force_update, *args, **kwargs) + def _get_start_date(self): + if self._last_target_timeseries_record_needs_recalculation(): + # NOTE: + # Running ...latest().delete() won't work. Maybe because currently + # TimeseriesRecord has some primary key hacks. + adate = self.target_timeseries.timeseriesrecord_set.latest().timestamp + self.target_timeseries.timeseriesrecord_set.filter(timestamp=adate).delete() + self.target_timeseries.save() + return super()._get_start_date() + + def _last_target_timeseries_record_needs_recalculation(self): + # No recalculation needed if it didn't have the "MISSING" flag. + if "MISS" not in self.target_timeseries.get_last_record_as_string(): + return False + + # Technically we should examine the number of missing values (given in the flag) + # and whether more data has become available for that date. But this would be + # quite complicated so we won't do it. The worst that can happen is that the + # last aggregated record gets unnecessarily recalculated for a few times. + return True + def _check_resulting_timestamp_offset(self): if not self.resulting_timestamp_offset: return @@ -489,7 +510,7 @@ def process_timeseries(self): logging.getLogger("enhydris.autoprocess").error(str(e)) return HTimeseries() aggregated = self._aggregate_time_series(regularized) - return self._trim_last_record_if_not_complete(aggregated) + return aggregated def _regularize_time_series(self, source_htimeseries): mode = self.method == "mean" and RM.INSTANTANEOUS or RM.INTERVAL @@ -509,6 +530,7 @@ def _aggregate_time_series(self, source_htimeseries): self.method, min_count=min_count, target_timestamp_offset=self.resulting_timestamp_offset or None, + missing_flag="MISSING{}", ) def _get_source_step(self, source_htimeseries): @@ -524,25 +546,3 @@ def _divide_target_step_by_source_step(self, source_step, target_step): return int( pd.Timedelta(target_step) / pd.tseries.frequencies.to_offset(source_step) ) - - def _trim_last_record_if_not_complete(self, ahtimeseries): - # If the very last record of the time series has the "MISS" flag, it means it - # was derived with one or more missing values in the source. We don't want to - # leave such a record at the end of the target time series, or it won't be - # re-calculated when more data becomes available, because processing begins at - # the record following the last existing one. - if self._last_target_record_needs_trimming(ahtimeseries): - ahtimeseries.data = ahtimeseries.data[:-1] - return ahtimeseries - - def _last_target_record_needs_trimming(self, ahtimeseries): - if len(ahtimeseries.data.index) == 0: - return False - last_target_record = ahtimeseries.data.iloc[-1] - last_target_record_date = last_target_record.name + pd.Timedelta( - self.resulting_timestamp_offset - ) - return ( - "MISS" in last_target_record["flags"] - and self.source_end_date < last_target_record_date - ) diff --git a/enhydris/autoprocess/tests/test_models/test_aggregation.py b/enhydris/autoprocess/tests/test_models/test_aggregation.py index 087799b3..c106a50f 100644 --- a/enhydris/autoprocess/tests/test_models/test_aggregation.py +++ b/enhydris/autoprocess/tests/test_models/test_aggregation.py @@ -2,7 +2,7 @@ from unittest import mock from django.db import IntegrityError -from django.test import TestCase +from django.test import TestCase, override_settings import numpy as np import pandas as pd @@ -10,8 +10,9 @@ from htimeseries import HTimeseries from model_bakery import baker -from enhydris.autoprocess.models import Aggregation +from enhydris.autoprocess.models import Aggregation, AutoProcess from enhydris.models import Station, Timeseries, TimeseriesGroup, Variable +from enhydris.tests.test_models.test_timeseries import get_tzinfo class AggregationTestCase(TestCase): @@ -197,7 +198,7 @@ class AggregationProcessTimeseriesTestCase(TestCase): ) expected_result_for_max_missing_one = pd.DataFrame( - data={"value": [56.0, 157.0], "flags": ["", "MISS"]}, + data={"value": [56.0, 157.0], "flags": ["", "MISSING1"]}, columns=["value", "flags"], index=[ dt.datetime(2019, 5, 21, 10, 59, tzinfo=dt.timezone.utc), @@ -206,12 +207,16 @@ class AggregationProcessTimeseriesTestCase(TestCase): ) expected_result_for_max_missing_five = pd.DataFrame( - data={"value": [2.0, 56.0, 157.0], "flags": ["MISS", "", "MISS"]}, + data={ + "value": [2.0, 56.0, 157.0, 202.0], + "flags": ["MISSING5", "", "MISSING1", "MISSING2"], + }, columns=["value", "flags"], index=[ dt.datetime(2019, 5, 21, 9, 59, tzinfo=dt.timezone.utc), dt.datetime(2019, 5, 21, 10, 59, tzinfo=dt.timezone.utc), dt.datetime(2019, 5, 21, 11, 59, tzinfo=dt.timezone.utc), + dt.datetime(2019, 5, 21, 12, 59, tzinfo=dt.timezone.utc), ], ) @@ -352,3 +357,116 @@ def test_max(self, mock_regularize, mock_haggregate): self.aggregation.method = "max" self.aggregation.process_timeseries() self.assertEqual(mock_regularize.call_args.kwargs["mode"], RM.INTERVAL) + + +@override_settings( + CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}} +) +class AggregationRecalculatesLastValueIfNeededTestCase(TestCase): + """ + This test case uses this source time series: + 2019-05-21 17:00:00+02:00 0.0 + 2019-05-21 17:10:00+02:00 1.0 + 2019-05-21 17:20:00+02:00 2.0 + 2019-05-21 17:30:00+02:00 3.0 + 2019-05-21 17:40:00+02:00 4.0 + 2019-05-21 17:50:00+02:00 5.0 + 2019-05-21 18:00:00+02:00 6.0 + 2019-05-21 18:10:00+02:00 7.0 + 2019-05-21 18:20:00+02:00 8.0 + 2019-05-21 18:30:00+02:00 9.0 + 2019-05-21 18:40:00+02:00 10.0 + + It makes aggregation to hourly, and the last aggregated record (19:00) has two + missing values in the source time series, and therefore the MISSING2 flag. + Subsequently these two records are added: + 2019-05-21 18:50:00+02:00 11.0 + 2019-05-21 19:00:00+02:00 12.0 + + Then aggregation is repeated, and it is verified that the aggregated record at + 19:00 is recalculated as needed. + """ + + def setUp(self): + station = baker.make(Station, name="Hobbiton", display_timezone="Etc/GMT-2") + timeseries_group = baker.make( + TimeseriesGroup, + gentity=station, + variable__descr="h", + precision=0, + ) + source_timeseries = baker.make( + Timeseries, + timeseries_group=timeseries_group, + type=Timeseries.CHECKED, + time_step="10min", + ) + start_date = dt.datetime(2019, 5, 21, 17, 0, tzinfo=get_tzinfo("Etc/GMT-2")) + index = [start_date + dt.timedelta(minutes=i) for i in range(0, 110, 10)] + values = [float(x) for x in range(11)] + flags = [""] * 11 + source_timeseries.set_data( + pd.DataFrame( + data={"value": values, "flags": flags}, + columns=["value", "flags"], + index=index, + ) + ) + aggregation = Aggregation( + timeseries_group=timeseries_group, + target_time_step="1h", + method="sum", + max_missing=2, + resulting_timestamp_offset="", + ) + super(AutoProcess, aggregation).save() # Avoid triggering a celery task + self.aggregation_id = aggregation.id + + def test_initial_target_timeseries(self): + aggregation = Aggregation.objects.get(id=self.aggregation_id) + aggregation.execute() + actual_data = aggregation.target_timeseries.get_data().data + expected_data = pd.DataFrame( + data={"value": [21.0, 34.0], "flags": ["", "MISSING2"]}, + columns=["value", "flags"], + index=[ + dt.datetime(2019, 5, 21, 18, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 19, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + ], + ) + expected_data.index.name = "date" + pd.testing.assert_frame_equal(actual_data, expected_data) + + def test_updated_target_timeseries(self): + Aggregation.objects.get(id=self.aggregation_id).execute() + + self._extend_source_timeseries() + aggregation = Aggregation.objects.get(id=self.aggregation_id) + aggregation.execute() + + ahtimeseries = aggregation.target_timeseries.get_data() + expected_data = pd.DataFrame( + data={"value": [21.0, 57.0], "flags": ["", ""]}, + columns=["value", "flags"], + index=[ + dt.datetime(2019, 5, 21, 18, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + dt.datetime(2019, 5, 21, 19, 0, tzinfo=get_tzinfo("Etc/GMT-2")), + ], + ) + expected_data.index.name = "date" + pd.testing.assert_frame_equal(ahtimeseries.data, expected_data) + + def _extend_source_timeseries(self): + aggregation = Aggregation.objects.get(id=self.aggregation_id) + source_timeseries = aggregation.source_timeseries + new_values = [11.0, 12.0] + new_flags = ["", ""] + end_date = source_timeseries.end_date + delta = dt.timedelta + new_dates = [end_date + delta(minutes=10), end_date + delta(minutes=20)] + new_data = pd.DataFrame( + data={"value": new_values, "flags": new_flags}, + columns=["value", "flags"], + index=new_dates, + ) + source_timeseries.append_data(new_data) diff --git a/enhydris/models/timeseries.py b/enhydris/models/timeseries.py index d2103dfe..d637eece 100644 --- a/enhydris/models/timeseries.py +++ b/enhydris/models/timeseries.py @@ -348,13 +348,14 @@ def save(self, force_insert=False, force_update=False, *args, **kwargs): class TimeseriesRecord(models.Model): - # Ugly primary key hack. + # Ugly primary key hack - FIX ME in Django 5.2. # Django does not allow composite primary keys, whereas timescaledb can't work # without them. Our composite primary key in this case is (timeseries, timestamp). # What we do is set managed=False, so that Django won't create the table itself; # we create it with migrations.RunSQL(). We also set "primary_key=True" in one of # the fields. While technically this is wrong, it fools Django into not expecting - # an "id" field to exist, and it doesn't affect querying functionality. + # an "id" field to exist, and it doesn't affect querying functionality (except in + # one case in autoprocess.models.Aggregation._get_start_date(), see comment there). timeseries = models.ForeignKey(Timeseries, on_delete=models.CASCADE) timestamp = models.DateTimeField(primary_key=True, verbose_name=_("Timestamp")) value = models.FloatField(blank=True, null=True, verbose_name=_("Value")) diff --git a/requirements.txt b/requirements.txt index 437d23df..404bc5e3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,5 +16,5 @@ django-simple-captcha>=0.5.14,<1 django-bootstrap4>=2,<4 requests>=2.25,<3 defusedxml>=0.7.1,<1 -pthelma[all]>=2,<3 +pthelma[all]>=2.1,<3 matplotlib>=3,<3.7