Source code for rivia.hdf.geometry

"""Geometry — read HEC-RAS geometry HDF5 files (.g*.hdf).

Provides structured access to 2-D flow-area mesh data:
cell centres, face connectivity, hydraulic property tables, etc.

Also provides access to storage areas and boundary condition lines.

"""

from __future__ import annotations

import datetime as dt
import logging
from collections.abc import Iterator, Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Generic, TypeVar, overload

import numpy as np
import pandas as pd

from rivia.utils import parse_hec_datetime

from ._base import _HdfFile, _RAS_TS_FMT

if TYPE_CHECKING:
    import h5py

logger = logging.getLogger("rivia.hdf")

# ---------------------------------------------------------------------------
# HDF path constants
# ---------------------------------------------------------------------------
_GEOM_ROOT = "Geometry"
_GEOM_2D_ROOT = "Geometry/2D Flow Areas"
_GEOM_2D_ATTRS = f"{_GEOM_2D_ROOT}/Attributes"
_SA_ROOT = "Geometry/Storage Areas"
_BC_ROOT = "Geometry/Boundary Condition Lines"
_STRUCT_ROOT = "Geometry/Structures"
_XS_ROOT = "Geometry/Cross Sections"
_RIVER_CL_ROOT = "Geometry/River Centerlines"
_RIVER_BANK_ROOT = "Geometry/River Bank Lines"
_RIVER_EDGE_ROOT = "Geometry/River Edge Lines"


# ---------------------------------------------------------------------------
# Private geometry utilities
# ---------------------------------------------------------------------------


def _point_in_polygon(px: float, py: float, polygon: np.ndarray) -> bool:
    """Ray-casting point-in-polygon test for a simple polygon.

    Parameters
    ----------
    px, py:
        Query point coordinates.
    polygon:
        Ordered vertices, shape ``(n, 2)``.
    """
    n = len(polygon)
    inside = False
    j = n - 1
    for i in range(n):
        xi, yi = float(polygon[i, 0]), float(polygon[i, 1])
        xj, yj = float(polygon[j, 0]), float(polygon[j, 1])
        if ((yi > py) != (yj > py)) and px < (xj - xi) * (py - yi) / (yj - yi) + xi:
            inside = not inside
        j = i
    return inside


# ---------------------------------------------------------------------------
# FlowArea — geometry for a single 2-D flow area
# ---------------------------------------------------------------------------


