Source code for rivia.hdf.unsteady_plan

"""UnsteadyUnsteadyPlan - read HEC-RAS unsteady plan HDF5 files (.p*.hdf).

Plan HDF files embed the same ``Geometry/`` group as geometry HDF files
*plus* ``Results/Unsteady/...`` time-series and summary output.

``UnsteadyUnsteadyPlan`` inherits ``Geometry`` so all geometry accessors are available.
``FlowAreaResults`` extends ``FlowArea`` with lazy time-series properties,
summary DataFrames, and computed depth / velocity methods.  Raster export
methods delegate to ``rivia.geo`` via a deferred import so this module is
fully usable without rasterio or scipy installed.

"""

from __future__ import annotations

import dataclasses
import datetime as dt
import logging
import math
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, overload

import numpy as np
import pandas as pd

from rivia.utils import log_call, parse_hec_datetime, parse_interval, timed

from ._base import _PlanHdf, _POSTPROC_TS_FMT, _RAS_TS_FMT, _parse_hec_ts_array
from .geometry import (
    _SA_ROOT,
    Bridge,
    CrossSection,
    CrossSectionCollection,
    FlowArea,
    FlowAreaCollection,
    Geometry,
    InlineStructure,
    LateralStructure,
    SA2DConnection,
    StorageArea,
    StorageAreaCollection,
    Structure,
    StructureCollection,
    _decode,
)
from .log import UnsteadyRuntimeLog

if TYPE_CHECKING:
    import h5py
    import rasterio.io

logger = logging.getLogger("rivia.hdf")


# ---------------------------------------------------------------------------
# HDF path constants
# ---------------------------------------------------------------------------
_TS_ROOT = "Results/Unsteady/Output/Output Blocks/Base Output/Unsteady Time Series"
_SUM_ROOT = "Results/Unsteady/Output/Output Blocks/Base Output/Summary Output"
_TS_2D = f"{_TS_ROOT}/2D Flow Areas"
_SUM_2D = f"{_SUM_ROOT}/2D Flow Areas"

_TS_SA = f"{_TS_ROOT}/Storage Areas"
_SUM_SA = f"{_SUM_ROOT}/Storage Areas"
_TIME_DS = f"{_TS_ROOT}/Time"
_TIME_STAMP_DS = f"{_TS_ROOT}/Time Date Stamp"

_DSS_ROOT = (
    "Results/Unsteady/Output/Output Blocks"
    "/DSS Hydrograph Output/Unsteady Time Series"
)
_DSS_SA_CONN = f"{_DSS_ROOT}/SA 2D Area Conn"
_DSS_INLINE = f"{_DSS_ROOT}/Inline Structures"
_DSS_LATERAL = f"{_DSS_ROOT}/Lateral Structures"
_DSS_BRIDGE = f"{_DSS_ROOT}/Bridge"
_DSS_TIME_STAMP_DS = f"{_DSS_ROOT}/Time Date Stamp"

_TS_XS = f"{_TS_ROOT}/Cross Sections"
_DSS_XS = f"{_DSS_ROOT}/Cross Sections"

_RUN_SUM = "Results/Unsteady/Summary"
_VOL_ACC = f"{_RUN_SUM}/Volume Accounting"
_VOL_1D = f"{_VOL_ACC}/Volume Accounting 1D"
_VOL_2D = f"{_VOL_ACC}/Volume Accounting 2D"

_POSTPROC_PROFILE_DATES = (
    "Results/Post Process/Steady/Output/Output Blocks"
    "/Base Output/Post Process/Post Process Profiles/Profile Dates"
)
_POSTPROC_XS = (
    "Results/Post Process/Steady/Output/Output Blocks"
    "/Base Output/Post Process/Post Process Profiles/Cross Sections"
)
_POSTPROC_GEOM_ATTRS = (
    "Results/Post Process/Steady/Output/Geometry Info"
    "/Cross Section Attributes"
)


# ---------------------------------------------------------------------------
# FlowAreaResults - extends FlowArea with plan results
# ---------------------------------------------------------------------------


