Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- **Home load history timezone handling**: the optimization loop built its 24h look-back window with a naive `datetime.now()`, clashing with the timezone-aware (UTC) timestamps returned by Home Assistant and the persistence layer and raising "can't compare offset-naive and offset-aware datetimes". The look-back window, the merged-consumption timestamp and the history purge cut-off now use UTC-aware timestamps consistently (`optimization_service.py`, `home_load_history_service.py`).

### Added

- **Additive history backfill on manual collection**: a manual per-device collection now re-fetches the whole requested look-back window from the provider and merges it into the store (de-duplicated by the `(device_id, timestamp)` primary key), filling internal gaps without dropping existing data. Previously it only ingested incrementally from the last stored point, ignoring `lookback_hours`. `EnergyLoadHistoryProviderPort.get_power_points` gains a `force_refresh` flag; the scheduled collection stays incremental (`ports.py`, `home_assistant_api_history.py`, `home_load_history_service.py`).
- **Forecast retrain after manual collection**: the device history modal prompts the user to retrain the device's forecast model with the freshly collected data, triggering per-device training and refreshing the forecast on success (frontend).
- **Training outcome reporting** (`LoadTrainingResult` value object in `domain/home_load/value_objects.py`): `train_device` now reports whether a model was actually `trained` (with best adapter, MAE and sample count), `skipped` (with reason, e.g. insufficient history) or `failed`. The `training/trigger` endpoint surfaces the outcome and the UI shows a status toast, instead of always reporting a generic "completed".
13 changes: 11 additions & 2 deletions core/edge_mining/adapters/domain/home_load/fast_api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,17 @@ async def trigger_training_device(
f"Load Device with ID {device_id} not found in Home Loads Profile {profile_id}"
)

