Source code for rivia.model.unsteady_flow

"""Read/write HEC-RAS unsteady flow files (.u**).

:class:`UnsteadyFlow` — structured editor.  Boundary conditions are
parsed into typed dataclass objects and may be sorted by river station.
``save()`` reconstructs the boundary section from the objects; trailing
meteorological / non-Newtonian lines are still written verbatim.

Convention
----------
``get_*`` methods return ``None`` when the requested item is not found.
``set_*`` methods raise :exc:`KeyError` when the target does not exist.
"""

from __future__ import annotations

import datetime as dt
import logging
from dataclasses import dataclass, field
from math import ceil
from pathlib import Path
from typing import Literal

from rivia.utils import parse_hec_datetime, parse_interval

logger = logging.getLogger("rivia.model")


def _parse_window(
    use_fixed_start: bool,
    fixed_start: str,
    interval: str,
    n_values: int,
) -> tuple[dt.datetime, dt.datetime] | None:
    """Return ``(start, end)`` datetimes for a boundary with a fixed start.

    Returns ``None`` when *use_fixed_start* is ``False``.

    Parameters
    ----------
    use_fixed_start:
        Mirror of the boundary's ``use_fixed_start`` flag.
    fixed_start:
        ``"DDMONYYYY,HHMM"`` or ``"DDMONYYYY,HHMMSS"`` string stored on the
        boundary (e.g. ``"01JAN2020,0600"``).
    interval:
        HEC-RAS interval string (e.g. ``"5MIN"``); parsed by
        :func:`rivia.utils.parse_interval`.
    n_values:
        Number of time-series values; determines the end date.
    """
    if not use_fixed_start:
        return None
    start = parse_hec_datetime(fixed_start)
    if n_values == 0:
        return start, start
    end = start + (n_values - 1) * parse_interval(interval)
    return start, end

# Scalar or sequence accepted by all set_* methods.
# A bare float/int is broadcast to fill the current time-series length.
_Values = list[float | int] | float | int


def _coerce_values(values: _Values, count: int) -> list[float]:
    """Return *values* as a list of floats of length *count*.

    If *values* is a scalar it is broadcast to *count* elements.  If *values*
    is already a sequence its length is used as-is (``count`` is ignored).
    """
    if isinstance(values, (int, float)):
        return [float(values)] * count
    return [float(v) for v in values]


# ---------------------------------------------------------------------------
# Formatting helpers (shared)
# ---------------------------------------------------------------------------

_COL_WIDTH = 8
_COLS_PER_ROW = 10


def _fit_width(value: float, width: int = _COL_WIDTH) -> str:
    """Right-justify *value* inside *width* characters.

    Tries integer, then progressively fewer decimal places, then scientific
    notation.  Truncates as last resort.
    """
    # Integer shortcut
    if isinstance(value, int) or (
        isinstance(value, float)
        and value == int(value)
        and len(str(int(value))) <= width
    ):
        s = str(int(value))
        if len(s) <= width:
            return s.rjust(width)

    s = repr(value)
    if len(s) <= width:
        return s.rjust(width)

    fv = float(value)
    for decimals in range(6, -1, -1):
        s = f"{fv:.{decimals}f}"
        if len(s) <= width:
            return s.rjust(width)

    for decimals in range(2, -1, -1):
        s = f"{fv:.{decimals}E}"
        if len(s) <= width:
            return s.rjust(width)

    return repr(value)[:width]


def _format_data_block(
    values: list[float], cols: int = _COLS_PER_ROW, width: int = _COL_WIDTH
) -> list[str]:
    """Return a list of fixed-width data lines (no trailing newline)."""
    lines: list[str] = []
    for i in range(0, len(values), cols):
        chunk = values[i : i + cols]
        lines.append("".join(_fit_width(v, width) for v in chunk))
    return lines


def _parse_data_block(
    lines: list[str], count: int, width: int = _COL_WIDTH
) -> list[float]:
    """Parse *count* fixed-width values from *lines*."""
    values: list[float] = []
    for line in lines:
        pos = 0
        while pos < len(line) and len(values) < count:
            token = line[pos : pos + width].strip()
            if token:
                try:
                    values.append(float(token))
                except ValueError:
                    values.append(0.0)
            pos += width
    return values[:count]


def _data_line_count(n: int, cols: int = _COLS_PER_ROW) -> int:
    """Number of data lines needed for *n* values at *cols* per line."""
    return ceil(n / cols) if n > 0 else 0


# ---------------------------------------------------------------------------
# Shared boundary dataclasses
# ---------------------------------------------------------------------------


