From 7734573933e56ebbaf84e1214d2cc479ed410442 Mon Sep 17 00:00:00 2001 From: chrispreee <117157625+chrispreee@users.noreply.github.com> Date: Wed, 23 Oct 2024 13:30:17 +0100 Subject: [PATCH] Initial working draft of aggregating serializer and viewset, see #HEA-547 --- apps/baseline/models.py | 4 +- apps/baseline/serializers.py | 176 +++++++++++++++++++++++++++++ apps/baseline/viewsets.py | 208 +++++++++++++++++++++++++++++++++++ hea/urls.py | 2 + 4 files changed, 388 insertions(+), 2 deletions(-) diff --git a/apps/baseline/models.py b/apps/baseline/models.py index 9dabfd3b..0d2fd13c 100644 --- a/apps/baseline/models.py +++ b/apps/baseline/models.py @@ -1099,7 +1099,7 @@ class LivelihoodActivity(common_models.Model): quantity_sold = models.PositiveIntegerField(blank=True, null=True, verbose_name=_("Quantity Sold/Exchanged")) quantity_other_uses = models.PositiveIntegerField(blank=True, null=True, verbose_name=_("Quantity Other Uses")) # Can normally be calculated / validated as `quantity_produced + quantity_purchased - quantity_sold - quantity_other_uses` # NOQA: E501 - # but there are exceptions, such as MilkProduction, where there is also an amount used for ButterProduction + # but there are exceptions, such as MilkProduction, where there is also an amount used for ButterProduction, is this captured quantity_other_uses? # NOQA: E501 quantity_consumed = models.PositiveIntegerField(blank=True, null=True, verbose_name=_("Quantity Consumed")) price = models.FloatField(blank=True, null=True, verbose_name=_("Price"), help_text=_("Price per unit")) @@ -1110,7 +1110,7 @@ class LivelihoodActivity(common_models.Model): # of external goods or services. expenditure = models.FloatField(blank=True, null=True, help_text=_("Expenditure")) - # Can normally be calculated / validated as `quantity_consumed` * `kcals_per_unit` + # Can normally be calculated / validated as `quantity_consumed` * `livelihoodstrategy__product__kcals_per_unit` kcals_consumed = models.PositiveIntegerField( blank=True, null=True, diff --git a/apps/baseline/serializers.py b/apps/baseline/serializers.py index 83298a0e..7b3cc746 100644 --- a/apps/baseline/serializers.py +++ b/apps/baseline/serializers.py @@ -1,7 +1,11 @@ +from django.db.models import Sum +from django.utils import translation +from rest_framework import fields as rest_framework_fields from rest_framework import serializers from rest_framework_gis.serializers import GeoFeatureModelSerializer from common.fields import translation_fields +from metadata.models import LivelihoodStrategyType from .models import ( BaselineLivelihoodActivity, @@ -1466,3 +1470,175 @@ def get_strategy_label(self, obj): def get_wealth_group_label(self, obj): return str(obj.wealth_group) + + +class DictQuerySetField(rest_framework_fields.SerializerMethodField): + def __init__(self, field_name=None, **kwargs): + self.field_name = field_name + super().__init__(**kwargs) + + def to_representation(self, obj): + return self.parent.get_field(obj, self.field_name) + + +class LivelihoodZoneBaselineReportSerializer(serializers.ModelSerializer): + class Meta: + model = LivelihoodZoneBaseline + fields = ( + "id", + "name", + "description", + "source_organization", + "source_organization_name", + "livelihood_zone", + "livelihood_zone_name", + "country_pk", + "country_iso_en_name", + "main_livelihood_category", + "bss", + "currency", + "reference_year_start_date", + "reference_year_end_date", + "valid_from_date", + "valid_to_date", # to display "is latest" / "is historic" in the UI for each ref yr + "population_source", + "population_estimate", + "livelihoodzone_pk", + "livelihood_strategy_pk", + "strategy_type", + "livelihood_activity_pk", + "wealth_group_category_code", + "population_estimate", + "slice_sum_kcals_consumed", + "sum_kcals_consumed", + "kcals_consumed_percent", + "product_cpc", + "product_common_name", + ) + + # For each of these aggregates the following calculation columns are added: + # (a) Total at the LZB level (filtered by population, wealth group, etc), eg, sum_kcals_consumed. + # (b) Total for the selected product/strategy type slice, eg, slice_sum_kcals_consumed. + # (c) The percentage the slice represents of the whole, eg, kcals_consumed_percent. + # Filters are automatically created, eg, min_kcals_consumed_percent and max_kcals_consumed_percent. + # If no ordering is specified by the FilterSet, the results are ordered by percent descending in the order here. + aggregates = { + "kcals_consumed": Sum, + } + + # For each of these pairs, a URL parameter is created "slice_{field}", eg, ?slice_product= + # They can appear zero, one or multiple times in the URL, and define a sub-slice of the row-level data. + # A slice includes activities with ANY of the products, AND, ANY of the strategy types. + # For example: (product=R0 OR product=L0) AND (strategy_type=MilkProd OR strategy_type=CropProd) + slice_fields = { + "product": "livelihood_strategies__product__cpc__istartswith", + "strategy_type": "livelihood_strategies__strategy_type__iexact", + } + + livelihood_zone_name = DictQuerySetField("livelihood_zone_name") + source_organization_name = DictQuerySetField("source_organization_pk") + country_pk = DictQuerySetField("country_pk") + country_iso_en_name = DictQuerySetField("country_iso_en_name") + livelihoodzone_pk = DictQuerySetField("livelihoodzone_pk") + livelihood_strategy_pk = DictQuerySetField("livelihood_strategy_pk") + livelihood_activity_pk = DictQuerySetField("livelihood_activity_pk") + wealth_group_category_code = DictQuerySetField("wealth_group_category_code") + id = DictQuerySetField("id") + name = DictQuerySetField("name") + description = DictQuerySetField("description") + source_organization = DictQuerySetField("source_organization") + livelihood_zone = DictQuerySetField("livelihood_zone") + main_livelihood_category = DictQuerySetField("main_livelihood_category") + bss = DictQuerySetField("bss") + currency = DictQuerySetField("currency") + reference_year_start_date = DictQuerySetField("reference_year_start_date") + reference_year_end_date = DictQuerySetField("reference_year_end_date") + valid_from_date = DictQuerySetField("valid_from_date") + valid_to_date = DictQuerySetField("valid_to_date") + population_source = DictQuerySetField("population_source") + population_estimate = DictQuerySetField("population_estimate") + product_cpc = DictQuerySetField("product_cpc") + product_common_name = DictQuerySetField("product_common_name") + strategy_type = DictQuerySetField("strategy_type") + + slice_sum_kcals_consumed = DictQuerySetField("slice_sum_kcals_consumed") + sum_kcals_consumed = DictQuerySetField("sum_kcals_consumed") + kcals_consumed_percent = DictQuerySetField("kcals_consumed_percent") + + def get_fields(self): + """ + User can specify fields= parameter to specify a field list, comma-delimited. + + If the fields parameter is not passed or does not match fields, defaults to self.Meta.fields. + + The aggregated fields self.aggregates are added regardless of user field selection. + """ + field_list = "request" in self.context and self.context["request"].query_params.get("fields", None) + if not field_list: + return super().get_fields() + + # User-provided list of fields + field_names = set(field_list.split(",")) + + # Add the aggregates that are always returned + for field_name, aggregate in self.aggregates.items(): + field_names |= { + field_name, + self.aggregate_field_name(field_name, aggregate), + self.slice_aggregate_field_name(field_name, aggregate), + self.slice_percent_field_name(field_name, aggregate), + } + + # Add the ordering field if specified + ordering = self.context["request"].query_params.get("ordering") + if ordering: + field_names.add(ordering) + + # Remove any that don't match a field as a dict + return {k: v for k, v in super().get_fields().items() if k in field_names} + + def get_field(self, obj, field_name): + """ + Aggregated querysets are a list of dicts. + This is called by AggregatedQuerysetField to get the value from the row dict. + """ + db_field = self.field_to_database_path(field_name) + value = obj.get(db_field, "") + # Get the readable, translated string from the choice key. + if field_name == "strategy_type" and value: + return dict(LivelihoodStrategyType.choices).get(value, value) + return value + + @staticmethod + def field_to_database_path(field_name): + language_code = translation.get_language() + return { + "livelihoodzone_pk": "pk", + "name": f"name_{language_code}", + "description": f"description_{language_code}", + "valid_to_date": "valid_to_date", + "livelihood_strategy_pk": "livelihood_strategies__pk", + "livelihood_activity_pk": "livelihood_strategies__livelihoodactivity__pk", + "wealth_group_category_code": "livelihood_strategies__livelihoodactivity__wealth_group__wealth_group_category__code", # NOQA: E501 + "kcals_consumed": "livelihood_strategies__livelihoodactivity__kcals_consumed", + "livelihood_zone_name": f"livelihood_zone__name_{language_code}", + "source_organization_pk": "source_organization__pk", + "source_organization_name": "source_organization__name", + "country_pk": "livelihood_zone__country__pk", + "country_iso_en_name": "livelihood_zone__country__iso_en_name", + "product_cpc": "livelihood_strategies__product", + "strategy_type": "livelihood_strategies__strategy_type", + "product_common_name": f"livelihood_strategies__product__common_name_{language_code}", + }.get(field_name, field_name) + + @staticmethod + def aggregate_field_name(field_name, aggregate): + return f"{aggregate.name.lower()}_{field_name}" # eg, sum_kcals_consumed + + @staticmethod + def slice_aggregate_field_name(field_name, aggregate): + return f"slice_{aggregate.name.lower()}_{field_name}" # eg, slice_sum_kcals_consumed + + @staticmethod + def slice_percent_field_name(field_name, aggregate): + return f"{field_name}_percent" # eg, kcals_consumed_percent diff --git a/apps/baseline/viewsets.py b/apps/baseline/viewsets.py index 85ad0b10..229a9076 100644 --- a/apps/baseline/viewsets.py +++ b/apps/baseline/viewsets.py @@ -1,5 +1,8 @@ from django.db import models +from django.db.models import F, OuterRef, Q, Subquery +from django.db.models.functions import Coalesce, NullIf from django_filters import rest_framework as filters +from rest_framework.viewsets import ModelViewSet from common.fields import translation_fields from common.filters import MultiFieldFilter @@ -63,6 +66,7 @@ LivelihoodProductCategorySerializer, LivelihoodStrategySerializer, LivelihoodZoneBaselineGeoSerializer, + LivelihoodZoneBaselineReportSerializer, LivelihoodZoneBaselineSerializer, LivelihoodZoneSerializer, LivestockSaleSerializer, @@ -1550,3 +1554,207 @@ class CopingStrategyViewSet(BaseModelViewSet): "leaders", "strategy", ] + + +class LivelihoodZoneBaselineReportViewSet(ModelViewSet): + """ + There are two ‘levels’ of filter needed on this endpoint. The standard ones which are already on the LZB endpoint + filter the LZBs that are returned (eg, population range and wealth group). Let’s call them ‘global’ filters. + Everything needs filtering by wealth group or population, if those filters are active. + + The strategy type and product filters do not remove LZBs from the results by themselves; they only filter the + statistics. I suggest we call them data ‘slice’ filters. + + If a user selects Sorghum, that filters the kcals income for our slice. The kcals income for the slice is then + divided by the kcals income on the global set for the kcals income percent. + + The global filters are identical to those already on the LZB endpoint (and will always be - it is sharing the + code). + + The slice filters are: + + - slice_product (for multiple, repeat the parameter, eg, slice_product=R0&slice_product=B01). These + match any CPC code that starts with the value. (The client needs to convert the selected product to CPC.) + + - slice_strategy_type - you can specify multiple, and you need to pass the code not the label (which could be + translated). (These are case-insensitive but otherwise must be an exact match.) + + The slice is defined by matching any of the products, AND any of the strategy types (as opposed to OR). + + Translated fields, eg, name, description, are rendered in the currently selected locale if possible. (Except + Country, which has different translations following ISO.) This can be selected in the UI or set using eg, + &language=pt which overrides the UI selection. + + You select the fields you want using the &fields= parameter in the usual way. If you omit the fields parameter all + fields are returned. These are currently the same field list as the normal LZB endpoint, plus the aggregations, + called slice_sum_kcals_consumed, sum_kcals_consumed, kcals_consumed_percent, plus product CPC and product common + name translated. If you omit a field, the statistics for that field will be aggregated together. + + The ordering code is also shared with the normal LZB endpoint, which uses the standard &ordering= parameter. If + none are specified, the results are sorted by the aggregations descending, ie, biggest percentage first. + + Example URL: + + http://localhost:8000/api/livelihoodzonebaselinereport/ + ?language=pt + &slice_product=R09 + &slice_strategy_type=MilkProduction + &fields=id,name,description,source_organization,source_organization_name,livelihood_zone,livelihood_zone_name, + country_pk,country_iso_en_name,main_livelihood_category,bss,currency,reference_year_start_date, + reference_year_end_date,valid_from_date,valid_to_date,population_source,population_estimate, + livelihoodzone_pk,livelihood_strategy_pk,strategy_type,livelihood_activity_pk,wealth_group_category_code, + product_cpc,product_common_name + &source_organization=1 + &min_kcals_consumed_percent=52 + &max_kcals_consumed_percent=99 + + The strategy type codes are: + MilkProduction + ButterProduction + MeatProduction + LivestockSale + CropProduction + FoodPurchase + PaymentInKind + ReliefGiftOther + Hunting + Fishing + WildFoodGathering + OtherCashIncome + OtherPurchase + + The product hierarchy can be retrieved from the classified product endpoint /api/classifiedproduct/. + + You can then filter by the percentage of your slice. The only value we have data for so far is kcals_consumed, + filtered using, eg, &min_kcals_consumed_percent=52&max_kcals_consumed_percent=99. + + The API currently only supports a single slice at a time. For combining please run multiple searches, and add + the desired results to the Compare tab. + """ + + queryset = LivelihoodZoneBaseline.objects.all() + serializer_class = LivelihoodZoneBaselineReportSerializer + filterset_class = LivelihoodZoneBaselineFilterSet + + def get_queryset(self): + """ + Aggregates the values specified in the serializer.aggregates property, grouping and aggregating by any + fields not requested by the user. + """ + + # Add the global filters, eg, wealth group, population range, that apply to global search results AND slices: + queryset = self.filter_queryset(super().get_queryset()) + + # Add the global aggregations, eg, total consumption filtered by wealth group but not by prod/strategy slice: + queryset = queryset.annotate(**self.global_aggregates()) + + # Work out the slice aggregates, eg, slice_sum_kcals_consumed for product/strategy slice: + slice_aggregates = self.get_slice_aggregates() + # Work out the calculations on aggregates, eg, + # kcals_consumed_percent = slice_sum_kcals_consumed * 100 / sum_kcals_consumed + calcs_on_aggregates = self.get_calculations_on_aggregates() + + # Extract the model fields from the combined list of model and calculated fields: + model_fields = self.get_serializer().get_fields().keys() - slice_aggregates.keys() - calcs_on_aggregates.keys() + + # Convert user-friendly field name (eg, livelihood_strategy_pk) into db field path (livelihood_strategies__pk). + obj_field_paths = [self.serializer_class.field_to_database_path(field) for field in model_fields] + + # Get them from the query. The ORM converts this qs.values() call into a SQL `GROUP BY *field_paths` clause. + queryset = queryset.values(*obj_field_paths) + + # The ORM converts these annotations into grouped SELECT ..., SUM(lzb.population), SUM(la.kcals_consumed), etc. + queryset = queryset.annotate(**slice_aggregates, **calcs_on_aggregates) + + # Add the filters on aggregates, eg, kcals_consumed_percent > 50% + queryset = queryset.filter(self.get_filters_on_aggregates()) + + # If no ordering has been specified by the FilterSet, order by final value fields descending: + if not self.request.query_params.get("ordering"): + order_by_value_desc = [ + f"-{self.serializer_class.slice_percent_field_name(field_name, aggregate)}" + for field_name, aggregate in self.serializer_class.aggregates.items() + ] + queryset = queryset.order_by(*order_by_value_desc) + + return queryset + + def get_filters_on_aggregates(self): + # Add filters on aggregates, eg, .filter(kcals_consumed_percent__gte=params.get("min_kcals_consumed_percent")) + filters_on_aggregates = Q() + for url_param_prefix, orm_expr in (("min", "gte"), ("max", "lte")): + for field in self.serializer_class.aggregates.keys(): + url_param_name = f"{url_param_prefix}_{field}_percent" + limit = self.request.query_params.get(url_param_name) + if limit is not None: + filters_on_aggregates &= Q(**{f"{field}_percent__{orm_expr}": float(limit)}) + return filters_on_aggregates + + def global_aggregates(self): + """ + Produced a subquery per LZB-wide statistic that we need, eg, kcals_consumed for selected wealth groups for all + products and strategies. The kcals_consumed for a specific set of products and strategy types is divided by + this figure to obtain a percentage. + """ + global_aggregates = {} + for field_name, aggregate in self.serializer_class.aggregates.items(): + subquery = LivelihoodZoneBaseline.objects.all() + + # The FilterSet applies the global filters, such as Wealth Group Category. + # We also need to apply these to the subquery that gets the kcal totals per LZB (eg, the kcal_percent + # denominator), to restrict the 100% value by, for example, wealth group. + subquery = self.filter_queryset(subquery) + + # Join to outer query + subquery = subquery.filter(pk=OuterRef("pk")) + + # Annotate with the aggregate expression, eg, sum_kcals_consumed + aggregate_field_name = self.serializer_class.aggregate_field_name(field_name, aggregate) + subquery = subquery.annotate( + **{aggregate_field_name: aggregate(self.serializer_class.field_to_database_path(field_name))} + ).values(aggregate_field_name)[:1] + + global_aggregates[aggregate_field_name] = Subquery(subquery) + + return global_aggregates + + def get_slice_aggregates(self): + # Construct the filters for the slice, for example specific products & strategy types, to apply to each measure + slice_filter = self.get_slice_filters() + # Remove the aggregated fields from the obj field list, and instead add them as sliced aggregate annotations: + slice_aggregates = {} + required_fields = set(self.get_serializer().get_fields().keys()) + required_fields.add(self.request.query_params.get("ordering", "")) + for field_name, aggregate in self.serializer_class.aggregates.items(): + aggregate_field_name = self.serializer_class.slice_aggregate_field_name(field_name, aggregate) + if aggregate_field_name in required_fields: + # Annotate the queryset with the aggregate, eg, slice_sum_kcals_consumed, applying the slice filters. + # This is then divided by, eg, sum_kcals_consumed for the percentage of the slice. + field_path = self.serializer_class.field_to_database_path(field_name) + slice_aggregates[aggregate_field_name] = aggregate(field_path, filter=slice_filter, default=0) + return slice_aggregates + + def get_slice_filters(self): + # Filters to slice the aggregations, to obtain, eg, the kcals for the selected products/strategy types. + # This is then divided by the total for the LZB for the slice percentage. + slice_filters = Q() + for slice_field, slice_expr in self.serializer_class.slice_fields.items(): + slice_filter = Q() + for item in self.request.query_params.getlist(f"slice_{slice_field}"): + slice_filter |= Q(**{slice_expr: item}) + # Slice must match any of the products AND any of the strategy types (if selected) + slice_filters &= slice_filter + return slice_filters + + def get_calculations_on_aggregates(self): + # Aggregate slice percentages + # TODO: Add complex kcal income calculations from LIAS + calcs_on_aggregates = {} + for field_name, aggregate in self.serializer_class.aggregates.items(): + slice_total = F(self.serializer_class.slice_aggregate_field_name(field_name, aggregate)) + overall_total = F(self.serializer_class.aggregate_field_name(field_name, aggregate)) + expr = slice_total * 100 / NullIf(overall_total, 0) # Protects against divide by zero + expr = Coalesce(expr, 0) # Zero if no LivActivities found for prod/strategy slice + slice_percent_field_name = self.serializer_class.slice_percent_field_name(field_name, aggregate) + calcs_on_aggregates[slice_percent_field_name] = expr + return calcs_on_aggregates diff --git a/hea/urls.py b/hea/urls.py index 1cc6733e..c915de02 100644 --- a/hea/urls.py +++ b/hea/urls.py @@ -26,6 +26,7 @@ LivelihoodActivityViewSet, LivelihoodProductCategoryViewSet, LivelihoodStrategyViewSet, + LivelihoodZoneBaselineReportViewSet, LivelihoodZoneBaselineViewSet, LivelihoodZoneViewSet, LivestockSaleViewSet, @@ -80,6 +81,7 @@ router.register(r"sourceorganization", SourceOrganizationViewSet) router.register(r"livelihoodzone", LivelihoodZoneViewSet) router.register(r"livelihoodzonebaseline", LivelihoodZoneBaselineViewSet) +router.register(r"livelihoodzonebaselinereport", LivelihoodZoneBaselineReportViewSet, "livelihoodzonebaselinereport") router.register(r"livelihoodproductcategory", LivelihoodProductCategoryViewSet) router.register(r"community", CommunityViewSet) router.register(r"wealthgroup", WealthGroupViewSet)