await training_service.train_device(device_id, weeks_lookback=weeks_lookback)
return {"status": "completed", "detail": f"Training completed for device '{device.name}'."}
result = await training_service.train_device(device_id, weeks_lookback=weeks_lookback)
if result.status == "trained" and result.best_adapter is not None:
detail = (
f"Model retrained for '{result.device_name}': best={result.best_adapter.value} "
f"MAE={result.best_mae:.1f} ({result.samples_used} samples)."
)
Comment on lines +929 to +933
elif result.status == "skipped":
detail = f"Training skipped for '{result.device_name}': {result.reason}."
else:
detail = f"Training failed for '{result.device_name}': {result.reason}."
return {"status": result.status, "detail": detail}
except HomeLoadsProfileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except HomeLoadsProfileDeviceNotFoundError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ def __init__(
self._history_repo = history_repo
self._logger = logger

async def get_power_points(self, start: Timestamp, end: Timestamp) -> List[HomeLoadPowerPoint]:
"""Return cached power points for this device in [start, end)."""
async def get_power_points(
self, start: Timestamp, end: Timestamp, force_refresh: bool = False
) -> List[HomeLoadPowerPoint]:
"""Return cached power points for this device in [start, end).

``force_refresh`` has no effect here: the dummy provider has no upstream
source to re-fetch from, it only serves what is already in the repo.
"""
if self._logger:
self._logger.debug(f"DummyEnergyLoadHistoryProvider: get_power_points({self.device_id}, [{start}, {end}))")
return self._history_repo.get_power_points(self.device_id, start, end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,27 @@ def __init__(
if self._logger:
self._logger.debug(f"HA history adapter bound to device {device_id} (entity='{entity_power}')")

async def get_power_points(self, start: Timestamp, end: Timestamp) -> List[HomeLoadPowerPoint]:
async def get_power_points(
self, start: Timestamp, end: Timestamp, force_refresh: bool = False
) -> List[HomeLoadPowerPoint]:
"""Return power points for the bound device in [start, end).

Hits the cache first; fetches missing or stale tail from Home Assistant.
When ``force_refresh`` is True the whole window is re-fetched from Home
Assistant (bounded by HA's own recorder retention) and merged with the
cache, so internal gaps get backfilled. New points are persisted; the
composite primary key on (device_id, timestamp) de-duplicates existing
ones.
"""
if start >= end:
return []

if force_refresh:
fetched = await self._fetch_from_home_assistant(start, end)
if fetched:
self._history_repo.add_power_points(self.device_id, fetched)
return self._history_repo.get_power_points(self.device_id, start, end)
Comment on lines +138 to +142

cached = self._history_repo.get_power_points(self.device_id, start, end)

latest_cached: Optional[Timestamp] = max((p.timestamp for p in cached), default=None)
Expand Down
10 changes: 6 additions & 4 deletions core/edge_mining/application/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
EnergyLoadHistoryProviderAdapter,
)
from edge_mining.domain.home_load.ports import EnergyLoadForecastProviderPort, EnergyLoadHistoryProviderPort
from edge_mining.domain.home_load.value_objects import HomeLoadPowerPoint
from edge_mining.domain.home_load.value_objects import HomeLoadPowerPoint, LoadTrainingResult
from edge_mining.domain.miner.aggregate_roots import Miner
from edge_mining.domain.miner.common import MinerControllerAdapter, MinerFeatureType
from edge_mining.domain.miner.entities import MinerController
Expand Down Expand Up @@ -155,7 +155,9 @@ async def collect_all(self, lookback_hours: int = 24) -> None:
"""Collect power points from all history providers for all enabled devices."""

@abstractmethod
async def collect_devices(self, device_ids: List[EntityId], lookback_hours: int = 24) -> None:
async def collect_devices(
self, device_ids: List[EntityId], lookback_hours: int = 24, force_full_window: bool = True
) -> None:
"""Collect power points for the specified devices only."""

@abstractmethod
Expand All @@ -179,8 +181,8 @@ async def train_all(self, weeks_lookback: int = 8) -> None:
"""Train models for every device that has sufficient history."""

@abstractmethod
async def train_device(self, device_id: EntityId, weeks_lookback: int = 8) -> None:
"""Train models for a single device."""
async def train_device(self, device_id: EntityId, weeks_lookback: int = 8) -> LoadTrainingResult:
"""Train models for a single device and return the outcome."""

@abstractmethod
def get_models(self, device_id: Optional[EntityId] = None) -> List[LoadConsumptionModel]:
Expand Down
33 changes: 25 additions & 8 deletions core/edge_mining/application/services/home_load_history_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,16 @@ async def _collect_for_device(
device_name: str,
provider_id: EntityId,
lookback_hours: int = 24,
force_full_window: bool = False,
) -> None:
"""Collect power points for a single device from its history provider."""
"""Collect power points for a single device from its history provider.

By default this is incremental: it fetches only what is newer than the
latest stored point. When ``force_full_window`` is True it re-fetches the
whole ``lookback_hours`` window from the provider (additive backfill),
which lets a manual collection fill internal gaps without losing already
stored data.
"""
history_provider = await self.adapter_service.get_home_load_history_provider(provider_id, device_id)
if not history_provider:
if self.logger:
Expand All @@ -80,13 +88,13 @@ async def _collect_for_device(

now = Timestamp(datetime.now(timezone.utc))
last_ts = self.home_load_history_repo.get_latest_timestamp(device_id)
if last_ts is not None:
start = last_ts
else:
if force_full_window or last_ts is None:
start = Timestamp(now - timedelta(hours=lookback_hours))
else:
start = last_ts

try:
power_points = await history_provider.get_power_points(start, now)
power_points = await history_provider.get_power_points(start, now, force_refresh=force_full_window)
except Exception as e:
if self.logger:
self.logger.error(
Expand Down Expand Up @@ -116,7 +124,7 @@ async def purge_all(self, retention_days: int = 90) -> None:
Iterates all profiles and their devices, purging historical data that
exceeds the retention window.
"""
cutoff = Timestamp(datetime.now() - timedelta(days=retention_days))
cutoff = Timestamp(datetime.now(timezone.utc) - timedelta(days=retention_days))
profiles = self.home_loads_repo.get_all()
if not profiles:
return
Expand Down Expand Up @@ -156,8 +164,16 @@ def clear_device_history(self, device_id: EntityId) -> int:
self.logger.info(f"Cleared {removed} power points for device {device_id}.")
return removed

async def collect_devices(self, device_ids: List[EntityId], lookback_hours: int = 24) -> None:
"""Collect power points for the specified devices only."""
async def collect_devices(
self, device_ids: List[EntityId], lookback_hours: int = 24, force_full_window: bool = True
) -> None:
"""Collect power points for the specified devices only.

This is the manual entry point (e.g. the "collect" button): it defaults
to an additive backfill of the whole ``lookback_hours`` window so an
explicit request honours the requested look-back even when data already
exists, filling internal gaps without dropping stored points.
"""
profiles = self.home_loads_repo.get_all()
if not profiles:
return
Expand All @@ -176,4 +192,5 @@ async def collect_devices(self, device_ids: List[EntityId], lookback_hours: int
device_name=device.name,
provider_id=device.energy_load_history_provider_id,
lookback_hours=lookback_hours,
force_full_window=force_full_window,
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
HomeLoadsProfileRepository,
LoadConsumptionModelRepository,
)
from edge_mining.domain.home_load.value_objects import LoadEnergyConsumption
from edge_mining.domain.home_load.value_objects import LoadEnergyConsumption, LoadTrainingResult
from edge_mining.shared.logging.port import LoggerPort


Expand Down Expand Up @@ -61,7 +61,7 @@ async def train_all(self, weeks_lookback: int = 8) -> None:
if self._logger:
self._logger.error(f"Training failed for device '{device.name}': {exc}")

async def train_device(self, device_id: EntityId, weeks_lookback: int = 8) -> None:
async def train_device(self, device_id: EntityId, weeks_lookback: int = 8) -> LoadTrainingResult:
"""Train models for a single device identified by device_id."""
profiles = self._home_loads_repo.get_all()
device_name: Optional[str] = None
Expand All @@ -76,9 +76,9 @@ async def train_device(self, device_id: EntityId, weeks_lookback: int = 8) -> No
if device_name is None:
if self._logger:
self._logger.warning(f"Device {device_id} not found in any profile. Skipping training.")
return
return LoadTrainingResult(device_name=str(device_id), status="failed", reason="device not found")

await self._train_for_device(device_id, device_name, weeks_lookback)
return await self._train_for_device(device_id, device_name, weeks_lookback)

def get_models(self, device_id: Optional[EntityId] = None) -> List[LoadConsumptionModel]:
"""Retrieve trained models, optionally filtered by device."""
Expand All @@ -93,18 +93,17 @@ async def _train_for_device(
device_id: EntityId,
device_name: str,
weeks_lookback: int,
) -> None:
) -> LoadTrainingResult:
"""Train HW + XGBoost models for one device, promote the better one."""
now = Timestamp(datetime.now(timezone.utc))
lookback_start = Timestamp(now - timedelta(weeks=weeks_lookback))

power_points = self._history_repo.get_power_points(device_id, lookback_start, now)
if len(power_points) < 48 * 2: # at least 48 hours of data for train+holdout
reason = f"insufficient history ({len(power_points)} points, need at least 96)"
if self._logger:
self._logger.debug(
f"Insufficient history for device '{device_name}' ({len(power_points)} points). Skipping training."
)
return
self._logger.debug(f"{reason.capitalize()} for device '{device_name}'. Skipping training.")
return LoadTrainingResult(device_name=device_name, status="skipped", reason=reason)

# Build LoadEnergyConsumption from power points
intervals = group_power_points_into_intervals(power_points)
Expand All @@ -116,9 +115,10 @@ async def _train_for_device(
holdout_consumption = consumption.in_window(holdout_start, now)

if len(train_consumption.intervals) < 48 or len(holdout_consumption.intervals) < 12:
reason = "not enough data after train/holdout split (need 48h train + 12h holdout)"
if self._logger:
self._logger.debug(f"Not enough data after split for device '{device_name}'. Skipping.")
return
self._logger.debug(f"{reason.capitalize()} for device '{device_name}'. Skipping.")
return LoadTrainingResult(device_name=device_name, status="skipped", reason=reason)

hw_model = self._train_hw(train_consumption, holdout_consumption, device_id, device_name)
xgb_model = self._train_xgb(train_consumption, holdout_consumption, device_id, device_name)
Expand All @@ -129,7 +129,7 @@ async def _train_for_device(
if not candidates:
if self._logger:
self._logger.warning(f"No model trained successfully for device '{device_name}'.")
return
return LoadTrainingResult(device_name=device_name, status="failed", reason="no model trained successfully")

best = min(candidates, key=lambda m: m.mae) # type: ignore[arg-type]
best.is_active = True
Expand All @@ -154,6 +154,14 @@ async def _train_for_device(
f"Trained models for device '{device_name}': best={best.adapter_type.value} MAE={best.mae:.2f}"
)

return LoadTrainingResult(
device_name=device_name,
status="trained",
best_adapter=best.adapter_type,
best_mae=best.mae,
samples_used=best.samples_used,
)

def _train_hw(
self,
train: LoadEnergyConsumption,
Expand Down
8 changes: 4 additions & 4 deletions core/edge_mining/application/services/optimization_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"""

import asyncio
from datetime import datetime, timedelta
from datetime import timedelta
from typing import Dict, List, Optional

from edge_mining.application.interfaces import (
Expand All @@ -17,7 +17,7 @@
OptimizationServiceInterface,
SunFactoryInterface,
)
from edge_mining.domain.common import EntityId, Timestamp, WattHours
from edge_mining.domain.common import EntityId, Timestamp, WattHours, utc_now_timestamp
from edge_mining.domain.energy.entities import EnergySource
from edge_mining.domain.energy.events import EnergyStateSnapshotUpdatedEvent
from edge_mining.domain.energy.ports import EnergyMonitorPort, EnergySourceRepository
Expand Down Expand Up @@ -111,7 +111,7 @@ def __init__(
@staticmethod
def _sum_consumptions(consumptions: List[LoadEnergyConsumption]) -> LoadEnergyConsumption:
"""Sum a list of LoadEnergyConsumption by matching (start, end) intervals."""
now_ts = Timestamp(datetime.now())
now_ts = utc_now_timestamp()
if not consumptions:
return LoadEnergyConsumption(timestamp=now_ts, intervals=[])

Expand Down Expand Up @@ -151,7 +151,7 @@ async def _build_home_loads_consumption(
if home_loads_profile is None:
return None

now = Timestamp(datetime.now())
now = utc_now_timestamp()
window_start = Timestamp(now - timedelta(hours=24))
empty_consumption = LoadEnergyConsumption(timestamp=now, intervals=[])

Expand Down
11 changes: 9 additions & 2 deletions core/edge_mining/domain/home_load/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,15 @@ def __init__(self, device_id: EntityId, provider_type: EnergyLoadHistoryProvider
self.provider_type = provider_type

@abstractmethod
async def get_power_points(self, start: Timestamp, end: Timestamp) -> List[HomeLoadPowerPoint]:
"""Retrieve raw power points for this device in the window [start, end)."""
async def get_power_points(
self, start: Timestamp, end: Timestamp, force_refresh: bool = False
) -> List[HomeLoadPowerPoint]:
"""Retrieve raw power points for this device in the window [start, end).

When ``force_refresh`` is True the provider re-fetches the whole window
from its source (ignoring any incremental/cache optimisation), so callers
can backfill internal gaps. Persisted duplicates are de-duplicated.
"""
raise NotImplementedError

@abstractmethod
Expand Down
19 changes: 18 additions & 1 deletion core/edge_mining/domain/home_load/value_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import List, Optional

from edge_mining.domain.common import EntityId, Timestamp, ValueObject, WattHours, Watts
from edge_mining.domain.home_load.common import LoadDeviceCategory
from edge_mining.domain.home_load.common import EnergyLoadForecastProviderAdapter, LoadDeviceCategory


@dataclass(frozen=True)
Expand All @@ -16,6 +16,23 @@ class HomeLoadPowerPoint(ValueObject):
power: Watts


@dataclass(frozen=True)
class LoadTrainingResult(ValueObject):
"""Outcome of a forecast-model (re)training run for a single LoadDevice.

``status`` is one of "trained", "skipped" or "failed". On "trained" the best
model metadata is filled; on "skipped"/"failed" ``reason`` explains why, so
callers can surface it to the user.
"""

device_name: str
status: str
reason: Optional[str] = None
best_adapter: Optional[EnergyLoadForecastProviderAdapter] = None
best_mae: Optional[float] = None
samples_used: Optional[int] = None


@dataclass(frozen=True)
class HomeLoadEnergyInterval(ValueObject):
"""
Expand Down
Loading
Loading