From 55c4c69b71c601432f2bc3334a93240d21aa5fa4 Mon Sep 17 00:00:00 2001 From: Marco Mancino Date: Sun, 28 Jun 2026 23:15:54 +0200 Subject: [PATCH 1/5] fix(home-load): UTC-aware history datetimes and additive backfill on manual collection The load-history pipeline mixed naive and timezone-aware datetimes, raising "can't compare offset-naive and offset-aware datetimes" while building the per-device consumption in the optimization loop. The 24h look-back window, the merged-consumption timestamp and the purge cut-off now use UTC-aware timestamps consistently. Manual collection also ignored `lookback_hours` whenever data already existed: it only fetched incrementally from the last stored point, so requesting e.g. 30 days returned just the few most recent points and never filled internal gaps. A manual `collect_devices` call now performs an additive full-window backfill -- `get_power_points` gains a `force_refresh` flag that re-fetches the whole window from Home Assistant and merges it into the store (de-duplicated by the (device_id, timestamp) primary key) without dropping existing data. The scheduled `collect_all` stays incremental. - optimization_service: UTC-aware look-back window and consumption timestamp - home_load_history_service: UTC-aware purge cut-off; force_full_window backfill - history provider port + HA/dummy adapters: force_refresh on get_power_points --- .../home_load/history_providers/dummy.py | 10 ++++-- .../home_assistant_api_history.py | 15 ++++++++- core/edge_mining/application/interfaces.py | 4 ++- .../services/home_load_history_service.py | 33 ++++++++++++++----- .../services/optimization_service.py | 8 ++--- core/edge_mining/domain/home_load/ports.py | 11 +++++-- 6 files changed, 63 insertions(+), 18 deletions(-) diff --git a/core/edge_mining/adapters/domain/home_load/history_providers/dummy.py b/core/edge_mining/adapters/domain/home_load/history_providers/dummy.py index 198aa1c..b1b9b3e 100644 --- a/core/edge_mining/adapters/domain/home_load/history_providers/dummy.py +++ b/core/edge_mining/adapters/domain/home_load/history_providers/dummy.py @@ -27,8 +27,14 @@ def __init__( self._history_repo = history_repo self._logger = logger - async def get_power_points(self, start: Timestamp, end: Timestamp) -> List[HomeLoadPowerPoint]: - """Return cached power points for this device in [start, end).""" + async def get_power_points( + self, start: Timestamp, end: Timestamp, force_refresh: bool = False + ) -> List[HomeLoadPowerPoint]: + """Return cached power points for this device in [start, end). + + ``force_refresh`` has no effect here: the dummy provider has no upstream + source to re-fetch from, it only serves what is already in the repo. + """ if self._logger: self._logger.debug(f"DummyEnergyLoadHistoryProvider: get_power_points({self.device_id}, [{start}, {end}))") return self._history_repo.get_power_points(self.device_id, start, end) diff --git a/core/edge_mining/adapters/domain/home_load/history_providers/home_assistant_api_history.py b/core/edge_mining/adapters/domain/home_load/history_providers/home_assistant_api_history.py index 006c720..d902ac5 100644 --- a/core/edge_mining/adapters/domain/home_load/history_providers/home_assistant_api_history.py +++ b/core/edge_mining/adapters/domain/home_load/history_providers/home_assistant_api_history.py @@ -120,14 +120,27 @@ def __init__( if self._logger: self._logger.debug(f"HA history adapter bound to device {device_id} (entity='{entity_power}')") - async def get_power_points(self, start: Timestamp, end: Timestamp) -> List[HomeLoadPowerPoint]: + async def get_power_points( + self, start: Timestamp, end: Timestamp, force_refresh: bool = False + ) -> List[HomeLoadPowerPoint]: """Return power points for the bound device in [start, end). Hits the cache first; fetches missing or stale tail from Home Assistant. + When ``force_refresh`` is True the whole window is re-fetched from Home + Assistant (bounded by HA's own recorder retention) and merged with the + cache, so internal gaps get backfilled. New points are persisted; the + composite primary key on (device_id, timestamp) de-duplicates existing + ones. """ if start >= end: return [] + if force_refresh: + fetched = await self._fetch_from_home_assistant(start, end) + if fetched: + self._history_repo.add_power_points(self.device_id, fetched) + return self._history_repo.get_power_points(self.device_id, start, end) + cached = self._history_repo.get_power_points(self.device_id, start, end) latest_cached: Optional[Timestamp] = max((p.timestamp for p in cached), default=None) diff --git a/core/edge_mining/application/interfaces.py b/core/edge_mining/application/interfaces.py index c41c9cb..76cb174 100644 --- a/core/edge_mining/application/interfaces.py +++ b/core/edge_mining/application/interfaces.py @@ -155,7 +155,9 @@ async def collect_all(self, lookback_hours: int = 24) -> None: """Collect power points from all history providers for all enabled devices.""" @abstractmethod - async def collect_devices(self, device_ids: List[EntityId], lookback_hours: int = 24) -> None: + async def collect_devices( + self, device_ids: List[EntityId], lookback_hours: int = 24, force_full_window: bool = True + ) -> None: """Collect power points for the specified devices only.""" @abstractmethod diff --git a/core/edge_mining/application/services/home_load_history_service.py b/core/edge_mining/application/services/home_load_history_service.py index 4bfb0ce..fd7f5e7 100644 --- a/core/edge_mining/application/services/home_load_history_service.py +++ b/core/edge_mining/application/services/home_load_history_service.py @@ -70,8 +70,16 @@ async def _collect_for_device( device_name: str, provider_id: EntityId, lookback_hours: int = 24, + force_full_window: bool = False, ) -> None: - """Collect power points for a single device from its history provider.""" + """Collect power points for a single device from its history provider. + + By default this is incremental: it fetches only what is newer than the + latest stored point. When ``force_full_window`` is True it re-fetches the + whole ``lookback_hours`` window from the provider (additive backfill), + which lets a manual collection fill internal gaps without losing already + stored data. + """ history_provider = await self.adapter_service.get_home_load_history_provider(provider_id, device_id) if not history_provider: if self.logger: @@ -80,13 +88,13 @@ async def _collect_for_device( now = Timestamp(datetime.now(timezone.utc)) last_ts = self.home_load_history_repo.get_latest_timestamp(device_id) - if last_ts is not None: - start = last_ts - else: + if force_full_window or last_ts is None: start = Timestamp(now - timedelta(hours=lookback_hours)) + else: + start = last_ts try: - power_points = await history_provider.get_power_points(start, now) + power_points = await history_provider.get_power_points(start, now, force_refresh=force_full_window) except Exception as e: if self.logger: self.logger.error( @@ -116,7 +124,7 @@ async def purge_all(self, retention_days: int = 90) -> None: Iterates all profiles and their devices, purging historical data that exceeds the retention window. """ - cutoff = Timestamp(datetime.now() - timedelta(days=retention_days)) + cutoff = Timestamp(datetime.now(timezone.utc) - timedelta(days=retention_days)) profiles = self.home_loads_repo.get_all() if not profiles: return @@ -156,8 +164,16 @@ def clear_device_history(self, device_id: EntityId) -> int: self.logger.info(f"Cleared {removed} power points for device {device_id}.") return removed - async def collect_devices(self, device_ids: List[EntityId], lookback_hours: int = 24) -> None: - """Collect power points for the specified devices only.""" + async def collect_devices( + self, device_ids: List[EntityId], lookback_hours: int = 24, force_full_window: bool = True + ) -> None: + """Collect power points for the specified devices only. + + This is the manual entry point (e.g. the "collect" button): it defaults + to an additive backfill of the whole ``lookback_hours`` window so an + explicit request honours the requested look-back even when data already + exists, filling internal gaps without dropping stored points. + """ profiles = self.home_loads_repo.get_all() if not profiles: return @@ -176,4 +192,5 @@ async def collect_devices(self, device_ids: List[EntityId], lookback_hours: int device_name=device.name, provider_id=device.energy_load_history_provider_id, lookback_hours=lookback_hours, + force_full_window=force_full_window, ) diff --git a/core/edge_mining/application/services/optimization_service.py b/core/edge_mining/application/services/optimization_service.py index 08bf729..2753b2c 100644 --- a/core/edge_mining/application/services/optimization_service.py +++ b/core/edge_mining/application/services/optimization_service.py @@ -8,7 +8,7 @@ """ import asyncio -from datetime import datetime, timedelta +from datetime import timedelta from typing import Dict, List, Optional from edge_mining.application.interfaces import ( @@ -17,7 +17,7 @@ OptimizationServiceInterface, SunFactoryInterface, ) -from edge_mining.domain.common import EntityId, Timestamp, WattHours +from edge_mining.domain.common import EntityId, Timestamp, WattHours, utc_now_timestamp from edge_mining.domain.energy.entities import EnergySource from edge_mining.domain.energy.events import EnergyStateSnapshotUpdatedEvent from edge_mining.domain.energy.ports import EnergyMonitorPort, EnergySourceRepository @@ -111,7 +111,7 @@ def __init__( @staticmethod def _sum_consumptions(consumptions: List[LoadEnergyConsumption]) -> LoadEnergyConsumption: """Sum a list of LoadEnergyConsumption by matching (start, end) intervals.""" - now_ts = Timestamp(datetime.now()) + now_ts = utc_now_timestamp() if not consumptions: return LoadEnergyConsumption(timestamp=now_ts, intervals=[]) @@ -151,7 +151,7 @@ async def _build_home_loads_consumption( if home_loads_profile is None: return None - now = Timestamp(datetime.now()) + now = utc_now_timestamp() window_start = Timestamp(now - timedelta(hours=24)) empty_consumption = LoadEnergyConsumption(timestamp=now, intervals=[]) diff --git a/core/edge_mining/domain/home_load/ports.py b/core/edge_mining/domain/home_load/ports.py index 91ef4f2..e7b8d62 100644 --- a/core/edge_mining/domain/home_load/ports.py +++ b/core/edge_mining/domain/home_load/ports.py @@ -83,8 +83,15 @@ def __init__(self, device_id: EntityId, provider_type: EnergyLoadHistoryProvider self.provider_type = provider_type @abstractmethod - async def get_power_points(self, start: Timestamp, end: Timestamp) -> List[HomeLoadPowerPoint]: - """Retrieve raw power points for this device in the window [start, end).""" + async def get_power_points( + self, start: Timestamp, end: Timestamp, force_refresh: bool = False + ) -> List[HomeLoadPowerPoint]: + """Retrieve raw power points for this device in the window [start, end). + + When ``force_refresh`` is True the provider re-fetches the whole window + from its source (ignoring any incremental/cache optimisation), so callers + can backfill internal gaps. Persisted duplicates are de-duplicated. + """ raise NotImplementedError @abstractmethod From 2c107962cf2c9011d65a2c1967ae278c0df63582 Mon Sep 17 00:00:00 2001 From: Marco Mancino Date: Sun, 28 Jun 2026 23:24:49 +0200 Subject: [PATCH 2/5] feat(frontend): offer forecast retrain after manual history collection After a manual per-device history collection completes, prompt the user to retrain that device's forecast model with the freshly collected data. The prompt only appears when the device has a forecast provider configured; on confirmation it triggers per-device training and refreshes the forecast panel. A small toast reports progress and surfaces training errors. Wires a new trainDevice call through the home-loads service and store onto the existing training-trigger endpoint. Scheduled nightly training is unchanged. --- .../homeLoads/LoadDeviceHistoryModal.vue | 45 +++++++++++++++++++ .../core/services/homeLoadsProfileService.ts | 11 +++++ .../src/core/stores/homeLoadsProfileStore.ts | 9 ++++ 3 files changed, 65 insertions(+) diff --git a/frontend/src/components/homeLoads/LoadDeviceHistoryModal.vue b/frontend/src/components/homeLoads/LoadDeviceHistoryModal.vue index d700c20..21b7058 100644 --- a/frontend/src/components/homeLoads/LoadDeviceHistoryModal.vue +++ b/frontend/src/components/homeLoads/LoadDeviceHistoryModal.vue @@ -37,6 +37,9 @@ const collecting = ref(false); const clearing = ref(false); const showClearConfirm = ref(false); const showCollectDialog = ref(false); +const showRetrainConfirm = ref(false); +const retraining = ref(false); +const retrainError = ref(null); const lookbackHours = ref(24); const selectedRange = ref<"24h" | "7d" | "30d">("24h"); @@ -128,6 +131,11 @@ async function collectHistory() { try { await profileStore.collectDeviceHistory(props.profileId, props.device.id, lookbackHours.value); await fetchHistory(); + // Ask the user whether to retrain the forecast model with the freshly + // collected data (only meaningful if the device has a forecast provider). + if (props.device.energy_load_forecast_provider_id) { + showRetrainConfirm.value = true; + } } catch (e) { console.error("Failed to collect device history:", e); } finally { @@ -135,6 +143,23 @@ async function collectHistory() { } } +async function retrainModel() { + showRetrainConfirm.value = false; + if (!props.profileId || !props.device?.id) return; + retraining.value = true; + retrainError.value = null; + try { + await profileStore.trainDevice(props.profileId, props.device.id); + // Refresh the forecast so the panel reflects the newly trained model. + await fetchForecast(); + } catch (e: any) { + console.error("Failed to retrain model:", e); + retrainError.value = e?.response?.data?.detail || e?.message || "Unknown error"; + } finally { + retraining.value = false; + } +} + async function clearHistory() { if (!props.profileId || !props.device?.id) return; showClearConfirm.value = false; @@ -541,6 +566,26 @@ function formatWh(v: number): string { @cancel="showClearConfirm = false" /> + + + +
+
+ Retraining forecast model… +
+
+ Retrain failed: {{ retrainError }} + +
+
+