Skip to content
Draft
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ paper
# Ignore IDE project files
.idea/
.vscode
AGENTS.md
uv.lock

tmp
test_ba
data
109 changes: 67 additions & 42 deletions atlite/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,66 +983,91 @@ def runoff(
def hydro(
cutout,
plants,
hydrobasins,
module="auto",
time=None,
hydrobasins=None,
flowspeed=1,
weight_with_height=False,
show_progress=False,
**kwargs,
):
"""
Compute inflow time-series for `plants` by aggregating over catchment
basins from `hydrobasins`
Get inflow time-series for `plants` by either extracting the discharge time series for
the nearest grid points or by computing runoff-based inflow time series

Parameters
----------
plants : pd.DataFrame
Run-of-river plants or dams with lon, lat columns.
module : str
The method to compute hydro time series. "auto" will prefere discharge but fall back to runoff-based computation
"glofas" will use discharge directly, "era5" will use runoff-based computation
time : pd.DatetimeIndex, optional
Time index to interpolate the plant inflow onto. Only relevant for
discharge-based computation. Defaults to the cutout's own time index.
hydrobasins : str|gpd.GeoDataFrame
Filename or GeoDataFrame of one level of the HydroBASINS dataset.
Filename or GeoDataFrame of one level of the HydroBASINS dataset. Only required
for runoff-based computation.
flowspeed : float
Average speed of water flows to estimate the water travel time from
basin to plant (default: 1 m/s).
basin to plant (default: 1 m/s). Only relevant for runoff-based computation.
weight_with_height : bool
Whether surface runoff should be weighted by potential height (probably
better for coarser resolution).
better for coarser resolution). Only relevant for runoff-based computation.
show_progress : bool
Whether to display progressbars.

References
----------
[1] Liu, Hailiang, et al. "A validated high-resolution hydro power
time-series model for energy systems analysis." arXiv preprint
arXiv:1901.08476 (2019).

[2] Lehner, B., Grill G. (2013): Global river hydrography and network
routing: baseline data and new approaches to study the world’s large river
systems. Hydrological Processes, 27(15): 2171–2186. Data is available at
www.hydrosheds.org.

Whether to display progressbars. Only relevant for runoff-based computation.
**kwargs
Additional arguments for runoff-based computation. Only relevant for runoff-based computation.
"""
basins = hydrom.determine_basins(plants, hydrobasins, show_progress=show_progress)

matrix = cutout.indicatormatrix(basins.shapes)
# compute the average surface runoff in each basin
# Fix NaN and Inf values to 0.0 to avoid numerical issues
matrix_normalized = np.nan_to_num(
matrix / matrix.sum(axis=1), nan=0.0, posinf=0.0, neginf=0.0
)
runoff = cutout.runoff(
matrix=matrix_normalized,
index=basins.shapes.index,
weight_with_height=weight_with_height,
show_progress=show_progress,
**kwargs,
)
# The hydrological parameters are in units of "m of water per day" and so
# they should be multiplied by 1000 and the basin area to convert to m3
# d-1 = m3 h-1 / 24
runoff *= xr.DataArray(basins.shapes.to_crs(dict(proj="cea")).area)

return hydrom.shift_and_aggregate_runoff_for_plants(
basins, runoff, flowspeed, show_progress
)
if module.lower() == "auto":
# Check if discharge data is available in cutout, otherwise use runoff
if "discharge" in cutout.data.data_vars:
return hydrom._hydro_from_discharge(
cutout,
plants,
time=time,
)
if hydrobasins is None or "runoff" not in cutout.data.data_vars:
raise ValueError(
"For runoff-based hydro time series, hydrobasins and runoff data must be provided."
)
return hydrom._hydro_from_runoff(
cutout,
plants,
hydrobasins,
flowspeed=flowspeed,
weight_with_height=weight_with_height,
show_progress=show_progress,
**kwargs,
)
elif module.lower() == "glofas":
# Check if discharge data is available in cutout, otherwise raise error
if "discharge" not in cutout.data.data_vars:
raise ValueError(
"For GloFAS-based hydro time series, the cutout must include discharge data."
)
return hydrom._hydro_from_discharge(
cutout,
plants,
time=time,
)
elif module.lower() == "era5":
# Check if hydrobasins is provided, otherwise raise error
if hydrobasins is None:
raise ValueError(
"For ERA5-based hydro time series, the hydrobasins dataset must be provided."
)
return hydrom._hydro_from_runoff(
cutout,
plants,
hydrobasins,
flowspeed=flowspeed,
weight_with_height=weight_with_height,
show_progress=show_progress,
**kwargs,
)
else:
raise ValueError(f'Unknown hydro module option "{module}".')


def convert_line_rating(
Expand Down
4 changes: 2 additions & 2 deletions atlite/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
atlite datasets.
"""

from atlite.datasets import era5, gebco, sarah
from atlite.datasets import era5, gebco, glofas, sarah

modules = {"era5": era5, "sarah": sarah, "gebco": gebco}
modules = {"era5": era5, "sarah": sarah, "gebco": gebco, "glofas": glofas}
132 changes: 132 additions & 0 deletions atlite/datasets/cds_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# SPDX-FileCopyrightText: Contributors to atlite <https://github.com/pypsa/atlite>
#
# SPDX-License-Identifier: MIT
"""
Shared helpers for datasets downloaded via the Climate Data Store (CDS).

Used by both the era5 and glofas dataset modules.
"""

import logging
import os
import weakref
from pathlib import Path

import xarray as xr

logger = logging.getLogger(__name__)


def noisy_unlink(path):
"""
Delete file at given path.
"""
logger.debug(f"Deleting file {path}")
try:
os.unlink(path)
except PermissionError:
logger.error(f"Unable to delete file {path}, as it is still in use.")


def _area(coords):
# North, West, South, East. Default: global
x0, x1 = coords["x"].min().item(), coords["x"].max().item()
y0, y1 = coords["y"].min().item(), coords["y"].max().item()
return [y1, x0, y0, x1]


def sanitize_chunks(chunks, **dim_mapping):
dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping
if not isinstance(chunks, dict):
# preserve "auto" or None
return chunks

return {
extname: chunks[intname]
for intname, extname in dim_mapping.items()
if intname in chunks
}


def add_finalizer(ds: xr.Dataset, target: str | Path):
logger.debug(f"Adding finalizer for {target}")
weakref.finalize(ds._close.__self__.ds, noisy_unlink, target)


def open_with_grib_conventions(
grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None
) -> xr.Dataset:
"""
Convert a grib file downloaded from the CDS to netcdf conventions locally.

The function does the same thing as the CDS backend does, but locally.
This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally.
The routine is a reduced version based on the documentation here:
https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook

Parameters
----------
grib_file : str | Path
Path to the grib file to be converted.
chunks
Chunks
tmpdir : Path, optional
If None adds a finalizer to the dataset object

Returns
-------
xr.Dataset
"""
#
# Open grib file as dataset
# Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF
# There are options that might be relevant for e.g. for wave model data, that have been removed here
# to keep the code cleaner and shorter
ds = xr.open_dataset(
grib_file,
engine="cfgrib",
time_dims=["valid_time"],
ignore_keys=["edition"],
coords_as_attributes=[
"surface",
"depthBelowLandLayer",
"entireAtmosphere",
"heightAboveGround",
"meanSea",
],
chunks=sanitize_chunks(chunks),
)
if tmpdir is None:
add_finalizer(ds, grib_file)

def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset:
"""
Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset
and that the order of dimensions is preserved.
"""
dims_required = [
c for c in dataset.coords if c in expand_dims + list(dataset.dims)
]
dims_missing = [
(c, i) for i, c in enumerate(dims_required) if c not in dataset.dims
]
dataset = dataset.expand_dims(
dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing]
)
return dataset

logger.debug("Converting grib file to netcdf format")
# Variables and dimensions to rename if they exist in the dataset
rename_vars = {
"time": "forecast_reference_time",
"step": "forecast_period",
"isobaricInhPa": "pressure_level",
"hybrid": "model_level",
}
rename_vars = {k: v for k, v in rename_vars.items() if k in ds}
ds = ds.rename(rename_vars)

# safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset
ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"])

return ds
Loading