[docs] @dataclass class InitialFlowLoc: """Initial flow at a river / reach / station.""" river: str reach: str river_station: str flow: float @classmethod def _from_raw(cls, raw: str) -> "InitialFlowLoc": parts = raw.split(",") return cls( river=parts[0].strip() if len(parts) > 0 else "", reach=parts[1].strip() if len(parts) > 1 else "", river_station=parts[2].strip() if len(parts) > 2 else "", flow=float(parts[3].strip()) if len(parts) > 3 else 0.0, ) def _to_raw(self) -> str: return f"{self.river:16},{self.reach:16},{self.river_station:8},{self.flow}"
[docs] @dataclass class InitialStorageElev: """Initial water surface elevation for a storage area.""" name: str elevation: float @classmethod def _from_raw(cls, raw: str) -> "InitialStorageElev": parts = raw.split(",") return cls( name=parts[0].strip() if parts else "", elevation=float(parts[1].strip()) if len(parts) > 1 else 0.0, ) def _to_raw(self) -> str: return f"{self.name},{self.elevation}"
[docs] @dataclass class InitialRainfallRunoffElev: """Initial water surface elevation for a reservoir / RRR.""" river: str reach: str river_station: str elevation: float @classmethod def _from_raw(cls, raw: str) -> "InitialRainfallRunoffElev": parts = raw.split(",") return cls( river=parts[0].strip() if len(parts) > 0 else "", reach=parts[1].strip() if len(parts) > 1 else "", river_station=parts[2].strip() if len(parts) > 2 else "", elevation=float(parts[3].strip()) if len(parts) > 3 else 0.0, ) def _to_raw(self) -> str: return ( f"{self.river:16},{self.reach:16},{self.river_station:8},{self.elevation}" )
# ---- boundary base --------------------------------------------------------- @dataclass class _Boundary: """Base class for all boundary condition types.""" river: str reach: str river_station: str # Raw comma-separated tail of the Boundary Location= line after the first # three fields (preserved verbatim for roundtrip fidelity). _location_tail: str = field(default="", repr=False) def _location_line(self) -> str: return ( f"Boundary Location={self.river:16},{self.reach:16}," f"{self.river_station:8},{self._location_tail}" ) def location( self, *, rs_float: bool = False ) -> tuple[str, str, str] | tuple[str, str, float]: """Return ``(river, reach, river_station)``. Parameters ---------- rs_float: If ``True``, river_station is returned as ``float`` (strips trailing ``'*'``); otherwise as ``str`` (default). """ if rs_float: return (self.river, self.reach, self._rs_float()) return (self.river, self.reach, self.river_station) def _rs_float(self) -> float: """River station as float for sorting (strips trailing '*').""" try: return float(self.river_station.rstrip("*").strip()) except ValueError: return float("-inf")
[docs] @dataclass class FlowHydrograph(_Boundary): """Upstream / internal flow hydrograph boundary.""" interval: str = "1HOUR" values: list[float] = field(default_factory=list) flow_hydrograph_slope: str | None = None stage_tw_check: int = 0 dss_file: str = "" dss_path: str = "" use_dss: bool = False use_fixed_start: bool = False fixed_start: str = "," is_critical: bool = False critical_boundary_flow: str = "" # Extra lines between the standard fields and next Boundary Location # that we don't model explicitly (e.g. CWMS InputPosition). _extra_lines: list[str] = field(default_factory=list, repr=False) @property def window(self) -> tuple[dt.datetime, dt.datetime] | None: """Return ``(start, end)`` as Python datetimes, or ``None``. ``None`` is returned when ``use_fixed_start`` is ``False``. The end date is ``start + len(values) * parse_interval(interval)``. """ return _parse_window( self.use_fixed_start, self.fixed_start, self.interval, len(self.values) )
[docs] @dataclass class LateralInflow(_Boundary): """Lateral or uniform lateral inflow hydrograph.""" interval: str = "1HOUR" values: list[float] = field(default_factory=list) is_uniform: bool = False dss_file: str = "" dss_path: str = "" use_dss: bool = False use_fixed_start: bool = False fixed_start: str = "," is_critical: bool = False critical_boundary_flow: str = "" _extra_lines: list[str] = field(default_factory=list, repr=False) @property def window(self) -> tuple[dt.datetime, dt.datetime] | None: """Return ``(start, end)`` as Python datetimes, or ``None``. ``None`` is returned when ``use_fixed_start`` is ``False``. The end date is ``start + len(values) * parse_interval(interval)``. """ return _parse_window( self.use_fixed_start, self.fixed_start, self.interval, len(self.values) )
[docs] @dataclass class StageHydrograph(_Boundary): """Stage (water-surface) hydrograph boundary.""" interval: str = "1HOUR" values: list[float] = field(default_factory=list) dss_path: str = "" use_dss: bool = False use_fixed_start: bool = False fixed_start: str = "," _extra_lines: list[str] = field(default_factory=list, repr=False) @property def window(self) -> tuple[dt.datetime, dt.datetime] | None: """Return ``(start, end)`` as Python datetimes, or ``None``. ``None`` is returned when ``use_fixed_start`` is ``False``. The end date is ``start + len(values) * parse_interval(interval)``. """ return _parse_window( self.use_fixed_start, self.fixed_start, self.interval, len(self.values) )
[docs] @dataclass class RatingCurve(_Boundary): """Rating-curve downstream boundary.""" pairs: list[tuple[float, float]] = field(default_factory=list) dss_path: str = "" use_dss: bool = False use_fixed_start: bool = False fixed_start: str = "," is_critical: bool = False critical_boundary_flow: str = "" _extra_lines: list[str] = field(default_factory=list, repr=False)
[docs] @dataclass class FrictionSlope(_Boundary): """Normal-depth (friction slope) downstream boundary.""" slope: float = 0.0 value2: float = 0.0
[docs] @dataclass class NormalDepth(_Boundary): """Normal-depth boundary specified as a single slope value.""" slope: float = 0.0
[docs] @dataclass class GateOpening: """Time series of openings for one gate.""" gate_name: str = "" dss_path: str = "" use_dss: bool = False time_interval: str = "1HOUR" use_fixed_start: bool = False fixed_start: str = "," values: list[float] = field(default_factory=list)
[docs] @dataclass class GateBoundary(_Boundary): """Inline structure with one or more gated openings.""" gates: list[GateOpening] = field(default_factory=list)
# Type alias for the flat boundary list BoundaryType = ( FlowHydrograph | LateralInflow | StageHydrograph | RatingCurve | FrictionSlope | NormalDepth | GateBoundary ) # --------------------------------------------------------------------------- # Boundary parser (shared by both classes) # --------------------------------------------------------------------------- def _parse_boundary_blocks(lines: list[str]) -> list[BoundaryType]: """Parse all boundary condition blocks from *lines*. *lines* should be the slice of the file that contains boundary blocks (i.e. from the first ``Boundary Location=`` line to the start of the trailing Met / Non-Newtonian section). """ boundaries: list[BoundaryType] = [] n = len(lines) i = 0 def _next_key(idx: int) -> tuple[str, str]: """Return (key, value) for lines[idx], splitting on first '='.""" raw = lines[idx] eq = raw.find("=") if eq == -1: return raw.rstrip("\n"), "" return raw[:eq].rstrip(), raw[eq + 1 :].rstrip("\n") while i < n: key, val = _next_key(i) if key != "Boundary Location": i += 1 continue # Parse location fields parts = val.split(",", 3) river = parts[0].strip() if len(parts) > 0 else "" reach = parts[1].strip() if len(parts) > 1 else "" rs = parts[2].strip() if len(parts) > 2 else "" tail = parts[3] if len(parts) > 3 else "" i += 1 # Peek ahead to determine BC type bc: BoundaryType | None = None while i < n: key2, val2 = _next_key(i) # ---- stop conditions (next Boundary Location or trailing section) if key2 == "Boundary Location": break if _is_trailing_key(key2): break # ---- Flow hydrograph if key2 == "Interval": interval = val2.strip() i += 1 if i >= n: break key3, val3 = _next_key(i) if key3 in ("Flow Hydrograph",): count = int(val3.strip()) i += 1 nlines = _data_line_count(count) data_lines = lines[i : i + nlines] i += nlines values = _parse_data_block(data_lines, count) bc = FlowHydrograph( river=river, reach=reach, river_station=rs, _location_tail=tail, interval=interval, values=values, ) # Consume metadata while i < n: k, v = _next_key(i) if k in ("Boundary Location",) or _is_trailing_key(k): break if k == "Flow Hydrograph Slope": bc.flow_hydrograph_slope = v.strip() elif k == "Stage Hydrograph TW Check": bc.stage_tw_check = int(v.strip()) elif k == "DSS File": bc.dss_file = v.strip() elif k == "DSS Path": bc.dss_path = v.strip() elif k == "Use DSS": bc.use_dss = v.strip().lower() == "true" elif k == "Use Fixed Start Time": bc.use_fixed_start = v.strip().lower() == "true" elif k == "Fixed Start Date/Time": bc.fixed_start = v.strip() elif k == "Is Critical Boundary": bc.is_critical = v.strip().lower() == "true" elif k == "Critical Boundary Flow": bc.critical_boundary_flow = v.strip() i += 1 break else: bc._extra_lines.append(lines[i]) i += 1 break elif key3 in ( "Lateral Inflow Hydrograph", "Uniform Lateral Inflow Hydrograph", ): count = int(val3.strip()) i += 1 nlines = _data_line_count(count) data_lines = lines[i : i + nlines] i += nlines values = _parse_data_block(data_lines, count) bc = LateralInflow( river=river, reach=reach, river_station=rs, _location_tail=tail, interval=interval, values=values, is_uniform=(key3 == "Uniform Lateral Inflow Hydrograph"), ) while i < n: k, v = _next_key(i) if k in ("Boundary Location",) or _is_trailing_key(k): break if k == "DSS File": bc.dss_file = v.strip() elif k == "DSS Path": bc.dss_path = v.strip() elif k == "Use DSS": bc.use_dss = v.strip().lower() == "true" elif k == "Use Fixed Start Time": bc.use_fixed_start = v.strip().lower() == "true" elif k == "Fixed Start Date/Time": bc.fixed_start = v.strip() elif k == "Is Critical Boundary": bc.is_critical = v.strip().lower() == "true" elif k == "Critical Boundary Flow": bc.critical_boundary_flow = v.strip() i += 1 break else: bc._extra_lines.append(lines[i]) i += 1 break elif key3 == "Stage Hydrograph": count = int(val3.strip()) i += 1 nlines = _data_line_count(count) data_lines = lines[i : i + nlines] i += nlines values = _parse_data_block(data_lines, count) bc = StageHydrograph( river=river, reach=reach, river_station=rs, _location_tail=tail, interval=interval, values=values, ) while i < n: k, v = _next_key(i) if k in ("Boundary Location",) or _is_trailing_key(k): break if k == "DSS Path": bc.dss_path = v.strip() elif k == "Use DSS": bc.use_dss = v.strip().lower() == "true" elif k == "Use Fixed Start Time": bc.use_fixed_start = v.strip().lower() == "true" elif k == "Fixed Start Date/Time": bc.fixed_start = v.strip() else: bc._extra_lines.append(lines[i]) i += 1 break else: # Unknown type after Interval= — skip i += 1 continue # ---- Rating curve elif key2 == "Rating Curve": count = int(val2.strip()) i += 1 # Rating curve data: pairs of (elev, flow), 10 values per row # so ceil(count*2 / 10) lines nlines = _data_line_count(count * 2) data_lines = lines[i : i + nlines] i += nlines flat = _parse_data_block(data_lines, count * 2) pairs = [(flat[j], flat[j + 1]) for j in range(0, len(flat), 2)] bc = RatingCurve( river=river, reach=reach, river_station=rs, _location_tail=tail, pairs=pairs, ) while i < n: k, v = _next_key(i) if k in ("Boundary Location",) or _is_trailing_key(k): break if k == "DSS Path": bc.dss_path = v.strip() elif k == "Use DSS": bc.use_dss = v.strip().lower() == "true" elif k == "Use Fixed Start Time": bc.use_fixed_start = v.strip().lower() == "true" elif k == "Fixed Start Date/Time": bc.fixed_start = v.strip() elif k == "Is Critical Boundary": bc.is_critical = v.strip().lower() == "true" elif k == "Critical Boundary Flow": bc.critical_boundary_flow = v.strip() i += 1 break else: bc._extra_lines.append(lines[i]) i += 1 break # ---- Friction slope elif key2 == "Friction Slope": parts2 = val2.split(",") slope = float(parts2[0].strip()) if parts2 else 0.0 v2 = float(parts2[1].strip()) if len(parts2) > 1 else 0.0 bc = FrictionSlope( river=river, reach=reach, river_station=rs, _location_tail=tail, slope=slope, value2=v2, ) i += 1 break # ---- Normal depth elif key2 == "Normal Depth": bc = NormalDepth( river=river, reach=reach, river_station=rs, _location_tail=tail, slope=float(val2.strip()), ) i += 1 break # ---- Gate boundary (inline structure) elif key2 == "Gate Name": bc = GateBoundary( river=river, reach=reach, river_station=rs, _location_tail=tail ) while i < n: k, v = _next_key(i) if k in ("Boundary Location",) or _is_trailing_key(k): break if k == "Gate Name": bc.gates.append(GateOpening(gate_name=v.strip())) i += 1 elif k == "Gate DSS Path": if bc.gates: bc.gates[-1].dss_path = v.strip() i += 1 elif k == "Gate Use DSS": if bc.gates: bc.gates[-1].use_dss = v.strip().lower() == "true" i += 1 elif k == "Gate Time Interval": if bc.gates: bc.gates[-1].time_interval = v.strip() i += 1 elif k == "Gate Use Fixed Start Time": if bc.gates: bc.gates[-1].use_fixed_start = v.strip().lower() == "true" i += 1 elif k == "Gate Fixed Start Date/Time": if bc.gates: bc.gates[-1].fixed_start = v.strip() i += 1 elif k == "Gate Openings": count = int(v.strip()) i += 1 nlines = _data_line_count(count) data_lines = lines[i : i + nlines] i += nlines if bc.gates: bc.gates[-1].values = _parse_data_block(data_lines, count) else: i += 1 break else: # Unknown line within a boundary block — skip i += 1 if bc is not None: boundaries.append(bc) return boundaries def _is_trailing_key(key: str) -> bool: """Return True for keys that mark the start of the trailing section.""" return ( key.startswith("Met ") or key.startswith("Met BC") or key in ( "Met Point Raster Parameters", "Non-Newtonian Method", "Non-Newtonian Constant Vol Conc", "Precipitation Mode", "Wind Mode", "Air Density Mode", "Lava Activation", ) ) # --------------------------------------------------------------------------- # UnsteadyFlow — structured, sortable editor # ---------------------------------------------------------------------------
[docs] class UnsteadyFlow: """Structured editor for HEC-RAS unsteady flow files (.u**). Boundary conditions are parsed into typed dataclass objects stored in :attr:`boundaries`. Boundaries may be sorted by river station (useful for workflows that address gates or lateral inflows by index). ``save()`` reconstructs the boundary section from the objects; the header, initial conditions, and trailing meteorological / Non-Newtonian lines are preserved verbatim. .. note:: ``save()`` is **not** byte-identical to the original when boundaries are reordered or values are changed, because the file is reconstructed from parsed objects — the file is reconstructed from parsed data. """ def __init__(self, path: str | Path) -> None: self._path = Path(path) if not self._path.is_file(): raise FileNotFoundError(f"Unsteady flow file not found: {self._path}") with open(self._path, encoding="utf-8", errors="replace") as fh: self._all_lines: list[str] = fh.readlines() self._parse() self._modified: bool = False # ------------------------------------------------------------------ # Parsing # ------------------------------------------------------------------ def _parse(self) -> None: lines = self._all_lines # Split file into: # 1. header lines (before first Initial/Boundary) # 2. initial condition lines # 3. boundary lines # 4. trailing lines (Met BC, Non-Newtonian, etc.) self._header_lines: list[str] = [] self._initial_lines: list[str] = [] self._trailing_lines: list[str] = [] boundary_lines: list[str] = [] section: Literal["header", "initial", "boundary", "trailing"] = "header" for line in lines: key = line.split("=", 1)[0].rstrip() if "=" in line else line.rstrip("\n") if section == "header": if key in ( "Initial Flow Loc", "Initial Storage Elev", "Initial RRR Elev", ): section = "initial" self._initial_lines.append(line) elif key == "Boundary Location": section = "boundary" boundary_lines.append(line) else: self._header_lines.append(line) elif section == "initial": if key == "Boundary Location": section = "boundary" boundary_lines.append(line) elif key not in ( "Initial Flow Loc", "Initial Storage Elev", "Initial RRR Elev", ): # Non-initial, non-boundary line — stays in initial block # (e.g. blank lines or unknown keys between initial conds) self._initial_lines.append(line) else: self._initial_lines.append(line) elif section == "boundary": if _is_trailing_key(key): section = "trailing" self._trailing_lines.append(line) else: boundary_lines.append(line) else: # trailing self._trailing_lines.append(line) # Parse initial conditions into typed objects self.initial_flow_locs: list[InitialFlowLoc] = [] self.initial_storage_elevs: list[InitialStorageElev] = [] self.initial_rainfall_runoff_elevs: list[InitialRainfallRunoffElev] = [] for line in self._initial_lines: if line.startswith("Initial Flow Loc="): raw = line[len("Initial Flow Loc=") :].strip() self.initial_flow_locs.append(InitialFlowLoc._from_raw(raw)) elif line.startswith("Initial Storage Elev="): raw = line[len("Initial Storage Elev=") :].strip() self.initial_storage_elevs.append(InitialStorageElev._from_raw(raw)) elif line.startswith("Initial RRR Elev="): raw = line[len("Initial RRR Elev=") :].strip() self.initial_rainfall_runoff_elevs.append(InitialRainfallRunoffElev._from_raw(raw)) # Parse boundaries boundary_lines_stripped = [l.rstrip("\n") for l in boundary_lines] self.boundaries: list[BoundaryType] = _parse_boundary_blocks( boundary_lines_stripped ) # ------------------------------------------------------------------ # Modification state # ------------------------------------------------------------------ @property def is_modified(self) -> bool: """``True`` if any value has been changed since the last :meth:`save`.""" return self._modified # ------------------------------------------------------------------ # Scalar properties (read from header_lines, write back in-place) # ------------------------------------------------------------------ def _header_get(self, key: str) -> str | None: prefix = key + "=" for line in self._header_lines: if line.startswith(prefix): val = line[len(prefix) :].strip() return val if val else None return None def _header_set(self, key: str, raw_value: str) -> None: prefix = key + "=" for i, line in enumerate(self._header_lines): if line.startswith(prefix): self._header_lines[i] = f"{prefix}{raw_value}\n" self._modified = True return raise KeyError(f"Key not found in header: {key!r}") @property def flow_title(self) -> str | None: """Flow file title (``Flow Title=``), or ``None`` if absent.""" return self._header_get("Flow Title") @flow_title.setter def flow_title(self, value: str) -> None: self._header_set("Flow Title", value) @property def program_version(self) -> str | None: """HEC-RAS version string (``Program Version=``), or ``None`` if absent.""" return self._header_get("Program Version") @property def write_ic_file(self) -> bool | None: """Whether to write an initial conditions file at the end of simulation. Returns ``True`` if ``Write IC File at Sim End=-1``, ``False`` if ``0``, or ``None`` if the key is absent (older files). """ raw = self._header_get("Write IC File at Sim End") if raw is None: return None return int(raw) == -1 @write_ic_file.setter def write_ic_file(self, value: bool) -> None: self._header_set("Write IC File at Sim End", "-1" if value else "0") def _header_set_restart_filename(self, filename: str) -> None: prefix = "Restart Filename=" for i, line in enumerate(self._header_lines): if line.startswith(prefix): self._header_lines[i] = f"{prefix}{filename}\n" self._modified = True return ur_prefix = "Use Restart=" for i, line in enumerate(self._header_lines): if line.startswith(ur_prefix): self._header_lines.insert(i + 1, f"{prefix}{filename}\n") self._modified = True return raise KeyError( "'Use Restart' not found in header; cannot insert 'Restart Filename'" ) @property def restart(self) -> tuple[int, str | None]: """Return ``(flag, filename)`` for the restart configuration. *flag* is the ``Use Restart`` value (``0`` = disabled, ``1`` = enabled). *filename* is the ``Restart Filename`` value, or ``None`` if absent. """ raw = self._header_get("Use Restart") flag = int(raw.strip()) if raw is not None else 0 filename = self._header_get("Restart Filename") return (flag, filename) @restart.setter def restart(self, value: int | bool | str | None) -> None: if value is None or value is False: self._header_set("Use Restart", " 0 ") elif value is True: self._header_set("Use Restart", " 1 ") elif isinstance(value, str): self._header_set_restart_filename(value) self._header_set("Use Restart", " 1 ") else: self._header_set("Use Restart", " 1 " if value else " 0 ") # ------------------------------------------------------------------ # Typed boundary views # ------------------------------------------------------------------ @property def flow_hydrographs(self) -> list[FlowHydrograph]: """All :class:`FlowHydrograph` boundaries, in file order.""" return [b for b in self.boundaries if isinstance(b, FlowHydrograph)] @property def lateral_inflows(self) -> list[LateralInflow]: """All :class:`LateralInflow` boundaries, in file order.""" return [b for b in self.boundaries if isinstance(b, LateralInflow)] @property def gate_boundaries(self) -> list[GateBoundary]: """All :class:`GateBoundary` boundaries, in file order.""" return [b for b in self.boundaries if isinstance(b, GateBoundary)] @property def friction_slopes(self) -> list[FrictionSlope]: """All :class:`FrictionSlope` boundaries, in file order.""" return [b for b in self.boundaries if isinstance(b, FrictionSlope)] # ------------------------------------------------------------------ # Sorting # ------------------------------------------------------------------ def _sort_type(self, bc_type: type, *, descending: bool) -> None: """Sort boundaries of *bc_type* by river station within each (river, reach) group, preserving both group order and the positions of all other types.""" targets = [ (i, b) for i, b in enumerate(self.boundaries) if isinstance(b, bc_type) ] # Group in first-appearance order (dict preserves insertion order, Python 3.7+) groups: dict[tuple[str, str], list[tuple[int, object]]] = {} for item in targets: key = (item[1].river, item[1].reach) if key not in groups: groups[key] = [] groups[key].append(item) # Sort by RS within each group, then flatten preserving group order sorted_targets = [] for group in groups.values(): sorted_targets.extend( sorted(group, key=lambda t: t[1]._rs_float(), reverse=descending) ) for (orig_idx, _), (_, sorted_bc) in zip(targets, sorted_targets): self.boundaries[orig_idx] = sorted_bc
[docs] def sort_flow_hydrographs(self, *, descending: bool = True) -> None: """Sort :class:`FlowHydrograph` entries by river station. Parameters ---------- descending: If ``True`` (default), highest station first (upstream → downstream for standard RAS numbering). Pass ``False`` for ascending order (lowest station first). """ self._sort_type(FlowHydrograph, descending=descending)
[docs] def sort_gate_boundaries(self, *, descending: bool = True) -> None: """Sort :class:`GateBoundary` entries by river station. Other boundary types remain at their original positions. Parameters ---------- descending: If ``True`` (default), highest station first (upstream → downstream for standard RAS numbering). Pass ``False`` for ascending order (lowest station first). """ self._sort_type(GateBoundary, descending=descending)
[docs] def sort_lateral_inflows(self, *, descending: bool = True) -> None: """Sort :class:`LateralInflow` entries by river station. Parameters ---------- descending: If ``True`` (default), highest station first (upstream → downstream for standard RAS numbering). Pass ``False`` for ascending order (lowest station first). """ self._sort_type(LateralInflow, descending=descending)
# ------------------------------------------------------------------ # Set by index (works naturally after sorting) # ------------------------------------------------------------------
[docs] def set_flow_hydrograph(self, index: int, values: _Values) -> None: """Set flow hydrograph values by position in :attr:`flow_hydrographs`. Parameters ---------- index: Position in the filtered flow-hydrograph list. values: New flow values. A scalar is broadcast to the length of the existing time series. """ bc = self.flow_hydrographs[index] bc.values = _coerce_values(values, len(bc.values)) self._modified = True
[docs] def set_lateral_inflow(self, index: int, values: _Values) -> None: """Set lateral inflow values by position in :attr:`lateral_inflows`. Parameters ---------- values: New flow values. A scalar is broadcast to the length of the existing time series. """ bc = self.lateral_inflows[index] bc.values = _coerce_values(values, len(bc.values)) self._modified = True
[docs] def set_all_lateral_inflows(self, values: list[float | list[float]]) -> None: """Set lateral inflow values across all :class:`LateralInflow` boundaries. Parameters ---------- values: One entry per lateral inflow (in file order). Each entry is either a scalar ``float`` (broadcast to the boundary's existing time-series length) or a ``list[float]`` (used as-is). If ``values`` is shorter than the total number of lateral inflows, the remaining boundaries are left unchanged. """ for bc, v in zip(self.lateral_inflows, values, strict=False): bc.values = _coerce_values(v, len(bc.values)) self._modified = True
[docs] def set_gate_opening( self, index: int, values: _Values, gate_index: int = 0 ) -> None: """Set gate opening values by position in :attr:`gate_boundaries`. Parameters ---------- index: Position in the filtered gate-boundary list. values: New opening values. A scalar is broadcast to the length of the existing gate opening time series. gate_index: Which gate within the boundary (default 0). """ gate = self.gate_boundaries[index].gates[gate_index] gate.values = _coerce_values(values, len(gate.values)) self._modified = True
[docs] def set_all_gate_openings(self, values: list[float | list[float]]) -> None: """Set gate opening values across all gates in all :class:`GateBoundary`. Parameters ---------- values: One entry per gate (in order across all boundaries). Each entry is either a scalar ``float`` (broadcast to the gate's existing time-series length) or a ``list[float]`` (used as-is). If ``values`` is shorter than the total number of gates, the remaining gates are left unchanged. """ all_gates = [gate for gb in self.gate_boundaries for gate in gb.gates] for gate, v in zip(all_gates, values, strict=False): gate.values = _coerce_values(v, len(gate.values)) self._modified = True
# ------------------------------------------------------------------ # Set by location (river / reach / rs) # ------------------------------------------------------------------ def _find_boundary(self, river: str, reach: str, rs: str) -> BoundaryType | None: r = river.strip().lower() rc = reach.strip().lower() s = str(rs).strip().lower() for b in self.boundaries: if ( b.river.lower() == r and b.reach.lower() == rc and b.river_station.lower() == s ): return b return None
[docs] def set_flow_hydrograph_at( self, river: str, reach: str, rs: str, values: _Values ) -> None: """Set flow hydrograph values by location. Parameters ---------- values: A scalar is broadcast to the existing time-series length. """ b = self._find_boundary(river, reach, rs) if not isinstance(b, FlowHydrograph): raise KeyError(f"No FlowHydrograph at {river!r}, {reach!r}, {rs!r}") b.values = _coerce_values(values, len(b.values)) self._modified = True
[docs] def set_lateral_inflow_at( self, river: str, reach: str, rs: str, values: _Values ) -> None: """Set lateral inflow values by location. Parameters ---------- values: A scalar is broadcast to the existing time-series length. """ b = self._find_boundary(river, reach, rs) if not isinstance(b, LateralInflow): raise KeyError(f"No LateralInflow at {river!r}, {reach!r}, {rs!r}") b.values = _coerce_values(values, len(b.values)) self._modified = True
[docs] def set_gate_opening_at( self, river: str, reach: str, rs: str, gate: str | int, values: _Values ) -> None: """Set gate opening values by location and gate name or index. Parameters ---------- gate: Gate name string, or a zero-based integer index into the boundary's gate list. values: A scalar is broadcast to the existing time-series length. """ b = self._find_boundary(river, reach, rs) if not isinstance(b, GateBoundary): raise KeyError(f"No GateBoundary at {river!r}, {reach!r}, {rs!r}") if isinstance(gate, int): try: g = b.gates[gate] except IndexError as exc: raise IndexError( f"Gate index {gate} out of range; " f"{len(b.gates)} gate(s) at {river!r}, {reach!r}, {rs!r}" ) from exc g.values = _coerce_values(values, len(g.values)) self._modified = True return gn = gate.strip().lower() for g in b.gates: if g.gate_name.strip().lower() == gn: g.values = _coerce_values(values, len(g.values)) self._modified = True return raise KeyError(f"Gate {gate!r} not found at {river!r}, {reach!r}, {rs!r}")
# ------------------------------------------------------------------ # Initial conditions # ------------------------------------------------------------------
[docs] def set_initial_flow(self, index: int, flow: float) -> None: """Update the initial flow at *index* in :attr:`initial_flow_locs`. Parameters ---------- index: Zero-based position in ``initial_flow_locs``. flow: New flow value. Raises ------ IndexError *index* is out of range. """ self.initial_flow_locs[index].flow = flow self._modified = True
[docs] def set_initial_flow_at(self, river: str, reach: str, rs: str, flow: float) -> None: """Update the initial flow at the given location. Parameters ---------- river: River name (case-insensitive match). reach: Reach name (case-insensitive match). rs: River station string. flow: New flow value. Raises ------ KeyError No matching ``Initial Flow Loc`` entry found. """ r = river.strip().lower() rc = reach.strip().lower() s = str(rs).strip().lower() for loc in self.initial_flow_locs: if ( loc.river.lower() == r and loc.reach.lower() == rc and loc.river_station.lower() == s ): loc.flow = flow self._modified = True return raise KeyError(f"Initial Flow Loc not found for {river!r}, {reach!r}, {rs!r}")
# ------------------------------------------------------------------ # Get by location # ------------------------------------------------------------------
[docs] def get_flow_hydrograph( self, river: str, reach: str, rs: str ) -> list[float] | None: """Return flow hydrograph values for the given location, or ``None``.""" b = self._find_boundary(river, reach, rs) if not isinstance(b, FlowHydrograph): return None return list(b.values)
[docs] def get_lateral_inflow( self, river: str, reach: str, rs: str ) -> list[float] | None: """Return lateral inflow values for the given location, or ``None``.""" b = self._find_boundary(river, reach, rs) if not isinstance(b, LateralInflow): return None return list(b.values)
[docs] def get_gate_openings( self, river: str, reach: str, rs: str, gate_name: str ) -> list[float] | None: """Return gate opening values for the given location and gate name, or ``None``.""" b = self._find_boundary(river, reach, rs) if not isinstance(b, GateBoundary): return None gn = gate_name.strip().lower() for g in b.gates: if g.gate_name.strip().lower() == gn: return list(g.values) return None
[docs] def get_initial_flow(self, river: str, reach: str, rs: str) -> float | None: """Return the initial flow at the given location, or ``None``.""" r = river.strip().lower() rc = reach.strip().lower() s = str(rs).strip().lower() for loc in self.initial_flow_locs: if ( loc.river.lower() == r and loc.reach.lower() == rc and loc.river_station.lower() == s ): return loc.flow return None
# ------------------------------------------------------------------ # Serialisation helpers # ------------------------------------------------------------------ def _boundary_to_lines(self, bc: BoundaryType) -> list[str]: """Serialise a single boundary object to a list of text lines.""" out: list[str] = [bc._location_line() + "\n"] if isinstance(bc, (FlowHydrograph, LateralInflow)): out.append(f"Interval={bc.interval}\n") if isinstance(bc, FlowHydrograph): keyword = "Flow Hydrograph" else: keyword = ( "Uniform Lateral Inflow Hydrograph" if bc.is_uniform else "Lateral Inflow Hydrograph" ) count = len(bc.values) out.append(f"{keyword}= {count} \n") for dl in _format_data_block(bc.values): out.append(dl + "\n") if isinstance(bc, FlowHydrograph): out.append(f"Stage Hydrograph TW Check={bc.stage_tw_check}\n") if bc.flow_hydrograph_slope is not None: out.append(f"Flow Hydrograph Slope= {bc.flow_hydrograph_slope}\n") if bc.dss_file: out.append(f"DSS File={bc.dss_file}\n") out.append(f"DSS Path={bc.dss_path}\n") out.append(f"Use DSS={str(bc.use_dss)}\n") out.append(f"Use Fixed Start Time={str(bc.use_fixed_start)}\n") out.append(f"Fixed Start Date/Time={bc.fixed_start}\n") out.append(f"Is Critical Boundary={str(bc.is_critical)}\n") out.append(f"Critical Boundary Flow={bc.critical_boundary_flow}\n") for el in bc._extra_lines: out.append(el if el.endswith("\n") else el + "\n") elif isinstance(bc, StageHydrograph): out.append(f"Interval={bc.interval}\n") count = len(bc.values) out.append(f"Stage Hydrograph= {count} \n") for dl in _format_data_block(bc.values): out.append(dl + "\n") out.append(f"DSS Path={bc.dss_path}\n") out.append(f"Use DSS={str(bc.use_dss)}\n") out.append(f"Use Fixed Start Time={str(bc.use_fixed_start)}\n") out.append(f"Fixed Start Date/Time={bc.fixed_start}\n") for el in bc._extra_lines: out.append(el if el.endswith("\n") else el + "\n") elif isinstance(bc, RatingCurve): flat = [v for pair in bc.pairs for v in pair] count = len(bc.pairs) out.append(f"Rating Curve= {count} \n") for dl in _format_data_block(flat): out.append(dl + "\n") out.append(f"DSS Path={bc.dss_path}\n") out.append(f"Use DSS={str(bc.use_dss)}\n") out.append(f"Use Fixed Start Time={str(bc.use_fixed_start)}\n") out.append(f"Fixed Start Date/Time={bc.fixed_start}\n") out.append(f"Is Critical Boundary={str(bc.is_critical)}\n") out.append(f"Critical Boundary Flow={bc.critical_boundary_flow}\n") for el in bc._extra_lines: out.append(el if el.endswith("\n") else el + "\n") elif isinstance(bc, FrictionSlope): out.append(f"Friction Slope={bc.slope},{int(bc.value2)}\n") elif isinstance(bc, NormalDepth): out.append(f"Normal Depth={bc.slope}\n") elif isinstance(bc, GateBoundary): for gate in bc.gates: out.append(f"Gate Name={gate.gate_name}\n") out.append(f"Gate DSS Path={gate.dss_path}\n") out.append(f"Gate Use DSS={str(gate.use_dss)}\n") out.append(f"Gate Time Interval={gate.time_interval}\n") out.append(f"Gate Use Fixed Start Time={str(gate.use_fixed_start)}\n") out.append(f"Gate Fixed Start Date/Time={gate.fixed_start}\n") count = len(gate.values) out.append(f"Gate Openings= {count} \n") for dl in _format_data_block(gate.values): out.append(dl + "\n") return out # ------------------------------------------------------------------ # Persistence # ------------------------------------------------------------------
[docs] def save(self, path: str | Path | None = None) -> None: """Reconstruct and write the unsteady flow file. The file is built as: 1. Header lines (verbatim from parse) 2. Initial condition lines (reconstructed from objects) 3. Boundary section (reconstructed from :attr:`boundaries`) 4. Trailing lines (verbatim from parse) Parameters ---------- path: Destination path. Overwrites the source file if omitted. """ dest = Path(path) if path is not None else self._path out: list[str] = [] # 1. Header out.extend(self._header_lines) # 2. Initial conditions for loc in self.initial_flow_locs: out.append(f"Initial Flow Loc={loc._to_raw()}\n") for se in self.initial_storage_elevs: out.append(f"Initial Storage Elev={se._to_raw()}\n") for re_ in self.initial_rainfall_runoff_elevs: out.append(f"Initial RRR Elev={re_._to_raw()}\n") # 3. Boundaries for bc in self.boundaries: out.extend(self._boundary_to_lines(bc)) # 4. Trailing out.extend(self._trailing_lines) with open(dest, "w", encoding="utf-8") as fh: fh.writelines(out) self._modified = False