[docs] class FlowAreaResults(FlowArea): """Geometry *and* time-series results for one named 2-D flow area. Inherits all geometry properties from :class:`FlowArea`. Time-series properties return raw ``h5py.Dataset`` objects so the caller controls how much data is loaded:: area.water_surface[10] # one timestep -> ndarray (n_cells,) area.water_surface[10:20] # slice -> ndarray (10, n_cells) area.water_surface[:] # all -> ndarray (n_t, n_cells) Parameters ---------- geom_group: ``h5py.Group`` at ``Geometry/2D Flow Areas/<name>``. ts_group: ``h5py.Group`` at the time-series result path for this area. sum_group: ``h5py.Group`` at the summary result path for this area. name: Flow area name. n_cells: Number of real computational cells. """ def __init__( self, geom_group: "h5py.Group", ts_group: "h5py.Group", sum_group: "h5py.Group", name: str, n_cells: int, ) -> None: super().__init__(geom_group, name, n_cells) self._ts = ts_group self._sum = sum_group def __repr__(self) -> str: return ( f"FlowAreaResults({self.name!r}," f" cells={self.n_cells}, faces={self.n_faces})" ) # ------------------------------------------------------------------ # Lazy time-series (h5py.Dataset - slice to control memory) # ------------------------------------------------------------------ @property def water_surface(self) -> "h5py.Dataset": """Water-surface elevation time series. ``h5py.Dataset``, shape ``(n_timesteps, n_cells + n_ghost)``. HEC-RAS stores ghost cell WSE (boundary condition stages) in the trailing columns. Slice with ``[:self.n_cells]`` for real cells only, or ``[:]`` for all including ghost cells. Slice to read: ``area.water_surface[10]``. """ return self._ts["Water Surface"] @property def face_velocity(self) -> "h5py.Dataset": """Signed face-normal velocity time series. ``h5py.Dataset``, shape ``(n_timesteps, n_faces)``. """ return self._ts["Face Velocity"] @property def face_flow(self) -> "h5py.Dataset | None": """Volumetric face-flow time series, or ``None`` if not output. ``h5py.Dataset``, shape ``(n_timesteps, n_faces)``. """ return self._ts.get("Face Flow") @property def cell_velocity(self) -> "h5py.Dataset | None": """HEC-RAS cell-velocity *speed* scalar, or ``None`` if not output. ``h5py.Dataset``, shape ``(n_timesteps, n_cells)``. This is the optional output enabled in the HDF Write Parameters; see :meth:`cell_velocity_vectors` for the derived vector field. """ return self._ts.get("Cell Velocity") # ------------------------------------------------------------------ # Eager summary results (small arrays, loaded once per access) # ------------------------------------------------------------------ def _load_summary(self, key: str, n: int | None = None) -> pd.DataFrame: """Load a ``(2, n_*)`` summary dataset as a tidy DataFrame. The HDF summary datasets have shape ``(2, n_elements)`` where ``[0, :]`` = maximum/minimum values and ``[1, :]`` = elapsed-time (in days) at which the extremum occurred. HEC-RAS stores entries for ghost cells as well; pass *n* to clip to the first *n* entries. Returns a DataFrame with columns ``['value', 'time']`` and integer index corresponding to cell or face index. """ raw = np.array(self._sum[key]) # shape (2, n_elements) if n is not None: raw = raw[:, :n] return pd.DataFrame( {"value": raw[0], "time": raw[1]}, ) @property def max_water_surface(self) -> pd.DataFrame: """Maximum WSE per cell. DataFrame with columns ``['value', 'time']``. ``value``: maximum water-surface elevation (model units). ``time``: elapsed simulation time (days) when max occurred. Index: 0-based cell index. Real cells only (ghost rows excluded). """ return self._load_summary("Maximum Water Surface", n=self.n_cells) @property def _max_water_surface(self) -> pd.DataFrame: """Maximum WSE including ghost cell rows. Same layout as :attr:`max_water_surface` but shape ``(n_cells + n_ghost,)``. Required when indexing with raw ``face_cell_indexes`` values. """ return self._load_summary("Maximum Water Surface") @property def min_water_surface(self) -> pd.DataFrame: """Minimum WSE per cell. Same column layout as :attr:`max_water_surface`. Real cells only (ghost rows excluded). """ return self._load_summary("Minimum Water Surface", n=self.n_cells) @property def _min_water_surface(self) -> pd.DataFrame: """Minimum WSE including ghost cell rows. Same layout as :attr:`min_water_surface` but shape ``(n_cells + n_ghost,)``. Required when indexing with raw ``face_cell_indexes`` values. """ return self._load_summary("Minimum Water Surface") @property def max_face_velocity(self) -> pd.DataFrame: """Maximum face velocity per face. DataFrame with columns ``['value', 'time']``. Index: 0-based face index. """ return self._load_summary("Maximum Face Velocity", n=self.n_faces) # ------------------------------------------------------------------ # Computed results - pure numpy, no geo dependency # ------------------------------------------------------------------
[docs] def wse(self, timestep: int) -> np.ndarray: """Water-surface elevation at each real cell for one timestep. Parameters ---------- timestep: 0-based index into the time dimension. Returns ------- ndarray, shape ``(n_cells,)`` Water-surface elevation in model units. Real cells only. """ return np.array(self.water_surface[timestep, : self.n_cells])
def _wse(self, timestep: int) -> np.ndarray: """Water-surface elevation including ghost cell rows for one timestep. Shape ``(n_cells + n_ghost,)``. Required when indexing with raw ``face_cell_indexes`` values which contain ghost cell indices. """ return np.array(self.water_surface[timestep, :])
[docs] def depth(self, timestep: int) -> np.ndarray: """Water depth at each cell centre for one timestep. Depth = max(0, WSE - bed_elevation), where bed elevation is the cell minimum elevation from the geometry. Parameters ---------- timestep: 0-based index into the time dimension. Returns ------- ndarray, shape ``(n_cells,)`` Real cells only (ghost rows excluded). """ wse = np.array(self.water_surface[timestep, : self.n_cells]) return np.maximum(0.0, wse - self.cell_min_elevation)
[docs] def max_depth(self) -> pd.DataFrame: """Maximum depth per cell using the time of maximum WSE. ``value = max(0, max_WSE - bed_elevation)``. ``time`` is the elapsed time of maximum WSE (days); this is an approximation - maximum depth may not coincide with maximum WSE. Returns ------- DataFrame with columns ``['value', 'time']``, index = cell index. """ df = self.max_water_surface df["value"] = np.maximum(0.0, df["value"].to_numpy() - self.cell_min_elevation) return df
[docs] def cell_velocity_vectors( self, timestep: int, method: Literal[ "area_weighted", "length_weighted", "flow_ratio" ] = "area_weighted", wse_interp: Literal["average", "sloped", "max"] = "average", face_velocity_location: Literal[ "centroid", "normal_intercept" ] = "normal_intercept", ) -> np.ndarray: """Reconstruct cell-centre velocity vectors via weighted least-squares. Uses the WLS method prescribed by the HEC-RAS Technical Reference Manual (Section: 2D Unsteady Flow - Cell Velocity). Parameters ---------- timestep: 0-based index into the time dimension. method: ``"area_weighted"`` (default, matches HEC-RAS): weights are wetted face flow areas from the hydraulic property tables. ``"length_weighted"``: weights are face plan-view lengths. ``"flow_ratio"``: requires ``Face Flow`` output; back-calculates flow area as \|Q\|/\|V_n\|. wse_interp: How to estimate face WSE when ``method="area_weighted"``. ``"average"`` (default): simple mean of the two adjacent cell WSEs. ``"sloped"``: distance-weighted interpolation at the face's actual position - see *face_velocity_location*. ``"max"``: maximum of the two adjacent cell WSEs. face_velocity_location: Position used as the face normal velocity measurement point when ``wse_interp="sloped"``. ``"normal_intercept"`` (default): the point where the cell-centre connecting line crosses the face polyline, matching how HEC-RAS locates its finite-difference gradient. ``"centroid"``: the geometric centroid of the face polyline. Has no effect when ``wse_interp`` is ``"average"`` or ``"max"``. Returns ------- ndarray, shape ``(n_cells, 2)`` ``[Vx, Vy]`` depth-averaged velocity components for real cells. """ from .velocity import compute_all_cell_velocities if method == "flow_ratio" and self.face_flow is None: raise KeyError( "Face Flow is not present in this HDF file. " "Enable 'Face Flow' in HEC-RAS HDF5 Write Parameters " "before running the simulation, or use a different method." ) face_normal_velocity = np.array(self.face_velocity[timestep, :]) # Read all rows (real + ghost) so boundary face WSE benefits from # ghost-cell WSE (boundary condition stage). cell_wse = np.array(self.water_surface[timestep, :]) face_flow = ( np.array(self.face_flow[timestep, :]) if method == "flow_ratio" else None ) cell_face_info, cell_face_values = self.cell_face_info face_ae_info, face_ae_values = self.face_area_elevation if wse_interp == "sloped": # Stack real + ghost cell coordinates so the sloped face-WSE # estimator can distance-weight boundary faces correctly. cell_centers = np.vstack([self.cell_centers, self.ghost_cell_centers]) face_velocity_coords = ( self.face_normal_intercept if face_velocity_location == "normal_intercept" else self.face_centroids ) else: cell_centers = None face_velocity_coords = None return compute_all_cell_velocities( n_cells=self.n_cells, cell_face_info=cell_face_info, cell_face_values=cell_face_values, face_normals=self.face_normals, face_cell_indexes=self.face_cell_indexes, face_ae_info=face_ae_info, face_ae_values=face_ae_values, face_normal_velocity=face_normal_velocity, cell_wse=cell_wse, method=method, face_flow=face_flow, wse_interp=wse_interp, cell_centers=cell_centers, face_velocity_coords=face_velocity_coords, )
[docs] def cell_speed( self, timestep: int, method: Literal[ "area_weighted", "length_weighted", "flow_ratio" ] = "area_weighted", wse_interp: Literal["average", "sloped", "max"] = "average", face_velocity_location: Literal[ "centroid", "normal_intercept" ] = "normal_intercept", ) -> np.ndarray: """Velocity magnitude at each cell centre for one timestep. Parameters ---------- timestep: 0-based index into the time dimension. method: Passed to :meth:`cell_velocity_vectors`. wse_interp: Passed to :meth:`cell_velocity_vectors`. face_velocity_location: Passed to :meth:`cell_velocity_vectors`. Returns ------- ndarray, shape ``(n_cells,)`` """ vecs = self.cell_velocity_vectors( timestep, method=method, wse_interp=wse_interp, face_velocity_location=face_velocity_location, ) return np.sqrt(vecs[:, 0] ** 2 + vecs[:, 1] ** 2)
[docs] def cell_velocity_angle( self, timestep: int, method: Literal[ "area_weighted", "length_weighted", "flow_ratio" ] = "area_weighted", wse_interp: Literal["average", "sloped", "max"] = "average", face_velocity_location: Literal[ "centroid", "normal_intercept" ] = "normal_intercept", ) -> np.ndarray: """Flow direction at each cell centre for one timestep. Angle is measured in degrees clockwise from north (the conventional "flow-to" bearing used in hydraulics and GIS). Parameters ---------- timestep: 0-based index into the time dimension. method: Passed to :meth:`cell_velocity_vectors`. wse_interp: Passed to :meth:`cell_velocity_vectors`. face_velocity_location: Passed to :meth:`cell_velocity_vectors`. Returns ------- ndarray, shape ``(n_cells,)`` Direction the flow is heading in degrees clockwise from north (0 = north, 90 = east, 180 = south, 270 = west). Cells whose speed is below 1e-10 return ``nan``. """ vecs = self.cell_velocity_vectors( timestep, method=method, wse_interp=wse_interp, face_velocity_location=face_velocity_location, ) vx = vecs[:, 0] vy = vecs[:, 1] speed = np.sqrt(vx**2 + vy**2) angle = (90.0 - np.degrees(np.arctan2(vy, vx))) % 360.0 angle[speed < 1e-10] = np.nan return angle
[docs] def face_velocity_vectors(self, timestep: int) -> np.ndarray: """Full 2D velocity ``[Vx, Vy]`` at each face midpoint. Implements the RASMapper-exact C-stencil least-squares reconstruction (Step A + Step 2 from ``geo/_rasmapper_pipeline.py``): * **Step A** — hydraulic connectivity: determines which faces are actively conveying flow based on adjacent-cell WSE and bed elevation. * **Step 2** — for each face, a 3-face C-stencil (the face itself plus its clockwise and counter-clockwise neighbors within each adjacent cell) solves a 2×2 WLS system to recover the full ``(Vx, Vy)`` vector from the stored face-normal scalar. The result from each adjacent cell is averaged to give a single face vector. Replicates ``ReconstructFaceVelocitiesLeastSquares`` from ``RasMapperLib/MeshFV2D.cs``. Parameters ---------- timestep: 0-based index into the time dimension. Returns ------- ndarray, shape ``(n_faces, 2)`` ``[Vx, Vy]`` velocity at each face midpoint. Disconnected (dry) faces receive ``[0, 0]``. """ from rivia.geo import _rasmapper_pipeline as _rasmap cell_wse = np.array(self.water_surface[timestep, :]) face_normal_vel = np.array(self.face_velocity[timestep, :]) cell_face_info, cell_face_values = self.cell_face_info _cell_face_count = cell_face_info[:, 1].astype(np.int32) _, _, face_hconn = _rasmap.compute_face_wss( cell_wse, self._cell_min_elevation, self.face_min_elevation, self.face_cell_indexes, _cell_face_count, ) face_connected = (face_hconn >= _rasmap.HC_BACKFILL) & (face_hconn <= _rasmap.HC_DOWNHILL_SHALLOW) face_vel_A, _ = _rasmap.reconstruct_face_velocities( face_normal_vel, self.face_normals[:, :2], face_connected, self.face_cell_indexes, cell_face_info, cell_face_values, ) return face_vel_A
[docs] def facepoint_velocity_vectors(self, timestep: int) -> np.ndarray: """Full 2D velocity ``[Vx, Vy]`` at each mesh facepoint (corner). Implements the RASMapper-exact pipeline (Steps A + 2 + 3): * **Step A** — hydraulic connectivity (:func:`~rivia.geo._rasmap.compute_face_wss`). * **Step 2** — C-stencil face velocity reconstruction (:func:`~rivia.geo._rasmap.reconstruct_face_velocities`). * **Step 3** — inverse-face-length weighted facepoint averaging (:func:`~rivia.geo._rasmap.compute_facepoint_velocities`). Each facepoint has one arc-context velocity vector per adjacent face; these are averaged to produce a single ``[Vx, Vy]`` per facepoint. Replicates ``ComputeVertexVelocities`` from ``RasMapperLib/MeshFV2D.cs``. Parameters ---------- timestep: 0-based index into the time dimension. Returns ------- ndarray, shape ``(n_facepoints, 2)`` ``[Vx, Vy]`` velocity at each mesh corner. Facepoints adjacent only to dry faces receive ``[0, 0]``. """ from rivia.geo import _rasmapper_pipeline as _rasmap cell_wse = np.array(self.water_surface[timestep, :]) face_normal_vel = np.array(self.face_velocity[timestep, :]) cell_face_info, cell_face_values = self.cell_face_info _cell_face_count = cell_face_info[:, 1].astype(np.int32) face_value_a, face_value_b, face_hconn = _rasmap.compute_face_wss( cell_wse, self._cell_min_elevation, self.face_min_elevation, self.face_cell_indexes, _cell_face_count, ) face_connected = (face_hconn >= _rasmap.HC_BACKFILL) & (face_hconn <= _rasmap.HC_DOWNHILL_SHALLOW) face_vel_A, face_vel_B = _rasmap.reconstruct_face_velocities( face_normal_vel, self.face_normals[:, :2], face_connected, self.face_cell_indexes, cell_face_info, cell_face_values, ) fp_face_info, fp_face_values = self.facepoint_face_orientation fp_velocities, _ = _rasmap.compute_facepoint_velocities( face_vel_A, face_vel_B, face_connected, self.face_lengths, self.face_facepoint_indexes, self.face_cell_indexes, cell_wse, fp_face_info, fp_face_values, face_value_a, face_value_b, ) # Collapse one vector per adjacent-face context → single vector per facepoint. n_fp = len(fp_velocities) result = np.zeros((n_fp, 2), dtype=np.float64) for fp in range(n_fp): vecs = fp_velocities[fp] if len(vecs): result[fp] = vecs.mean(axis=0) return result
[docs] def velocity_plot( self, timestep: int, cell_index: int, *, render_mode: Literal["horizontal", "sloping", "hybrid"] = "sloping", buffer: int = 1, reference_raster: str | Path | None = None, use_depth_weights: bool = False, shallow_to_flat: bool = False, pixel_size: float | None = None, n_arrows: int = 200, ax: Any | None = None, ) -> Any: """Quiver plot of rasterized velocity vectors around a target cell. Rasterizes the neighbourhood of *cell_index* using the RASMapper-exact pipeline (``rasterize_results`` with ``variable="velocity"``), then draws quiver arrows at the pixel centres of wet pixels. Arrows show the final, fully interpolated ``[Vx, Vy]`` value at each pixel — the same values that appear in RASMapper's velocity map. Mesh polygon outlines and cell index labels are drawn as context. Requires ``matplotlib`` (``pip install matplotlib``). Parameters ---------- timestep: 0-based time index. cell_index: Target cell. The neighbourhood is expanded by BFS from this cell. render_mode: ``"horizontal"``, ``"sloping"`` (default), or ``"hybrid"`` — passed to :meth:`export_raster`. buffer: Number of face-adjacency hops to expand from *cell_index*. ``1`` = immediate neighbours; ``2`` = two rings out. reference_raster: Optional path to a terrain DEM GeoTIFF. When supplied, the pixel size and CRS are inherited from the DEM and ``use_depth_weights=True`` becomes available. When ``None`` (default), the pixel size is auto-derived from the median face length in the neighbourhood. use_depth_weights: Passed to :meth:`export_raster`. ``hybrid`` mode only. Requires *reference_raster*. shallow_to_flat: Passed to :meth:`export_raster`. ``hybrid`` mode only. pixel_size: Raster pixel size in model coordinate units. Smaller values produce a finer grid and more potential arrow positions. Ignored when *reference_raster* is supplied (the DEM pixel size is used instead). Defaults to ``local_cell_size / 3``, giving ~9 pixels per face-length unit so *n_arrows* has room to work. n_arrows: Target number of quiver arrows. Wet pixels are subsampled with stride ``ceil(sqrt(n_wet / n_arrows))`` so the actual count is approximately *n_arrows*. Increase for denser plots (e.g. ``n_arrows=800``); set to a very large number to show every wet pixel. Default ``200``. ax: Existing ``matplotlib.Axes`` to draw on. If ``None`` a new figure/axes is created. Returns ------- matplotlib.axes.Axes """ try: import matplotlib.pyplot as plt except ImportError as exc: raise ImportError( "matplotlib is required for velocity_plot(). " "Install it with: pip install matplotlib" ) from exc from rivia.geo import raster as _raster # -- 1. BFS neighbourhood (with ring tracking) ------------------- cell_face_info, cell_face_values = self.cell_face_info face_cell_indexes = self.face_cell_indexes n_cells = self.n_cells # ring_of[c] = BFS hop distance from cell_index (0 = focus cell) ring_of: dict[int, int] = {cell_index: 0} neighbors: set[int] = {cell_index} frontier: set[int] = {cell_index} for hop in range(buffer): next_frontier: set[int] = set() for c in frontier: start = int(cell_face_info[c, 0]) count = int(cell_face_info[c, 1]) for k in range(count): fi = int(cell_face_values[start + k, 0]) for nb in ( int(face_cell_indexes[fi, 0]), int(face_cell_indexes[fi, 1]), ): if 0 <= nb < n_cells and nb not in neighbors: ring_of[nb] = hop + 1 next_frontier.add(nb) neighbors |= next_frontier frontier = next_frontier # -- 2. Bounding box and auto cell_size -------------------------- cell_polys = self.cell_polygons cell_centers = self.cell_centers all_verts = np.vstack([ cell_polys[c] for c in neighbors if len(cell_polys[c]) > 0 ]) x_min = float(all_verts[:, 0].min()) x_max = float(all_verts[:, 0].max()) y_min = float(all_verts[:, 1].min()) y_max = float(all_verts[:, 1].max()) # Local cell size: median face length in the neighbourhood. # Always computed — used for arrow sizing and (when no reference_raster) # as the output pixel size. nb_face_idx: list[int] = [] for c in neighbors: start = int(cell_face_info[c, 0]) count = int(cell_face_info[c, 1]) for k in range(count): nb_face_idx.append(int(cell_face_values[start + k, 0])) face_lengths = self.face_normals[:, 2] local_cell_size = float(np.median(face_lengths[sorted(set(nb_face_idx))])) if reference_raster is None: # pixel_size overrides the auto value; default is local_cell_size / 3 # so each cell contains ~9 pixels and n_arrows has room to work. _cell_size: float | None = ( pixel_size if pixel_size is not None else local_cell_size / 3.0 ) else: _cell_size = None # reference_raster provides the pixel grid _margin = local_cell_size # Rectangular perimeter with a small margin bbox_perim = np.array([ [x_min - _margin, y_min - _margin], [x_max + _margin, y_min - _margin], [x_max + _margin, y_max + _margin], [x_min - _margin, y_max + _margin], ]) # -- 3. Rasterize velocity via full rasmap pipeline --------------- # reference_raster enables pixel-level dry masking (WSE < terrain → # nodata) in addition to the coarser cell-level wet check. # "velocity_vector" → 4-band [Vx, Vy, speed, direction]; quiver # needs all four bands. fp_face_info, fp_face_values = self.facepoint_face_orientation ds = _raster.rasterize_results( variable="velocity_vector", cell_wse=np.array(self.water_surface[timestep, :]), cell_min_elevation=self._cell_min_elevation, face_min_elevation=self.face_min_elevation, face_cell_indexes=face_cell_indexes, cell_face_info=cell_face_info, cell_face_values=cell_face_values, face_facepoint_indexes=self.face_facepoint_indexes, fp_coords=self.facepoint_coordinates, face_normals=self.face_normals, fp_face_info=fp_face_info, fp_face_values=fp_face_values, cell_polygons=cell_polys, face_normal_velocity=np.array(self.face_velocity[timestep, :]), output_path=None, cell_centers=cell_centers, cell_surface_area=self.cell_surface_area, reference_raster=reference_raster, cell_size=_cell_size, render_mode=render_mode, use_depth_weights=use_depth_weights, shallow_to_flat=shallow_to_flat, tight_extent=False, perimeter=bbox_perim, ) # -- 4. Read bands and pixel coordinates ------------------------- from matplotlib.colors import Normalize from matplotlib.path import Path as _MPath nodata_val = float(ds.nodata) vx_grid = ds.read(1).astype(np.float64) vy_grid = ds.read(2).astype(np.float64) speed_grid = ds.read(3).astype(np.float64) transform = ds.transform ds.close() height, width = vx_grid.shape rows_idx, cols_idx = np.mgrid[0:height, 0:width] qx_grid = transform.c + (cols_idx + 0.5) * transform.a qy_grid = transform.f + (rows_idx + 0.5) * transform.e # e < 0 wet = (speed_grid > 0) & (speed_grid != nodata_val) # Assign each pixel to the innermost neighbourhood ring it belongs to. # Pixels outside all neighbourhood cells are excluded from quiver. pts = np.column_stack([qx_grid.ravel(), qy_grid.ravel()]) pixel_ring = np.full(height * width, buffer + 1, dtype=np.int32) for c in neighbors: poly = cell_polys[c] if len(poly) < 3: continue ring = ring_of.get(c, buffer) inside = _MPath(poly).contains_points(pts) # Lower ring number = closer to focus cell → higher priority pixel_ring = np.where(inside & (ring < pixel_ring), ring, pixel_ring) pixel_ring = pixel_ring.reshape(height, width) wet = wet & (pixel_ring <= buffer) # -- 5. Arrow scaling and subsampling ---------------------------- # Arrow length is mapped linearly from [sp_min, sp_max] speed to # [0.30, 0.85] × local_cell_size so every arrow is visible. # Outer rings are scaled down slightly to emphasise the focus area. wet_r, wet_c = np.where(wet) if len(wet_r) == 0: # No wet pixels in neighbourhood — draw polygons only if ax is None: _, ax = plt.subplots() else: speeds = speed_grid[wet_r, wet_c] rings = pixel_ring[wet_r, wet_c] sp_min, sp_max = float(speeds.min()), float(speeds.max()) arrow_min = local_cell_size * 0.30 arrow_max = local_cell_size * 0.85 if sp_max > sp_min + 1e-12: t = (speeds - sp_min) / (sp_max - sp_min) arrow_len = arrow_min + t * (arrow_max - arrow_min) else: arrow_len = np.full(len(speeds), arrow_max) # Outer rings get 15 % shorter arrows per hop from the focus cell ring_weight = np.maximum(0.55, 1.0 - rings * 0.15) arrow_len *= ring_weight # Unit direction × scaled length (data-coordinate arrows) eps = 1e-12 u_norm = np.where(speeds > eps, vx_grid[wet_r, wet_c] / speeds, 0.0) v_norm = np.where(speeds > eps, vy_grid[wet_r, wet_c] / speeds, 0.0) u_draw = u_norm * arrow_len v_draw = v_norm * arrow_len # Subsample: target ~200 arrows (subsampled from neighbourhood only) stride = max(1, int(np.ceil(np.sqrt(len(wet_r) / n_arrows)))) if stride > 1: sel = np.arange(0, len(wet_r), stride) wet_r, wet_c = wet_r[sel], wet_c[sel] u_draw, v_draw, speeds = u_draw[sel], v_draw[sel], speeds[sel] norm = Normalize(vmin=sp_min, vmax=sp_max) # -- 6. Draw ------------------------------------------------- if ax is None: fig, ax = plt.subplots() else: fig = ax.get_figure() for c in neighbors: poly = cell_polys[c] if len(poly) == 0: continue is_target = c == cell_index ring_x = np.append(poly[:, 0], poly[0, 0]) ring_y = np.append(poly[:, 1], poly[0, 1]) fc = "steelblue" if is_target else "lightgray" lw = 1.8 if is_target else 0.8 ax.fill(poly[:, 0], poly[:, 1], fc=fc, alpha=0.18, ec="none") ax.plot(ring_x, ring_y, color="dimgray", lw=lw) cx, cy = float(cell_centers[c, 0]), float(cell_centers[c, 1]) ax.text(cx, cy, str(c), ha="center", va="center", fontsize=7, color="black", clip_on=True) Q = ax.quiver( qx_grid[wet_r, wet_c], qy_grid[wet_r, wet_c], u_draw, v_draw, speeds, cmap="plasma", norm=norm, scale=1.0, scale_units="xy", angles="xy", pivot="middle", ) fig.colorbar(Q, ax=ax, label="Speed") ax.set_aspect("equal") ax.set_title( f"{self.name} t={timestep} cell={cell_index} " f"mode={render_mode} buf={buffer}" ) ax.set_xlabel("X") ax.set_ylabel("Y") return ax # Fallback: no wet pixels — still draw polygons if ax is None: _, ax = plt.subplots() for c in neighbors: poly = cell_polys[c] if len(poly) == 0: continue is_target = c == cell_index ring_x = np.append(poly[:, 0], poly[0, 0]) ring_y = np.append(poly[:, 1], poly[0, 1]) fc = "steelblue" if is_target else "lightgray" lw = 1.8 if is_target else 0.8 ax.fill(poly[:, 0], poly[:, 1], fc=fc, alpha=0.18, ec="none") ax.plot(ring_x, ring_y, color="dimgray", lw=lw) cx, cy = float(cell_centers[c, 0]), float(cell_centers[c, 1]) ax.text(cx, cy, str(c), ha="center", va="center", fontsize=7, color="black", clip_on=True) ax.set_aspect("equal") ax.set_title( f"{self.name} t={timestep} cell={cell_index} " f"mode={render_mode} buf={buffer} [dry]" ) ax.set_xlabel("X") ax.set_ylabel("Y") return ax
# ------------------------------------------------------------------ # Derived results helpers (computed from HDF result arrays) # ------------------------------------------------------------------
[docs] def water_surface_at_facepoints(self, timestep: int) -> np.ndarray: """WSE interpolated to facepoints for one timestep. Convenience wrapper around :meth:`~FlowArea.wse_at_facepoints` that reads the water-surface time series internally. Parameters ---------- timestep: 0-based index into the time dimension. Returns ------- ndarray, shape ``(n_facepoints,)`` Facepoint WSE. ``nan`` where all adjacent cells are dry. """ # Include ghost-cell WSE so perimeter facepoints receive a value # from the adjacent boundary cell rather than returning NaN. cell_wse = np.array(self.water_surface[timestep, :]) return self.wse_at_facepoints(cell_wse)
[docs] def wet_cells(self, timestep: int, depth_min: float = 0.0) -> np.ndarray: """Boolean mask of wet cells for one timestep. A cell is wet when ``WSE - cell_min_elevation > depth_min``. Parameters ---------- timestep: 0-based index into the time dimension. depth_min: Minimum depth threshold in model units. Default ``0.0``. Returns ------- ndarray, shape ``(n_cells,)``, dtype bool """ wse = np.array(self.water_surface[timestep, : self.n_cells]) return (wse - self.cell_min_elevation) > depth_min
[docs] def wet_faces(self, timestep: int, depth_min: float = 0.0) -> np.ndarray: """Boolean mask of wet faces for one timestep. A face is wet when at least one of its adjacent cells is wet (see :meth:`wet_cells`). Boundary faces are wet when their single adjacent real cell is wet. Parameters ---------- timestep: 0-based index into the time dimension. depth_min: Minimum cell depth threshold passed to :meth:`wet_cells`. Returns ------- ndarray, shape ``(n_faces,)``, dtype bool """ wc = self.wet_cells(timestep, depth_min) fci = self.face_cell_indexes # (n_faces, 2) n = self.n_cells c0_wet = np.where(fci[:, 0] >= 0, wc[np.clip(fci[:, 0], 0, n - 1)], False) c1_wet = np.where(fci[:, 1] >= 0, wc[np.clip(fci[:, 1], 0, n - 1)], False) return c0_wet | c1_wet
# ------------------------------------------------------------------ # Raster export - delegates to rivia.geo (deferred import) # ------------------------------------------------------------------
[docs] @log_call(logging.INFO) @timed() def export_raster( self, variable: Literal["wse", "water_surface", "depth", "velocity", "velocity_vector"], timestep: int | None = None, output_path: str | Path | None = None, *, reference_raster: str | Path | None = None, cell_size: float | None = None, crs: Any | None = None, nodata: float = -9999.0, render_mode: Literal["horizontal", "sloping", "hybrid"] = "sloping", use_depth_weights: bool = False, shallow_to_flat: bool = False, depth_threshold: float = 0.001, tight_extent: bool = True, ) -> Path | rasterio.io.DatasetReader: """Rasterize a result variable using the RASMapper-exact algorithm. Implements the pixel-perfect pipeline reverse-engineered from ``RasMapperLib/`` (decompiled C# source, HEC-RAS 6.6), validated against RASMapper VRT exports — median ``diff`` = 0.000000. Parameters ---------- variable: ``"wse"`` / ``"water_surface"`` — water-surface elevation. ``"depth"`` — water depth (WSE minus terrain); requires *reference_raster*. ``"velocity"`` — 1-band speed raster ``sqrt(Vx²+Vy²)``; requires an explicit *timestep*. ``"velocity_vector"`` — 4-band raster ``[Vx, Vy, speed, direction_deg]``; requires an explicit *timestep*. timestep: 0-based time index. Pass ``None`` to use the time of maximum water-surface elevation (``"wse"``/``"water_surface"`` and ``"depth"`` only; raises ``ValueError`` for ``"velocity"``). output_path: Destination ``.tif`` file path. ``None`` returns an open in-memory ``rasterio.DatasetReader``; the caller must close it. reference_raster: Existing GeoTIFF whose transform and CRS are inherited. Also used as the terrain DEM for depth computation. **Required** when ``variable="depth"``. Mutually exclusive with *cell_size*. cell_size: Output pixel size in model coordinate units. Used when no *reference_raster* is supplied; grid origin is derived from the flow-area perimeter bounding box. Mutually exclusive with *reference_raster*. crs: Output CRS. Inherited from *reference_raster* when ``None``. nodata: Fill value for dry / out-of-domain pixels (default ``-9999``). render_mode: ``"sloping"`` (default) — RASMapper "Sloping Cell Corners"; uses corner facepoints only. RasMapperLib hardcodes ``shallow_to_flat=True`` for this mode; the user-supplied value is overridden. Matches ``store_map(render_mode="sloping")``. ``"hybrid"`` — "Sloping Cell Corners + Face Centers"; ``use_depth_weights`` and ``shallow_to_flat`` are honoured. Matches ``store_map(render_mode="hybrid")``. ``"horizontal"`` — flat per-cell value; facepoint interpolation is skipped. Matches ``store_map(render_mode="horizontal")``. use_depth_weights: Weight face contributions by water depth. **``hybrid`` only**; forced ``False`` for other modes. Requires *reference_raster*. shallow_to_flat: Render cells with no hydraulically-connected faces flat. **``hybrid`` only** (user-configurable); forced ``True`` for ``"sloping"`` per RasMapperLib, ``False`` for ``"horizontal"``. depth_threshold: Minimum depth for a pixel to be considered wet (default ``0.001``). Matches ``RASResults.MinWSPlotTolerance``. tight_extent: When ``True`` (default), pixels outside the flow-area boundary polygon are set to *nodata*. Returns ------- Path Written GeoTIFF path when *output_path* is given. rasterio.io.DatasetReader Open in-memory dataset when *output_path* is ``None``. Raises ------ ImportError If ``rasterio`` or ``shapely`` are not installed. ValueError If ``variable="depth"`` and *reference_raster* is not provided. If ``variable="velocity"`` or ``variable="velocity_vector"`` and ``timestep=None``. If neither *reference_raster* nor *cell_size* is provided. """ from rivia.geo import raster as _raster if variable in ("velocity", "velocity_vector") and timestep is None: raise ValueError( "timestep=None is not supported for velocity. " "Provide an explicit timestep index." ) if reference_raster is None and cell_size is None: # Default: median face length (same heuristic as export_raster) cell_size = float(np.median(self.face_normals[:, 2])) # ---- Read HDF arrays ------------------------------------------------ if timestep is None: cell_wse = self._max_water_surface["value"].to_numpy() else: cell_wse = self._wse(timestep) face_normal_velocity: np.ndarray | None = None if variable in ("velocity", "velocity_vector"): face_normal_velocity = np.array(self.face_velocity[timestep, :]) cell_face_info, cell_face_values = self.cell_face_info fp_face_info, fp_face_values = self.facepoint_face_orientation # ---- Delegate to rasterize_results ------------------------------------- return _raster.rasterize_results( variable=variable, cell_wse=cell_wse, cell_min_elevation=self._cell_min_elevation, face_min_elevation=self.face_min_elevation, face_cell_indexes=self.face_cell_indexes, cell_face_info=cell_face_info, cell_face_values=cell_face_values, face_facepoint_indexes=self.face_facepoint_indexes, fp_coords=self.facepoint_coordinates, face_normals=self.face_normals, fp_face_info=fp_face_info, fp_face_values=fp_face_values, cell_polygons=self.cell_polygons, face_normal_velocity=face_normal_velocity, output_path=output_path, cell_centers=self.cell_centers, cell_surface_area=self.cell_surface_area, reference_raster=reference_raster, cell_size=cell_size, crs=crs, nodata=nodata, render_mode=render_mode, use_depth_weights=use_depth_weights, shallow_to_flat=shallow_to_flat, depth_threshold=depth_threshold, tight_extent=tight_extent, perimeter=self.perimeter, )
[docs] @log_call(logging.INFO) @timed() def export_hydraulic_rasters( self, timestep: int, reference_raster: str | Path, *, wse_path: str | Path | None = None, depth_path: str | Path | None = None, velocity_path: str | Path | None = None, nodata: float = -9999.0, render_mode: Literal["horizontal", "sloping", "hybrid"] = "sloping", use_depth_weights: bool = False, shallow_to_flat: bool = False, depth_threshold: float = 0.001, tight_extent: bool = True, ) -> dict[str, Path | "rasterio.io.DatasetReader"]: """Export water-surface elevation, depth, and velocity rasters in one call. Convenience wrapper that calls :meth:`export_raster` three times — once each for ``"water_surface"``, ``"depth"``, and ``"velocity"`` — sharing the same render settings. Parameters ---------- timestep: 0-based time index. Required — all three outputs need a specific timestep (velocity has no max-value fallback). reference_raster: Path to the terrain DEM GeoTIFF. Required — used to derive depth (WSE minus DEM) and to inherit the output CRS and transform. wse_path: Destination ``.tif`` for the water-surface elevation raster. ``None`` returns an open in-memory ``rasterio.DatasetReader``. depth_path: Destination ``.tif`` for the depth raster. ``None`` returns an open in-memory ``rasterio.DatasetReader``. velocity_path: Destination ``.tif`` for the velocity magnitude raster. ``None`` returns an open in-memory ``rasterio.DatasetReader``. nodata: Fill value for dry / out-of-domain pixels (default ``-9999``). render_mode: Passed to :meth:`export_raster` for all three variables. use_depth_weights: Passed to :meth:`export_raster`. ``hybrid`` only. shallow_to_flat: Passed to :meth:`export_raster`. ``hybrid`` only. depth_threshold: Minimum depth for a pixel to be considered wet (default ``0.001``). tight_extent: When ``True`` (default), pixels outside the flow-area boundary are set to *nodata*. Returns ------- dict with keys ``"water_surface"``, ``"depth"``, ``"velocity"``. Each value is the written ``Path`` (when the corresponding ``*_path`` argument is given) or an open in-memory ``rasterio.DatasetReader`` (when ``None``). The caller must close any in-memory datasets. """ common = dict( timestep=timestep, reference_raster=reference_raster, nodata=nodata, render_mode=render_mode, use_depth_weights=use_depth_weights, shallow_to_flat=shallow_to_flat, depth_threshold=depth_threshold, tight_extent=tight_extent, ) return { "water_surface": self.export_raster( "water_surface", output_path=wse_path, **common ), "depth": self.export_raster( "depth", output_path=depth_path, **common ), "velocity": self.export_raster( "velocity", output_path=velocity_path, **common ), }
# --------------------------------------------------------------------------- # FlowAreaResultsCollection # ---------------------------------------------------------------------------
[docs] class FlowAreaResultsCollection(FlowAreaCollection): """Collection of :class:`FlowAreaResults` objects backed by a plan HDF file. Overrides :class:`FlowAreaCollection` to return ``FlowAreaResults`` instead of plain ``FlowArea`` instances. """ def __getitem__(self, name: str) -> FlowAreaResults: if name not in self._cache: root = self._hdf.get("Geometry/2D Flow Areas") if root is None or name not in root: raise KeyError( f"2D flow area {name!r} not found. Available: {self.names}" ) n_cells = self._get_real_cell_count(name) # Time-series group for this area ts_path = f"{_TS_2D}/{name}" if ts_path not in self._hdf: raise KeyError( f"No time-series results found for flow area {name!r} " f"at '{ts_path}'. Has the plan been computed?" ) # Summary group (may be absent for steady-flow plans) sum_path = f"{_SUM_2D}/{name}" sum_group = self._hdf.get(sum_path) if sum_group is None: raise KeyError( f"No summary results found for flow area {name!r} at '{sum_path}'." ) self._cache[name] = FlowAreaResults( geom_group=root[name], ts_group=self._hdf[ts_path], sum_group=sum_group, name=name, n_cells=n_cells, ) return self._cache[name] # type: ignore[return-value]
# --------------------------------------------------------------------------- # StorageAreaResults - extends StorageArea geometry with plan results # ---------------------------------------------------------------------------
[docs] class StorageAreaResults(StorageArea): """Geometry *and* time-series results for one storage area. Inherits all geometry properties from :class:`~rivia.hdf.StorageArea` (:attr:`boundary`, :attr:`volume_elevation`, :meth:`volume_at_elevation`, etc.). Time-series properties return ``numpy`` arrays (storage areas are scalar entities so their result datasets are small and eager loading is appropriate). Parameters ---------- sa: Parent geometry object whose fields are copied into this instance. sa_index: 0-based column index of this SA in the flat ``(n_t, n_sa)`` datasets (``Water Surface``, ``Flow``) stored under ``Storage Areas/``. ts_sa_group: ``h5py.Group`` at ``-/Unsteady Time Series/Storage Areas``, or ``None`` when the plan has no SA results. sum_sa_group: ``h5py.Group`` at ``-/Summary Output/Storage Areas``, or ``None``. """ def __init__( self, sa: StorageArea, sa_index: int, ts_sa_group: "h5py.Group | None", sum_sa_group: "h5py.Group | None", ) -> None: super().__init__( name=sa.name, mode=sa.mode, boundary=sa.boundary, volume_elevation=sa.volume_elevation, ) self._i = sa_index self._ts = ts_sa_group self._sum = sum_sa_group # per-SA subgroup: -/Storage Areas/<name>/ self._sub = ts_sa_group.get(sa.name) if ts_sa_group else None self._cache: dict[str, np.ndarray] = {} # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _load_flat(self, key: str) -> np.ndarray: """Load column *i* from a flat ``(n_t, n_sa)`` dataset.""" if key not in self._cache: if self._ts is None: raise KeyError( f"No time-series results for storage area {self.name!r}. " "Has the plan been computed?" ) self._cache[key] = np.array(self._ts[key])[:, self._i] return self._cache[key] def _load_vars(self) -> np.ndarray: """Load and cache the ``(n_t, 6)`` Storage Area Variables array.""" if "_vars" not in self._cache: if self._sub is None or "Storage Area Variables" not in self._sub: raise KeyError( f"'Storage Area Variables' not found for storage area " f"{self.name!r}." ) self._cache["_vars"] = np.array(self._sub["Storage Area Variables"]) return self._cache["_vars"] # ------------------------------------------------------------------ # Flat time-series (one value per timestep) # ------------------------------------------------------------------ @property def wse(self) -> np.ndarray: """Water-surface elevation time series. Shape ``(n_t,)``.""" return self._load_flat("Water Surface") @property def flow(self) -> np.ndarray: """Net inflow rate (positive = into SA). Shape ``(n_t,)``.""" return self._load_flat("Flow") # ------------------------------------------------------------------ # Storage Area Variables columns (WSE, flows, area, volume) # ------------------------------------------------------------------ @property def inflow_net(self) -> np.ndarray: """Net inflow rate. Shape ``(n_t,)``.""" return self._load_vars()[:, 1] @property def inflow(self) -> np.ndarray: """Total inflow rate (sum of all inflow sources). Shape ``(n_t,)``.""" return self._load_vars()[:, 2] @property def outflow(self) -> np.ndarray: """Total outflow rate (sum of all outflow sinks). Shape ``(n_t,)``.""" return self._load_vars()[:, 3] @property def surface_area(self) -> np.ndarray: """Water-surface area time series (model area units). Shape ``(n_t,)``.""" return self._load_vars()[:, 4] @property def volume(self) -> np.ndarray: """Stored volume time series (model volume units). Shape ``(n_t,)``.""" return self._load_vars()[:, 5] # ------------------------------------------------------------------ # Connection inflows # ------------------------------------------------------------------ @property def connections(self) -> np.ndarray | None: """Inflow from each named connection. Shape ``(n_t, n_conns)``, or ``None`` when no connection data is stored. Column names are in :attr:`connection_names`. """ if "_conns" not in self._cache: if self._sub is None: return None ds = self._sub.get("Connections to Storage Area") if ds is None: return None self._cache["_conns"] = np.array(ds) return self._cache["_conns"] @property def connection_names(self) -> list[str]: """Names of the inflow connection sources (from HDF ``Connections`` attribute). Falls back to index-based names if the attribute is absent. """ if self._sub is None: return [] ds = self._sub.get("Connections to Storage Area") if ds is None: return [] attr = ds.attrs.get("Connections") if attr is None: n = ds.shape[1] if ds.ndim > 1 else 1 return [f"connection_{i}" for i in range(n)] return [_decode(v) for v in attr] # ------------------------------------------------------------------ # Summary results # ------------------------------------------------------------------ def _load_summary(self, key: str) -> pd.DataFrame: """Load a ``(2, n_sa)`` summary dataset as a single-row DataFrame.""" if self._sum is None: raise KeyError( f"No summary results for storage area {self.name!r}. " "Has the plan been computed?" ) raw = np.array(self._sum[key]) # shape (2, n_sa) return pd.DataFrame({"value": [float(raw[0, self._i])], "time": [float(raw[1, self._i])]}) @property def max_wse(self) -> pd.DataFrame: """Maximum WSE. DataFrame with columns ``['value', 'time']``. ``value``: maximum WSE in model units. ``time``: elapsed simulation time (days) when maximum occurred. """ return self._load_summary("Maximum Water Surface") @property def min_wse(self) -> pd.DataFrame: """Minimum WSE. Same column layout as :attr:`max_wse`.""" return self._load_summary("Minimum Water Surface")
# --------------------------------------------------------------------------- # StorageAreaResultsCollection # ---------------------------------------------------------------------------
[docs] class StorageAreaResultsCollection(StorageAreaCollection): """Collection of :class:`StorageAreaResults` backed by a plan HDF file. Overrides :class:`~rivia.hdf.StorageAreaCollection` to return ``StorageAreaResults`` with both geometry *and* plan results. """ def _load(self) -> dict[str, StorageAreaResults]: # type: ignore[override] if self._items is not None: return self._items # type: ignore[return-value] if _SA_ROOT not in self._hdf: self._items = {} return self._items # type: ignore[return-value] # Re-read geometry flat arrays (same logic as StorageAreaCollection._load) root = self._hdf[_SA_ROOT] attrs = np.array(root["Attributes"]) poly_info = np.array(root["Polygon Info"]) poly_pts = np.array(root["Polygon Points"]) ve_info = np.array(root["Volume Elevation Info"]) ve_vals = np.array(root["Volume Elevation Values"]) ts_sa_group = self._hdf.get(_TS_SA) sum_sa_group = self._hdf.get(_SUM_SA) items: dict[str, StorageAreaResults] = {} for i, row in enumerate(attrs): name = _decode(row["Name"]) mode = _decode(row["Mode"]) start_pt = int(poly_info[i, 0]) n_pts = int(poly_info[i, 1]) boundary = poly_pts[start_pt : start_pt + n_pts].astype(float) ve_start = int(ve_info[i, 0]) ve_count = int(ve_info[i, 1]) vol_elev = ( ve_vals[ve_start : ve_start + ve_count].astype(float) if ve_count > 0 else np.empty((0, 2), dtype=float) ) sa = StorageArea( name=name, mode=mode, boundary=boundary, volume_elevation=vol_elev ) items[name] = StorageAreaResults( sa=sa, sa_index=i, ts_sa_group=ts_sa_group, sum_sa_group=sum_sa_group, ) self._items = items # type: ignore[assignment] return items def __getitem__(self, name: str) -> StorageAreaResults: # type: ignore[override] items = self._load() if name not in items: raise KeyError( f"Storage area {name!r} not found. Available: {self.names}" ) return items[name]
# --------------------------------------------------------------------------- # _StructureResultsMixin - shared HDF result access for all structure types # --------------------------------------------------------------------------- class _StructureResultsMixin: """Mixin that adds ``Structure Variables`` HDF access to geometry dataclasses. All four structure result classes inherit this so ``variable_names``, ``flow_total``, ``stage_hw``, ``stage_tw``, ``weir_variables``, and ``flow_gate`` are implemented once and shared. Concrete subclasses must set ``self._g`` (the result ``h5py.Group``) and ``self._cache`` (empty ``dict``) in their ``__init__``. """ if TYPE_CHECKING: _g: h5py.Group _cache: dict[str, np.ndarray] # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _load(self, key: str) -> np.ndarray: if key not in self._cache: self._cache[key] = np.array(self._g[key]) return self._cache[key] def _col_index(self, *candidates: str) -> int: """Index of first column whose name contains any candidate (case-insensitive). Candidates are tried in order; the first column whose lowercased name contains a lowercased candidate wins. Raises ``KeyError`` if nothing matches. """ names_lower = [n.lower() for n in self.variable_names] for cand in candidates: cand_l = cand.lower() for i, n in enumerate(names_lower): if cand_l in n: return i raise KeyError( f"No column matching {candidates!r} in {self.variable_names!r}" ) # ------------------------------------------------------------------ # Structure Variables # ------------------------------------------------------------------ @property def variable_names(self) -> list[str]: """Column names from the ``Structure Variables`` ``Variable_Unit`` attribute. Falls back to ``col_0``, ``col_1``, ... when the attribute is absent. """ ds = self._g["Structure Variables"] attr = ds.attrs.get("Variable_Unit") if attr is None: attr = ds.attrs.get("Variables") if attr is not None: return [_decode(v[0]) for v in attr] return [f"col_{i}" for i in range(ds.shape[1])] @property def structure_variables(self) -> "h5py.Dataset": """All structure variables as a lazy ``h5py.Dataset``, shape ``(n_t, n_vars)``. Column names are in :attr:`variable_names`. """ return self._g["Structure Variables"] @property def flow_total(self) -> np.ndarray: """Total flow through the structure. Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("total flow", "flow") ] @property def stage_hw(self) -> np.ndarray: """Headwater stage (upstream side). Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("stage hw", "hw", "headwater") ] @property def stage_tw(self) -> np.ndarray: """Tailwater stage (downstream side). Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("stage tw", "tw", "tailwater") ] @property def flow_gate_total(self) -> np.ndarray | None: """Sum of flow through all gate groups. Shape ``(n_t,)``. Returns ``None`` when no ``Total Gate Flow`` column is present in ``Structure Variables`` (i.e. the structure has no gate groups). """ try: col = self._col_index("total gate flow", "gate flow") except KeyError: return None return self._load("Structure Variables")[:, col] @property def flow_weir(self) -> np.ndarray | None: """Weir overflow component. Shape ``(n_t,)``. Returns ``None`` when no ``Weir Flow`` column is present in ``Structure Variables`` (i.e. the structure has no weir). """ try: col = self._col_index("weir flow") except KeyError: return None return self._load("Structure Variables")[:, col] # ------------------------------------------------------------------ # Optional datasets (shared by Inline, Lateral, Bridge, SA2DConnection) # ------------------------------------------------------------------ @property def weir_variables(self) -> "h5py.Dataset | None": """Detailed weir hydraulics time series, or ``None`` if absent.""" return self._g.get("Weir Variables") def flow_gate(self, gate_number: int) -> "h5py.Dataset": """Gate operation dataset for gate *gate_number* (1-based). Returns a lazy ``h5py.Dataset``, shape ``(n_t, n_vars)``. Raises ------ KeyError If the gate number does not exist for this structure. """ path = f"Gate Groups/Gate #{gate_number}" if path not in self._g: gates_grp = self._g.get("Gate Groups") available = list(gates_grp.keys()) if gates_grp is not None else [] raise KeyError( f"Gate #{gate_number} not found. Available: {available}" ) return self._g[path] # --------------------------------------------------------------------------- # SA2DConnectionResults - one connection between two hydraulic areas # ---------------------------------------------------------------------------
[docs] class SA2DConnectionResults(_StructureResultsMixin, SA2DConnection): """Geometry *and* time-series results for one HEC-RAS SA/2D connection. Inherits geometry from :class:`~rivia.hdf.SA2DConnection` and shared HDF result access (``structure_variables``, ``total_flow``, ``stage_hw``, ``stage_tw``, ``weir_variables``, ``flow_gate``) from :class:`_StructureResultsMixin`. Parameters ---------- geom: Geometry object from :class:`~rivia.hdf.StructureCollection`. group: ``h5py.Group`` at ``-/SA 2D Area Conn/<plan_name>``. """ def __init__(self, geom: SA2DConnection, group: h5py.Group) -> None: SA2DConnection.__init__( self, mode=geom.mode, upstream_type=geom.upstream_type, downstream_type=geom.downstream_type, centerline=geom.centerline, name=geom.name, upstream_node=geom.upstream_node, downstream_node=geom.downstream_node, ) # Plan result group name may differ from geometry name for 2D↔2D connections # (HEC-RAS prefixes the flow-area name, e.g. "Lower Levee" → # "BaldEagleCr Lower Levee"). self._plan_name: str = group.name.split("/")[-1] self._g = group self._cache: dict[str, np.ndarray] = {} # ------------------------------------------------------------------ # SA2D-specific result datasets # ------------------------------------------------------------------ @property def breaching_variables(self) -> "h5py.Dataset | None": """Breach geometry and flow time series, or ``None`` if not breach-capable. Lazy ``h5py.Dataset``, shape ``(n_t, 10)``. Columns: Stage HW, Stage TW, Bottom Width, Bottom Elevation, Left Side Slope, Right Side Slope, Breach Flow, Breach Velocity, Breach Flow Area, Top Elevation. """ return self._g.get("Breaching Variables") @property def headwater_cells(self) -> np.ndarray | None: """2-D mesh cell indices on the headwater side, or ``None`` if absent. Shape ``(n_faces,)``. """ if "Headwater Cells" not in self._g: return None return self._load("Headwater Cells") @property def tailwater_cells(self) -> np.ndarray | None: """2-D mesh cell indices on the tailwater side, or ``None`` if absent. For 2D↔2D connections (levees) these are stored as a flat ``int32`` dataset at the group root. For SA↔2D connections (e.g. a dam with a storage-area headwater) they are stored as fixed-width byte strings in ``HW TW Segments/Tailwater Cells`` and are decoded here. Shape ``(n_cells,)``. """ # 2D↔2D: flat int32 at group root if "Tailwater Cells" in self._g: return self._load("Tailwater Cells") # SA↔2D: string-encoded cell indices in HW TW Segments subgroup seg = self._g.get("HW TW Segments") if seg is not None and "Tailwater Cells" in seg: raw = seg["Tailwater Cells"][:] return np.array( [int(v.decode().strip()) for v in raw], dtype=np.int32 ) return None
# --------------------------------------------------------------------------- # InlineResults - inline structure geometry + plan results # ---------------------------------------------------------------------------
[docs] class InlineResults(_StructureResultsMixin, InlineStructure): """Geometry *and* time-series results for one HEC-RAS inline structure. Inherits geometry from :class:`~rivia.hdf.InlineStructure` and shared HDF result access from :class:`_StructureResultsMixin`. The HDF group is at ``Results/.../DSS Hydrograph Output/.../Inline Structures/<river reach rs>``. Parameters ---------- geom: Geometry object from :class:`~rivia.hdf.StructureCollection`. group: ``h5py.Group`` at the inline structure result path. """ def __init__(self, geom: InlineStructure, group: h5py.Group) -> None: InlineStructure.__init__( self, mode=geom.mode, upstream_type=geom.upstream_type, downstream_type=geom.downstream_type, centerline=geom.centerline, location=geom.location, upstream_node=geom.upstream_node, downstream_node=geom.downstream_node, weir=geom.weir, gate_groups=geom.gate_groups, ) self._g = group self._cache: dict[str, np.ndarray] = {}
# --------------------------------------------------------------------------- # LateralResults - lateral structure geometry + plan results # ---------------------------------------------------------------------------
[docs] class LateralResults(_StructureResultsMixin, LateralStructure): """Geometry *and* time-series results for one HEC-RAS lateral structure. Inherits geometry from :class:`~rivia.hdf.LateralStructure` and shared HDF result access from :class:`_StructureResultsMixin`. The HDF group is at ``Results/.../DSS Hydrograph Output/.../Lateral Structures/<river reach rs>``. ``downstream_node`` is the name of the connected Storage Area or 2-D Flow Area, or an empty string when flow exits the system. Parameters ---------- geom: Geometry object from :class:`~rivia.hdf.StructureCollection`. group: ``h5py.Group`` at the lateral structure result path. """ def __init__(self, geom: LateralStructure, group: h5py.Group) -> None: LateralStructure.__init__( self, mode=geom.mode, upstream_type=geom.upstream_type, downstream_type=geom.downstream_type, centerline=geom.centerline, location=geom.location, upstream_node=geom.upstream_node, downstream_node=geom.downstream_node, weir=geom.weir, gate_groups=geom.gate_groups, ) self._g = group self._cache: dict[str, np.ndarray] = {} # ------------------------------------------------------------------ # Lateral-specific reach-split variables # ------------------------------------------------------------------ @property def flow_hw_us(self) -> np.ndarray: """Flow at the upstream bounding cross section. Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("flow hw us") ] @property def flow_hw_ds(self) -> np.ndarray: """Flow at the downstream bounding cross section. Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("flow hw ds") ] @property def stage_hw_us(self) -> np.ndarray: """Stage at the upstream bounding cross section. Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("stage hw us") ] @property def stage_hw_ds(self) -> np.ndarray: """Stage at the downstream bounding cross section. Shape ``(n_t,)``.""" return self._load("Structure Variables")[ :, self._col_index("stage hw ds") ]
# --------------------------------------------------------------------------- # BridgeResults - bridge geometry + plan results # ---------------------------------------------------------------------------
[docs] class BridgeResults(_StructureResultsMixin, Bridge): """Geometry *and* time-series results for one HEC-RAS bridge structure. Inherits geometry from :class:`~rivia.hdf.Bridge` and shared HDF result access from :class:`_StructureResultsMixin`. The HDF group is at ``Results/.../DSS Hydrograph Output/.../Bridge/<river reach rs>``. Parameters ---------- geom: Geometry object from :class:`~rivia.hdf.StructureCollection`. group: ``h5py.Group`` at the bridge result path. """ def __init__(self, geom: Bridge, group: h5py.Group) -> None: Bridge.__init__( self, mode=geom.mode, upstream_type=geom.upstream_type, downstream_type=geom.downstream_type, centerline=geom.centerline, location=geom.location, upstream_node=geom.upstream_node, downstream_node=geom.downstream_node, weir=geom.weir, gate_groups=geom.gate_groups, ) self._g = group self._cache: dict[str, np.ndarray] = {}
# --------------------------------------------------------------------------- # StructureResultsCollection - plan-enriched StructureCollection # ---------------------------------------------------------------------------
[docs] class StructureResultsCollection(StructureCollection): """Plan-enriched structure collection: all structure types with results. Overrides :class:`~rivia.hdf.StructureCollection` so each item carries both geometry attributes *and* time-series result access: * :class:`SA2DConnection` → :class:`SA2DConnectionResults` (Base Output ``SA 2D Area Conn``) * :class:`InlineStructure` → :class:`InlineResults` (DSS Hydrograph Output ``Inline Structures``) * :class:`LateralStructure` → :class:`LateralResults` (DSS Hydrograph Output ``Lateral Structures``) * :class:`Bridge` → :class:`BridgeResults` (DSS Hydrograph Output ``Bridge``) When no plan result group is found for a structure (e.g. DSS output was not requested for that type), the plain geometry object is kept unchanged. """ def _load(self) -> dict[str, Structure]: # type: ignore[override] if self._items is not None: return self._items import h5py as _h5 # Build geometry items first (parent caches in self._items). geom_items = StructureCollection._load(self) # Helper: collect sub-groups from an HDF path (returns {} when absent). def _groups(path: str) -> dict[str, h5py.Group]: root = self._hdf.get(path) if root is None: return {} return {k: v for k, v in root.items() if isinstance(v, _h5.Group)} conn_groups = _groups(_DSS_SA_CONN) inline_groups = _groups(_DSS_INLINE) lateral_groups = _groups(_DSS_LATERAL) bridge_groups = _groups(_DSS_BRIDGE) items: dict[str, Structure] = {} for key, geom in geom_items.items(): if isinstance(geom, SA2DConnection): # Derive plan result group name from geometry fields: # 2D↔2D (levee): "{upstream_2d_area} {connection}" # SA↔2D / SA↔SA (one end is SA or '--'): Connection name if ( geom.upstream_type == "2D" and geom.downstream_type == "2D" ): plan_key = f"{geom.upstream_node} {geom.name}" else: plan_key = geom.name grp = conn_groups.get(plan_key) items[key] = ( SA2DConnectionResults(geom, grp) if grp is not None else geom ) elif isinstance(geom, InlineStructure): plan_key = " ".join(geom.location) grp = inline_groups.get(plan_key) items[key] = InlineResults(geom, grp) if grp is not None else geom elif isinstance(geom, LateralStructure): plan_key = " ".join(geom.location) grp = lateral_groups.get(plan_key) items[key] = LateralResults(geom, grp) if grp is not None else geom elif isinstance(geom, Bridge): plan_key = " ".join(geom.location) grp = bridge_groups.get(plan_key) items[key] = BridgeResults(geom, grp) if grp is not None else geom else: items[key] = geom self._items = items return self._items
# --------------------------------------------------------------------------- # CrossSectionResults / CrossSectionResultsCollection # --------------------------------------------------------------------------- class _CrossSectionResultsBase(CrossSection): """Private base for all three cross-section result variants. Holds the HDF handle, column index, and result-group root; provides the shared ``_load`` helper, ``water_surface``, and ``flow`` properties that are present in every output block. Concrete subclasses add the properties that are specific to their output block (:class:`CrossSectionResults`, :class:`CrossSectionResultsDSS`, :class:`CrossSectionResultsInstantaneous`). """ def __init__( self, geom: CrossSection, hdf: "h5py.File", index: int, root: str, ) -> None: CrossSection.__init__( self, river=geom.river, reach=geom.reach, rs=geom.rs, name=geom.name, left_bank=geom.left_bank, right_bank=geom.right_bank, len_left=geom.len_left, len_channel=geom.len_channel, len_right=geom.len_right, contraction=geom.contraction, expansion=geom.expansion, station_elevation=geom.station_elevation, mannings_n=geom.mannings_n, cut_line=geom.cut_line, centerline_polyline=geom.centerline_polyline, ) self._hdf = hdf self._index = index self._root = root self._cache: dict[str, np.ndarray] = {} def _load(self, dataset: str) -> np.ndarray: """Load column ``self._index`` from ``{root}/{dataset}``, cached.""" if dataset not in self._cache: ds = self._hdf.get(f"{self._root}/{dataset}") if ds is None: raise KeyError( f"Dataset '{dataset}' not found at '{self._root}'." ) self._cache[dataset] = np.array(ds[:, self._index]) return self._cache[dataset] @property def wse(self) -> np.ndarray: """Water surface elevation time series. Shape ``(n_t,)``.""" return self._load("Water Surface") @property def flow(self) -> np.ndarray: """Flow time series. Shape ``(n_t,)``.""" return self._load("Flow")
[docs] class CrossSectionResults(_CrossSectionResultsBase): """Geometry *and* results for one XS from the **Base Output** block. Corresponds to :attr:`UnsteadyPlan.cross_sections` (mapping output interval). All variables written by HEC-RAS at the mapping interval are exposed. Parameters ---------- geom: Geometry object from :class:`CrossSectionCollection`. hdf: Open ``h5py.File`` — kept alive by the parent ``UnsteadyPlan`` context. index: Column index of this XS in the ``(n_t, n_xs)`` result datasets. root: HDF path prefix — ``_TS_XS``. """ @property def flow_cumulative(self) -> np.ndarray: """Cumulative flow volume time series. Shape ``(n_t,)``.""" return self._load("Flow Volume Cumulative") @property def flow_lateral(self) -> np.ndarray: """Lateral flow time series. Shape ``(n_t,)``.""" return self._load("Flow Lateral") @property def velocity_channel(self) -> np.ndarray: """Channel velocity time series. Shape ``(n_t,)``.""" return self._load("Velocity Channel") @property def velocity_total(self) -> np.ndarray: """Total velocity time series. Shape ``(n_t,)``.""" return self._load("Velocity Total")
[docs] class CrossSectionResultsDSS(_CrossSectionResultsBase): """Geometry *and* results for one XS from the **DSS Hydrograph Output** block. Corresponds to :attr:`UnsteadyPlan.cross_sections_output` (hydrograph output interval). Available datasets: ``wse``, ``flow``, ``flow_cumulative``. Parameters ---------- geom: Geometry object from :class:`CrossSectionCollection`. hdf: Open ``h5py.File`` — kept alive by the parent ``UnsteadyPlan`` context. index: Column index of this XS in the ``(n_t, n_xs)`` result datasets. root: HDF path prefix — ``_DSS_XS``. """ @property def flow_cumulative(self) -> np.ndarray: """Cumulative flow volume time series. Shape ``(n_t,)``.""" return self._load("Flow Volume Cumulative")
[docs] class CrossSectionResultsInstantaneous(_CrossSectionResultsBase): """Geometry *and* results for one XS from the **Post Process Profiles** block. Corresponds to :attr:`UnsteadyPlan.cross_sections_instantaneous`. All result arrays have shape ``(n_profiles,)`` where index ``0`` is the **Max WS** profile and indices ``1:`` are the instantaneous profiles written at the instantaneous interval. Use :attr:`UnsteadyPlan.instantaneous_timestamps` for the datetime index of indices ``1:``. Available top-level datasets: ``water_surface``, ``flow``, ``energy_grade``. All ``Additional Variables`` sub-group datasets are exposed as named properties (e.g. :attr:`flow_total`, :attr:`velocity_channel`) and are also reachable via :meth:`additional_variable` for programmatic access. Parameters ---------- geom: Geometry object from :class:`CrossSectionCollection`. hdf: Open ``h5py.File`` — kept alive by the parent ``UnsteadyPlan`` context. index: Column index of this XS in the ``(n_profiles, n_xs)`` result datasets. root: HDF path prefix — ``_POSTPROC_XS``. """ # ------------------------------------------------------------------ # Top-level datasets # ------------------------------------------------------------------ @property def energy_grade(self) -> np.ndarray: """Energy grade line elevation (m). Shape ``(n_profiles,)``. Index 0 = Max WS profile; indices 1: = instantaneous profiles. """ return self._load("Energy Grade") # ------------------------------------------------------------------ # Additional Variables — generic accessor # ------------------------------------------------------------------
[docs] def additional_variable(self, name: str) -> np.ndarray: """Load one column from the ``Additional Variables`` sub-group. Parameters ---------- name: Dataset name inside ``Additional Variables/``, e.g. ``"Flow Total"``, ``"Velocity Channel"``, ``"Conveyance Total"``. Returns ------- ndarray, shape ``(n_profiles,)`` Index 0 = Max WS profile; indices 1: = instantaneous profiles. Raises ------ KeyError If *name* is not present in ``Additional Variables/``. """ return self._load(f"Additional Variables/{name}")
# ------------------------------------------------------------------ # Additional Variables — explicit properties # Each delegates to additional_variable() so caching is shared. # ------------------------------------------------------------------ @property def alpha(self) -> np.ndarray: """Velocity-head correction factor α. Shape ``(n_profiles,)``.""" return self.additional_variable("Alpha") @property def beta(self) -> np.ndarray: """Momentum correction factor β. Shape ``(n_profiles,)``.""" return self.additional_variable("Beta") @property def flow_area_channel(self) -> np.ndarray: """Channel flow area (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area Flow Channel") @property def flow_area_left_ob(self) -> np.ndarray: """Left overbank flow area (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area Flow Left OB") @property def flow_area_right_ob(self) -> np.ndarray: """Right overbank flow area (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area Flow Right OB") @property def flow_area_total(self) -> np.ndarray: """Total flow area (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area Flow Total") @property def ineffective_area_channel(self) -> np.ndarray: """Channel area including ineffective zones (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area including Ineffective Channel") @property def ineffective_area_left_ob(self) -> np.ndarray: """Left overbank area including ineffective zones (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area including Ineffective Left OB") @property def ineffective_area_right_ob(self) -> np.ndarray: """Right overbank area including ineffective zones (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area including Ineffective Right OB") @property def ineffective_area_total(self) -> np.ndarray: """Total area including ineffective zones (m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Area including Ineffective Total") @property def conveyance_channel(self) -> np.ndarray: """Channel conveyance (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Conveyance Channel") @property def conveyance_left_ob(self) -> np.ndarray: """Left overbank conveyance (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Conveyance Left OB") @property def conveyance_right_ob(self) -> np.ndarray: """Right overbank conveyance (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Conveyance Right OB") @property def conveyance_total(self) -> np.ndarray: """Total conveyance (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Conveyance Total") @property def critical_energy_grade(self) -> np.ndarray: """Critical energy grade line elevation (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Critical Energy Grade") @property def critical_water_surface(self) -> np.ndarray: """Critical water surface elevation (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Critical Water Surface") @property def energy_grade_slope(self) -> np.ndarray: """Energy grade slope (m/m). Shape ``(n_profiles,)``.""" return self.additional_variable("EG Slope") @property def friction_slope(self) -> np.ndarray: """Friction slope (m/m). Shape ``(n_profiles,)``.""" return self.additional_variable("Friction Slope") @property def flow_channel(self) -> np.ndarray: """Channel flow (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Flow Channel") @property def flow_left_ob(self) -> np.ndarray: """Left overbank flow (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Flow Left OB") @property def flow_right_ob(self) -> np.ndarray: """Right overbank flow (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Flow Right OB") @property def flow_total(self) -> np.ndarray: """Total flow (m³/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Flow Total") @property def hydraulic_depth_channel(self) -> np.ndarray: """Channel hydraulic depth (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Depth Channel") @property def hydraulic_depth_left_ob(self) -> np.ndarray: """Left overbank hydraulic depth (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Depth Left OB") @property def hydraulic_depth_right_ob(self) -> np.ndarray: """Right overbank hydraulic depth (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Depth Right OB") @property def hydraulic_depth_total(self) -> np.ndarray: """Total hydraulic depth (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Depth Total") @property def hydraulic_radius_channel(self) -> np.ndarray: """Channel hydraulic radius (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Radius Channel") @property def hydraulic_radius_left_ob(self) -> np.ndarray: """Left overbank hydraulic radius (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Radius Left OB") @property def hydraulic_radius_right_ob(self) -> np.ndarray: """Right overbank hydraulic radius (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Radius Right OB") @property def hydraulic_radius_total(self) -> np.ndarray: """Total hydraulic radius (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Hydraulic Radius Total") @property def mannings_n_channel(self) -> np.ndarray: """Weighted/composite channel Manning's n. Shape ``(n_profiles,)``.""" return self.additional_variable("Manning n Channel") @property def mannings_n_left_ob(self) -> np.ndarray: """Left overbank Manning's n. Shape ``(n_profiles,)``.""" return self.additional_variable("Manning n Left OB") @property def mannings_n_right_ob(self) -> np.ndarray: """Right overbank Manning's n. Shape ``(n_profiles,)``.""" return self.additional_variable("Manning n Right OB") @property def mannings_n_total(self) -> np.ndarray: """Total weighted Manning's n. Shape ``(n_profiles,)``.""" return self.additional_variable("Manning n Total") @property def max_depth_total(self) -> np.ndarray: """Total maximum water depth (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Maximum Depth Total") @property def shear(self) -> np.ndarray: """Bed shear stress (N/m²). Shape ``(n_profiles,)``.""" return self.additional_variable("Shear") @property def top_width_channel(self) -> np.ndarray: """Channel top width (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Channel") @property def top_width_channel_with_ineffective(self) -> np.ndarray: """Channel top width including ineffective areas (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Channel including Ineffective") @property def top_width_left_ob(self) -> np.ndarray: """Left overbank top width (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Left OB") @property def top_width_left_ob_with_ineffective(self) -> np.ndarray: """Left overbank top width including ineffective areas (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Left OB including Ineffective") @property def top_width_right_ob(self) -> np.ndarray: """Right overbank top width (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Right OB") @property def top_width_right_ob_with_ineffective(self) -> np.ndarray: """Right overbank top width including ineffective areas (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Right OB including Ineffective") @property def top_width_total(self) -> np.ndarray: """Total top width (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Total") @property def top_width_total_with_ineffective(self) -> np.ndarray: """Total top width including ineffective areas (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Top Width Total including Ineffective") @property def velocity_channel(self) -> np.ndarray: """Channel velocity (m/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Velocity Channel") @property def velocity_left_ob(self) -> np.ndarray: """Left overbank velocity (m/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Velocity Left OB") @property def velocity_right_ob(self) -> np.ndarray: """Right overbank velocity (m/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Velocity Right OB") @property def velocity_total(self) -> np.ndarray: """Total velocity (m/s). Shape ``(n_profiles,)``.""" return self.additional_variable("Velocity Total") @property def wse_total(self) -> np.ndarray: """Total stage / water surface elevation (m). Shape ``(n_profiles,)``. Sourced from ``Additional Variables/Water Surface Total``. Equivalent to :attr:`wse` for most configurations. """ return self.additional_variable("Water Surface Total") @property def wetted_perimeter_channel(self) -> np.ndarray: """Channel wetted perimeter (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Wetted Perimeter Channel") @property def wetted_perimeter_left_ob(self) -> np.ndarray: """Left overbank wetted perimeter (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Wetted Perimeter Left OB") @property def wetted_perimeter_right_ob(self) -> np.ndarray: """Right overbank wetted perimeter (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Wetted Perimeter Right OB") @property def wetted_perimeter_total(self) -> np.ndarray: """Total wetted perimeter (m). Shape ``(n_profiles,)``.""" return self.additional_variable("Wetted Perimeter Total")
[docs] class CrossSectionResultsCollection(CrossSectionCollection): """Plan-enriched cross section collection with time-series results. Parameterised over the concrete result class and the HDF path used to map cross sections to column indices, so one implementation serves all three output blocks. Parameters ---------- hdf: Open ``h5py.File`` handle. root: HDF path to the cross section result group: ``_TS_XS``, ``_DSS_XS``, or ``_POSTPROC_XS``. result_cls: Concrete result class to instantiate per cross section — :class:`CrossSectionResults`, :class:`CrossSectionResultsDSS`, or :class:`CrossSectionResultsInstantaneous`. attrs_path: HDF path to the ``Cross Section Attributes`` structured array used to map ``(river, reach, station)`` → column index. Defaults to ``f"{root}/Cross Section Attributes"``, which is correct for Base Output and DSS blocks. Pass ``_POSTPROC_GEOM_ATTRS`` for the Post Process block, where attributes live outside the XS result group. """ def __init__( self, hdf: "h5py.File", root: str, result_cls: type[_CrossSectionResultsBase] = CrossSectionResults, attrs_path: str | None = None, ) -> None: super().__init__(hdf) self._root = root self._result_cls = result_cls self._attrs_path = attrs_path or f"{root}/Cross Section Attributes" self._result_items: dict[str, _CrossSectionResultsBase] | None = None def _load_results(self) -> dict[str, _CrossSectionResultsBase]: if self._result_items is not None: return self._result_items geom_items = CrossSectionCollection._load(self) attrs_ds = self._hdf.get(self._attrs_path) if attrs_ds is None: self._result_items = {} return self._result_items result_attrs = np.array(attrs_ds) fn = attrs_ds.dtype.names result_index: dict[tuple[str, str, str], int] = {} for i, row in enumerate(result_attrs): r = _decode(row["River"]) if "River" in fn else "" rc = _decode(row["Reach"]) if "Reach" in fn else "" st = _decode(row["Station"]) if "Station" in fn else "" result_index[(r, rc, st)] = i items: dict[str, _CrossSectionResultsBase] = {} for key, geom in geom_items.items(): idx = result_index.get((geom.river, geom.reach, geom.rs)) if idx is not None: items[key] = self._result_cls(geom, self._hdf, idx, self._root) self._result_items = items return self._result_items @overload def __getitem__(self, key: int) -> _CrossSectionResultsBase: ... @overload def __getitem__(self, key: str) -> _CrossSectionResultsBase: ... @overload def __getitem__(self, key: tuple[str, str, str]) -> _CrossSectionResultsBase: ... def __getitem__( self, key: int | str | tuple[str, str, str] ) -> _CrossSectionResultsBase: items = self._load_results() if isinstance(key, int): keys = list(items) try: return items[keys[key]] except IndexError: raise IndexError( f"Index {key} out of range (n={len(items)})" ) from None if isinstance(key, tuple): str_key = self._loc_index.get(key) if str_key is None: raise KeyError(f"Cross section {key!r} not found.") if str_key not in items: raise KeyError( f"Cross section {key!r} has no results in this plan." ) return items[str_key] if key not in items: raise KeyError( f"Cross section {key!r} not found. Available: {self.names}" ) return items[key] def __len__(self) -> int: return len(self._load_results()) def __iter__(self) -> Iterator[_CrossSectionResultsBase]: return iter(self._load_results().values()) @property def names(self) -> list[str]: """Cross section keys available in this result block. Returns ------- list[str] Keys of the form ``"<river> <reach> <station>"``. """ return list(self._load_results().keys())
# --------------------------------------------------------------------------- # UnsteadyPlan - public entry point # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Simulation summary dataclasses # ---------------------------------------------------------------------------
[docs] @dataclasses.dataclass class RunStatus: """Overall run metadata from ``Results/Unsteady/Summary``. Attributes ---------- solution: HEC-RAS solution status string, e.g. ``"Unsteady Finished Successfully"`` or ``"Unsteady Went Unstable"``. run_window: Wall-clock window during which the simulation ran, as a raw string, e.g. ``"26NOV2025 15:58:44 to 26NOV2025 16:03:41"``. compute_time_total: Total wall-clock computation time in ``"HH:MM:SS"`` format. compute_time_dss: Time spent writing DSS output in ``"HH:MM:SS"`` format. max_cores: Maximum number of CPU cores used. max_wsel_error: Maximum water-surface elevation error (model units), or ``None`` when the simulation went unstable before convergence. time_unstable: Elapsed simulation time (days) when the solution went unstable, or ``None`` if the run finished successfully. timestamp_unstable: HEC-RAS date/time string for the instability, or ``None`` if the run finished successfully (HEC-RAS writes ``"Not Applicable"``). """ solution: str run_window: str compute_time_total: str compute_time_dss: str max_cores: int max_wsel_error: float | None time_unstable: float | None timestamp_unstable: str | None
[docs] def to_dict(self) -> dict[str, Any]: """Return a dict with short, meaningful keys.""" return { "solution": self.solution, "run_window": self.run_window, "compute_time_total": self.compute_time_total, "compute_time_dss": self.compute_time_dss, "max_cores": self.max_cores, "max_wsel_error": self.max_wsel_error, "time_unstable": self.time_unstable, "timestamp_unstable": self.timestamp_unstable, }
[docs] def parse_run_window( self, ) -> tuple[dt.datetime, dt.datetime] | None: """Parse :attr:`run_window` into ``(start, end)`` datetimes. Splits the raw string on ``" to "`` and parses each half with the HEC-RAS timestamp format ``"%d%b%Y %H:%M:%S"``. Returns ------- tuple[datetime, datetime] or None ``(start, end)`` as timezone-naive :class:`datetime.datetime` objects, or ``None`` when the string is missing, malformed, or cannot be parsed. Examples -------- :: s = hdf.compute_summary() window = s.run.parse_run_window() if window: start, end = window print(end - start) # wall-clock duration """ try: left, right = self.run_window.split(" to ", maxsplit=1) return ( parse_hec_datetime(left.strip(), fmt=_RAS_TS_FMT), parse_hec_datetime(right.strip(), fmt=_RAS_TS_FMT), ) except (ValueError, AttributeError): return None
[docs] @dataclasses.dataclass class VolumeAccounting: """Overall volume accounting from ``Results/Unsteady/Summary/Volume Accounting``. Covers the full model (1D + 2D combined). Attributes ---------- units: Volume units string written by HEC-RAS, e.g. ``"Acre Feet"`` or ``"1000 m^3"``. vol_start: Total storage volume at the start of the simulation. vol_end: Total storage volume at the end of the simulation. inflow: Total boundary flux of water into the model. outflow: Total boundary flux of water out of the model. error: Volume balance error (``vol_start + inflow - outflow - vol_end``). error_pct: Volume balance error as a percentage of total inflow. """ units: str vol_start: float vol_end: float inflow: float outflow: float error: float error_pct: float
[docs] def to_dict(self) -> dict[str, Any]: """Return a dict with short, meaningful keys.""" return { "units": self.units, "vol_start": self.vol_start, "vol_end": self.vol_end, "inflow": self.inflow, "outflow": self.outflow, "error": self.error, "error_pct": self.error_pct, }
[docs] @dataclasses.dataclass class VolumeAccounting1D: """1-D component volume accounting from ``Volume Accounting 1D``. Attributes ---------- units: Volume units string, e.g. ``"Acre Feet"`` or ``"1000 m^3"``. reach_vol_start: Total 1-D reach storage at simulation start. reach_vol_end: Total 1-D reach storage at simulation end. sa_vol_start: Storage-area volume at simulation start. sa_vol_end: Storage-area volume at simulation end. flow_us_in: Cumulative upstream inflow across all reaches. flow_ds_out: Cumulative downstream outflow across all reaches. hydro_lat: Cumulative lateral hydrograph exchange (positive = into model). hydro_sa: Cumulative storage-area hydrograph exchange. diversions: Cumulative diversions (negative = water removed). groundwater: Cumulative groundwater exchange. precip_excess: Cumulative precipitation excess (rainfall-runoff applied to reaches). """ units: str reach_vol_start: float reach_vol_end: float sa_vol_start: float sa_vol_end: float flow_us_in: float flow_ds_out: float hydro_lat: float hydro_sa: float diversions: float groundwater: float precip_excess: float
[docs] def to_dict(self) -> dict[str, Any]: """Return a dict with short, meaningful keys.""" return { "units": self.units, "reach_vol_start": self.reach_vol_start, "reach_vol_end": self.reach_vol_end, "sa_vol_start": self.sa_vol_start, "sa_vol_end": self.sa_vol_end, "flow_us_in": self.flow_us_in, "flow_ds_out": self.flow_ds_out, "hydro_lat": self.hydro_lat, "hydro_sa": self.hydro_sa, "diversions": self.diversions, "groundwater": self.groundwater, "precip_excess": self.precip_excess, }
[docs] @dataclasses.dataclass class VolumeAccounting2DArea: """Volume accounting for one named 2-D flow area. One instance per child group under ``Results/Unsteady/Summary/Volume Accounting/Volume Accounting 2D``. Attributes ---------- units: Volume units string, e.g. ``"Acre Feet"`` or ``"1000 m^3"``. vol_start: 2-D flow area storage at simulation start. vol_end: 2-D flow area storage at simulation end. cum_inflow: Cumulative inflow into this 2-D area. cum_outflow: Cumulative outflow out of this 2-D area. error: Volume balance error for this area. error_pct: Volume balance error as a percentage of cumulative inflow. """ units: str vol_start: float vol_end: float cum_inflow: float cum_outflow: float error: float error_pct: float
[docs] def to_dict(self) -> dict[str, Any]: """Return a dict with short, meaningful keys.""" return { "units": self.units, "vol_start": self.vol_start, "vol_end": self.vol_end, "cum_inflow": self.cum_inflow, "cum_outflow": self.cum_outflow, "error": self.error, "error_pct": self.error_pct, }
[docs] @dataclasses.dataclass class ComputeSummary: """Full unsteady simulation summary for a plan HDF file. Aggregates all summary groups under ``Results/Unsteady/Summary``. Attributes ---------- run: Overall run metadata (solution status, timing, stability). volume: Model-wide volume accounting (1D + 2D combined). volume_1d: Volume accounting for 1-D components only. volume_2d: Per-area volume accounting for each 2-D flow area. Empty dict when the model has no 2-D flow areas. """ run: RunStatus volume: VolumeAccounting volume_1d: VolumeAccounting1D volume_2d: dict[str, VolumeAccounting2DArea]
[docs] def to_dict(self) -> dict[str, Any]: """Return a nested dict with short, meaningful keys. Top-level keys: ``"run"``, ``"volume"``, ``"volume_1d"``, ``"volume_2d"``. The ``"volume_2d"`` value is a dict keyed by 2-D flow area name. """ return { "run": self.run.to_dict(), "volume": self.volume.to_dict(), "volume_1d": self.volume_1d.to_dict(), "volume_2d": { name: area.to_dict() for name, area in self.volume_2d.items() }, }
[docs] def errors(self) -> dict[str, float]: """Return key convergence and volume-balance error metrics. Derives four scalar error measures from this summary that are useful for quick quality-control checks after a run. Returns ------- dict[str, float] A flat dictionary with the following keys: ``"wse"`` Maximum water-surface elevation error across all cells (model units — feet or metres). ``None`` when the simulation went unstable before the run converged. ``"volume_error_pct"`` Overall volume balance error as a percentage of total inflow, read directly from the HEC-RAS ``Results/Unsteady/Summary`` volume accounting block. ``"volume_1d_error_pct"`` 1-D volume balance error (%) computed from the individual 1-D flux terms:: error = -(flow_us_in + net_other_fluxes + storage_change) supply = reach_vol_start + sa_vol_start + flow_us_in + hydro_lat + hydro_sa + diversions + groundwater + precip_excess error_pct = error / supply * 100 where *net_other_fluxes* = ``-flow_ds_out + hydro_lat + hydro_sa + diversions + groundwater + precip_excess``. ``supply`` is the total 1-D water budget (initial storage plus all non-DS-outflow fluxes); for a 1D-only model this matches the HEC-RAS overall ``"Error Percent"`` attribute. Returns ``nan`` when total 1-D supply is zero. ``"volume_2d_error_pct"`` Largest-magnitude 2-D volume balance error (%) across all 2-D flow areas, preserving the sign of the worst-case area. Zero when the model has no 2-D flow areas. See Also -------- ok : Quality-control pass/fail check with threshold logging. """ v1d = self.volume_1d outflow = ( v1d.flow_ds_out - v1d.hydro_lat - v1d.hydro_sa - v1d.diversions - v1d.groundwater - v1d.precip_excess ) storage_change = ( (v1d.reach_vol_start + v1d.sa_vol_start) - (v1d.reach_vol_end + v1d.sa_vol_end) ) error_1d = outflow - v1d.flow_us_in - storage_change supply_1d = ( v1d.reach_vol_start + v1d.sa_vol_start + v1d.flow_us_in + v1d.hydro_lat + v1d.hydro_sa + v1d.diversions + v1d.groundwater + v1d.precip_excess ) volume_1d_error_pct = ( error_1d / supply_1d * 100 if supply_1d != 0 else float("nan") ) if (self.volume_2d and not np.isclose(error_1d, 0.0) and np.isclose(v1d.flow_ds_out, 0.0)): logger.warning( "Model has 2-D flow areas but zero 1-D downstream outflow; " "1-D error percentages may be unreliable. Setting 1-D error to zero." ) volume_1d_error_pct = 0.0 max_2d_error_pct = 0.0 for area in self.volume_2d.values(): if abs(area.error_pct) > abs(max_2d_error_pct): max_2d_error_pct = area.error_pct return { "wse": self.run.max_wsel_error, "volume_error_pct": self.volume.error_pct, "volume_1d_error_pct": volume_1d_error_pct, "volume_2d_error_pct": max_2d_error_pct, }
[docs] def ok( self, wse_threshold: float = 0.5, volume_error_pct_threshold: float = 1.0, volume_1d_error_pct_threshold: float = 1.0, volume_2d_error_pct_threshold: float = 1.0, ) -> bool: """Return ``True`` if the simulation completed stably and all error metrics are within their thresholds. Checks stability first, then each error metric against its threshold. All failures are collected and logged before returning so the caller sees the complete picture in one pass. If the simulation went unstable every failure is logged at ``CRITICAL``; otherwise threshold violations are logged at ``WARNING``. Parameters ---------- wse_threshold: Maximum allowable water-surface elevation error in model units (feet or metres). Default: ``0.5``. volume_error_pct_threshold: Maximum allowable overall volume balance error (%). Default: ``1.0``. volume_1d_error_pct_threshold: Maximum allowable 1-D volume balance error (%). Default: ``1.0``. volume_2d_error_pct_threshold: Maximum allowable 2-D volume balance error (%) per 2-D flow area. Default: ``1.0``. Returns ------- bool ``True`` if the simulation is stable and all metrics are within bounds; ``False`` otherwise. See Also -------- errors : Raw error metric values. """ failures: list[str] = [] unstable = self.run.time_unstable is not None if unstable: failures.append( f"simulation went unstable at t={self.run.time_unstable}" ) errs = self.errors() wse = errs["wse"] if wse is None or math.isnan(wse) or abs(wse) > wse_threshold: failures.append( f"WSE error {wse!r} exceeds threshold {wse_threshold}" ) vol_err = errs["volume_error_pct"] if math.isnan(vol_err) or abs(vol_err) > volume_error_pct_threshold: failures.append( f"overall volume error {vol_err:.4f}% exceeds threshold " f"{volume_error_pct_threshold}%" ) vol_1d_err = errs["volume_1d_error_pct"] if math.isnan(vol_1d_err) or abs(vol_1d_err) > volume_1d_error_pct_threshold: vol_1d_str = "nan" if math.isnan(vol_1d_err) else f"{vol_1d_err:.4f}%" failures.append( f"1D volume error {vol_1d_str} exceeds threshold " f"{volume_1d_error_pct_threshold}%" ) for area_name, area in self.volume_2d.items(): exceeds = ( math.isnan(area.error_pct) or abs(area.error_pct) > volume_2d_error_pct_threshold ) if exceeds: failures.append( f"2D area '{area_name}' volume error {area.error_pct:.4f}% " f"exceeds threshold {volume_2d_error_pct_threshold}%" ) if not failures: return True log = logger.critical if unstable else logger.warning for msg in failures: log("compute_ok: %s", msg) return False
[docs] class UnsteadyPlan(_PlanHdf, Geometry): """Read HEC-RAS plan HDF5 output files (``*.p*.hdf``). A plan HDF file contains the same ``Geometry/`` data as a geometry HDF file, *plus* ``Results/Unsteady/...`` time-series and summary output. Parameters ---------- filename: Path to the plan HDF file. The ``.hdf`` suffix is appended automatically if absent. Examples -------- :: with UnsteadyPlan("MyModel.p01") as hdf: ts = hdf.mapping_timestamps area = hdf.flow_areas["spillway"] wse = area.water_surface[10] # one timestep depth = area.depth(10) speed = area.cell_speed(10) max_d = area.max_depth() # requires rasterio + scipy: area.export_raster("depth", "depth.tif", timestep=None, cell_size=5.0, crs="EPSG:26910") # or get an in-memory dataset: ds = area.export_raster("depth", timestep=None, cell_size=5.0) arr = ds.read(1) ds.close() """ def __init__(self, filename: str | Path, program_directory: str | Path | None = None) -> None: super().__init__(filename) self._program_directory = Path(program_directory) if program_directory else None self._geom_view: Geometry | None = None self._plan_flow_areas: FlowAreaResultsCollection | None = None self._plan_storage_areas: StorageAreaResultsCollection | None = None self._plan_structures: StructureResultsCollection | None = None self._plan_cross_sections: CrossSectionResultsCollection | None = None self._plan_cross_sections_output: CrossSectionResultsCollection | None = None self._plan_cross_sections_instantaneous: ( CrossSectionResultsCollection | None ) = None # ------------------------------------------------------------------ # Runtime log # ------------------------------------------------------------------
[docs] def runtime_log(self) -> UnsteadyRuntimeLog: """Read the runtime compute log from ``Results/Summary/``. Returns ------- UnsteadyRuntimeLog Log container with the full text/RTF compute messages, the compute-process table, and unsteady-specific parsing methods such as :meth:`~UnsteadyRuntimeLog.max_iterations` and :meth:`~UnsteadyRuntimeLog.adaptive_timesteps`. Raises ------ KeyError If ``Results/Summary`` is absent from the HDF file. """ return UnsteadyRuntimeLog(*self._runtime_log_raw())
# ------------------------------------------------------------------ # File metadata # ------------------------------------------------------------------
[docs] def compute_summary(self) -> ComputeSummary: """Return the full unsteady simulation summary. Reads all groups under ``Results/Unsteady/Summary`` and returns a :class:`ComputeSummary` dataclass aggregating run metadata, overall volume accounting, 1-D volume accounting, and per-area 2-D volume accounting. Call :meth:`ComputeSummary.to_dict` on the result for a nested dict with short, meaningful keys. Returns ------- ComputeSummary Raises ------ KeyError If ``Results/Unsteady/Summary`` is absent — e.g. the file is a steady-flow plan or the simulation has not been run yet. Examples -------- :: with UnsteadyPlan("MyModel.p01") as hdf: s = hdf.compute_summary() print(s.run.solution) print(s.volume.error_pct) d = s.to_dict() """ grp = self._hdf.get(_RUN_SUM) if grp is None: raise KeyError( f"'{_RUN_SUM}' not found. " "Ensure this is an unsteady-flow plan HDF file that has been run." ) def _str(attrs: Any, key: str) -> str: v = attrs[key] return v.decode() if isinstance(v, (bytes, np.bytes_)) else str(v) def _float_or_none(attrs: Any, key: str) -> float | None: v = float(attrs[key]) return None if np.isnan(v) else v # -- RunStatus --------------------------------------------------- a = grp.attrs unstable_ts = _str(a, "Time Stamp Solution Went Unstable") run = RunStatus( solution=_str(a, "Solution"), run_window=_str(a, "Run Time Window"), compute_time_total=_str(a, "Computation Time Total"), compute_time_dss=_str(a, "Computation Time DSS"), max_cores=int(a["Maximum number of cores"]), max_wsel_error=_float_or_none(a, "Maximum WSEL Error"), time_unstable=_float_or_none(a, "Time Solution Went Unstable"), timestamp_unstable=( None if unstable_ts == "Not Applicable" else unstable_ts ), ) # -- VolumeAccounting (overall) ----------------------------------- va_grp = grp["Volume Accounting"] a = va_grp.attrs volume = VolumeAccounting( units=_str(a, "Vol Accounting in"), vol_start=float(a["Volume Starting"]), vol_end=float(a["Volume Ending"]), inflow=float(a["Total Boundary Flux of Water In"]), outflow=float(a["Total Boundary Flux of Water Out"]), error=float(a["Error"]), error_pct=float(a["Error Percent"]), ) # -- VolumeAccounting1D ------------------------------------------ a = va_grp["Volume Accounting 1D"].attrs precip_key = next(k for k in a if k.startswith("Precip Excess")) volume_1d = VolumeAccounting1D( units=_str(a, "Vol Accounting in"), reach_vol_start=float(a["Reach Start 1D"]), reach_vol_end=float(a["Reach Final 1D"]), sa_vol_start=float(a["SA Starting"]), sa_vol_end=float(a["SA Final"]), flow_us_in=float(a["Flow US In"]), flow_ds_out=float(a["Flow DS Out"]), hydro_lat=float(a["Hydro Lat"]), hydro_sa=float(a["Hydro SA"]), diversions=float(a["Diversions"]), groundwater=float(a["Groundwater"]), precip_excess=float(a[precip_key]), ) # -- VolumeAccounting2D (optional) -------------------------------- volume_2d: dict[str, VolumeAccounting2DArea] = {} vd_grp = va_grp.get("Volume Accounting 2D") if vd_grp is not None: for area_name, area_grp in vd_grp.items(): a = area_grp.attrs volume_2d[area_name] = VolumeAccounting2DArea( units=_str(a, "Vol Accounting in"), vol_start=float(a["Vol Starting"]), vol_end=float(a["Vol Ending"]), cum_inflow=float(a["Cum Inflow"]), cum_outflow=float(a["Cum Outflow"]), error=float(a["Error"]), error_pct=float(a["Error Percent"]), ) return ComputeSummary( run=run, volume=volume, volume_1d=volume_1d, volume_2d=volume_2d, )
[docs] def compute_errors(self) -> dict[str, float]: """Return key convergence and volume-balance error metrics. Convenience wrapper: calls :meth:`compute_summary` and delegates to :meth:`ComputeSummary.errors`. See that method for full documentation. See Also -------- ComputeSummary.errors : Full documentation and return-value details. compute_ok : Quality-control pass/fail check with threshold logging. """ return self.compute_summary().errors()
[docs] def compute_ok( self, wse_threshold: float = 0.5, volume_error_pct_threshold: float = 1.0, volume_1d_error_pct_threshold: float = 1.0, volume_2d_error_pct_threshold: float = 1.0, ) -> bool: """Return ``True`` if the simulation completed stably and all error metrics are within their thresholds. Convenience wrapper: calls :meth:`compute_summary` and delegates to :meth:`ComputeSummary.ok`. See that method for full documentation, including parameter descriptions and logging behaviour. Examples -------- Quick pass/fail check after a run:: if not hdf.compute_ok(): raise RuntimeError("Simulation quality check failed.") Custom thresholds:: if not hdf.compute_ok(wse_threshold=0.1, volume_error_pct_threshold=0.5): logger.warning("Tight-tolerance check failed.") See Also -------- ComputeSummary.ok : Full documentation and parameter descriptions. compute_errors : Raw error metric values. """ return self.compute_summary().ok( wse_threshold=wse_threshold, volume_error_pct_threshold=volume_error_pct_threshold, volume_1d_error_pct_threshold=volume_1d_error_pct_threshold, volume_2d_error_pct_threshold=volume_2d_error_pct_threshold, )
@property def ras_version(self) -> str: """HEC-RAS version string from the plan HDF root attribute. Returns the ``File Version`` root attribute, e.g. ``'HEC-RAS 6.6 September 2024'``. """ raw = self._hdf.attrs["File Version"] return raw.decode() if isinstance(raw, (bytes, np.bytes_)) else str(raw) @property def computation_interval(self) -> dt.timedelta | None: """Base computation time step, or ``None`` if absent. Read from ``Plan Data/Plan Information`` attribute ``Computation Time Step Base``. HEC-RAS stores the value as a concatenated number-unit string, e.g. ``'20SEC'``, ``'5MIN'``, ``'1HR'``, ``'1HOUR'``, ``'1DAY'``. """ raw = self._plan_info_attr("Computation Time Step Base") return None if raw is None else parse_interval(raw) @property def mapping_interval(self) -> dt.timedelta | None: """Mapping output interval, or ``None`` if absent. Read from ``Plan Data/Plan Information`` attribute ``Base Output Interval``. HEC-RAS stores the value as a concatenated number-unit string, e.g. ``'5MIN'``. """ raw = self._plan_info_attr("Base Output Interval") return None if raw is None else parse_interval(raw) # ------------------------------------------------------------------ # Time stamps # ------------------------------------------------------------------ @property def mapping_timestamps(self) -> pd.DatetimeIndex: """Simulation output time stamps as a ``pd.DatetimeIndex``. Parsed from the ``Time Date Stamp`` dataset written by HEC-RAS. Format: ``DD Mon YYYY HH:MM:SS`` (e.g. ``03Jan2000 00:00:00``). """ ds = self._hdf.get(_TIME_STAMP_DS) if ds is None: raise KeyError( f"Time Date Stamp dataset not found at '{_TIME_STAMP_DS}'. " "Ensure this is an unsteady-flow plan HDF file." ) raw = np.array(ds).astype(str) return _parse_hec_ts_array(raw, _RAS_TS_FMT) @property def output_timestamps(self) -> pd.DatetimeIndex: """DSS hydrograph output time stamps as a ``pd.DatetimeIndex``. Parsed from ``Results/.../DSS Hydrograph Output/Unsteady Time Series/Time Date Stamp``. Format: ``DD Mon YYYY HH:MM:SS``. """ ds = self._hdf.get(_DSS_TIME_STAMP_DS) if ds is None: raise KeyError( f"Time Date Stamp dataset not found at '{_DSS_TIME_STAMP_DS}'. " "Ensure DSS hydrograph output was written for this plan." ) raw = np.array(ds).astype(str) return _parse_hec_ts_array(raw, _RAS_TS_FMT) @property def output_interval(self) -> dt.timedelta | None: """DSS hydrograph output interval, or ``None`` if absent. Derived from the difference between the first two hydrograph time stamps. """ ds = self._hdf.get(_DSS_TIME_STAMP_DS) if ds is None or len(ds) < 2: return None raw = np.array(ds[:2]).astype(str) ts = _parse_hec_ts_array(raw, _RAS_TS_FMT) return ts[1] - ts[0] @property def instantaneous_interval(self) -> dt.timedelta | None: """Instantaneous profile output interval, or ``None`` if absent. Derived from the difference between the first two actual profile timestamps in ``Profile Dates`` (i.e. entries at indices 1 and 2, skipping the ``"Max WS"`` entry at index 0). """ ds = self._hdf.get(_POSTPROC_PROFILE_DATES) if ds is None or len(ds) < 3: return None t1 = parse_hec_datetime(ds[1].decode(), fmt=_POSTPROC_TS_FMT) t2 = parse_hec_datetime(ds[2].decode(), fmt=_POSTPROC_TS_FMT) return t2 - t1 @property def instantaneous_timestamps(self) -> pd.DatetimeIndex: """Instantaneous profile output timestamps as a ``pd.DatetimeIndex``. Parsed from ``Profile Dates`` in the Post Process Profiles group, skipping the first entry (``"Max WS"``). Format: ``"DDMONYYYY HHMM"`` (e.g. ``"01JAN2026 0002"``). Use ``cross_sections_instantaneous[xs].wse[1:]`` (or ``[1:n+1]``) to align result arrays with this index; index ``0`` of the result arrays holds the Max WS profile value. Raises ------ KeyError If the Post Process Profiles group is absent from this HDF file. """ ds = self._hdf.get(_POSTPROC_PROFILE_DATES) if ds is None: raise KeyError( f"Profile Dates not found at '{_POSTPROC_PROFILE_DATES}'. " "Ensure this plan has Post Process steady results." ) raw = np.array(ds[1:]).astype(str) return _parse_hec_ts_array(raw, _POSTPROC_TS_FMT) @property def n_mapping_timestamps(self) -> int | None: """Number of mapping output time steps, or ``None`` for steady-flow plans. Reads only the dataset shape — no timestamp data is loaded. """ ds = self._hdf.get(_TIME_STAMP_DS) return None if ds is None else len(ds) @property def n_output_timestamps(self) -> int | None: """Number of DSS hydrograph output time steps, or ``None`` if absent. Reads only the dataset shape — no timestamp data is loaded. """ ds = self._hdf.get(_DSS_TIME_STAMP_DS) return None if ds is None else len(ds) @property def n_instantaneous_timestamps(self) -> int | None: """Number of instantaneous profile time steps, or ``None`` if absent. Reads only the dataset shape — no timestamp data is loaded. The ``"Max WS"`` entry at index 0 is excluded from the count. """ ds = self._hdf.get(_POSTPROC_PROFILE_DATES) return None if ds is None else max(0, len(ds) - 1) # ------------------------------------------------------------------ # Collections (override Geometry equivalents with results-aware types) # ------------------------------------------------------------------ @property def flow_areas(self) -> FlowAreaResultsCollection: """Access 2-D flow areas with both geometry and results data.""" if self._plan_flow_areas is None: self._plan_flow_areas = FlowAreaResultsCollection(self._hdf) return self._plan_flow_areas @property def storage_areas(self) -> StorageAreaResultsCollection: """Access storage areas with both geometry and plan results data.""" if self._plan_storage_areas is None: self._plan_storage_areas = StorageAreaResultsCollection(self._hdf) return self._plan_storage_areas @property def structures(self) -> StructureResultsCollection: """Access all structures with geometry *and* plan results. Returns a :class:`StructureResultsCollection` where each item is upgraded to the matching results class when plan output is present: * :class:`SA2DConnectionResults` — SA/2D connections * :class:`InlineResults` — inline structures * :class:`LateralResults` — lateral structures * :class:`BridgeResults` — bridge structures Use :attr:`~rivia.hdf.StructureCollection.connections`, :attr:`~rivia.hdf.StructureCollection.inlines`, :attr:`~rivia.hdf.StructureCollection.laterals`, and :attr:`~rivia.hdf.StructureCollection.bridges` for filtered access. """ if self._plan_structures is None: self._plan_structures = StructureResultsCollection(self._hdf) return self._plan_structures @property def cross_sections(self) -> CrossSectionResultsCollection: """1-D cross sections with geometry and Base Output results. Results are at the mapping output interval (:attr:`mapping_timestamps`). All variables are available: ``water_surface``, ``flow``, ``flow_lateral``, ``velocity_channel``, ``velocity_total``, ``flow_volume_cumulative``. See also :attr:`cross_sections_output` for the DSS hydrograph interval (fewer variables but finer timestep when DSS output was enabled). """ if self._plan_cross_sections is None: self._plan_cross_sections = CrossSectionResultsCollection( self._hdf, _TS_XS, result_cls=CrossSectionResults, ) return self._plan_cross_sections @property def cross_sections_output(self) -> CrossSectionResultsCollection: """1-D cross sections with geometry and DSS Hydrograph Output results. Items are :class:`CrossSectionResultsDSS` instances. Results are at the hydrograph output interval (:attr:`output_timestamps`). Available variables: ``water_surface``, ``flow``, ``flow_volume_cumulative``. """ if self._plan_cross_sections_output is None: self._plan_cross_sections_output = CrossSectionResultsCollection( self._hdf, _DSS_XS, result_cls=CrossSectionResultsDSS, ) return self._plan_cross_sections_output @property def cross_sections_instantaneous(self) -> CrossSectionResultsCollection: """1-D cross sections with geometry and Post Process Profiles results. Items are :class:`CrossSectionResultsInstantaneous` instances. Results are at the instantaneous profile interval (:attr:`instantaneous_timestamps`). Each result array has shape ``(n_profiles,)`` where index ``0`` is the **Max WS** profile and indices ``1:`` are the instantaneous profiles. Available variables: ``water_surface``, ``flow``, ``energy_grade``, plus any ``Additional Variables`` dataset via :meth:`~CrossSectionResultsInstantaneous.additional_variable`. """ if self._plan_cross_sections_instantaneous is None: self._plan_cross_sections_instantaneous = CrossSectionResultsCollection( self._hdf, _POSTPROC_XS, result_cls=CrossSectionResultsInstantaneous, attrs_path=_POSTPROC_GEOM_ATTRS, ) return self._plan_cross_sections_instantaneous @property def sa2d_connections(self) -> dict[str, SA2DConnection]: """SA/2D hydraulic connections keyed by geometry name. Convenience alias for ``hdf.structures.connections``. Items are :class:`SA2DConnectionResults` when plan output is present, plain :class:`SA2DConnection` otherwise. """ return self.structures.connections