Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 34 additions & 37 deletions apps/predbat/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,47 +950,44 @@ def fetch_sensor_data(self):
# Fetch extra load forecast
self.load_forecast, self.load_forecast_array = self.fetch_extra_load_forecast(self.now_utc)

# Load previous load data
if self.get_arg("ge_cloud_data", False):
self.download_ge_data(self.now_utc)
# Load previous load data - now always uses sensor-based approach
# GE Cloud component will populate sensors if enabled
if "load_today" in self.args:
self.load_minutes, self.load_minutes_age = self.minute_data_load(self.now_utc, "load_today", self.max_days_previous, required_unit="kWh", load_scaling=self.load_scaling)
self.log("Found {} load_today datapoints going back {} days".format(len(self.load_minutes), self.load_minutes_age))
self.load_minutes_now = max(self.load_minutes.get(0, 0) - self.load_minutes.get(self.minutes_now, 0), 0)
self.load_last_period = (self.load_minutes.get(0, 0) - self.load_minutes.get(PREDICT_STEP, 0)) * 60 / PREDICT_STEP
else:
# Load data
if "load_today" in self.args:
self.load_minutes, self.load_minutes_age = self.minute_data_load(self.now_utc, "load_today", self.max_days_previous, required_unit="kWh", load_scaling=self.load_scaling)
self.log("Found {} load_today datapoints going back {} days".format(len(self.load_minutes), self.load_minutes_age))
self.load_minutes_now = max(self.load_minutes.get(0, 0) - self.load_minutes.get(self.minutes_now, 0), 0)
self.load_last_period = (self.load_minutes.get(0, 0) - self.load_minutes.get(PREDICT_STEP, 0)) * 60 / PREDICT_STEP
if self.load_forecast:
self.log("Using load forecast from load_forecast sensor")
self.load_minutes_now = self.load_forecast.get(0, 0)
self.load_minutes_age = 0
self.load_last_period = 0
else:
if self.load_forecast:
self.log("Using load forecast from load_forecast sensor")
self.load_minutes_now = self.load_forecast.get(0, 0)
self.load_minutes_age = 0
self.load_last_period = 0
else:
self.log("Error: You have not set load_today or load_forecast, you will have no load data")
self.record_status(message="Error: load_today not set correctly", had_errors=True)
raise ValueError

# Load import today data
if "import_today" in self.args:
self.import_today = self.minute_data_import_export(self.now_utc, "import_today", scale=self.import_export_scaling, required_unit="kWh")
self.import_today_now = max(self.import_today.get(0, 0) - self.import_today.get(self.minutes_now, 0), 0)
else:
self.log("Warn: You have not set import_today in apps.yaml, you will have no previous import data")
self.log("Error: You have not set load_today or load_forecast, you will have no load data")
self.record_status(message="Error: load_today not set correctly", had_errors=True)
raise ValueError

# Load export today data
if "export_today" in self.args:
self.export_today = self.minute_data_import_export(self.now_utc, "export_today", scale=self.import_export_scaling, required_unit="kWh")
self.export_today_now = max(self.export_today.get(0, 0) - self.export_today.get(self.minutes_now, 0), 0)
else:
self.log("Warn: You have not set export_today in apps.yaml, you will have no previous export data")
# Load import today data
if "import_today" in self.args:
self.import_today = self.minute_data_import_export(self.now_utc, "import_today", scale=self.import_export_scaling, required_unit="kWh")
self.import_today_now = max(self.import_today.get(0, 0) - self.import_today.get(self.minutes_now, 0), 0)
else:
self.log("Warn: You have not set import_today in apps.yaml, you will have no previous import data")

# PV today data
if "pv_today" in self.args:
self.pv_today = self.minute_data_import_export(self.now_utc, "pv_today", required_unit="kWh")
self.pv_today_now = max(self.pv_today.get(0, 0) - self.pv_today.get(self.minutes_now, 0), 0)
else:
self.log("Warn: You have not set pv_today in apps.yaml, you will have no previous pv data")
# Load export today data
if "export_today" in self.args:
self.export_today = self.minute_data_import_export(self.now_utc, "export_today", scale=self.import_export_scaling, required_unit="kWh")
self.export_today_now = max(self.export_today.get(0, 0) - self.export_today.get(self.minutes_now, 0), 0)
else:
self.log("Warn: You have not set export_today in apps.yaml, you will have no previous export data")

