-
-
Notifications
You must be signed in to change notification settings - Fork 32.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow for separate Opower costs and consumption data #132133
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
"""Coordinator to handle Opower connections.""" | ||
|
||
from datetime import datetime, timedelta | ||
from enum import Enum | ||
import logging | ||
from types import MappingProxyType | ||
from typing import Any, cast | ||
|
@@ -35,6 +36,17 @@ | |
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class StatisticType(Enum): | ||
"""Statistic types included.""" | ||
|
||
COST = "COST" | ||
CONSUMPTION = "CONSUMPTION" | ||
|
||
def __str__(self) -> str: | ||
"""Return the value of the enum.""" | ||
return self.value | ||
|
||
|
||
class OpowerCoordinator(DataUpdateCoordinator[dict[str, Forecast]]): | ||
"""Handle fetching Opower data, updating sensors and inserting statistics.""" | ||
|
||
|
@@ -113,14 +125,15 @@ | |
) | ||
if not last_stat: | ||
_LOGGER.debug("Updating statistic for the first time") | ||
cost_reads = await self._async_get_cost_reads( | ||
cost_reads, cost_reads_billing = await self._async_get_cost_reads( | ||
account, self.api.utility.timezone() | ||
) | ||
cost_sum = 0.0 | ||
consumption_sum = 0.0 | ||
last_stats_time = None | ||
last_stats_time_billing = None | ||
else: | ||
cost_reads = await self._async_get_cost_reads( | ||
cost_reads, cost_reads_billing = await self._async_get_cost_reads( | ||
account, | ||
self.api.utility.timezone(), | ||
last_stat[consumption_statistic_id][0]["start"], | ||
|
@@ -156,32 +169,100 @@ | |
cost_sum = cast(float, stats[cost_statistic_id][0]["sum"]) | ||
consumption_sum = cast(float, stats[consumption_statistic_id][0]["sum"]) | ||
last_stats_time = stats[consumption_statistic_id][0]["start"] | ||
if cost_reads_billing: | ||
start = cost_reads_billing[0].start_time | ||
_LOGGER.debug("Getting cost statistics at: %s", start) | ||
# In the common case there should be a previous statistic at start time | ||
# so we only need to fetch one statistic. If there isn't any, fetch all. | ||
for end in (start + timedelta(seconds=1), None): | ||
stats = await get_instance(self.hass).async_add_executor_job( | ||
statistics_during_period, | ||
self.hass, | ||
start, | ||
end, | ||
{cost_statistic_id}, | ||
"hour", | ||
None, | ||
{"sum"}, | ||
) | ||
if stats: | ||
break | ||
if end: | ||
_LOGGER.debug( | ||
"Not found. Trying to find the oldest statistic after %s", | ||
start, | ||
) | ||
# We are in this code path only if get_last_statistics found a stat | ||
# so statistics_during_period should also have found at least one. | ||
assert stats | ||
cost_sum = cast(float, stats[cost_statistic_id][0]["sum"]) | ||
last_stats_time_billing = stats[cost_statistic_id][0]["start"] | ||
|
||
cost_statistics = [] | ||
consumption_statistics = [] | ||
await self._insert_statistics_cost_read( | ||
account, | ||
cost_statistic_id, | ||
consumption_statistic_id, | ||
cost_sum, | ||
consumption_sum, | ||
last_stats_time, | ||
cost_reads, | ||
[StatisticType.COST, StatisticType.CONSUMPTION] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code is now hard to maintain with all the branching. I feel like it could be simplified if you have 2 lists of cost reads: one for usage, one for cost. For accounts like mine the 2 lists will be the same. For accounts like yours the second list will have the monthly data. Then you can fetch and update statistics in the recorder separately. This unfortunately means for accounts like mine the recorder interactions will now be slower but I think it's better to have less complex code. Could you try that in a separate PR to see how the code would look like? |
||
if not cost_reads_billing | ||
else [StatisticType.CONSUMPTION], | ||
) | ||
|
||
for cost_read in cost_reads: | ||
start = cost_read.start_time | ||
if last_stats_time is not None and start.timestamp() <= last_stats_time: | ||
continue | ||
cost_sum += cost_read.provided_cost | ||
consumption_sum += cost_read.consumption | ||
if cost_reads_billing: | ||
await self._insert_statistics_cost_read( | ||
account, | ||
cost_statistic_id, | ||
consumption_statistic_id, | ||
cost_sum, | ||
consumption_sum, | ||
last_stats_time_billing, | ||
cost_reads_billing, | ||
[StatisticType.COST], | ||
) | ||
|
||
async def _insert_statistics_cost_read( | ||
self, | ||
account: Account, | ||
cost_statistic_id: str, | ||
consumption_statistic_id: str, | ||
cost_sum: float, | ||
consumption_sum: float, | ||
last_stats_time: float | None, | ||
cost_reads: list[CostRead], | ||
statistic_types: list[StatisticType], | ||
) -> None: | ||
"""Insert Opower statistics for a single cost read list.""" | ||
cost_statistics = [] | ||
consumption_statistics = [] | ||
|
||
for cost_read in cost_reads: | ||
start = cost_read.start_time | ||
if last_stats_time is not None and start.timestamp() <= last_stats_time: | ||
continue | ||
cost_sum += cost_read.provided_cost | ||
consumption_sum += cost_read.consumption | ||
|
||
if StatisticType.COST in statistic_types: | ||
cost_statistics.append( | ||
StatisticData( | ||
start=start, state=cost_read.provided_cost, sum=cost_sum | ||
) | ||
) | ||
if StatisticType.CONSUMPTION in statistic_types: | ||
consumption_statistics.append( | ||
StatisticData( | ||
start=start, state=cost_read.consumption, sum=consumption_sum | ||
) | ||
) | ||
|
||
name_prefix = ( | ||
f"Opower {self.api.utility.subdomain()} " | ||
f"{account.meter_type.name.lower()} {account.utility_account_id}" | ||
) | ||
name_prefix = ( | ||
f"Opower {self.api.utility.subdomain()} " | ||
f"{account.meter_type.name.lower()} {account.utility_account_id}" | ||
) | ||
if StatisticType.COST in statistic_types: | ||
cost_metadata = StatisticMetaData( | ||
has_mean=False, | ||
has_sum=True, | ||
|
@@ -190,6 +271,7 @@ | |
statistic_id=cost_statistic_id, | ||
unit_of_measurement=None, | ||
) | ||
if StatisticType.CONSUMPTION in statistic_types: | ||
consumption_metadata = StatisticMetaData( | ||
has_mean=False, | ||
has_sum=True, | ||
|
@@ -201,12 +283,14 @@ | |
else UnitOfVolume.CENTUM_CUBIC_FEET, | ||
) | ||
|
||
if StatisticType.COST in statistic_types: | ||
_LOGGER.debug( | ||
"Adding %s statistics for %s", | ||
len(cost_statistics), | ||
cost_statistic_id, | ||
) | ||
async_add_external_statistics(self.hass, cost_metadata, cost_statistics) | ||
if StatisticType.CONSUMPTION in statistic_types: | ||
_LOGGER.debug( | ||
"Adding %s statistics for %s", | ||
len(consumption_statistics), | ||
|
@@ -218,7 +302,7 @@ | |
|
||
async def _async_get_cost_reads( | ||
self, account: Account, time_zone_str: str, start_time: float | None = None | ||
) -> list[CostRead]: | ||
) -> tuple[list[CostRead], list[CostRead] | None]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update function doc |
||
"""Get cost reads. | ||
|
||
If start_time is None, get cost reads since account activation, | ||
|
@@ -245,6 +329,31 @@ | |
break | ||
cost_reads += finer_cost_reads | ||
|
||
def _check_is_cost_missing( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this just be: def _check_is_cost_missing(cost_reads: list[CostRead]) -> bool:
if not cost_reads:
return False
return sum(cost_read.provided_cost for cost_read in cost_reads) == 0 And only call it with daily_cost_reads and hourly_cost_reads? You will also set: |
||
cost_reads: list[CostRead], finer_cost_reads: list[CostRead] | ||
) -> bool: | ||
if not cost_reads or not finer_cost_reads: | ||
return False | ||
sum_cost_reads = 0 | ||
sum_finer_cost_reads = 0 | ||
for _, cost_read in enumerate(cost_reads): | ||
# Skip to first bill read fully represented in fine reads | ||
if cost_read.start_time < finer_cost_reads[0].start_time: | ||
continue | ||
sum_cost_reads += cost_read.provided_cost | ||
for _, finer_cost_read in enumerate(finer_cost_reads): | ||
# End if beyond rough cost read window | ||
if cost_reads[-1].end_time < finer_cost_read.start_time: | ||
break | ||
sum_finer_cost_reads += finer_cost_read.provided_cost | ||
_LOGGER.debug( | ||
"Calculated cost sums rough: %s fine: %s", | ||
sum_cost_reads, | ||
sum_finer_cost_reads, | ||
) | ||
return sum_cost_reads > 0 and sum_finer_cost_reads == 0 | ||
|
||
cost_reads_billing = None | ||
tz = await dt_util.async_get_time_zone(time_zone_str) | ||
if start_time is None: | ||
start = None | ||
|
@@ -257,7 +366,7 @@ | |
) | ||
_LOGGER.debug("Got %s monthly cost reads", len(cost_reads)) | ||
if account.read_resolution == ReadResolution.BILLING: | ||
return cost_reads | ||
return cost_reads, None | ||
|
||
if start_time is None: | ||
start = end - timedelta(days=3 * 365) | ||
|
@@ -271,9 +380,14 @@ | |
account, AggregateType.DAY, start, end | ||
) | ||
_LOGGER.debug("Got %s daily cost reads", len(daily_cost_reads)) | ||
if _check_is_cost_missing(cost_reads, daily_cost_reads): | ||
_LOGGER.debug( | ||
"Daily data seems to be missing cost data, falling back to bill view for costs" | ||
) | ||
cost_reads_billing = cost_reads.copy() | ||
_update_with_finer_cost_reads(cost_reads, daily_cost_reads) | ||
if account.read_resolution == ReadResolution.DAY: | ||
return cost_reads | ||
return cost_reads, cost_reads_billing | ||
|
||
if start_time is None: | ||
start = end - timedelta(days=2 * 30) | ||
|
@@ -285,6 +399,11 @@ | |
account, AggregateType.HOUR, start, end | ||
) | ||
_LOGGER.debug("Got %s hourly cost reads", len(hourly_cost_reads)) | ||
if _check_is_cost_missing(daily_cost_reads, hourly_cost_reads): | ||
_LOGGER.debug( | ||
"Hourly data seems to be missing cost data, falling back to daily view for costs" | ||
) | ||
cost_reads_billing = daily_cost_reads.copy() | ||
_update_with_finer_cost_reads(cost_reads, hourly_cost_reads) | ||
_LOGGER.debug("Got %s cost reads", len(cost_reads)) | ||
return cost_reads | ||
return cost_reads, cost_reads_billing | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code below is very similar to the code above. Can you avoid duplicate code by extracting it to a function?