[docs] class FlowArea: """Geometry data for one named 2-D flow area. All properties are loaded eagerly on first access and cached; they are small static arrays compared with the time-series results. Parameters ---------- group: The ``h5py.Group`` at ``Geometry/2D Flow Areas/<name>``. name: Human-readable name of the flow area. n_cells: Number of *real* computational cells (ghost cells excluded). """ def __init__(self, group: "h5py.Group", name: str, n_cells: int) -> None: self._g = group self._name = name self._n_cells = n_cells # lazy-loaded cache self._cache: dict[str, np.ndarray] = {} # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _load(self, key: str) -> np.ndarray: if key not in self._cache: self._cache[key] = np.array(self._g[key]) return self._cache[key] # ------------------------------------------------------------------ # Basic info # ------------------------------------------------------------------ @property def name(self) -> str: """Name of the 2-D flow area.""" return self._name @property def n_cells(self) -> int: """Number of real computational cells (ghost cells excluded).""" return self._n_cells @property def n_faces(self) -> int: """Total number of cell faces.""" return self.face_normals.shape[0] # ------------------------------------------------------------------ # Cell geometry # ------------------------------------------------------------------ @property def cell_centers(self) -> np.ndarray: """x, y coordinates of real cell centres. Shape ``(n_cells, 2)``.""" return self._load("Cells Center Coordinate")[: self._n_cells] @property def ghost_cell_centers(self) -> np.ndarray: """x, y coordinates of ghost (boundary) cell centres. Ghost cells occupy indices ``n_cells .. n_total-1`` in the HDF datasets. They hold boundary-condition data and are needed when reconstructing WSE or velocity at perimeter faces. Returns ------- ndarray, shape ``(n_ghost, 2)`` May be empty (shape ``(0, 2)``) if the HDF stores no ghost rows. """ return self._load("Cells Center Coordinate")[self._n_cells :] @property def cell_min_elevation(self) -> np.ndarray: """Minimum bed elevation per cell. Shape ``(n_cells,)``. Real cells only (ghost rows excluded). See :attr:`_cell_min_elevation` for the full array. """ return self._load("Cells Minimum Elevation")[: self._n_cells] @property def _cell_min_elevation(self) -> np.ndarray: """Minimum bed elevation including ghost cell rows. Shape ``(n_cells + n_ghost,)``. Ghost rows contain ``NaN`` — the HDF5 dataset fill value (IEEE 754 float32 ``0x FFC00000``). HEC-RAS leaves ghost rows unwritten so they take the dataset fill value. Note that not all cell arrays behave this way: ``Cells Center Manning's n`` ghost rows hold real Manning's n values copied from adjacent real cells, and ``Cells Surface Area`` ghost rows are ``0.0``. Use this instead of :attr:`cell_min_elevation` when indexing with raw ``face_cell_indexes`` values, which contain ghost cell indices on the cellB side of perimeter faces. """ return self._load("Cells Minimum Elevation") @property def cell_mannings_n(self) -> np.ndarray: """Manning's n at each cell centre. Shape ``(n_cells,)``.""" return self._load("Cells Center Manning's n")[: self._n_cells] @property def cell_surface_area(self) -> np.ndarray: """Plan-view surface area per cell. Shape ``(n_cells,)``.""" return self._load("Cells Surface Area")[: self._n_cells] @property def cell_volume_elevation(self) -> tuple[np.ndarray, np.ndarray]: """Volume-elevation table index and values. Returns ------- info : ndarray, shape ``(n_cells, 2)`` Columns: ``[start_index, count]`` into *values*. values : ndarray, shape ``(total, 2)`` Columns: ``[elevation, volume]``. """ return ( self._load("Cells Volume Elevation Info")[: self._n_cells], self._load("Cells Volume Elevation Values"), ) @property def cell_face_info(self) -> tuple[np.ndarray, np.ndarray]: """Cell-to-face connectivity index and values. Returns ------- info : ndarray, shape ``(n_cells, 2)`` Columns: ``[start_index, count]`` into *values*. values : ndarray, shape ``(total, 2)`` Columns: ``[face_index, orientation]``. The orientation flag identifies which role this cell plays for that face, using the RasMapper ``Face`` object convention (``Face.cs``): every face stores ``cellA`` and ``cellB`` (columns 0 and 1 of :attr:`face_cell_indexes`) and ``fpA`` and ``fpB`` (columns 0 and 1 of :attr:`face_facepoint_indexes`). RasMapper guarantees the following **CCW traversal invariant**: * Walking CCW around **cellA**, the face edge runs ``fpA → fpB`` (fpA is the entry facepoint, fpB is the exit facepoint). * Walking CCW around **cellB**, the edge runs ``fpB → fpA`` (fpB is the entry facepoint, fpA is the exit facepoint). The orientation flag encodes which role *this* cell plays: * ``+1`` — this cell is ``cellA`` for that face. The stored face normal points **outward** (away from this cell toward ``cellB``). The entry facepoint for CCW traversal is ``fpA`` (column 0 of ``face_facepoint_indexes``). * ``-1`` — this cell is ``cellB`` for that face. The stored face normal points **inward** (toward this cell from ``cellA``). The entry facepoint for CCW traversal is ``fpB`` (column 1 of ``face_facepoint_indexes``). To pick the CCW-entry facepoint for a face given its orientation flag *ori*:: fp = face_facepoint_indexes[fi, 0] if ori > 0 \ else face_facepoint_indexes[fi, 1] This is equivalent to ``Face.GetFPPrev(cellIdx)`` in ``RasMapperLib/RasMapperLib.Mesh/Face.cs``. Example ------- Given:: info = np.array([[0, 3], [3, 4]]) values = np.array([ [10, +1], [11, -1], [12, +1], [20, +1], [21, +1], [22, -1], [23, +1], ]) Cell ``0`` uses ``values[0:3]`` (faces ``10, 11, 12``), and cell ``1`` uses ``values[3:7]`` (faces ``20, 21, 22, 23``). """ return ( self._load("Cells Face and Orientation Info"), self._load("Cells Face and Orientation Values"), ) @property def cell_facepoint_indexes(self) -> np.ndarray: """Corner facepoint indices per cell in polygon order. Shape ``(n_cells, 8)``. Each row contains up to 8 facepoint indices (into :attr:`facepoint_coordinates`) in polygon-traversal order; unused slots are padded with ``-1``. Example ------- A triangular cell and a quadrilateral cell:: cell_facepoint_indexes = np.array([ [ 5, 12, 18, -1, -1, -1, -1, -1], # triangle [ 0, 3, 7, 11, -1, -1, -1, -1], # quad ]) """ return self._load("Cells FacePoint Indexes")[: self._n_cells] @property def cell_polygons(self) -> list[np.ndarray]: """Polygon vertices for every cell in counter-clockwise order. Returns a list of length ``n_cells``. Each element is an ``ndarray`` of shape ``(n_vertices, 2)`` containing the ``(x, y)`` coordinates of the cell polygon in **counter-clockwise** winding (GeoJSON / OGC / Shapely exterior-ring convention). For cells that border curved faces the interior perimeter points from :attr:`face_perimeter` are inserted between the corner facepoints, giving an accurate boundary representation. Cells whose face adjacency graph cannot be traversed as a closed cycle emit an empty ``(0, 2)`` array. Computed once and cached. Examples -------- Create Shapely geometries:: from shapely.geometry import Polygon polys = [Polygon(pts) for pts in fa.cell_polygons] Create a GeoDataFrame (requires geopandas):: import geopandas as gpd from shapely.geometry import Polygon gdf = gpd.GeoDataFrame( geometry=[Polygon(pts) for pts in fa.cell_polygons], crs="EPSG:…", ) """ cache_key = "_cell_polygons" if cache_key in self._cache: return self._cache[cache_key] from collections import defaultdict fp_idx = self.face_facepoint_indexes # (n_faces, 2) fp_coords = self.facepoint_coordinates # (n_facepoints, 2) peri_info, peri_vals = self.face_perimeter cell_fp_idx = self.cell_facepoint_indexes # (n_cells, 8) cfi, cfv = self.cell_face_info n = self._n_cells # Cells that border at least one curved face need the slow path so # that interior perimeter points are inserted correctly. curved_face_idxs = np.where(peri_info[:, 1] > 0)[0] if len(curved_face_idxs) > 0: face_cell_idx = self.face_cell_indexes curved_cells: set[int] = set( face_cell_idx[curved_face_idxs].flatten().tolist() ) curved_cells.discard(-1) else: curved_cells = set() polygons: list[np.ndarray] = [np.empty((0, 2), dtype=np.float64) for x in range(n)] for c in range(n): if c not in curved_cells: # Fast path: polygon-order corners are already in HDF. corners = cell_fp_idx[c] valid = corners[corners >= 0] poly: np.ndarray = fp_coords[valid].copy() else: # Slow path: adjacency graph walk, inserting interior points. start = int(cfi[c, 0]) count = int(cfi[c, 1]) face_idxs = cfv[start : start + count, 0] adj: dict[int, list[int]] = defaultdict(list) edge_to_face: dict[tuple[int, int], int] = {} for f in face_idxs: f = int(f) fp0 = int(fp_idx[f, 0]) fp1 = int(fp_idx[f, 1]) adj[fp0].append(fp1) adj[fp1].append(fp0) edge_to_face[(fp0, fp1)] = f edge_to_face[(fp1, fp0)] = f all_pts: list[np.ndarray] = [] fps_list = list(adj.keys()) current = fps_list[0] prev_fp = None for _ in range(count): all_pts.append(fp_coords[current]) nb = adj[current] nxt = nb[1] if nb[0] == prev_fp else nb[0] face_i = edge_to_face.get((current, nxt)) if face_i is not None: pstart = int(peri_info[face_i, 0]) n_int = int(peri_info[face_i, 1]) if n_int > 0: pts = peri_vals[pstart : pstart + n_int] if current != int(fp_idx[face_i, 0]): pts = pts[::-1] all_pts.extend(pts) prev_fp = current current = nxt if current != fps_list[0]: # cycle did not close — malformed continue poly = np.array(all_pts) if len(poly) < 3: continue # Enforce CCW winding (shoelace signed area: positive → CCW). x, y = poly[:, 0], poly[:, 1] signed_area = float( np.dot(x, np.roll(y, -1)) - np.dot(np.roll(x, -1), y) ) if signed_area < 0: poly = poly[::-1] polygons[c] = poly self._cache[cache_key] = polygons return polygons # ------------------------------------------------------------------ # Face geometry # ------------------------------------------------------------------ @property def face_normals(self) -> np.ndarray: """Face unit normal vectors and plan-view lengths. Shape ``(n_faces, 3)``. Columns: ``[nx, ny, length]``. The normal points from the *left* cell to the *right* cell as defined by :attr:`face_cell_indexes`. Example ------- Given:: face_normals = np.array([ [1.0, 0.0, 12.5], [0.0, -1.0, 8.0], ]) Face ``0`` has a unit normal pointing in ``+nx`` with length ``12.5``. Face ``1`` has a unit normal pointing in ``-ny`` with length ``8.0``. """ return self._load("Faces NormalUnitVector and Length") @property def face_cell_indexes(self) -> np.ndarray: """Left and right cell index for each face. Shape ``(n_faces, 2)``. A value of ``-1`` indicates a boundary face with no neighbour on that side. Example ------- Given:: face_cell_indexes = np.array([ [0, 1], [1, 2], [2, -1], [-1, 0], ]) Face ``0`` is interior between cells ``0`` (left) and ``1`` (right). Face ``2`` is a boundary face for cell ``2`` on the left side, and face ``3`` is a boundary face for cell ``0`` on the right side. """ return self._load("Faces Cell Indexes") @property def face_min_elevation(self) -> np.ndarray: """Minimum bed elevation at each face centroid. Shape ``(n_faces,)``.""" return self._load("Faces Minimum Elevation") @property def face_invert_station(self) -> np.ndarray: """Station of the invert centroid of each face. HEC-RAS stores the station distance (from one end of the face) to the centroid of the bottom 5% of the face cross-sectional area — the invert region used internally for flow calculations. Shape ``(n_faces,)``, dtype float32. """ return self._load("Faces Low Elevation Centroid") @property def face_invert_coordinates(self) -> np.ndarray: """(x, y) coordinates of the invert point on each face. Interpolates along each face polyline at the station distance stored in :attr:`face_invert_station` to produce a projected ``(x, y)`` position. Handles both straight faces (two endpoints) and curved faces (with interior perimeter points) correctly. Shape ``(n_faces, 2)``. """ cache_key = "_face_invert_coordinates" if cache_key in self._cache: return self._cache[cache_key] stations = self.face_invert_station # (n_faces,) polylines = self.face_polylines # list of (n_pts, 2) n_faces = len(polylines) coords = np.empty((n_faces, 2), dtype=float) for fi, poly in enumerate(polylines): s = float(stations[fi]) seg_vecs = np.diff(poly, axis=0) # (n_segs, 2) seg_lens = np.linalg.norm(seg_vecs, axis=1) # (n_segs,) cum = np.concatenate([[0.0], np.cumsum(seg_lens)]) # Clamp station to [0, total_length] to guard against float32 overshoot s = float(np.clip(s, 0.0, cum[-1])) seg = int(np.searchsorted(cum, s, side="right")) - 1 seg = min(seg, len(seg_lens) - 1) t = (s - cum[seg]) / seg_lens[seg] if seg_lens[seg] > 0 else 0.0 coords[fi] = poly[seg] + t * seg_vecs[seg] self._cache[cache_key] = coords return coords @property def face_normal_intercept(self) -> np.ndarray: """(x, y) coordinate on each face that lies on the cell-centre connecting line. For each interior face (both neighbours present), this is the point where the straight line through the two adjacent cell centres crosses the face polyline. HEC-RAS uses this intersection implicitly when computing face-normal hydraulic gradients and finite-difference slopes. For boundary faces (one neighbour absent), the face centroid is returned as a fallback. Shape ``(n_faces, 2)``. Computed once and cached. Notes ----- Parametric 2D line–segment intersection: given the infinite line ``P = cell_L + t * (cell_R - cell_L)`` and face segment ``Q = A + s * (B - A)`` the system is solved for *s* (segment parameter, must satisfy ``0 ≤ s ≤ 1``) and the hit point is ``A + s * (B - A)``. For curved faces all segments are tested and the first hit is used. """ cache_key = "_face_normal_intercept" if cache_key in self._cache: return self._cache[cache_key] cc = self.cell_centers # (n_cells, 2) fci = self.face_cell_indexes # (n_faces, 2) polylines = self.face_polylines # list[(n_pts, 2)] centroids = self.face_centroids # (n_faces, 2) — boundary fallback n_cells = len(cc) result = centroids.copy() for fi, poly in enumerate(polylines): left = int(fci[fi, 0]) right = int(fci[fi, 1]) if left < 0 or left >= n_cells or right < 0 or right >= n_cells: continue # boundary face — centroid already set d = cc[right] - cc[left] # direction of cell-to-cell line for k in range(len(poly) - 1): a = poly[k] e = poly[k + 1] - a # face-segment direction # Solve P1 + t*d = A + s*e → det, s via Cramer's rule det = e[0] * d[1] - d[0] * e[1] if abs(det) < 1e-12: continue # parallel — try next segment r = a - cc[left] s = (d[0] * r[1] - r[0] * d[1]) / det if -1e-9 <= s <= 1.0 + 1e-9: s = float(np.clip(s, 0.0, 1.0)) result[fi] = a + s * e break # If no segment intersected (degenerate geometry), centroid stays. self._cache[cache_key] = result return result @property def face_facepoint_indexes(self) -> np.ndarray: """Start and end face-point indices for each face. Shape ``(n_faces, 2)``. Example ------- Given:: face_facepoint_indexes = np.array([ [12, 18], [18, 24], [24, 12], ]) Face ``0`` connects facepoints ``12`` and ``18``; face ``1`` connects ``18`` and ``24``; face ``2`` connects ``24`` and ``12``. """ return self._load("Faces FacePoint Indexes") @property def face_perimeter(self) -> tuple[np.ndarray, np.ndarray]: """Interior perimeter points along curved faces. Most faces are straight lines between their two endpoint facepoints. For curved faces, one or more intermediate ``(x, y)`` coordinate points are stored here. These points are **not** indexed facepoints; they do not appear in :attr:`facepoint_coordinates`. Returns ------- info : ndarray, shape ``(n_faces, 2)`` ``[start_index, count]`` into *values*. ``count == 0`` for straight faces. values : ndarray, shape ``(total_interior_pts, 2)`` ``(x, y)`` coordinates of interior perimeter points in order along the face polyline. """ return ( self._load("Faces Perimeter Info"), self._load("Faces Perimeter Values"), ) @property def face_centroids(self) -> np.ndarray: """Centroid coordinate of each face. Shape ``(n_faces, 2)``. For straight faces (no interior perimeter points) this equals the midpoint of the chord between the two endpoint facepoints. For curved faces this is the **arc midpoint** — the point on the face polyline at exactly half the total arc length — which guarantees the result lies on the face itself. An arc-length-weighted average of segment midpoints is *not* used because it can float off the polyline for non-convex shapes (V-notches, loops). Computed once and cached; subsequent accesses return the same array. """ cache_key = "_face_centroids" if cache_key in self._cache: return self._cache[cache_key] fp_idx = self._load("Faces FacePoint Indexes") # (n_faces, 2) fp_coords = self._load("FacePoints Coordinate") # (n_facepoints, 2) peri_info = self._load("Faces Perimeter Info") # (n_faces, 2) peri_vals = self._load("Faces Perimeter Values") # (total, 2) fp0 = fp_coords[fp_idx[:, 0]] # (n_faces, 2) fp1 = fp_coords[fp_idx[:, 1]] # (n_faces, 2) # Default: straight-face midpoint for all faces. # For a two-point segment the arc midpoint equals the chord midpoint. centroids = 0.5 * (fp0 + fp1) # Override for curved faces (interior perimeter points present). # Use the arc midpoint — the point on the polyline at exactly half the # total arc length. This guarantees the centroid lies ON the face # polyline, unlike an arc-length-weighted average which can float off # the curve for non-convex shapes (V-notches, loops, etc.). curved = np.where(peri_info[:, 1] > 0)[0] for fi in curved: start = int(peri_info[fi, 0]) count = int(peri_info[fi, 1]) pts = np.vstack([fp0[fi], peri_vals[start : start + count], fp1[fi]]) seg_len = np.linalg.norm(pts[1:] - pts[:-1], axis=1) total = seg_len.sum() if total == 0: centroids[fi] = pts.mean(axis=0) continue cumlen = np.concatenate([[0.0], np.cumsum(seg_len)]) half = total / 2.0 seg_idx = int(np.searchsorted(cumlen[1:], half, side="right")) seg_idx = min(seg_idx, len(seg_len) - 1) t = (half - cumlen[seg_idx]) / seg_len[seg_idx] centroids[fi] = pts[seg_idx] + t * (pts[seg_idx + 1] - pts[seg_idx]) self._cache[cache_key] = centroids return centroids @property def facepoint_coordinates(self) -> np.ndarray: """x, y coordinates of all face endpoints. Shape ``(n_facepoints, 2)``.""" return self._load("FacePoints Coordinate") @property def face_area_elevation(self) -> tuple[np.ndarray, np.ndarray]: """Hydraulic property table for each face. Returns ------- info : ndarray, shape ``(n_faces, 2)`` Columns: ``[start_index, count]`` into *values*. values : ndarray, shape ``(total, 4)`` Columns: ``[elevation, flow_area, wetted_perimeter, mannings_n]``. Example ------- Given:: info = np.array([[0, 2], [2, 3]]) values = np.array([ [100.0, 0.0, 10.0, 0.030], [101.0, 5.0, 11.0, 0.030], [100.0, 0.0, 12.0, 0.035], [101.0, 6.0, 13.0, 0.035], [102.0, 9.0, 14.0, 0.035], ]) Face ``0`` uses ``values[0:2]`` and face ``1`` uses ``values[2:5]``. For face ``0`` specifically: ``[100.0, 0.0, 10.0, 0.030]`` means at elevation ``100.0`` the flow area is ``0.0`` (dry threshold), wetted perimeter is ``10.0``, and Manning's ``n`` is ``0.030``; ``[101.0, 5.0, 11.0, 0.030]`` is the next stage row with larger flow area/perimeter at elevation ``101.0``. """ return ( self._load("Faces Area Elevation Info"), self._load("Faces Area Elevation Values"), )
[docs] def face_elevation_area(self, face_index: int) -> tuple[np.ndarray, np.ndarray]: """Elevation and flow-area arrays for a single face. Parameters ---------- face_index : int Zero-based face index. Returns ------- elevation : ndarray, shape ``(n_rows,)`` area : ndarray, shape ``(n_rows,)`` """ info, values = self.face_area_elevation start = int(info[face_index, 0]) count = int(info[face_index, 1]) rows = values[start : start + count] return rows[:, 0], rows[:, 1]
# ------------------------------------------------------------------ # Perimeter # ------------------------------------------------------------------ @property def perimeter(self) -> np.ndarray: """Boundary polygon of the 2-D flow area. Shape ``(n_pts, 2)``.""" return self._load("Perimeter")
[docs] def check_cells( self, *, check_boundary: bool = True, tol: float = 1e-10, ) -> dict: """Check all mesh cells against HEC-RAS geometric validity rules. Calls :func:`rivia.geo.mesh_validation.check_mesh_cells` with the HDF arrays for this flow area. Parameters ---------- check_boundary : When ``True`` (default) each cell centre is also tested against the 2D flow area perimeter polygon (rule 5). tol : Absolute tolerance for near-collinear edge detection. Returns ------- dict Full validation report. Pass to :func:`rivia.geo.mesh_validation.print_mesh_report` for a human-readable summary. """ from rivia.geo.mesh_validation import check_mesh_cells cfi, cfv = self.cell_face_info peri_info, peri_vals = self.face_perimeter return check_mesh_cells( cell_centers=self.cell_centers, facepoint_coordinates=self.facepoint_coordinates, face_facepoint_indexes=self.face_facepoint_indexes, cell_face_info=cfi, cell_face_values=cfv, face_perimeter_info=peri_info, face_perimeter_values=peri_vals, boundary_polygon=self.perimeter if check_boundary else None, tol=tol, )
@property def facepoint_face_orientation_info(self) -> np.ndarray: """CSR start/count index into :attr:`facepoint_face_orientation_values`. Shape ``(n_facepoints, 2)``, dtype ``int32``. Each row is ``[start, count]`` — a slice into :attr:`facepoint_face_orientation_values` for that facepoint. count = number of faces adjacent to that facepoint. HDF source: ``FacePoints Face and Orientation Info``. Examples -------- Using the 5-facepoint mesh from :attr:`facepoint_face_orientation` (fp0 at centre with 4 adjacent faces; fp1–fp4 each with 1):: facepoint_face_orientation_info = np.array([ [0, 4], # fp0: 4 entries starting at slot 0 [4, 1], # fp1: 1 entry starting at slot 4 [5, 1], # fp2: 1 entry starting at slot 5 [6, 1], # fp3: 1 entry starting at slot 6 [7, 1], # fp4: 1 entry starting at slot 7 ], dtype=np.int32) To get the entries for fp0:: start, count = info[0] # 0, 4 fp0_entries = values[start : start + count] # slots 0-3 """ return self._load("FacePoints Face and Orientation Info").astype(np.int32) @property def facepoint_face_orientation_values(self) -> np.ndarray: """Face index and orientation flag for each facepoint→face entry. Shape ``(total_entries, 2)``, dtype ``int32``. Each row is ``[face_idx, orientation]`` where ``orientation = -1`` means this facepoint is ``fpA`` (first endpoint) of that face and ``orientation = +1`` means it is ``fpB`` (second endpoint). Matches the values written by RasMapper (``MeshFV2D.cs``: fpA → ``-1``, fpB → ``+1``). Use :attr:`facepoint_face_orientation_info` to slice per facepoint. HDF source: ``FacePoints Face and Orientation Values``. Examples -------- Using the 5-facepoint mesh from :attr:`facepoint_face_orientation`:: facepoint_face_orientation_values = np.array([ [1, -1], # slot 0 — face 1, fp0 is fpA [3, -1], # slot 1 — face 3, fp0 is fpA [0, -1], # slot 2 — face 0, fp0 is fpA [2, -1], # slot 3 — face 2, fp0 is fpA [3, 1], # slot 4 — face 3, fp1 is fpB [0, 1], # slot 5 — face 0, fp2 is fpB [2, 1], # slot 6 — face 2, fp3 is fpB [1, 1], # slot 7 — face 1, fp4 is fpB ], dtype=np.int32) Note that the face indices for fp0 (slots 0–3) are ``[1, 3, 0, 2]``, not ``[0, 1, 2, 3]`` — they are in counter-clockwise angular order, not creation order. """ return self._load("FacePoints Face and Orientation Values").astype(np.int32) # ------------------------------------------------------------------ # Derived topology and geometry helpers (computed from HDF arrays) # ------------------------------------------------------------------ @property def facepoint_face_orientation(self) -> tuple[np.ndarray, np.ndarray]: """Angle-sorted facepoint→face mapping with orientation flags. Returns ``(fp_face_info, fp_face_values)`` where: * ``fp_face_info``: shape ``(n_facepoints, 2)``, dtype ``int32``. Each row is ``[start, count]`` — a slice into ``fp_face_values`` for that facepoint. * ``fp_face_values``: shape ``(total_entries, 2)``, dtype ``int32``. Each row is ``[face_idx, orientation]`` where ``orientation = 0`` means this facepoint is ``fpA`` (first endpoint) of that face and ``orientation = 1`` means it is ``fpB`` (second endpoint). Entries for each facepoint are sorted in **counter-clockwise** (ascending bearing) angular order around the facepoint coordinate. Bearing is measured from the positive x-axis using ``atan2(dy, dx)``, mapped to ``[0, 2π)`` in RasMapper (``SegmentM.Bearing()``) and ``[-π, π)`` in the Python re-implementation (``np.arctan2``); both produce the same relative ordering. This ordering is required for correct arc traversal in the RASMapper vertex-velocity algorithm (Step 3, ``MeshFV2D.cs``). Uses :attr:`facepoint_face_orientation_info` and :attr:`facepoint_face_orientation_values` when present (production HDF files always include them). Falls back to building the CSR arrays from :attr:`face_facepoint_indexes` for older or minimal test fixtures. The angle sort is always applied. Computed once and cached. Examples -------- Terminology used in this example: * **fp0 … fp4** — facepoint indices (mesh vertices, integer row numbers in :attr:`facepoint_coordinates`). * **fpA / fpB** — the two endpoint roles of a face. Every face has exactly two endpoint facepoints stored as ``[fpA_index, fpB_index]`` in :attr:`face_facepoint_indexes`. fpA is the first column (index 0), fpB is the second column (index 1). Five-facepoint mesh with fp0 at the centre and four faces created in arbitrary order:: fp2 (0,1) | face 0 | face 3 | fp3 -----fp0----- fp1 (-1,0) | (1,0) face 2 | face 1 | fp4 (0,-1) # face_facepoint_indexes — each face stores [fpA_index, fpB_index] face 0: fpA=fp0, fpB=fp2 (North) face 1: fpA=fp0, fpB=fp4 (South) face 2: fpA=fp0, fpB=fp3 (West) face 3: fpA=fp0, fpB=fp1 (East) Bearings from fp0 to the far endpoint of each adjacent face: face 0 → atan2(1, 0) = +π/2 (North) face 1 → atan2(-1, 0) = -π/2 (South) face 2 → atan2(0, -1) = ±π (West) face 3 → atan2(0, 1) = 0 (East) After sorting ascending (counter-clockwise: S → E → N → W): .. code-block:: text fp_face_info fp0: [0, 4] # 4 entries starting at slot 0 fp1: [4, 1] fp2: [5, 1] fp3: [6, 1] fp4: [7, 1] fp_face_values # [face_idx, orientation] (-1=fpA, +1=fpB) slot 0: [1, -1] ← face 1 (South), fp0 is fpA slot 1: [3, -1] ← face 3 (East), fp0 is fpA slot 2: [0, -1] ← face 0 (North), fp0 is fpA slot 3: [2, -1] ← face 2 (West), fp0 is fpA slot 4: [3, 1] ← face 3, fp1 is fpB slot 5: [0, 1] ← face 0, fp2 is fpB slot 6: [2, 1] ← face 2, fp3 is fpB slot 7: [1, 1] ← face 1, fp4 is fpB fp0's faces in angular order: ``fp_face_values[0:4, 0]`` → ``[1, 3, 0, 2]``, not ``[0, 1, 2, 3]``. Notes ----- Derived from ``_build_fp_face_connectivity`` and ``_sort_fp_faces_by_angle`` in ``RasMapperLib/MeshFV2D.cs`` (HEC-RAS 6.6). """ cache_key = "_facepoint_face_orientation" if cache_key in self._cache: return self._cache[cache_key] # type: ignore[return-value] face_fps = self.face_facepoint_indexes # (n_faces, 2) fp_coords = self.facepoint_coordinates # (n_fp, 2) n_fp = len(fp_coords) n_faces = len(face_fps) # Production geometry HDF files always contain these datasets. # Older or minimal test fixtures may not; fall back to building # the CSR arrays from face_facepoint_indexes in that case. if ("FacePoints Face and Orientation Info" in self._g and "FacePoints Face and Orientation Values" in self._g): fp_info = self.facepoint_face_orientation_info.copy() fp_vals = self.facepoint_face_orientation_values.copy() else: # Build CSR-style arrays from face_facepoint_indexes. # orientation -1 = fpA (first endpoint), +1 = fpB (second endpoint) # matching the values written by RasMapper (MeshFV2D.cs). fp_counts = np.zeros(n_fp, dtype=np.int32) for fi in range(n_faces): fp_counts[int(face_fps[fi, 0])] += 1 fp_counts[int(face_fps[fi, 1])] += 1 fp_info = np.zeros((n_fp, 2), dtype=np.int32) offset = 0 for i in range(n_fp): fp_info[i, 0] = offset fp_info[i, 1] = fp_counts[i] offset += fp_counts[i] fp_vals = np.zeros((offset, 2), dtype=np.int32) current = np.zeros(n_fp, dtype=np.int32) for fi in range(n_faces): fpA = int(face_fps[fi, 0]) fpB = int(face_fps[fi, 1]) pos_a = fp_info[fpA, 0] + current[fpA] fp_vals[pos_a, 0] = fi fp_vals[pos_a, 1] = -1 # fpA side current[fpA] += 1 pos_b = fp_info[fpB, 0] + current[fpB] fp_vals[pos_b, 0] = fi fp_vals[pos_b, 1] = 1 # fpB side current[fpB] += 1 # --- Angle-sort entries for each facepoint ------------------------- # Required so that arc traversal in vertex-velocity computation # visits faces in consistent counter-clockwise angular order. for fp in range(n_fp): start = int(fp_info[fp, 0]) count = int(fp_info[fp, 1]) if count <= 1: continue fp_x, fp_y = float(fp_coords[fp, 0]), float(fp_coords[fp, 1]) angles = np.empty(count, dtype=np.float64) for j in range(count): fi = int(fp_vals[start + j, 0]) fpA_idx = int(face_fps[fi, 0]) fpB_idx = int(face_fps[fi, 1]) other = fpB_idx if fpA_idx == fp else fpA_idx ox = float(fp_coords[other, 0]) oy = float(fp_coords[other, 1]) angles[j] = np.arctan2(oy - fp_y, ox - fp_x) order = np.argsort(angles) temp = fp_vals[start : start + count].copy() for j in range(count): fp_vals[start + j] = temp[order[j]] self._cache[cache_key] = (fp_info, fp_vals) # type: ignore[assignment] return fp_info, fp_vals @property def facepoint_to_faces(self) -> list[np.ndarray]: """Face indices adjacent to each facepoint. Returns a list of length ``n_facepoints``. Element ``i`` is an ``int64`` array of face indices whose endpoints include facepoint ``i``. Most interior facepoints are shared by exactly two faces; boundary facepoints may appear in more. Computed once and cached. """ cache_key = "_facepoint_to_faces" if cache_key in self._cache: return self._cache[cache_key] # type: ignore[return-value] fp_idx = self.face_facepoint_indexes # (n_faces, 2) n_fp = len(self.facepoint_coordinates) n_faces = len(fp_idx) buckets: list[list[int]] = [[] for _ in range(n_fp)] for fi in range(n_faces): buckets[int(fp_idx[fi, 0])].append(fi) buckets[int(fp_idx[fi, 1])].append(fi) result = [np.array(b, dtype=np.int64) for b in buckets] self._cache[cache_key] = result # type: ignore[assignment] return result @property def facepoint_to_cells(self) -> list[np.ndarray]: """Real cell indices surrounding each facepoint. Returns a list of length ``n_facepoints``. Element ``i`` is a sorted ``int64`` array of cell indices (``< n_cells``) that include facepoint ``i`` as a polygon corner. Ghost cells and the sentinel value ``-1`` are excluded. Computed once and cached. """ cache_key = "_facepoint_to_cells" if cache_key in self._cache: return self._cache[cache_key] # type: ignore[return-value] fp_to_faces = self.facepoint_to_faces fci = self.face_cell_indexes # (n_faces, 2) n = self._n_cells result: list[np.ndarray] = [] for faces in fp_to_faces: cells: set[int] = set() for fi in faces: c0, c1 = int(fci[fi, 0]), int(fci[fi, 1]) if 0 <= c0 < n: cells.add(c0) if 0 <= c1 < n: cells.add(c1) result.append(np.array(sorted(cells), dtype=np.int64)) self._cache[cache_key] = result # type: ignore[assignment] return result @property def cell_neighbors(self) -> list[np.ndarray]: """Adjacent real cell indices for each cell. Returns a list of length ``n_cells``. Element ``c`` is an ``int64`` array of real cell indices (``< n_cells``) that share a face with cell ``c``. Boundary faces (where the neighbour index is ``-1``) contribute no entry. Order follows the face order in :attr:`cell_face_info`. Computed once and cached. """ cache_key = "_cell_neighbors" if cache_key in self._cache: return self._cache[cache_key] # type: ignore[return-value] cfi, cfv = self.cell_face_info fci = self.face_cell_indexes n = self._n_cells neighbors: list[np.ndarray] = [] for c in range(n): start = int(cfi[c, 0]) count = int(cfi[c, 1]) nb: list[int] = [] for fi in cfv[start : start + count, 0]: c0, c1 = int(fci[fi, 0]), int(fci[fi, 1]) other = c1 if c0 == c else c0 if 0 <= other < n: nb.append(other) neighbors.append(np.array(nb, dtype=np.int64)) self._cache[cache_key] = neighbors # type: ignore[assignment] return neighbors @property def boundary_face_mask(self) -> np.ndarray: """Boolean mask of faces on the 2D flow area perimeter. Shape ``(n_faces,)``. ``True`` for faces where one side has no neighbouring cell (``face_cell_indexes`` value is ``-1``). Computed once and cached. """ cache_key = "_boundary_face_mask" if cache_key in self._cache: return self._cache[cache_key] fci = self.face_cell_indexes mask = (fci[:, 0] < 0) | (fci[:, 1] < 0) self._cache[cache_key] = mask return mask @property def boundary_cell_mask(self) -> np.ndarray: """Boolean mask of cells that touch the flow area perimeter. Shape ``(n_cells,)``. ``True`` for any real cell that has at least one boundary face (see :attr:`boundary_face_mask`). Computed once and cached. """ cache_key = "_boundary_cell_mask" if cache_key in self._cache: return self._cache[cache_key] bfm = self.boundary_face_mask fci = self.face_cell_indexes n = self._n_cells bfi = np.where(bfm)[0] bc = fci[bfi].flatten() bc = bc[(bc >= 0) & (bc < n)] mask = np.zeros(n, dtype=bool) mask[bc] = True self._cache[cache_key] = mask return mask @property def face_polylines(self) -> list[np.ndarray]: """Full vertex sequence for each face. Returns a list of length ``n_faces``. Each element is an ``ndarray`` of shape ``(n_pts, 2)`` in canonical HDF order (facepoint 0 → interior points → facepoint 1). Straight faces yield 2-point arrays; curved faces have 3 or more points. Computed once and cached. """ cache_key = "_face_polylines" if cache_key in self._cache: return self._cache[cache_key] # type: ignore[return-value] fp_idx = self.face_facepoint_indexes # (n_faces, 2) fp_coords = self.facepoint_coordinates # (n_facepoints, 2) peri_info, peri_vals = self.face_perimeter n_faces = len(fp_idx) fp0 = fp_coords[fp_idx[:, 0]] # (n_faces, 2) fp1 = fp_coords[fp_idx[:, 1]] # (n_faces, 2) polylines: list[np.ndarray] = [ np.array([fp0[fi], fp1[fi]]) for fi in range(n_faces) ] for fi in np.where(peri_info[:, 1] > 0)[0]: start = int(peri_info[fi, 0]) n_int = int(peri_info[fi, 1]) polylines[fi] = np.vstack( [fp0[fi], peri_vals[start : start + n_int], fp1[fi]] ) self._cache[cache_key] = polylines # type: ignore[assignment] return polylines @property def face_lengths(self) -> np.ndarray: """Arc length of each face polyline. Shape ``(n_faces,)``. Straight faces: Euclidean chord length (equals column 2 of :attr:`face_normals`). Curved faces: sum of segment lengths along the full polyline from :attr:`face_perimeter`. Computed once and cached. """ cache_key = "_face_lengths" if cache_key in self._cache: return self._cache[cache_key] fp_idx = self.face_facepoint_indexes fp_coords = self.facepoint_coordinates peri_info, peri_vals = self.face_perimeter fp0 = fp_coords[fp_idx[:, 0]] fp1 = fp_coords[fp_idx[:, 1]] lengths = np.linalg.norm(fp1 - fp0, axis=1).copy() for fi in np.where(peri_info[:, 1] > 0)[0]: start = int(peri_info[fi, 0]) n_int = int(peri_info[fi, 1]) pts = np.vstack([fp0[fi], peri_vals[start : start + n_int], fp1[fi]]) lengths[fi] = float(np.linalg.norm(np.diff(pts, axis=0), axis=1).sum()) self._cache[cache_key] = lengths return lengths @property def cell_bbox(self) -> np.ndarray: """Axis-aligned bounding box per cell. Shape ``(n_cells, 4)``. Columns: ``[xmin, ymin, xmax, ymax]``. Derived from the polygon corner facepoints in :attr:`cell_facepoint_indexes`. Interior perimeter points on curved faces are not included; the error is negligible for the near-convex cells HEC-RAS requires. Computed once and cached. """ cache_key = "_cell_bbox" if cache_key in self._cache: return self._cache[cache_key] cell_fp_idx = self.cell_facepoint_indexes # (n_cells, 8) fp_coords = self.facepoint_coordinates # (n_facepoints, 2) valid = cell_fp_idx >= 0 safe = np.where(valid, cell_fp_idx, 0) coords = fp_coords[safe] # (n_cells, 8, 2) cx = np.where(valid, coords[:, :, 0], np.nan) cy = np.where(valid, coords[:, :, 1], np.nan) bbox = np.column_stack([ np.nanmin(cx, axis=1), np.nanmin(cy, axis=1), np.nanmax(cx, axis=1), np.nanmax(cy, axis=1), ]) self._cache[cache_key] = bbox return bbox @property def mesh_bbox(self) -> np.ndarray: """Overall bounding box of the flow area. Shape ``(4,)``. Returns ``[xmin, ymin, xmax, ymax]`` covering all cells. """ b = self.cell_bbox return np.array([b[:, 0].min(), b[:, 1].min(), b[:, 2].max(), b[:, 3].max()]) @property def cell_aspect_ratio(self) -> np.ndarray: """Aspect ratio of each cell. Shape ``(n_cells,)``. Ratio of the longest face to the shortest face (by arc length from :attr:`face_lengths`). Value of ``1.0`` for a regular cell; large values indicate elongated cells. Computed once and cached. """ cache_key = "_cell_aspect_ratio" if cache_key in self._cache: return self._cache[cache_key] face_len = self.face_lengths cfi, cfv = self.cell_face_info n = self._n_cells ratios = np.ones(n) for c in range(n): start = int(cfi[c, 0]) count = int(cfi[c, 1]) lens = face_len[cfv[start : start + count, 0]] mn = float(lens.min()) if mn > 0.0: ratios[c] = float(lens.max()) / mn self._cache[cache_key] = ratios return ratios @property def cell_compactness(self) -> np.ndarray: """Isoperimetric compactness of each cell. Shape ``(n_cells,)``. ``compactness = 4π × area / perimeter²``. A circle scores ``1.0``; elongated or irregular cells score lower. Uses :attr:`cell_surface_area` for area and :attr:`face_lengths` for the perimeter. Computed once and cached. """ cache_key = "_cell_compactness" if cache_key in self._cache: return self._cache[cache_key] area = self.cell_surface_area face_len = self.face_lengths cfi, cfv = self.cell_face_info n = self._n_cells perimeter = np.zeros(n) for c in range(n): start = int(cfi[c, 0]) count = int(cfi[c, 1]) perimeter[c] = face_len[cfv[start : start + count, 0]].sum() with np.errstate(invalid="ignore", divide="ignore"): compactness = np.where( perimeter > 0.0, 4.0 * np.pi * area / perimeter**2, np.nan, ) self._cache[cache_key] = compactness return compactness
[docs] def cells_containing_points(self, xy: np.ndarray) -> np.ndarray: """Return the cell index containing each query point. Parameters ---------- xy : ndarray, shape ``(m, 2)`` or ``(2,)`` Query ``(x, y)`` coordinates. Returns ------- ndarray, shape ``(m,)``, dtype ``int64`` 0-based cell index for each point, or ``-1`` when outside all cells (or the cell polygon is malformed). Notes ----- Uses :attr:`cell_bbox` for a fast bounding-box pre-filter then :attr:`cell_polygons` for exact ray-casting containment. No spatial index is built; for very large query sets consider building an external RTree over :attr:`cell_bbox`. """ xy = np.asarray(xy, dtype=np.float64) scalar = xy.ndim == 1 if scalar: xy = xy[np.newaxis] m = len(xy) bbox = self.cell_bbox # (n_cells, 4) polygons = self.cell_polygons result = np.full(m, -1, dtype=np.int64) for qi in range(m): px, py = float(xy[qi, 0]), float(xy[qi, 1]) candidates = np.where( (px >= bbox[:, 0]) & (px <= bbox[:, 2]) & (py >= bbox[:, 1]) & (py <= bbox[:, 3]) )[0] for ci in candidates: poly = polygons[ci] if len(poly) >= 3 and _point_in_polygon(px, py, poly): result[qi] = int(ci) break return int(result[0]) if scalar else result
# --------------------------------------------------------------------------- # FlowAreaCollection — dict-like access to all flow areas in the HDF file # ---------------------------------------------------------------------------
[docs] class FlowAreaCollection(Mapping[str, "FlowArea"]): """Access all 2-D flow areas stored in an HDF geometry or plan file. Implements the :class:`collections.abc.Mapping` protocol: supports ``len()``, iteration, ``in``, ``[]``, ``.keys()``, ``.values()``, ``.items()``, and ``.get()``. Parameters ---------- hdf: Open ``h5py.File`` handle. """ def __init__(self, hdf: "h5py.File") -> None: self._hdf = hdf self._cache: dict[str, FlowArea] = {} # ------------------------------------------------------------------ # Attributes table # ------------------------------------------------------------------ @property def summary(self) -> pd.DataFrame: """One row per 2-D flow area with geometry attributes. Columns include ``name``, ``cell_count``, and all other fields from the ``Geometry/2D Flow Areas/Attributes`` structured dataset. Raises ------ KeyError If the ``Geometry/2D Flow Areas`` group is absent (e.g. a 1D-only geometry file). """ if _GEOM_2D_ROOT not in self._hdf: raise KeyError( f"No 2D flow areas found in {self._hdf.filename!r}. " "The file may be a 1D-only geometry." ) attrs_ds = self._hdf[_GEOM_2D_ATTRS] data = np.array(attrs_ds) df = pd.DataFrame(data) # Decode byte-string columns for col in df.columns: if df[col].dtype.kind in ("S", "O"): df[col] = df[col].str.decode("utf-8").str.strip() # Normalise the name column to lower-case key 'name' name_col = next((c for c in df.columns if c.lower() == "name"), None) if name_col and name_col != "name": df = df.rename(columns={name_col: "name"}) return df @property def names(self) -> list[str]: """Names of all 2-D flow areas in the file.""" import h5py root = self._hdf.get(_GEOM_2D_ROOT) if root is None: return [] return [ k for k, v in root.items() if isinstance(v, h5py.Group) and k != "Attributes" ] # ------------------------------------------------------------------ # Item access # ------------------------------------------------------------------ def __getitem__(self, name: str) -> FlowArea: if name not in self._cache: root = self._hdf.get(_GEOM_2D_ROOT) if root is None or name not in root: raise KeyError( f"2D flow area {name!r} not found. Available: {self.names}" ) n_cells = self._get_real_cell_count(name) self._cache[name] = FlowArea(root[name], name, n_cells) return self._cache[name] def __contains__(self, name: str) -> bool: return name in self.names def __iter__(self): return iter(self.names) def __len__(self) -> int: return len(self.names) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _get_real_cell_count(self, area_name: str) -> int: """Return the number of real (non-ghost) cells for *area_name*. Reads from ``Geometry/2D Flow Areas/Attributes`` structured array. Falls back to the Water Surface result shape if Attributes lookup fails, then to the coordinate array length as a last resort. TODO: this may be uncessary single the 'Cell Count' in summary/attributes is reliably populated by HEC-RAS. """ attrs_ds = self._hdf.get(_GEOM_2D_ATTRS) if attrs_ds is not None: attrs = np.array(attrs_ds) dtype_names = [f.lower() for f in attrs_ds.dtype.names] name_idx = next((i for i, f in enumerate(dtype_names) if "name" in f), None) count_idx = next( (i for i, f in enumerate(dtype_names) if "cell" in f and "count" in f), None, ) if name_idx is not None and count_idx is not None: field_name = attrs_ds.dtype.names[name_idx] field_count = attrs_ds.dtype.names[count_idx] for row in attrs: row_name = row[field_name] if isinstance(row_name, bytes): row_name = row_name.decode().strip() if row_name == area_name: return int(row[field_count]) # Fallback: coordinate array length (may include ghost cells) coord = self._hdf[f"{_GEOM_2D_ROOT}/{area_name}/Cells Center Coordinate"] return coord.shape[0]
# --------------------------------------------------------------------------- # StorageArea / StorageAreaCollection # --------------------------------------------------------------------------- def _decode(value: bytes | str) -> str: """Decode a byte-string field from an HDF structured array.""" if isinstance(value, bytes): return value.decode("utf-8").strip() return str(value).strip()
[docs] @dataclass class StorageArea: """Geometry for a single HEC-RAS storage area (reservoir, pond, etc.). Attributes ---------- name: Name of the storage area. mode: Storage mode string from HEC-RAS (e.g. ``"Elev Vol RC"`` for an elevation-volume rating curve, or ``"Normal"`` for a flat-pool). boundary: x, y coordinates of the storage area boundary polygon. Shape ``(n_pts, 2)``. volume_elevation: Elevation-volume rating curve. Shape ``(n_pairs, 2)`` with columns ``[elevation, volume]``. Empty array (shape ``(0, 2)``) when the storage area has no rating curve (e.g. flat-pool mode). """ name: str mode: str boundary: np.ndarray # (n_pts, 2) volume_elevation: np.ndarray # (n_pairs, 2): [elevation, volume] @property def elevations(self) -> np.ndarray: """Elevation column of the rating curve. Shape ``(n_pairs,)``.""" return self.volume_elevation[:, 0] @property def volumes(self) -> np.ndarray: """Volume column of the rating curve. Shape ``(n_pairs,)``.""" return self.volume_elevation[:, 1] @property def centroid(self) -> np.ndarray: """Centroid of the boundary polygon, shape ``(2,)``. Computed using the standard shoelace formula so the result is exact for any simple (non-self-intersecting) polygon and is not biased by uneven vertex spacing along the boundary. Returns ------- ndarray, shape ``(2,)`` ``[x, y]`` of the polygon centroid. Raises ------ ValueError If ``boundary`` has fewer than three points. """ pts = self.boundary if len(pts) < 3: raise ValueError( f"Storage area {self.name!r} boundary has fewer than 3 points " f"({len(pts)}); cannot compute centroid." ) x, y = pts[:, 0], pts[:, 1] # Shoelace cross products: A_i = x_i * y_{i+1} - x_{i+1} * y_i cross = x[:-1] * y[1:] - x[1:] * y[:-1] # Close the polygon (last→first edge) cross_close = x[-1] * y[0] - x[0] * y[-1] signed_area = (cross.sum() + cross_close) / 2.0 if signed_area == 0.0: # Degenerate (collinear points): fall back to vertex mean return pts.mean(axis=0) cx = ((x[:-1] + x[1:]) * cross).sum() + (x[-1] + x[0]) * cross_close cy = ((y[:-1] + y[1:]) * cross).sum() + (y[-1] + y[0]) * cross_close factor = 1.0 / (6.0 * signed_area) return np.array([cx * factor, cy * factor])
[docs] def volume_at_elevation(self, wse: float) -> float: """Return interpolated stored volume at *wse*. Uses linear interpolation via ``numpy.interp``. Values outside the rating-curve range are clamped to the curve endpoints. Raises ------ ValueError If the storage area has no volume-elevation rating curve. """ if len(self.volume_elevation) == 0: raise ValueError( f"Storage area {self.name!r} has no volume-elevation rating curve " f"(mode={self.mode!r})." ) return float(np.interp(wse, self.elevations, self.volumes))
[docs] class StorageAreaCollection(Mapping[str, "StorageArea"]): """Access all storage areas stored in an HDF geometry file. Implements the :class:`collections.abc.Mapping` protocol: supports ``len()``, iteration, ``in``, ``[]``, ``.keys()``, ``.values()``, ``.items()``, and ``.get()``. Parameters ---------- hdf: Open ``h5py.File`` handle. """ def __init__(self, hdf: "h5py.File") -> None: self._hdf = hdf self._items: dict[str, StorageArea] | None = None def _load(self) -> dict[str, StorageArea]: if self._items is not None: return self._items if _SA_ROOT not in self._hdf: self._items = {} return self._items root = self._hdf[_SA_ROOT] attrs = np.array(root["Attributes"]) poly_info = np.array(root["Polygon Info"]) # (n, 4): [start_pt, n_pts, ...] poly_pts = np.array(root["Polygon Points"]) # (total, 2) ve_info = np.array(root["Volume Elevation Info"]) # (n, 2): [start, count] ve_vals = np.array(root["Volume Elevation Values"]) # (total, 2) items: dict[str, StorageArea] = {} for i, row in enumerate(attrs): name = _decode(row["Name"]) mode = _decode(row["Mode"]) start_pt = int(poly_info[i, 0]) n_pts = int(poly_info[i, 1]) boundary = poly_pts[start_pt : start_pt + n_pts].astype(float) ve_start = int(ve_info[i, 0]) ve_count = int(ve_info[i, 1]) if ve_count > 0: vol_elev = ve_vals[ve_start : ve_start + ve_count].astype(float) else: vol_elev = np.empty((0, 2), dtype=float) items[name] = StorageArea( name=name, mode=mode, boundary=boundary, volume_elevation=vol_elev, ) self._items = items return self._items @property def names(self) -> list[str]: """Names of all storage areas in the file.""" return list(self._load().keys()) @property def summary(self) -> pd.DataFrame: """One row per storage area with basic attributes. Columns: ``name``, ``mode``, ``n_boundary_points``, ``n_elev_vol_pairs``. """ rows = [ { "name": sa.name, "mode": sa.mode, "n_boundary_points": len(sa.boundary), "n_elev_vol_pairs": len(sa.volume_elevation), } for sa in self._load().values() ] return pd.DataFrame(rows) def __getitem__(self, name: str) -> StorageArea: items = self._load() if name not in items: raise KeyError( f"Storage area {name!r} not found. Available: {self.names}" ) return items[name] def __contains__(self, name: str) -> bool: return name in self._load() def __iter__(self): return iter(self._load()) def __len__(self) -> int: return len(self._load())
# --------------------------------------------------------------------------- # BoundaryConditionLine / BoundaryConditionCollection # ---------------------------------------------------------------------------
[docs] @dataclass class BoundaryConditionLine: """A single HEC-RAS boundary condition line. Attributes ---------- name: Name of the BC line (e.g. ``"DSNormalDepth1"``). connected_area: Name of the 2-D flow area or storage area this line belongs to (the ``SA-2D`` field in the HDF attributes table). bc_type: Type string (e.g. ``"External"`` or ``"Internal"``). polyline: x, y coordinates of the BC line. Shape ``(n_pts, 2)``. """ name: str connected_area: str bc_type: str polyline: np.ndarray # (n_pts, 2)
[docs] class BoundaryConditionCollection(Mapping[str, "BoundaryConditionLine"]): """Access all boundary condition lines in an HDF geometry file. Implements the :class:`collections.abc.Mapping` protocol: supports ``len()``, iteration, ``in``, ``[]``, ``.keys()``, ``.values()``, ``.items()``, and ``.get()``. Parameters ---------- hdf: Open ``h5py.File`` handle. """ def __init__(self, hdf: "h5py.File") -> None: self._hdf = hdf self._items: dict[str, BoundaryConditionLine] | None = None def _load(self) -> dict[str, BoundaryConditionLine]: if self._items is not None: return self._items if _BC_ROOT not in self._hdf: self._items = {} return self._items root = self._hdf[_BC_ROOT] attrs = np.array(root["Attributes"]) poly_info = np.array(root["Polyline Info"]) # (n, 4): [start_pt, n_pts, ...] poly_pts = np.array(root["Polyline Points"]) # (total, 2) # Resolve the connected-area field name (varies: "SA-2D" or "SA/2D Area") sa_field = next( (f for f in attrs.dtype.names if "2D" in f or "2d" in f), None, ) items: dict[str, BoundaryConditionLine] = {} for i, row in enumerate(attrs): name = _decode(row["Name"]) connected = _decode(row[sa_field]) if sa_field else "" bc_type = _decode(row["Type"]) start_pt = int(poly_info[i, 0]) n_pts = int(poly_info[i, 1]) polyline = poly_pts[start_pt : start_pt + n_pts].astype(float) items[name] = BoundaryConditionLine( name=name, connected_area=connected, bc_type=bc_type, polyline=polyline, ) self._items = items return self._items @property def names(self) -> list[str]: """Names of all boundary condition lines in the file.""" return list(self._load().keys()) @property def summary(self) -> pd.DataFrame: """One row per BC line with basic attributes. Columns: ``name``, ``connected_area``, ``type``, ``n_points``. """ rows = [ { "name": bc.name, "connected_area": bc.connected_area, "type": bc.bc_type, "n_points": len(bc.polyline), } for bc in self._load().values() ] return pd.DataFrame(rows) def __getitem__(self, name: str) -> BoundaryConditionLine: items = self._load() if name not in items: raise KeyError( f"Boundary condition line {name!r} not found. " f"Available: {self.names}" ) return items[name] def __contains__(self, name: str) -> bool: return name in self._load() def __iter__(self): return iter(self._load()) def __len__(self) -> int: return len(self._load())
# --------------------------------------------------------------------------- # Structure / SA2DConnection / StructureCollection # ---------------------------------------------------------------------------
[docs] @dataclass class Weir: """Overflow weir parameters read from ``Geometry/Structures/Attributes``. Present on :class:`Bridge`, :class:`InlineStructure`, and :class:`LateralStructure` structures when *mode* is ``'Weir/Gate/Culverts'``. Attributes ---------- width: Weir crest length perpendicular to flow (HDF ``Weir Width``). coefficient: Discharge coefficient (HDF ``Weir Coef``). shape: Crest shape: ``'Broad Crested'``, ``'Ogee'``, etc. (HDF ``Weir Shape``). max_submergence: Maximum submergence ratio above which flow is fully submerged (HDF ``Weir Max Submergence``). min_elevation: Minimum crest elevation; ``nan`` when not set (HDF ``Weir Min Elevation``). us_slope: Upstream face slope H:V (HDF ``Weir US Slope``). ds_slope: Downstream face slope H:V (HDF ``Weir DS Slope``). skew: Skew angle in degrees (HDF ``Weir Skew``). use_water_surface: When ``True`` the water-surface elevation is used as the weir reference head; when ``False`` the energy grade line is used (HDF ``Use WS for Weir Reference``). """ width: float coefficient: float shape: str max_submergence: float min_elevation: float us_slope: float ds_slope: float skew: float use_water_surface: bool
[docs] @dataclass class GateOpening: """One physical opening within a :class:`GateGroup`. Attributes ---------- name: Opening label (HDF ``Name`` in ``Gate Groups/Openings/Attributes``). station: Lateral station of this opening along the structure centreline (HDF ``Station``). """ name: str station: float
[docs] @dataclass class GateGroup: """One gate group from ``Geometry/Structures/Gate Groups/Attributes``. A gate group defines a set of identical gate openings. Each opening in the group shares the same geometry (width, height, invert, coefficients) but is placed at a different station along the structure. Attributes ---------- name: Gate group label (e.g. ``'Gate #1'``). width: Gate opening width (ft or m). height: Gate opening height (ft or m). invert: Gate sill elevation. sluice_coefficient: Sluice gate discharge coefficient. radial_coefficient: Radial (Tainter) gate discharge coefficient. weir_coefficient: Overflow weir coefficient for the gate crest. spillway_shape: Crest shape used when gate overflows (e.g. ``'Broad Crested'``). openings: Individual openings in this group, one per physical gate bay. """ name: str width: float height: float invert: float sluice_coefficient: float radial_coefficient: float weir_coefficient: float spillway_shape: str openings: list[GateOpening] = field(default_factory=list)
[docs] @dataclass class Structure: """Base class for one HEC-RAS structure from ``Geometry/Structures/Attributes``. Attributes ---------- mode: HDF ``Mode`` field (e.g. ``'Weir/Gate/Culverts'``). Empty string when the field is blank. upstream_type: HDF ``US Type`` field: ``'XS'`` (1-D cross section), ``'SA'`` (storage area), ``'2D'`` (2-D flow area), or ``'--'`` (unspecified / treated as storage area by HEC-RAS). downstream_type: HDF ``DS Type`` field — same vocabulary as *upstream_type*. centerline: x, y coordinates of the structure centreline. Shape ``(n_pts, 2)``. """ mode: str upstream_type: str downstream_type: str centerline: np.ndarray # (n_pts, 2)
[docs] @dataclass class Bridge(Structure): """Bridge structure embedded in a 1-D HEC-RAS reach. Both sides are always ``'XS'``. Attributes ---------- location: ``(river, reach, rs)`` of this bridge in the 1-D geometry (HDF ``River`` / ``Reach`` / ``RS`` fields). upstream_node: ``(river, reach, rs)`` of the adjacent upstream cross section (HDF ``US River`` / ``US Reach`` / ``US RS``). downstream_node: ``(river, reach, rs)`` of the adjacent downstream cross section (HDF ``DS River`` / ``DS Reach`` / ``DS RS``). weir: Weir overflow parameters; ``None`` when *mode* is empty (pure bridge, no overflow weir modelled). gate_groups: Gate groups attached to this structure (empty list when none). """ location: tuple[str, str, str] = ("", "", "") upstream_node: tuple[str, str, str] = ("", "", "") downstream_node: tuple[str, str, str] = ("", "", "") weir: Weir | None = None gate_groups: list[GateGroup] = field(default_factory=list)
[docs] @dataclass class InlineStructure(Structure): """Inline structure (e.g. inline weir/dam) embedded in a 1-D HEC-RAS reach. Both sides are always ``'XS'``. Attributes ---------- location: ``(river, reach, rs)`` of this structure in the 1-D geometry. upstream_node: ``(river, reach, rs)`` of the adjacent upstream cross section. downstream_node: ``(river, reach, rs)`` of the adjacent downstream cross section. weir: Weir overflow parameters; ``None`` when *mode* is empty. gate_groups: Gate groups attached to this structure (empty list when none). """ location: tuple[str, str, str] = ("", "", "") upstream_node: tuple[str, str, str] = ("", "", "") downstream_node: tuple[str, str, str] = ("", "", "") weir: Weir | None = None gate_groups: list[GateGroup] = field(default_factory=list)
[docs] @dataclass class LateralStructure(Structure): """Lateral structure connecting a 1-D reach to a Storage Area or 2-D Flow Area. The upstream side is always ``'XS'`` (the 1-D reach). The downstream side connects to a Storage Area (``'SA'``), a 2-D Flow Area (``'2D'``), or nothing when flow exits the system (empty *downstream_type*). Attributes ---------- location: ``(river, reach, rs)`` of this structure in the 1-D geometry. upstream_node: ``(river, reach, rs)`` of the adjacent upstream cross section. downstream_node: Name of the connected Storage Area or 2-D Flow Area (HDF ``DS SA/2D``). Empty string when flow exits the system. weir: Weir overflow parameters; ``None`` when *mode* is empty. gate_groups: Gate groups attached to this structure (empty list when none). """ location: tuple[str, str, str] = ("", "", "") upstream_node: tuple[str, str, str] = ("", "", "") downstream_node: str = "" weir: Weir | None = None gate_groups: list[GateGroup] = field(default_factory=list)
[docs] @dataclass class SA2DConnection(Structure): """Connection structure linking two Storage Areas or 2-D Flow Areas. Both sides are ``'SA'``, ``'2D'``, or ``'--'`` (treated as SA by HEC-RAS). Common examples: dam breach connection, levee between two 2-D domains, SA-to-SA link. Attributes ---------- name: User-given connection name from the HDF ``Connection`` field (e.g. ``"Dam"``, ``"Lower Levee"``). upstream_node: Name of the upstream Storage Area or 2-D Flow Area (HDF ``US SA/2D``). downstream_node: Name of the downstream Storage Area or 2-D Flow Area (HDF ``DS SA/2D``). Notes ----- Plan-result groups (see :class:`~rivia.hdf.SA2DConnectionResults`) may use a different naming convention: for 2D↔2D connections HEC-RAS prefixes the flow area name (e.g. geometry ``"Lower Levee"`` → plan result ``"BaldEagleCr Lower Levee"``). """ name: str = "" upstream_node: str = "" downstream_node: str = ""
_T = TypeVar("_T")
[docs] class StructureIndex(Generic[_T]): """Ordered mapping of structures supporting string-key *and* integer-index access. Behaves like a read-only ``dict`` but also accepts integer positions:: coll["Lower Levee"] # by name coll[0] # first item (insertion order) coll[-1] # last item len(coll) list(coll.keys()) for name, obj in coll.items(): ... Parameters ---------- items: Ordered ``dict`` of ``{name: structure}`` pairs. """ def __init__(self, items: dict[str, _T]) -> None: self._items = items self._keys: list[str] = list(items) @overload def __getitem__(self, key: int) -> _T: ... @overload def __getitem__(self, key: str) -> _T: ... def __getitem__(self, key: int | str) -> _T: if isinstance(key, int): try: return self._items[self._keys[key]] except IndexError: raise IndexError( f"Index {key} out of range for {len(self._keys)} structures" ) from None if key not in self._items: raise KeyError( f"{key!r} not found. Available: {self._keys}" ) return self._items[key] def __contains__(self, key: object) -> bool: return key in self._items def __iter__(self) -> Iterator[str]: return iter(self._keys) def __len__(self) -> int: return len(self._items) def __repr__(self) -> str: return f"{type(self).__name__}({self._keys!r})"
[docs] def keys(self): """Keys in insertion order.""" return self._items.keys()
[docs] def values(self): """Values in insertion order.""" return self._items.values()
[docs] def items(self): """``(key, value)`` pairs in insertion order.""" return self._items.items()
@property def names(self) -> list[str]: """All keys in insertion order.""" return self._keys
# --------------------------------------------------------------------------- # Structure / SA2DConnection / StructureCollection # ---------------------------------------------------------------------------
[docs] class StructureCollection: """Access all structures stored in ``Geometry/Structures/Attributes``. The collection is keyed by a string identifier: * :class:`SA2DConnection` — the HDF ``Connection`` field (user-given name). * :class:`Bridge`, :class:`InlineStructure`, :class:`LateralStructure` — ``"River Reach RS"`` built from the HDF ``River`` / ``Reach`` / ``RS`` fields. Use the typed filter properties (:attr:`connections`, :attr:`bridges`, :attr:`laterals`, :attr:`inlines`) to narrow by structure subclass. Parameters ---------- hdf: Open ``h5py.File`` handle. """ def __init__(self, hdf: "h5py.File") -> None: self._hdf = hdf self._items: dict[str, Structure] | None = None self._tuple_index: dict[tuple[str, str, str], str] | None = None # ------------------------------------------------------------------ # Internal loader # ------------------------------------------------------------------ def _load_gate_groups(self) -> dict[int, list[GateGroup]]: """Return gate groups keyed by structure row index (0-based).""" gg_root = f"{_STRUCT_ROOT}/Gate Groups" if gg_root not in self._hdf: return {} gg_ds = self._hdf[f"{gg_root}/Attributes"] gg_arr = np.array(gg_ds) gg_fn = gg_ds.dtype.names def _gg(row, f: str) -> str: return _decode(row[f]) if f in gg_fn else "" def _ggf(row, f: str) -> float: return float(row[f]) if f in gg_fn else float("nan") # Build openings dict: (struct_id, gate_group_local_id) → [GateOpening] openings_map: dict[tuple[int, int], list[GateOpening]] = {} op_path = f"{gg_root}/Openings/Attributes" if op_path in self._hdf: op_ds = self._hdf[op_path] op_arr = np.array(op_ds) op_fn = op_ds.dtype.names for op_row in op_arr: sid = int(op_row["Structure ID"]) if "Structure ID" in op_fn else -1 gid = int(op_row["Gate Group ID"]) if "Gate Group ID" in op_fn else 0 name = _decode(op_row["Name"]) if "Name" in op_fn else "" station = float(op_row["Station"]) if "Station" in op_fn else float("nan") key = (sid, gid) openings_map.setdefault(key, []).append(GateOpening(name=name, station=station)) # Build gate groups, tracking local index per structure gate_groups_map: dict[int, list[GateGroup]] = {} local_count: dict[int, int] = {} for gg_row in gg_arr: sid = int(gg_row["Structure ID"]) if "Structure ID" in gg_fn else -1 local_id = local_count.get(sid, 0) local_count[sid] = local_id + 1 gg = GateGroup( name=_gg(gg_row, "Name"), width=_ggf(gg_row, "Width"), height=_ggf(gg_row, "Height"), invert=_ggf(gg_row, "Invert"), sluice_coefficient=_ggf(gg_row, "Sluice Coef"), radial_coefficient=_ggf(gg_row, "Radial Coef"), weir_coefficient=_ggf(gg_row, "Weir Coef"), spillway_shape=_gg(gg_row, "Spillway Shape"), openings=openings_map.get((sid, local_id), []), ) gate_groups_map.setdefault(sid, []).append(gg) return gate_groups_map def _load(self) -> dict[str, Structure]: if self._items is not None: return self._items if _STRUCT_ROOT not in self._hdf: self._items = {} return self._items root = self._hdf[_STRUCT_ROOT] attrs_ds = root["Attributes"] attrs = np.array(attrs_ds) cl_info = np.array(root["Centerline Info"]) # (n, 4): [start, count, ...] cl_pts = np.array(root["Centerline Points"]) # (total, 2) fn = attrs_ds.dtype.names # available field names def _get(row, f: str) -> str: return _decode(row[f]) if f in fn else "" def _getf(row, f: str) -> float: return float(row[f]) if f in fn else float("nan") def _xs_node(r: str, rc: str, rstation: str) -> tuple[str, str, str]: return (r, rc, rstation) def _build_weir(row) -> Weir: return Weir( width=_getf(row, "Weir Width"), coefficient=_getf(row, "Weir Coef"), shape=_get(row, "Weir Shape"), max_submergence=_getf(row, "Weir Max Submergence"), min_elevation=_getf(row, "Weir Min Elevation"), us_slope=_getf(row, "Weir US Slope"), ds_slope=_getf(row, "Weir DS Slope"), skew=_getf(row, "Weir Skew"), use_water_surface=bool(int(row["Use WS for Weir Reference"])) if "Use WS for Weir Reference" in fn else False, ) gate_groups_map = self._load_gate_groups() items: dict[str, Structure] = {} for i, row in enumerate(attrs): typ = _get(row, "Type") mode = _get(row, "Mode") us_t = _get(row, "US Type") ds_t = _get(row, "DS Type") start_pt = int(cl_info[i, 0]) n_pts = int(cl_info[i, 1]) centerline = cl_pts[start_pt : start_pt + n_pts].astype(float) base = dict(mode=mode, upstream_type=us_t, downstream_type=ds_t, centerline=centerline) if typ == "Connection": conn_name = _get(row, "Connection") or f"Connection_{i}" key = conn_name if key in items: key = f"{key}_{i}" items[key] = SA2DConnection( **base, name=conn_name, upstream_node=_get(row, "US SA/2D"), downstream_node=_get(row, "DS SA/2D"), ) else: river = _get(row, "River") reach = _get(row, "Reach") rs = _get(row, "RS") key = f"{river} {reach} {rs}".strip() if (river or reach or rs) else f"{typ}_{i}" if key in items: key = f"{key}_{i}" location = (river, reach, rs) us_xs = _xs_node(_get(row, "US River"), _get(row, "US Reach"), _get(row, "US RS")) weir = _build_weir(row) if mode else None gate_groups = gate_groups_map.get(i, []) if typ == "Bridge": items[key] = Bridge( **base, location=location, upstream_node=us_xs, downstream_node=_xs_node(_get(row, "DS River"), _get(row, "DS Reach"), _get(row, "DS RS")), weir=weir, gate_groups=gate_groups, ) elif typ == "Inline": items[key] = InlineStructure( **base, location=location, upstream_node=us_xs, downstream_node=_xs_node(_get(row, "DS River"), _get(row, "DS Reach"), _get(row, "DS RS")), weir=weir, gate_groups=gate_groups, ) elif typ == "Lateral": items[key] = LateralStructure( **base, location=location, upstream_node=us_xs, downstream_node=_get(row, "DS SA/2D"), weir=weir, gate_groups=gate_groups, ) else: items[key] = Structure(**base) # unknown type, store as base self._items = items # Secondary index: (river, reach, rs) → string key for non-connection structures. self._tuple_index = {} for k, v in items.items(): if isinstance(v, (Bridge, InlineStructure, LateralStructure)): r, rc, rs = v.location normalized = (r.strip(), rc.strip(), rs.strip()) self._tuple_index[normalized] = k return self._items # ------------------------------------------------------------------ # Public interface # ------------------------------------------------------------------ @property def names(self) -> list[str]: """Keys of all structures in the collection.""" return list(self._load().keys()) @property def connections(self) -> StructureIndex[SA2DConnection]: """All :class:`SA2DConnection` instances keyed by connection name.""" return StructureIndex( {k: v for k, v in self._load().items() if isinstance(v, SA2DConnection)} ) @property def bridges(self) -> StructureIndex[Bridge]: """All :class:`Bridge` instances keyed by ``"River Reach RS"``.""" return StructureIndex( {k: v for k, v in self._load().items() if isinstance(v, Bridge)} ) @property def laterals(self) -> StructureIndex[LateralStructure]: """All :class:`LateralStructure` instances keyed by ``"River Reach RS"``.""" return StructureIndex( {k: v for k, v in self._load().items() if isinstance(v, LateralStructure)} ) @property def inlines(self) -> StructureIndex[InlineStructure]: """All :class:`InlineStructure` instances keyed by ``"River Reach RS"``.""" return StructureIndex( {k: v for k, v in self._load().items() if isinstance(v, InlineStructure)} ) @property def summary(self) -> pd.DataFrame: """One row per structure with basic attributes. Columns: ``key``, ``subclass``, ``mode``, ``upstream_type``, ``upstream_node``, ``downstream_type``, ``downstream_node``, ``n_centerline_points``. ``upstream_node`` / ``downstream_node`` are ``(river, reach, rs)`` tuples for :class:`Bridge` and :class:`InlineStructure` sides, and plain strings (area names) for :class:`SA2DConnection` and :class:`LateralStructure` downstream sides. """ rows = [] for key, s in self._load().items(): rows.append({ "key": key, "subclass": type(s).__name__, "mode": s.mode, "upstream_type": s.upstream_type, "upstream_node": getattr(s, "upstream_node", None), "downstream_type": s.downstream_type, "downstream_node": getattr(s, "downstream_node", None), "n_centerline_points": len(s.centerline), }) return pd.DataFrame(rows) def __getitem__(self, key: str | int | tuple[str, str, str]) -> Structure: items = self._load() if isinstance(key, int): keys = list(items) try: return items[keys[key]] except IndexError: raise IndexError( f"Index {key} out of range for {len(keys)} structures" ) from None if isinstance(key, tuple): r, rc, rs = key normalized = (r.strip(), rc.strip(), rs.strip()) str_key = self._tuple_index.get(normalized) # type: ignore[union-attr] if str_key is None: raise KeyError( f"No structure at {key!r}. " "SA2DConnections cannot be looked up by (River, Reach, RS)." ) return items[str_key] if key not in items: raise KeyError(f"Structure {key!r} not found. Available: {self.names}") return items[key] def __contains__(self, key: object) -> bool: items = self._load() if isinstance(key, tuple) and len(key) == 3: r, rc, rs = key normalized = (str(r).strip(), str(rc).strip(), str(rs).strip()) return normalized in self._tuple_index # type: ignore[operator] return key in items def __iter__(self): return iter(self._load()) def __len__(self) -> int: return len(self._load())
# --------------------------------------------------------------------------- # Geometry — public entry point # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # CrossSection / CrossSectionCollection # --------------------------------------------------------------------------- def _walk_polyline( polyline: np.ndarray, stations: np.ndarray ) -> np.ndarray: """Interpolate (x, y) along a polyline at given cumulative distances. Parameters ---------- polyline: Ordered vertices of the line, shape ``(n_pts, 2)``. stations: Cumulative distances at which to interpolate, shape ``(n,)``. Must be non-negative and not exceed the total polyline length. Returns ------- ndarray, shape ``(n, 2)`` Interpolated ``(x, y)`` coordinates. """ if len(polyline) < 2: return np.tile(polyline[0], (len(stations), 1)) # Segment lengths and cumulative distances along polyline diffs = np.diff(polyline, axis=0) seg_len = np.hypot(diffs[:, 0], diffs[:, 1]) cum_len = np.concatenate([[0.0], np.cumsum(seg_len)]) total = cum_len[-1] # Clamp stations to [0, total] t = np.clip(stations, 0.0, total) # For each query station find which segment it falls in seg_idx = np.searchsorted(cum_len, t, side="right") - 1 seg_idx = np.clip(seg_idx, 0, len(seg_len) - 1) # Fractional position within segment seg_start = cum_len[seg_idx] seg_end = cum_len[seg_idx + 1] denom = seg_end - seg_start # Avoid divide-by-zero for zero-length segments frac = np.where(denom > 0.0, (t - seg_start) / denom, 0.0) frac = frac[:, np.newaxis] return polyline[seg_idx] + frac * diffs[seg_idx] def _polyline_intersect( a: np.ndarray, b: np.ndarray ) -> np.ndarray | None: """Return the first intersection point of two 2-D polylines, or ``None``. Uses the parametric line-segment intersection formula for each pair of segments. Returns the exact intersection coordinates ``(x, y)`` as a shape-``(2,)`` array, or ``None`` when the polylines do not intersect. Parameters ---------- a, b: Ordered vertices of each polyline, shape ``(n, 2)`` and ``(m, 2)`` respectively. Both must have at least two points. """ if len(a) < 2 or len(b) < 2: return None # Vectorise over all pairs of segments between a and b a0 = a[:-1] # (na, 2) segment starts a1 = a[1:] # (na, 2) segment ends b0 = b[:-1] # (nb, 2) b1 = b[1:] da = a1 - a0 # (na, 2) db = b1 - b0 # (nb, 2) # For each (i, j) pair: a0[i] + t*da[i] == b0[j] + u*db[j] # Cross-product denominator: da[i] × db[j] # Broadcast: (na, 1, 2) and (1, nb, 2) da_ = da[:, np.newaxis, :] # (na, 1, 2) db_ = db[np.newaxis, :, :] # (1, nb, 2) a0_ = a0[:, np.newaxis, :] # (na, 1, 2) b0_ = b0[np.newaxis, :, :] # (1, nb, 2) denom = da_[..., 0] * db_[..., 1] - da_[..., 1] * db_[..., 0] # (na, nb) diff = b0_ - a0_ # (na, nb, 2) # Avoid divide-by-zero for parallel segments parallel = np.abs(denom) < 1e-12 denom_safe = np.where(parallel, 1.0, denom) t = (diff[..., 0] * db_[..., 1] - diff[..., 1] * db_[..., 0]) / denom_safe u = (diff[..., 0] * da_[..., 1] - diff[..., 1] * da_[..., 0]) / denom_safe # Valid intersection: t in [0,1], u in [0,1], not parallel valid = (~parallel) & (t >= 0.0) & (t <= 1.0) & (u >= 0.0) & (u <= 1.0) idx = np.argwhere(valid) if len(idx) == 0: return None # Return the first hit (lowest i, then j) i, j = idx[0] return a0[i] + t[i, j] * da[i]
[docs] @dataclass class CrossSection: """One HEC-RAS 1-D cross section from ``Geometry/Cross Sections/Attributes``. Attributes ---------- river: River name (HDF ``River`` field). reach: Reach name (HDF ``Reach`` field). rs: River station string (HDF ``RS`` field). name: Optional user label (HDF ``Name`` field). description: Optional extended description (HDF ``Description`` field). left_bank: Left bank station separating LOB from the channel (HDF ``Left Bank``). right_bank: Right bank station separating the channel from ROB (HDF ``Right Bank``). len_left: Reach length for the left overbank (HDF ``Len Left``). len_channel: Reach length for the main channel (HDF ``Len Channel``). len_right: Reach length for the right overbank (HDF ``Len Right``). contraction: Contraction energy loss coefficient (HDF ``Contr``). expansion: Expansion energy loss coefficient (HDF ``Expan``). friction_mode: Friction mode string, e.g. ``'Basic Mann n'`` (HDF ``Friction Mode``). hp_count: Number of rows in the precomputed hydraulic-property table (HDF ``HP Count``). hp_start_elev: Starting elevation for the hydraulic-property table (HDF ``HP Start Elev``). hp_vert_incr: Vertical elevation increment between hydraulic-property table rows (HDF ``HP Vert Incr``). station_elevation: Cross-section survey points, shape ``(n, 2)``. Columns: ``[station, elevation]``. Source: ``Geometry/Cross Sections/Station Elevation Values``. mannings_n: Manning's *n* breakpoints, shape ``(n, 2)``. Columns: ``[station, n_value]``. Source: ``Geometry/Cross Sections/Manning's n Values``. ineffective_areas: Ineffective-flow block regions, shape ``(n, 3)``. Columns: ``[left_sta, right_sta, elevation]``. Source: ``Geometry/Cross Sections/Ineffective Blocks``. Empty ``(0, 3)`` when no ineffective areas are defined. cut_line: Plan-view GIS cut-line coordinates, shape ``(n, 2)``. Columns: ``[x, y]``. This is the polyline drawn across the floodplain from the left edge to the right edge of the cross section. Use :meth:`station_coordinates` to project survey stations onto this line and obtain 3-D point coordinates. Source: ``Geometry/Cross Sections/Polyline Points``. orthogonal_vector: Unit vector perpendicular to the cut line, shape ``(2,)``: ``[cos θ, sin θ]`` where θ is measured CCW from the x-axis. Source: ``Geometry/Cross Sections/Orthogonal Vectors``. centerline_polyline: Plan-view river centreline polyline for this reach, shape ``(n, 2)``. Populated automatically by :class:`CrossSectionCollection` from ``Geometry/River Centerlines`` by matching on *river* and *reach*. Empty ``(0, 2)`` when no matching centreline is found. Used by :attr:`centerline_coordinates`. """ river: str = "" reach: str = "" rs: str = "" name: str = "" description: str = "" left_bank: float = float("nan") right_bank: float = float("nan") len_left: float = float("nan") len_channel: float = float("nan") len_right: float = float("nan") contraction: float = float("nan") expansion: float = float("nan") friction_mode: str = "" hp_count: int = 0 hp_start_elev: float = float("nan") hp_vert_incr: float = float("nan") station_elevation: np.ndarray = field(default_factory=lambda: np.empty((0, 2))) mannings_n: np.ndarray = field(default_factory=lambda: np.empty((0, 2))) ineffective_areas: np.ndarray = field(default_factory=lambda: np.empty((0, 3))) cut_line: np.ndarray = field(default_factory=lambda: np.empty((0, 2))) orthogonal_vector: np.ndarray = field(default_factory=lambda: np.zeros(2)) centerline_polyline: np.ndarray = field( default_factory=lambda: np.empty((0, 2)), repr=False, compare=False, ) @property def location(self) -> tuple[str, str, str]: """``(river, reach, rs)`` identity tuple for this cross section.""" return (self.river, self.reach, self.rs)
[docs] def station_coordinates(self) -> np.ndarray: """Project each survey station onto the GIS cut line to get 3-D coordinates. Interpolates ``(x, y)`` for every row of :attr:`station_elevation` by walking the :attr:`cut_line` polyline. The leftmost survey station maps to the first cut-line vertex; the rightmost maps to the last. Elevation values are taken directly from :attr:`station_elevation`. Returns ------- ndarray, shape ``(n, 3)`` Columns: ``[x, y, elevation]`` for each station/elevation point. Returns an empty ``(0, 3)`` array when :attr:`cut_line` or :attr:`station_elevation` contains no points. Notes ----- The mapping assumes the cut line runs from the left bank (station = min) to the right bank (station = max) in the same direction as the survey. This matches the HEC-RAS convention stored in the HDF file. """ if len(self.cut_line) < 2 or len(self.station_elevation) == 0: return np.empty((0, 3)) stations = self.station_elevation[:, 0] elevations = self.station_elevation[:, 1] # Map survey stations to cumulative distance along cut line sta_min, sta_max = stations.min(), stations.max() if sta_min == sta_max: # Degenerate XS — all points collapse to cut-line midpoint diffs = np.diff(self.cut_line, axis=0) total = np.sum(np.hypot(diffs[:, 0], diffs[:, 1])) t = np.full(len(stations), total / 2.0) else: diffs_cl = np.diff(self.cut_line, axis=0) total = np.sum(np.hypot(diffs_cl[:, 0], diffs_cl[:, 1])) t = (stations - sta_min) / (sta_max - sta_min) * total xy = _walk_polyline(self.cut_line, t) return np.column_stack([xy, elevations])
@property def left_bank_coordinates(self) -> np.ndarray: """Spatial coordinates of the left bank station, shape ``(3,)``. Interpolates ``[x, y, elevation]`` at :attr:`left_bank` from the output of :meth:`station_coordinates`. Returns ------- ndarray, shape ``(3,)`` ``[x, y, elevation]``. Returns ``array([nan, nan, nan])`` when :meth:`station_coordinates` returns no points or when :attr:`left_bank` is ``NaN``. """ coords = self.station_coordinates() if len(coords) == 0 or np.isnan(self.left_bank): return np.full(3, np.nan) stations = self.station_elevation[:, 0] return np.array([ np.interp(self.left_bank, stations, coords[:, 0]), np.interp(self.left_bank, stations, coords[:, 1]), np.interp(self.left_bank, stations, coords[:, 2]), ]) @property def right_bank_coordinates(self) -> np.ndarray: """Spatial coordinates of the right bank station, shape ``(3,)``. Interpolates ``[x, y, elevation]`` at :attr:`right_bank` from the output of :meth:`station_coordinates`. Returns ------- ndarray, shape ``(3,)`` ``[x, y, elevation]``. Returns ``array([nan, nan, nan])`` when :meth:`station_coordinates` returns no points or when :attr:`right_bank` is ``NaN``. """ coords = self.station_coordinates() if len(coords) == 0 or np.isnan(self.right_bank): return np.full(3, np.nan) stations = self.station_elevation[:, 0] return np.array([ np.interp(self.right_bank, stations, coords[:, 0]), np.interp(self.right_bank, stations, coords[:, 1]), np.interp(self.right_bank, stations, coords[:, 2]), ]) @property def centerline_coordinates(self) -> np.ndarray | None: """Intersection of the cut line with the river centreline, shape ``(2,)``. Computes the point where :attr:`cut_line` crosses :attr:`centerline_polyline`. Returns ``None`` when :attr:`centerline_polyline` is empty (no centreline matched this reach), or when the two polylines do not intersect. Returns ------- ndarray, shape ``(2,)`` or None ``[x, y]`` of the intersection, or ``None``. """ if len(self.centerline_polyline) < 2 or len(self.cut_line) < 2: return None return _polyline_intersect(self.cut_line, self.centerline_polyline)
[docs] class CrossSectionCollection: """Access all 1-D cross sections in ``Geometry/Cross Sections``. Keyed by ``"River Reach RS"`` — the same convention used by :class:`StructureCollection` and the DSS Hydrograph Output group names. Parameters ---------- hdf: Open ``h5py.File`` handle. """ def __init__(self, hdf: h5py.File) -> None: self._hdf = hdf self._items: dict[str, CrossSection] | None = None self._loc_index: dict[tuple[str, str, str], str] = {} def _load(self) -> dict[str, CrossSection]: if self._items is not None: return self._items root = self._hdf.get(_XS_ROOT) if root is None: self._items = {} return self._items attrs_ds = root["Attributes"] attrs = np.array(attrs_ds) fn = attrs_ds.dtype.names def _s(row, f: str) -> str: return _decode(row[f]) if f in fn else "" def _f(row, f: str) -> float: return float(row[f]) if f in fn else float("nan") # Ragged-array helpers: Info shape (n_xs, 2) → [start, count] se_info = np.array(root["Station Elevation Info"]) # (n_xs, 2) se_vals = np.array(root["Station Elevation Values"]) # (total, 2) mn_info = np.array(root["Manning's n Info"]) # (n_xs, 2) mn_vals = np.array(root["Manning's n Values"]) # (total, 2) # Polyline Info (n_xs, 4): col0=pt_start, col1=pt_count, col2=part_start, col3=part_count pl_info = np.array(root["Polyline Info"]) pl_pts = np.array(root["Polyline Points"]) # (total, 2) # Ineffective flow areas: Info (n_xs, 2) → [start, count] into Blocks # These datasets are absent when no XS has ineffective areas defined. _ineff_info_ds = root.get("Ineffective Info") _ineff_blks_ds = root.get("Ineffective Blocks") ineff_info = np.array(_ineff_info_ds) if _ineff_info_ds is not None else np.zeros((len(attrs), 2), dtype=int) ineff_blks = np.array(_ineff_blks_ds) if _ineff_blks_ds is not None else np.empty(0, dtype=[("Left Sta", float), ("Right Sta", float), ("Elevation", float)]) # Orthogonal vectors: one (cos θ, sin θ) unit vector per XS orth_vecs = np.array(root["Orthogonal Vectors"]) # (n_xs, 2) items: dict[str, CrossSection] = {} for i, row in enumerate(attrs): river = _s(row, "River") reach = _s(row, "Reach") rs = _s(row, "RS") key = f"{river} {reach} {rs}".strip() or f"XS_{i}" if key in items: key = f"{key}_{i}" se_start, se_count = int(se_info[i, 0]), int(se_info[i, 1]) mn_start, mn_count = int(mn_info[i, 0]), int(mn_info[i, 1]) pl_start, pl_count = int(pl_info[i, 0]), int(pl_info[i, 1]) ineff_start, ineff_cnt = int(ineff_info[i, 0]), int(ineff_info[i, 1]) # Ineffective areas: extract Left Sta / Right Sta / Elevation as float (n, 3) ineff_slice = ineff_blks[ineff_start : ineff_start + ineff_cnt] if len(ineff_slice): ineff_arr = np.column_stack([ ineff_slice["Left Sta"].astype(float), ineff_slice["Right Sta"].astype(float), ineff_slice["Elevation"].astype(float), ]) else: ineff_arr = np.empty((0, 3)) self._loc_index[(river, reach, rs)] = key items[key] = CrossSection( river=river, reach=reach, rs=rs, name=_s(row, "Name"), description=_s(row, "Description"), left_bank=_f(row, "Left Bank"), right_bank=_f(row, "Right Bank"), len_left=_f(row, "Len Left"), len_channel=_f(row, "Len Channel"), len_right=_f(row, "Len Right"), contraction=_f(row, "Contr"), expansion=_f(row, "Expan"), friction_mode=_s(row, "Friction Mode"), hp_count=int(row["HP Count"]) if "HP Count" in fn else 0, hp_start_elev=_f(row, "HP Start Elev"), hp_vert_incr=_f(row, "HP Vert Incr"), station_elevation=( se_vals[se_start : se_start + se_count].astype(float) ), mannings_n=( mn_vals[mn_start : mn_start + mn_count].astype(float) ), ineffective_areas=ineff_arr, cut_line=( pl_pts[pl_start : pl_start + pl_count].astype(float) ), orthogonal_vector=orth_vecs[i].astype(float), ) # Populate centerline_polyline on each XS by matching river+reach # from Geometry/River Centerlines. Two HDF formats exist: # Newer: "Attributes" structured dataset with "River Name"/"Reach Name" fields # Older: separate "River Names"/"Reach Names" byte-string arrays only cl_root = self._hdf.get(_RIVER_CL_ROOT) if cl_root is not None: cl_polylines = _read_polyline_group(cl_root) cl_map: dict[tuple[str, str], np.ndarray] = {} if "Attributes" in cl_root: cl_attrs = np.array(cl_root["Attributes"]) cl_fn = cl_attrs.dtype.names def _cls(row, f: str) -> str: return _decode(row[f]) if f in cl_fn else "" for ci, crow in enumerate(cl_attrs): cl_river = _cls(crow, "River Name") cl_reach = _cls(crow, "Reach Name") if ci < len(cl_polylines): cl_map[(cl_river, cl_reach)] = cl_polylines[ci] else: cl_rivers = [_decode(v) for v in np.array(cl_root["River Names"])] cl_reaches = [_decode(v) for v in np.array(cl_root["Reach Names"])] for ci, (rv, rc) in enumerate(zip(cl_rivers, cl_reaches, strict=True)): if ci < len(cl_polylines): cl_map[(rv, rc)] = cl_polylines[ci] for xs in items.values(): poly = cl_map.get((xs.river, xs.reach)) if poly is not None: xs.centerline_polyline = poly self._items = items return self._items @property def names(self) -> list[str]: """Keys of all cross sections in the collection.""" return list(self._load().keys()) @overload def __getitem__(self, key: int) -> CrossSection: ... @overload def __getitem__(self, key: str) -> CrossSection: ... @overload def __getitem__(self, key: tuple[str, str, str]) -> CrossSection: ... def __getitem__(self, key: int | str | tuple[str, str, str]) -> CrossSection: items = self._load() if isinstance(key, int): keys = list(items) try: return items[keys[key]] except IndexError: raise IndexError( f"Index {key} out of range (n={len(items)})" ) from None if isinstance(key, tuple): str_key = self._loc_index.get(key) if str_key is None: raise KeyError( f"Cross section {key!r} not found." ) return items[str_key] if key not in items: raise KeyError( f"Cross section {key!r} not found. Available: {self.names}" ) return items[key] def __len__(self) -> int: return len(self._load()) def __iter__(self) -> Iterator[CrossSection]: return iter(self._load().values()) def __repr__(self) -> str: return f"{type(self).__name__}({len(self)} cross sections)"
# --------------------------------------------------------------------------- # River Centerlines / Bank Lines / Edge Lines # --------------------------------------------------------------------------- def _read_polyline_group( root: "h5py.Group", ) -> list[np.ndarray]: """Read a ``Polyline Info / Polyline Points`` pair into a list of arrays. Parameters ---------- root: HDF group containing ``Polyline Info`` and ``Polyline Points`` datasets. Returns ------- list of ndarray, each shape ``(n_pts, 2)`` One array per polyline feature. """ info = np.array(root["Polyline Info"]) # (n, 4): pt_start, pt_count, ... pts = np.array(root["Polyline Points"]) # (total, 2) result = [] for row in info: start, count = int(row[0]), int(row[1]) result.append(pts[start : start + count].astype(float)) return result
[docs] @dataclass class RiverCenterline: """One HEC-RAS river centreline from ``Geometry/River Centerlines``. Attributes ---------- river: River name (HDF ``River Name`` field). reach: Reach name (HDF ``Reach Name`` field). upstream_type: Upstream connection type (HDF ``US Type`` field). Common values: ``'XS'`` (cross section), ``'2D Flow Area'``, ``'Storage Area'``, ``'External'``. upstream_name: Name of the upstream connected element (HDF ``US Name`` field). downstream_type: Downstream connection type (HDF ``DS Type`` field). Same vocabulary as *upstream_type*. downstream_name: Name of the downstream connected element (HDF ``DS Name`` field). junction_to_us_xs: Distance from the upstream junction to the nearest upstream cross section (HDF ``Junction to US XS`` field). ``NaN`` when the upstream connection is not a junction. ds_xs_to_junction: Distance from the nearest downstream cross section to the downstream junction (HDF ``DS XS to Junction`` field). ``NaN`` when the downstream connection is not a junction. polyline: Plan-view centreline coordinates, shape ``(n, 2)``. Columns: ``[x, y]``. Source: ``Geometry/River Centerlines/Polyline Points``. """ river: str reach: str upstream_type: str upstream_name: str downstream_type: str downstream_name: str junction_to_us_xs: float ds_xs_to_junction: float polyline: np.ndarray # (n, 2)
[docs] @dataclass class RiverBankLine: """One HEC-RAS river bank line from ``Geometry/River Bank Lines``. Bank lines are stored as an ordered pair — index 0 is the left bank, index 1 is the right bank (as drawn in RASMapper). Attributes ---------- polyline: Plan-view bank-line coordinates, shape ``(n, 2)``: ``[x, y]``. """ polyline: np.ndarray # (n, 2)
[docs] @dataclass class RiverEdgeLine: """One HEC-RAS river edge line from ``Geometry/River Edge Lines``. Edge lines define the extents of the 1-D floodplain corridor. Attributes ---------- polyline: Plan-view edge-line coordinates, shape ``(n, 2)``: ``[x, y]``. """ polyline: np.ndarray # (n, 2)
[docs] class RiverGeometry: """Lazy-loaded river geometry lines from a HEC-RAS geometry HDF. Provides access to river centrelines, left/right bank lines, and edge lines stored under ``Geometry/`` in geometry or plan HDF files. Parameters ---------- hdf: Open ``h5py.File`` handle. """ def __init__(self, hdf: "h5py.File") -> None: self._hdf = hdf self._centerlines: list[RiverCenterline] | None = None self._bank_lines: tuple[RiverBankLine | None, RiverBankLine | None] | None = None self._edge_lines: tuple[RiverEdgeLine | None, RiverEdgeLine | None] | None = None @property def centerlines(self) -> list[RiverCenterline]: """River centrelines. One :class:`RiverCenterline` per reach. Returns an empty list when no centreline data is present. """ if self._centerlines is not None: return self._centerlines root = self._hdf.get(_RIVER_CL_ROOT) if root is None: self._centerlines = [] return self._centerlines polylines = _read_polyline_group(root) result: list[RiverCenterline] = [] if "Attributes" in root: attrs = np.array(root["Attributes"]) fn = attrs.dtype.names def _s(row, f: str) -> str: return _decode(row[f]) if f in fn else "" def _f(row, f: str) -> float: return float(row[f]) if f in fn else float("nan") for i, row in enumerate(attrs): result.append(RiverCenterline( river=_s(row, "River Name"), reach=_s(row, "Reach Name"), upstream_type=_s(row, "US Type"), upstream_name=_s(row, "US Name"), downstream_type=_s(row, "DS Type"), downstream_name=_s(row, "DS Name"), junction_to_us_xs=_f(row, "Junction to US XS"), ds_xs_to_junction=_f(row, "DS XS to Junction"), polyline=polylines[i] if i < len(polylines) else np.empty((0, 2)), )) else: cl_rivers = [_decode(v) for v in np.array(root["River Names"])] cl_reaches = [_decode(v) for v in np.array(root["Reach Names"])] for i, (rv, rc) in enumerate(zip(cl_rivers, cl_reaches, strict=True)): result.append(RiverCenterline( river=rv, reach=rc, upstream_type="", upstream_name="", downstream_type="", downstream_name="", junction_to_us_xs=float("nan"), ds_xs_to_junction=float("nan"), polyline=polylines[i] if i < len(polylines) else np.empty((0, 2)), )) self._centerlines = result return self._centerlines @property def bank_lines(self) -> tuple[RiverBankLine | None, RiverBankLine | None]: """Left and right bank lines as a ``(left, right)`` tuple. Each element is a :class:`RiverBankLine` or ``None`` when absent. HEC-RAS stores exactly two bank lines (index 0 = left, 1 = right). """ if self._bank_lines is not None: return self._bank_lines root = self._hdf.get(_RIVER_BANK_ROOT) if root is None: self._bank_lines = (None, None) return self._bank_lines polylines = _read_polyline_group(root) left = RiverBankLine(polylines[0]) if len(polylines) > 0 else None right = RiverBankLine(polylines[1]) if len(polylines) > 1 else None self._bank_lines = (left, right) return self._bank_lines @property def edge_lines(self) -> tuple[RiverEdgeLine | None, RiverEdgeLine | None]: """Left and right edge lines as a ``(left, right)`` tuple. Each element is a :class:`RiverEdgeLine` or ``None`` when absent. """ if self._edge_lines is not None: return self._edge_lines root = self._hdf.get(_RIVER_EDGE_ROOT) if root is None: self._edge_lines = (None, None) return self._edge_lines polylines = _read_polyline_group(root) left = RiverEdgeLine(polylines[0]) if len(polylines) > 0 else None right = RiverEdgeLine(polylines[1]) if len(polylines) > 1 else None self._edge_lines = (left, right) return self._edge_lines def __repr__(self) -> str: n_cl = len(self.centerlines) bl, br = self.bank_lines el, er = self.edge_lines n_banks = sum(1 for b in (bl, br) if b is not None) n_edges = sum(1 for e in (el, er) if e is not None) return ( f"{type(self).__name__}(" f"{n_cl} centerline(s), " f"{n_banks} bank line(s), " f"{n_edges} edge line(s))" )
# --------------------------------------------------------------------------- # GeometrySummary # ---------------------------------------------------------------------------
[docs] @dataclass class LayerRef: """Reference to a RASMapper HDF layer. Attributes ---------- filename: Absolute path to the HDF file containing this layer, resolved relative to the project directory at read time. layername: Layer name inside the HDF file. file_date: Timestamp of the HDF file. date_modified: Timestamp when the layer was last modified, or ``None`` when HEC-RAS does not record a modification date. For terrain layers in models with 2-D flow areas this is the terrain HDF mtime recorded by each flow area at preprocessing time (the most recent across all flow areas is used). """ filename: Path layername: str file_date: dt.datetime date_modified: dt.datetime | None
[docs] @dataclass class GeometrySummary: """Metadata from the ``Geometry/`` group attributes. Attributes ---------- title: Geometry title string set in RASMapper. version: Geometry schema version, e.g. ``"1.0.20 (20Sep2024)"``. complete: ``True`` when the geometry was fully preprocessed. si_units: ``True`` when the model uses SI units; ``False`` for US Customary. extents: Bounding box as ``(xmin, xmax, ymin, ymax)`` in model coordinates, or ``None`` when the geometry has no spatial reference. preprocessed_at: Timestamp when RASMapper last preprocessed the geometry, or ``None`` when the geometry has never been preprocessed (HEC-RAS stores the sentinel ``'00JAN0000 00:00:00'`` in that case). terrain: Terrain layer reference, or ``None`` for non-geospatial models. land_cover: Land-cover (Manning's n) layer reference, or ``None`` when absent. infiltration: Infiltration layer reference, or ``None`` when absent. pct_impervious: Percent-impervious layer reference, or ``None`` when absent. """ title: str version: str complete: bool si_units: bool extents: tuple[float, float, float, float] | None preprocessed_at: dt.datetime | None terrain: LayerRef | None land_cover: LayerRef | None infiltration: LayerRef | None pct_impervious: LayerRef | None def __repr__(self) -> str: def _layer(ref: LayerRef | None) -> str: if ref is None: return "\u2014" file_date = ref.file_date.strftime("%Y-%m-%d %H:%M:%S") suffix = f" file -> {file_date}" if ref.date_modified is not None: suffix += f" mod -> {ref.date_modified.strftime('%Y-%m-%d %H:%M:%S')}" return f"{ref.filename.stem} :: {ref.layername}{suffix}" def _extents(e: tuple[float, float, float, float] | None) -> str: if e is None: return "\u2014" return f"x [{e[0]:.2f}, {e[1]:.2f}] y [{e[2]:.2f}, {e[3]:.2f}]" rows = [ ("title", self.title), ("version", self.version), ("complete", str(self.complete)), ("units", "SI" if self.si_units else "US Customary"), ("extents", _extents(self.extents)), ("preprocessed", self.preprocessed_at.strftime("%Y-%m-%d %H:%M:%S") if self.preprocessed_at else "\u2014"), ("terrain", _layer(self.terrain)), ("land_cover", _layer(self.land_cover)), ("infiltration", _layer(self.infiltration)), ("pct_impervious", _layer(self.pct_impervious)), ] width = max(len(k) for k, _ in rows) lines = [type(self).__name__] + [ f" {k:<{width}} : {v}" for k, v in rows ] return "\n".join(lines)
# --------------------------------------------------------------------------- # Geometry — public entry point # ---------------------------------------------------------------------------
[docs] class Geometry(_HdfFile): """Read HEC-RAS geometry HDF5 output files (``*.g*.hdf``). Parameters ---------- filename: Path to the geometry HDF file. The ``.hdf`` suffix is appended automatically if absent. Examples -------- :: with Geometry("MyModel.g01") as g: print(g.flow_areas.summary) centers = g.flow_areas["spillway"].cell_centers """ def __init__(self, filename: str | Path) -> None: super().__init__(filename) self._flow_areas: FlowAreaCollection | None = None self._storage_areas: StorageAreaCollection | None = None self._boundary_condition_lines: BoundaryConditionCollection | None = None self._structures: StructureCollection | None = None self._cross_sections: CrossSectionCollection | None = None self._river_geometry: RiverGeometry | None = None # ------------------------------------------------------------------ # Collections # ------------------------------------------------------------------ @property def flow_areas(self) -> FlowAreaCollection: """Access 2-D flow areas stored in the geometry HDF.""" if self._flow_areas is None: self._flow_areas = FlowAreaCollection(self._hdf) return self._flow_areas @property def storage_areas(self) -> StorageAreaCollection: """Access storage areas (reservoirs, ponds) stored in the geometry HDF.""" if self._storage_areas is None: self._storage_areas = StorageAreaCollection(self._hdf) return self._storage_areas @property def boundary_condition_lines(self) -> BoundaryConditionCollection: """Access boundary condition lines stored in the geometry HDF.""" if self._boundary_condition_lines is None: self._boundary_condition_lines = BoundaryConditionCollection(self._hdf) return self._boundary_condition_lines @property def structures(self) -> StructureCollection: """Access all structures (connections, bridges, laterals, inline weirs).""" if self._structures is None: self._structures = StructureCollection(self._hdf) return self._structures @property def cross_sections(self) -> CrossSectionCollection: """Access all 1-D cross sections stored in the geometry HDF.""" if self._cross_sections is None: self._cross_sections = CrossSectionCollection(self._hdf) return self._cross_sections @property def river_geometry(self) -> RiverGeometry: """Access river centrelines, bank lines, and edge lines.""" if self._river_geometry is None: self._river_geometry = RiverGeometry(self._hdf) return self._river_geometry @property def projection(self) -> str | None: """WKT projection string stored in the HDF root attribute, or ``None``. HEC-RAS writes the model CRS as a WKT string in the root-level ``Projection`` attribute. Returns ``None`` when the attribute is absent (older files or models without a defined projection). The raw WKT string can be converted to a ``pyproj.CRS`` or a ``rasterio.crs.CRS`` object if needed:: import pyproj crs = pyproj.CRS.from_wkt(geom.projection) Returns ------- str or None """ raw = self._hdf.attrs.get("Projection") if raw is None: return None return raw.decode() if isinstance(raw, (bytes, np.bytes_)) else str(raw)
[docs] def geometry_summary(self) -> GeometrySummary: """Return metadata from the ``Geometry/`` group attributes. Reads the top-level attributes of the ``Geometry`` HDF group and returns a :class:`GeometrySummary` dataclass. The method is available on :class:`Geometry`, :class:`~rivia.hdf.SteadyPlan`, and :class:`~rivia.hdf.UnsteadyPlan` because both plan types inherit from :class:`Geometry`. Returns ------- GeometrySummary Raises ------ KeyError If the ``Geometry`` group is absent from the HDF file. Examples -------- :: with Geometry("MyModel.g01") as g: s = g.geometry_summary() print(s.title) print(s.extents) print(s.si_units) """ grp = self._hdf.get(_GEOM_ROOT) if grp is None: raise KeyError( f"'{_GEOM_ROOT}' group not found in {self._filename!r}." ) a = grp.attrs def _str(key: str) -> str: v = a[key] return v.decode() if isinstance(v, (bytes, np.bytes_)) else str(v) def _bool_opt(key: str, default: bool = False) -> bool: raw = a.get(key) if raw is None: logger.warning( "Geometry group attribute %r not found in %r; defaulting to %r", key, self._filename, default, ) return default v = raw.decode() if isinstance(raw, (bytes, np.bytes_)) else str(raw) return v.strip().lower() == "true" def _opt_str(key: str) -> str | None: v = a.get(key) if v is None: return None return v.decode() if isinstance(v, (bytes, np.bytes_)) else str(v) def _parse_ts(s: str) -> dt.datetime: return parse_hec_datetime(s.strip(), fmt=_RAS_TS_FMT) def _parse_ts_opt(s: str | None) -> dt.datetime | None: if not s: return None try: return _parse_ts(s) except ValueError: return None root = Path(self._filename).parent def _layer(prefix: str) -> LayerRef | None: raw = _opt_str(f"{prefix} Filename") if not raw: return None return LayerRef( filename=(root / raw).resolve(), layername=_opt_str(f"{prefix} Layername") or "", file_date=_parse_ts(_opt_str(f"{prefix} File Date") or ""), date_modified=_parse_ts_opt(_opt_str(f"{prefix} Date Last Modified")), ) raw_ext = a.get("Extents") extents: tuple[float, float, float, float] | None = None if raw_ext is not None: ext = np.asarray(raw_ext, dtype=float) extents = (float(ext[0]), float(ext[1]), float(ext[2]), float(ext[3])) terrain = _layer("Terrain") # Terrain Date Last Modified is not stored at the Geometry group level. # For models with 2D flow areas each flow area stores a "Terrain File Date" # equal to the terrain HDF mtime at the time that flow area was preprocessed. # Use the most recent of those as date_modified on the terrain LayerRef. if terrain is not None: fa_grp = self._hdf.get(_GEOM_2D_ROOT) if fa_grp is not None: fa_dates: list[dt.datetime] = [] for fa_name in fa_grp: fa = fa_grp[fa_name] if not hasattr(fa, "attrs"): continue raw_fa = fa.attrs.get("Terrain File Date") if raw_fa is not None: s = raw_fa.decode() if isinstance(raw_fa, (bytes, np.bytes_)) else str(raw_fa) parsed = _parse_ts_opt(s) if parsed is not None: fa_dates.append(parsed) if fa_dates: terrain.date_modified = max(fa_dates) return GeometrySummary( title=_str("Title"), version=_str("Version"), complete=_bool_opt("Complete Geometry"), si_units=_bool_opt("SI Units"), extents=extents, preprocessed_at=_parse_ts_opt(_str("Geometry Time")), terrain=terrain, land_cover=_layer("Land Cover"), infiltration=_layer("Infiltration"), pct_impervious=_layer("Percent Impervious"), )
[docs] def center_coordinates( self, location: tuple[str, str | None, str | None], ) -> np.ndarray: """Return the centre coordinates for a cross section, storage area, or 2-D flow area cell. The *location* tuple is interpreted differently depending on which element type is identified: * **Cross section** — ``(river, reach, rs)`` Both *river* and *reach* must be non-empty. Returns the intersection of the XS cut line with the river centreline polyline (i.e. :attr:`CrossSection.centerline_coordinates <rivia.hdf.geometry.CrossSection.centerline_coordinates>`). * **Storage area** — ``(name, "", "")`` or ``(name, None, None)`` *reach* is empty or ``None``. Returns the area-weighted centroid of the boundary polygon (i.e. :attr:`StorageArea.centroid`). * **2-D flow area cell** — ``(area_name, "", cell_index)`` or ``(area_name, None, cell_index)`` *reach* is empty or ``None``; *cell_index* is the integer cell number as a string. Returns the cell centre coordinate from :attr:`FlowArea.cell_centers`. Parameters ---------- location: Three-element tuple ``(a, b, c)`` described above. Returns ------- ndarray, shape ``(2,)`` ``[x, y]`` of the requested centre point. Raises ------ KeyError If the named cross section, storage area, or flow area is not found in the HDF. ValueError If the cross section has no centreline match (its ``centerline_coordinates`` is ``None``), or *cell_index* is not a valid integer or is out of range for the flow area. Examples -------- :: with Geometry("MyModel.g01") as g: # 1-D cross section xy = g.center_coordinates(("Bald Eagle Cr.", "Lock Haven", "137520")) # Storage area xy = g.center_coordinates(("Reservoir", "", "")) # 2-D flow area cell 42 xy = g.center_coordinates(("Perimeter 1", "", "42")) """ a, b, c = location if b: # --- Cross section (river, reach, rs) --- xs = self.cross_sections[(a, b, c)] coords = xs.centerline_coordinates if coords is None: raise ValueError( f"Cross section ({a!r}, {b!r}, {c!r}) has no river " "centreline match; centerline_coordinates is None." ) return coords if not c: # --- Storage area (name, "", "") --- return self.storage_areas[a].centroid # --- 2-D flow area cell (area_name, "", cell_index) --- try: idx = int(c) except ValueError: raise ValueError( f"Cell index {c!r} is not a valid integer." ) from None centers = self.flow_areas[a].cell_centers n = len(centers) if idx < 0 or idx >= n: raise ValueError( f"Cell index {idx} is out of range for flow area {a!r} " f"(n_cells={n})." ) return centers[idx]