# PV today data
if "pv_today" in self.args:
self.pv_today = self.minute_data_import_export(self.now_utc, "pv_today", required_unit="kWh")
self.pv_today_now = max(self.pv_today.get(0, 0) - self.pv_today.get(self.minutes_now, 0), 0)
else:
self.log("Warn: You have not set pv_today in apps.yaml, you will have no previous pv data")

# Battery temperature
if "battery_temperature_history" in self.args:
Expand Down
204 changes: 204 additions & 0 deletions apps/predbat/gecloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ def __init__(self, api_key, automatic, base):
self.failures_total = 0
self.last_success_timestamp = None

# Historical data tracking for incremental fetching
self.last_historical_fetch_time = None
self.historical_data_last_timestamp = None

def wait_api_started(self):
"""
Return if the API has started
Expand Down Expand Up @@ -779,6 +783,10 @@ async def start(self):
for device in device_list:
await self.enable_real_time_control(device, self.settings[device])

# Fetch historical data every 30 minutes (1800 seconds)
if seconds % 1800 == 0:
await self.fetch_and_publish_historical_data()

except Exception as e:
self.log("Error: GECloud: Exception in main loop {}".format(e))

Expand All @@ -793,6 +801,202 @@ async def start(self):
await asyncio.sleep(5)
seconds += 5

async def fetch_and_publish_historical_data(self):
"""
Fetch historical consumption data from GE Cloud and publish to sensors
Uses incremental fetching - only gets new data since last fetch
"""
if not self.api_key:
self.log("GECloud: No API key configured for historical data")
return False

try:
from datetime import datetime, timezone

now_utc = datetime.now(timezone.utc)

# Get the first device serial (assuming single inverter for historical data)
devices_dict = await self.async_get_devices()
device_list = devices_dict["battery"]
if not device_list:
self.log("GECloud: No devices found for historical data")
return False

geserial = device_list[0] # Use first device
headers = {"Authorization": "Bearer " + self.api_key, "Content-Type": "application/json", "Accept": "application/json"}

# Determine how much data to fetch based on last fetch time
if self.last_historical_fetch_time is None:
# First time - fetch 7 days of data
max_days_previous = 7
self.log("GECloud: Initial historical data fetch - getting 7 days")
else:
# Incremental fetch - calculate days since last fetch
time_diff = now_utc - self.last_historical_fetch_time
days_since_last = time_diff.days
hours_since_last = time_diff.seconds / 3600

if days_since_last == 0 and hours_since_last < 2:
# Less than 2 hours since last fetch - skip this fetch
self.log("GECloud: Skipping historical fetch - only {} hours since last fetch".format(round(hours_since_last, 1)))
return True
elif days_since_last == 0:
# Same day but >2 hours - just fetch today and yesterday
max_days_previous = 1
self.log("GECloud: Incremental fetch - getting 1 day (last fetch {} hours ago)".format(round(hours_since_last, 1)))
elif days_since_last <= 2:
# 1-2 days since last fetch - get last 2-3 days
max_days_previous = days_since_last + 1
self.log("GECloud: Incremental fetch - getting {} days (last fetch {} days ago)".format(max_days_previous, days_since_last))
else:
# More than 2 days - get up to 7 days but warn about gap
max_days_previous = min(days_since_last + 1, 7)
self.log("GECloud: Large gap since last fetch ({} days) - getting {} days of data".format(days_since_last, max_days_previous))

mdata = []
days_prev_count = 0
latest_timestamp = None

while days_prev_count <= max_days_previous:
days_prev = max_days_previous - days_prev_count
time_value = now_utc - timedelta(days=days_prev)
datestr = time_value.strftime("%Y-%m-%d")
url = "https://api.givenergy.cloud/v1/inverter/{}/data-points/{}".format(geserial, datestr)

while url:
if "?" in url:
url += "&pageSize=8000"
else:
url += "?pageSize=8000"

# Simple HTTP request (not using the cached version for simplicity)
try:
import requests

response = requests.get(url, headers=headers, timeout=30)
if response.status_code != 200:
self.log("GECloud: Historical data request failed: {}".format(response.status_code))
break
data = response.json()
except Exception as e:
self.log("GECloud: Historical data request error: {}".format(e))
break

darray = data.get("data", None)
if darray is None:
if days_prev == 0:
self.log("GECloud: No historical data available for today yet")
days_prev_count += 1
break
else:
self.log("GECloud: Error downloading historical data for {}".format(datestr))
break

for item in darray:
timestamp = item["time"]

# Skip data we already have (if doing incremental fetch)
if self.historical_data_last_timestamp is not None:
from utils import str2time

try:
item_time = str2time(timestamp)
if item_time <= self.historical_data_last_timestamp:
continue # Skip older data we already have
except (ValueError, TypeError):
pass # If we can't parse timestamp, include the data

consumption = item["total"]["consumption"]
dimport = item["total"]["grid"]["import"]
dexport = item["total"]["grid"]["export"]
dpv = item["total"]["solar"]

new_data = {"last_updated": timestamp, "consumption": consumption, "import": dimport, "export": dexport, "pv": dpv}
mdata.append(new_data)

# Track the latest timestamp
if latest_timestamp is None or timestamp > latest_timestamp:
latest_timestamp = timestamp

if not darray:
url = None
else:
url = data["links"].get("next", None)

days_prev_count += 1

if not mdata:
if self.last_historical_fetch_time is None:
self.log("GECloud: No historical data retrieved on initial fetch")
return False
else:
self.log("GECloud: No new historical data since last fetch")
return True

# Process and publish data to sensors
# Create sensor data arrays compatible with minute_data_load
load_data = []
import_data = []
export_data = []
pv_data = []

for item in mdata:
timestamp = item["last_updated"]
load_data.append({"last_updated": timestamp, "state": item["consumption"]})
import_data.append({"last_updated": timestamp, "state": item["import"]})
export_data.append({"last_updated": timestamp, "state": item["export"]})
pv_data.append({"last_updated": timestamp, "state": item["pv"]})

# Publish as sensor entities using the naming convention that automatic config expects
# Use the first device serial for the sensor names to match automatic config
entity_prefix = "sensor.predbat_gecloud_" + geserial

self.base.dashboard_item(
entity_prefix + "_consumption_today",
state=load_data,
attributes={"unit_of_measurement": "kWh", "device_class": "energy", "friendly_name": "Consumption Today (GE Cloud Historical)", "ge_cloud_historical": True, "last_fetch": now_utc.isoformat(), "data_points": len(mdata)},
app="gecloud",
)

self.base.dashboard_item(
entity_prefix + "_grid_import_today",
state=import_data,
attributes={"unit_of_measurement": "kWh", "device_class": "energy", "friendly_name": "Grid Import Today (GE Cloud Historical)", "ge_cloud_historical": True, "last_fetch": now_utc.isoformat(), "data_points": len(mdata)},
app="gecloud",
)

self.base.dashboard_item(
entity_prefix + "_grid_export_today",
state=export_data,
attributes={"unit_of_measurement": "kWh", "device_class": "energy", "friendly_name": "Grid Export Today (GE Cloud Historical)", "ge_cloud_historical": True, "last_fetch": now_utc.isoformat(), "data_points": len(mdata)},
app="gecloud",
)

self.base.dashboard_item(
entity_prefix + "_solar_today",
state=pv_data,
attributes={"unit_of_measurement": "kWh", "device_class": "energy", "friendly_name": "Solar Today (GE Cloud Historical)", "ge_cloud_historical": True, "last_fetch": now_utc.isoformat(), "data_points": len(mdata)},
app="gecloud",
)

# Update state tracking for next incremental fetch
self.last_historical_fetch_time = now_utc
if latest_timestamp:
try:
from utils import str2time

self.historical_data_last_timestamp = str2time(latest_timestamp)
except (ValueError, TypeError):
self.log("GECloud: Could not parse latest timestamp: {}".format(latest_timestamp))

fetch_type = "Initial" if self.last_historical_fetch_time == now_utc and self.historical_data_last_timestamp is None else "Incremental"
self.log("GECloud: {} fetch completed - {} historical data points published to sensors".format(fetch_type, len(mdata)))
return True

except Exception as e:
self.log("GECloud: Error fetching historical data: {}".format(e))
return False

async def stop(self):
self.stop_cloud = True

Expand Down