"""Read/write HEC-RAS geometry files (.g**).
:class:`Geometry` — verbatim-line editor for HEC-RAS 1-D geometry files
(.g01, .g02, …). All lines are stored verbatim. Typed access is provided
for:
- File metadata (``title``, ``program_version``, ``viewing_rectangle``)
- Reach / node inventory (``reaches``, ``junctions``, ``node_rs_list``)
- Cross-section data: ``#Sta/Elev``, ``#Mann``, ``Bank Sta``,
``#XS Ineff``, ``Exp/Cntr``, ``Levee``, ``#Block Obstruct``,
``XS HTab Starting El and Incr``
(read via :meth:`get_cross_section`, write via targeted setters)
- Structure nodes (bridge, culvert, inline/lateral structure) preserved
verbatim and accessible via :meth:`get_node_lines`
``save()`` is byte-faithful for every unmodified line.
Node type codes in ``Type RM Length L Ch R = TYPE, ...``::
1 — Cross Section
2 — Culvert (single or twin-pipe)
3 — Bridge
4 — Multiple Opening
5 — Inline Structure
6 — Lateral Structure
Cross-section fixed-width format (8-char columns):
.. code-block:: text
#Sta/Elev= N alternating station/elevation pairs, 10 per row
#Mann= N,t,a triplets (station, n-value, variation), 9 per row
#XS Ineff= N triplets (x_start, x_end, elevation), 9 per row
followed by Permanent Ineff= flags (8-char, 10/row)
#Block Obstruct= N,t triplets (x_start, x_end, elevation), 9 per row
Levee=lf,ls,le,rf,rs,re[,lfe,rfe]
left/right flag(-1=active), station, elevation,
optional failure elevations
XS HTab Starting El and Incr=el,incr,count
Vertical (depth/flow-varying) Manning's n — appears between ``XS Rating Curve=``
and ``Exp/Cntr=`` when active:
.. code-block:: text
Vertical n Elevations= N N WSE or flow breakpoints (8-char cols, 10/row)
Vertical n for Station=S per-station entry; N n-values follow (8-char, 10/row)
...
Vertical n Flow= F F=0 → WSE breakpoints; F=-1 → flow breakpoints
When vertical n is active, ``#Mann= N , 0 , 0`` stores zone boundary stations
only; n-values in that block are all zero (placeholders).
Convention
----------
``get_*`` methods return ``None`` when the requested item is not found.
``set_*`` methods raise :exc:`KeyError` when the target does not exist.
"""
from __future__ import annotations
import contextlib
import logging
from dataclasses import dataclass, field
from enum import IntEnum
from math import ceil, isnan, nan
from pathlib import Path
from typing import Generic, TypeVar, overload
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_COL = 8 # fixed-width column for numerical data
_COLS_STAE = 10 # values per row in #Sta/Elev blocks (5 pairs)
_COLS_MANN = 9 # values per row in #Mann / #XS Ineff (3 triplets)
_COLS_FLAGS = 10 # flags per row in Permanent Ineff blocks
[docs]
class NodeType(IntEnum):
"""Node type codes from the ``Type RM Length L Ch R = TYPE, ...`` line."""
CROSS_SECTION = 1
CULVERT = 2
BRIDGE = 3
MULTIPLE_OPENING = 4
INLINE_STRUCTURE = 5
LATERAL_STRUCTURE = 6
_NODE_TYPE_NAMES: dict[NodeType, str] = {
NodeType.CROSS_SECTION: "CrossSection",
NodeType.CULVERT: "Culvert",
NodeType.BRIDGE: "Bridge",
NodeType.MULTIPLE_OPENING: "MultipleOpening",
NodeType.INLINE_STRUCTURE: "InlineStructure",
NodeType.LATERAL_STRUCTURE: "LateralStructure",
}
#: Culvert shape codes from ``Culvert=`` data line (index 0).
_CULVERT_SHAPES: dict[int, str] = {
1: "Circular",
2: "Box",
3: "Pipe Arch",
4: "Ellipse",
5: "Arch",
6: "Semi-Circle",
7: "Low Profile Arch",
8: "High Profile Arch",
9: "Con Span",
}
_KEY_NODE = "Type RM Length L Ch R"
_KEY_REACH = "River Reach"
_KEY_JUNCT = "Junct Name"
_T = TypeVar("_T")
# ---------------------------------------------------------------------------
# Formatting helpers (same algorithm as flow_steady)
# ---------------------------------------------------------------------------
def _fit_width(value: float, width: int = _COL) -> str:
"""Right-justify *value* inside *width* chars.
Tries integer, then progressively fewer decimal places, then scientific
notation. Truncates as a last resort.
"""
if isinstance(value, int) or (
isinstance(value, float)
and value == int(value)
and len(str(int(value))) <= width
):
s = str(int(value))
if len(s) <= width:
return s.rjust(width)
s = repr(value)
if len(s) <= width:
return s.rjust(width)
fv = float(value)
for decimals in range(6, -1, -1):
s = f"{fv:.{decimals}f}"
if len(s) <= width:
return s.rjust(width)
for decimals in range(2, -1, -1):
s = f"{fv:.{decimals}E}"
if len(s) <= width:
return s.rjust(width)
return repr(value)[:width]
def _format_block(values: list[float], cols: int, width: int = _COL) -> list[str]:
"""Return fixed-width data rows (no trailing newline)."""
rows: list[str] = []
for i in range(0, len(values), cols):
rows.append("".join(_fit_width(v, width) for v in values[i : i + cols]))
return rows
def _parse_block(lines: list[str], count: int, width: int = _COL) -> list[float]:
"""Parse up to *count* fixed-width values from *lines*, skipping blanks."""
values: list[float] = []
for line in lines:
pos = 0
while pos < len(line) and len(values) < count:
token = line[pos : pos + width].strip()
if token:
try:
values.append(float(token))
except ValueError:
values.append(0.0)
pos += width
return values[:count]
def _parse_block_fixed(
lines: list[str], count: int, width: int = _COL
) -> list[float]:
"""Read exactly *count* fixed-width positions; blank fields become 0.0.
Unlike :func:`_parse_block`, blank columns are NOT skipped — they
contribute a ``0.0``. Required for ``#Block Obstruct`` data where
absent endpoints are left blank rather than omitted.
"""
values: list[float] = []
for line in lines:
pos = 0
while pos + width <= len(line) and len(values) < count:
token = line[pos : pos + width].strip()
try:
values.append(float(token) if token else 0.0)
except ValueError:
values.append(0.0)
pos += width
# Pad if line is shorter than expected
while len(values) < count:
values.append(0.0)
return values[:count]
def _row_count(n: int, cols: int) -> int:
return ceil(n / cols) if n > 0 else 0
def _fmt_levee_val(v: float | None) -> str:
"""Format a levee field value: integer when whole, else decimal, else ''."""
if v is None:
return ""
return str(int(v)) if v == int(v) else str(v)
def _fmt_levee_line(left: LeveeData | None, right: LeveeData | None) -> str:
"""Build the ``Levee=`` line from left/right :class:`LeveeData`."""
if left is not None:
lf = f"-1,{_fmt_levee_val(left.station)},{_fmt_levee_val(left.elevation)}"
l_fail = _fmt_levee_val(left.failure_elevation)
else:
lf, l_fail = "0,,", ""
if right is not None:
rf = f"-1,{_fmt_levee_val(right.station)},{_fmt_levee_val(right.elevation)}"
r_fail = _fmt_levee_val(right.failure_elevation)
else:
rf, r_fail = "0,,", ""
return f"Levee={lf},{rf},{l_fail},{r_fail}"
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
[docs]
@dataclass
class ManningEntry:
"""One horizontal Manning's n zone.
Attributes
----------
station:
Left boundary station of this n zone.
n_value:
Manning's roughness coefficient.
variation:
Third column in the HEC-RAS file. Usually ``0``; used
for vertical-n or alternative-n assignments.
"""
station: float
n_value: float
variation: float = 0.0
def __post_init__(self) -> None:
if self.n_value <= 0:
raise ValueError(f"n_value must be > 0, got {self.n_value!r}")
[docs]
@dataclass
class IneffArea:
"""One ineffective flow area interval.
Attributes
----------
x_start:
Left boundary station.
x_end:
Right boundary station.
elevation:
Activation elevation (area is ineffective below this).
permanent:
``True`` if always active (``T`` flag), ``False`` if
elevation-triggered (``F`` flag).
"""
x_start: float
x_end: float
elevation: float
permanent: bool = False
[docs]
@dataclass
class LeveeData:
"""Levee definition for one bank of a cross section.
Encoded on the ``Levee=`` line as a ``-1`` active flag followed by
station and elevation.
Attributes
----------
station:
Lateral station of the levee crest.
elevation:
Levee crest elevation.
failure_elevation:
Elevation at which the levee fails; ``None`` if
not specified.
"""
station: float
elevation: float
failure_elevation: float | None = None
[docs]
@dataclass
class BlockedObstruction:
"""One blocked-obstruction interval in a cross section (``#Block Obstruct``).
Flow area between *x_start* and *x_end* is blocked up to *elevation*.
Use a very large elevation (e.g. ``999``) for a permanently blocked zone.
Attributes
----------
x_start:
Left boundary station.
x_end:
Right boundary station.
elevation:
Elevation ceiling of the blocked zone.
"""
x_start: float
x_end: float
elevation: float
[docs]
@dataclass
class VerticalNStation:
"""Manning's n values at one cross-section station, varying by depth or flow.
Attributes
----------
station:
Lateral station (same coordinate system as ``#Sta/Elev``).
n_values:
n-value at each breakpoint in :class:`VerticalN`.
"""
station: float
n_values: list[float]
[docs]
@dataclass
class VerticalN:
"""Vertical (depth/flow-varying) Manning's n for a cross section.
HEC-RAS stores this block between ``XS Rating Curve=`` and
``Exp/Cntr=`` when vertical variation is active. The ``#Mann``
block in the same XS still defines zone boundaries but carries
placeholder zero n-values.
Attributes
----------
breakpoints:
Water-surface elevations (``by_flow=False``) or
flow values (``by_flow=True``) at which n is
tabulated. Length N.
by_flow:
``True`` when breakpoints are flows
(``Vertical n Flow=-1``); ``False`` when they
are WSE (``Vertical n Flow= 0``).
stations:
Per-station n-value curves, each with N entries.
"""
breakpoints: list[float]
by_flow: bool
stations: list[VerticalNStation]
[docs]
@dataclass
class CrossSection:
"""Parsed data for one HEC-RAS cross section (node type 1).
Returned by :meth:`Geometry.get_cross_section`. Write changes back
with the targeted setters on :class:`Geometry` (``set_mannings``,
``set_stations``, ``set_bank_stations``, ``set_exp_cntr``).
Attributes
----------
river:
River name.
reach:
Reach name.
rs:
River station string (normalised: no trailing
whitespace or ``*`` interpolation marker).
description:
Node description from ``BEGIN/END DESCRIPTION``.
stations:
Station values from ``#Sta/Elev``.
elevations:
Elevation values from ``#Sta/Elev``.
mann_entries:
Manning's n zones from ``#Mann``.
mann_type:
Type flag from ``#Mann= N , type , alt`` header.
mann_alt:
Alt flag from ``#Mann= N , type , alt`` header.
bank_left:
Left bank station (``Bank Sta``).
bank_right:
Right bank station (``Bank Sta``).
ineff_areas:
Ineffective flow areas (``#XS Ineff``).
expansion:
Expansion loss coefficient (``Exp/Cntr``).
contraction:
Contraction loss coefficient (``Exp/Cntr``).
left_length:
Left overbank reach length from node header.
channel_length:
Channel reach length from node header.
right_length:
Right overbank reach length from node header.
interpolated:
``True`` if the RS string had a trailing ``*``
(HEC-RAS interpolated cross section).
vertical_n:
Vertical (depth/flow-varying) Manning's n, or
``None`` if the cross section uses flat n-values.
levee_left:
Left-bank levee (``Levee=`` line), or ``None``.
levee_right:
Right-bank levee (``Levee=`` line), or ``None``.
blocked_obstructions:
Blocked-obstruction intervals
(``#Block Obstruct``).
htab_starting_elevation:
Starting elevation for the hydraulic table
(``XS HTab Starting El and Incr``).
htab_increment:
Elevation increment for the hydraulic table.
htab_count:
Number of entries in the hydraulic table.
"""
river: str
reach: str
rs: str
description: str = ""
stations: list[float] = field(default_factory=list)
elevations: list[float] = field(default_factory=list)
mann_entries: list[ManningEntry] = field(default_factory=list)
mann_type: int = 0
mann_alt: int = 0
bank_left: float | None = None
bank_right: float | None = None
ineff_areas: list[IneffArea] = field(default_factory=list)
expansion: float = 0.3
contraction: float = 0.1
left_length: float | None = None
channel_length: float | None = None
right_length: float | None = None
interpolated: bool = False
vertical_n: VerticalN | None = None
levee_left: LeveeData | None = None
levee_right: LeveeData | None = None
blocked_obstructions: list[BlockedObstruction] = field(default_factory=list)
htab_starting_elevation: float | None = None
htab_increment: float | None = None
htab_count: int | None = None
# ---------------------------------------------------------------------------
# Structure dataclasses (inline structures from text geometry files)
# ---------------------------------------------------------------------------
[docs]
@dataclass
class GateOpening:
"""One gate opening within a gate group.
Attributes
----------
name:
Opening name from ``IW Gate Opening=`` line; empty string
when the line is absent.
station:
Lateral station of the gate opening (8-char fixed-width column).
gis:
GIS (x, y) coordinate pairs from the data line following the
``IW Gate Opening=`` line (16-char fixed-width columns).
Empty list when the GIS point count is zero.
"""
name: str
station: float
gis: list[tuple[float, float]] = field(default_factory=list)
_GATE_TYPE_NAMES: dict[int, str] = {
0: "sluice",
1: "radial",
2: "overflow_closed_top",
3: "overflow_open",
4: "user_defined_curves",
}
[docs]
@dataclass
class GateGroup:
"""One gate group in an inline structure (``IW Gate Name`` block).
Field order follows the comma-delimited ``IW Gate Name`` data line::
name,Wd,H,Inv,GCoef,Exp_T,Exp_O,Exp_H,Type,WCoef,Is_Ogee,
SpillHt,DesHd,#Openings,trunnion_height,orifice_coef,
head_ref,radial_coef,,is_sharp_crested,weir_p1,weir_p2,weir_p3
Attributes
----------
name:
Group name (index 0).
width:
Gate width — ``Wd`` (index 1).
height:
Gate height — ``H`` (index 2).
invert:
Gate invert elevation — ``Inv`` (index 3).
gate_coefficient:
Gate discharge coefficient (applies to all
gate types) — ``GCoef`` (index 4). The HDF
version names this field
``sluice_coefficient``.
trunnion_exponent:
Trunnion arm exponent — ``Exp_T`` (index 5).
opening_exponent:
Gate opening exponent — ``Exp_O`` (index 6).
height_exponent:
Gate height exponent — ``Exp_H`` (index 7).
gate_type:
Gate type string — ``Type`` (index 8):
``'sluice'``, ``'radial'``,
``'overflow_closed_top'``,
``'overflow_open'``, or
``'user_defined_curves'``.
weir_coefficient:
Overflow weir coefficient — ``WCoef``
(index 9).
is_ogee:
``True`` when ``Is_Ogee`` (index 10) is
``-1`` (ogee crest shape).
spillway_approach_height:
Spillway approach height — ``SpillHt``
(index 11).
design_energy_head:
Design energy head — ``DesHd`` (index 12).
trunnion_height:
Trunnion height (index 14).
orifice_coefficient:
Orifice coefficient (index 15).
head_reference:
Head reference point — 0 = sill,
1 = centre of opening (index 16).
radial_coefficient:
Radial (Tainter) gate discharge coefficient
(index 17).
is_sharp_crested:
``True`` when index 19 is ``-1``.
use_weir_param1:
``True`` when index 20 is ``-1``.
use_weir_param2:
``True`` when index 21 is ``-1``.
use_weir_param3:
``True`` when index 22 is ``-1``.
openings:
Individual gate openings (stations + names).
"""
name: str
width: float
height: float
invert: float
gate_coefficient: float
trunnion_exponent: float
opening_exponent: float
height_exponent: float
gate_type: str
weir_coefficient: float
is_ogee: bool
spillway_approach_height: float
design_energy_head: float
trunnion_height: float
orifice_coefficient: float
head_reference: int
radial_coefficient: float
is_sharp_crested: bool
use_weir_param1: bool
use_weir_param2: bool
use_weir_param3: bool
openings: list[GateOpening] = field(default_factory=list)
@property
def weir_shape(self) -> str:
"""Shape of the overflow weir crest.
Returns ``'Ogee'``, ``'Sharp Crested'``, or ``'Broad Crested'``
based on the ``is_ogee`` and ``is_sharp_crested`` flags.
"""
if self.is_ogee:
return "Ogee"
if self.is_sharp_crested:
return "Sharp Crested"
return "Broad Crested"
@property
def Cu(self) -> float:
"""Unsubmerged discharge coefficient (alias for ``gate_coefficient``).
Typical range 0.5–0.7 for sluice and radial gates.
"""
return self.gate_coefficient
@property
def Cs(self) -> float:
"""Submerged discharge coefficient (alias for ``orifice_coefficient``).
Typical value ~0.8; applied when tailwater submerges the gate opening.
"""
return self.orifice_coefficient
[docs]
@dataclass
class Weir:
"""Overflow weir parameters from the ``IW Dist`` block.
Mirrors :class:`rivia.hdf.geometry.Weir` in field names. Key
differences vs the HDF version:
- Always populated when an ``IW Dist`` line exists, even when *mode* is
empty (the HDF version returns ``None`` for ``mode=''``).
- ``us_slope``, ``ds_slope``: not present in the ``IW Dist`` text line;
stored as ``nan``.
.. TODO: Check whether HEC-RAS stores US/DS slope elsewhere in the
text geometry file (e.g. a secondary ``IW Dist`` continuation line
or a ``Spillway`` block) and parse them if found.
- ``use_water_surface``: not stored in the text geometry format; always
``False``.
.. TODO: Identify whether this flag is persisted anywhere in the
``.g**`` file and add parsing when found.
Attributes
----------
width:
Weir width (``WD``, index 1).
coefficient:
Weir discharge coefficient (``Coef``, index 2).
shape:
``'Broad Crested'`` or ``'Ogee'``
(``Is_Ogee``, index 6).
max_submergence:
Maximum submergence ratio (``MaxSub``,
index 4).
min_elevation:
Minimum weir crest elevation (``Min_El``,
index 5); ``nan`` when blank.
skew:
Weir skew angle in degrees (``Skew``,
index 3).
dist:
Distance from the upstream XS to the weir
face (index 0 of the ``IW Dist`` data line);
``nan`` for bridges and laterals (not stored
in those formats).
spillway_approach_height:
Spillway approach height — ``SpillHt``
(index 7 of the ``IW Dist`` data line);
``nan`` for bridges and laterals.
design_energy_head:
Design energy head — ``DesHd`` (index 8 of
the ``IW Dist`` data line); ``nan`` for
bridges and laterals.
us_slope:
Upstream face slope H:V; ``nan`` — not in
text format.
ds_slope:
Downstream face slope H:V; ``nan`` — not in
text format.
use_water_surface:
Use water surface as weir reference head;
always ``False`` — not in text format.
"""
width: float
coefficient: float
shape: str
max_submergence: float
min_elevation: float
skew: float
dist: float = nan
spillway_approach_height: float = nan
design_energy_head: float = nan
us_slope: float = nan
ds_slope: float = nan
use_water_surface: bool = False
[docs]
@dataclass
class CulvertGroup:
"""One culvert barrel definition from a ``Culvert=`` line.
Field order matches the comma-separated data on the ``Culvert=`` line::
shape, span, rise, length, n, Ke, exit_loss, inlet_type, outlet_type,
us_invert, us_station, ds_invert, ds_station, name, ?, chart_number
Attributes
----------
name:
Culvert identifier (index 13), e.g. ``'Culvert # 1'``.
shape_code:
Shape code (index 0) — see :data:`_CULVERT_SHAPES`.
shape_name:
Human-readable shape, e.g. ``'Circular'``, ``'Box'``.
span:
Width or diameter (index 1).
rise:
Height (index 2); ``0.0`` when blank.
length:
Barrel length (index 3).
n_top:
Manning's roughness coefficient (index 4).
entrance_loss:
Entrance loss coefficient Ke (index 5).
exit_loss:
Exit loss coefficient (index 6).
inlet_type:
Inlet control type code (index 7).
outlet_type:
Outlet control type code (index 8).
upstream_invert:
Upstream invert elevation (index 9).
upstream_station:
Upstream station location (index 10).
downstream_invert:
Downstream invert elevation (index 11).
downstream_station:
Downstream station location (index 12).
chart_number:
Inlet control chart number (index 15).
num_barrels:
Number of barrels — from ``BC Culvert Barrel=`` line;
``1`` when that line is absent.
n_bottom:
Bottom Manning's n — from ``Culvert Bottom n=``;
``None`` when absent.
depth_n_bottom:
Bottom fill depth — from ``Culvert Bottom Depth=``;
``None`` when absent.
.. TODO: ``Multiple Barrel Culv=`` uses a different field layout
(no ``us_station`` / ``ds_station`` columns; barrel stations appear on
the next line as ``num_barrels`` upstream + ``num_barrels`` downstream
values in 8-char fixed-width columns). These station values are
currently not parsed.
"""
name: str
shape_code: int
shape_name: str
span: float
rise: float
length: float
n_top: float
entrance_loss: float
exit_loss: float
inlet_type: int
outlet_type: int
upstream_invert: float
upstream_station: float
downstream_invert: float
downstream_station: float
chart_number: int
num_barrels: int = 1
n_bottom: float | None = None
depth_n_bottom: float | None = None
[docs]
@dataclass
class Pier:
"""One pier group from a ``Pier Skew, UpSta & Num, DnSta & Num=`` block.
The header line carries the skew, station, and count; four fixed-width
blocks (8-char columns, up to 10 per row) immediately follow::
upstream_count widths
upstream_count elevations
downstream_count widths
downstream_count elevations
Attributes
----------
skew:
Pier skew angle in degrees; ``0.0`` when blank.
upstream_station:
Station of the upstream pier face.
upstream_count:
Number of upstream pier elements.
downstream_station:
Station of the downstream pier face.
downstream_count:
Number of downstream pier elements.
upstream_widths:
Width of each upstream pier element.
upstream_elevations:
Top-of-pier elevation for each upstream element.
downstream_widths:
Width of each downstream pier element.
downstream_elevations:
Top-of-pier elevation for each downstream element.
"""
skew: float
upstream_station: float
upstream_count: int
downstream_station: float
downstream_count: int
upstream_widths: list[float]
upstream_elevations: list[float]
downstream_widths: list[float]
downstream_elevations: list[float]
[docs]
@dataclass
class Roadway:
"""Deck/roadway geometry from the ``Deck Dist Width WeirC ...`` block.
The header ``Deck Dist Width WeirC Skew NumUp NumDn MinLoCord MaxHiCord
MaxSubmerge Is_Ogee`` is followed by a comma-separated data line and then
six fixed-width blocks (8-char columns, up to 10 values per row):
- *numUp* upstream deck stations
- *numUp* upstream high-chord (top of deck) elevations
- *numUp* upstream low-chord (soffit) elevations
- *numDn* downstream versions of the above (same layout)
``lo_chord_up`` / ``lo_chord_dn`` will be an empty list when the
corresponding block is all-blank (some culvert files omit low-chord data).
Attributes
----------
dist:
Distance from the upstream cross section to the
bridge face (index 0).
width:
Deck/roadway width (index 1).
weir_coefficient:
Overflow weir discharge coefficient (index 2).
skew:
Bridge skew angle in degrees (index 3).
max_submergence:
Maximum submergence ratio (index 8); ``nan`` when
blank.
shape:
Overflow weir crest shape — ``'Broad Crested'`` or
``'Ogee'`` (from Is_Ogee flag, index 9).
min_lo_chord:
Minimum low-chord elevation (index 6); ``nan`` when
blank.
max_hi_chord:
Maximum high-chord elevation (index 7); ``nan`` when
blank.
stations_up:
Upstream deck station values.
hi_chord_up:
Upstream high-chord (top of deck) elevations.
lo_chord_up:
Upstream low-chord (soffit) elevations; empty list
when the block is all-blank.
stations_dn:
Downstream deck station values.
hi_chord_dn:
Downstream high-chord elevations.
lo_chord_dn:
Downstream low-chord elevations; empty list when
all-blank.
.. TODO: The ``Bridge Culvert-`` flags line (immediately before
``Deck Dist``) encodes open-deck / culvert-only options and is not yet
parsed.
.. TODO: ``BR Coef=`` (bridge loss coefficients — momentum/energy method,
Yarnell K factor, etc.) is not yet parsed.
.. TODO: ``WSPro=`` (water surface profile method options) is not yet
parsed.
.. TODO: ``BC Design=`` (design flow parameters) is not yet parsed.
"""
dist: float
width: float
weir_coefficient: float
skew: float
max_submergence: float
shape: str
min_lo_chord: float
max_hi_chord: float
stations_up: list[float] = field(default_factory=list)
hi_chord_up: list[float] = field(default_factory=list)
lo_chord_up: list[float] = field(default_factory=list)
stations_dn: list[float] = field(default_factory=list)
hi_chord_dn: list[float] = field(default_factory=list)
lo_chord_dn: list[float] = field(default_factory=list)
[docs]
@dataclass
class Structure:
"""Base class for one HEC-RAS structure parsed from a text geometry file.
Mirrors :class:`rivia.hdf.geometry.Structure` in field names. Key
difference:
- ``centerline``: the text geometry file carries no GIS centreline
coordinates for structures; always an empty list. The HDF version
stores an ``(n_pts, 2)`` numpy array.
.. TODO: Investigate whether centreline coordinates can be recovered
from the ``*.rasmap`` file or the HDF geometry file alongside the
text file, and populate this field when available.
Attributes
----------
mode:
HEC-RAS mode string (e.g. ``'Weir/Gate/Culverts'``).
Always ``''`` for inline structures — not stored in
the text format.
.. TODO: Check whether ``Mode=`` or equivalent is
written to the ``.g**`` file for any structure type
and parse it when found.
upstream_type:
Connection type on the upstream side (``'XS'``,
``'SA'``, ``'2D'``, or ``'--'``). Always ``'XS'``
for inline structures.
downstream_type:
Connection type on the downstream side.
centerline:
GIS centreline as ``[(x, y), ...]``; always ``[]``
from text files.
"""
mode: str
upstream_type: str
downstream_type: str
centerline: list[tuple[float, float]] = field(default_factory=list)
[docs]
@dataclass
class InlineStructure(Structure):
"""Inline structure (node type 5) parsed from a ``.g**`` text geometry file.
Mirrors :class:`rivia.hdf.geometry.InlineStructure`. Inherits ``mode``,
``upstream_type``, ``downstream_type``, and ``centerline`` from
:class:`Structure`.
Differences vs the HDF version:
- ``upstream_node`` / ``downstream_node``: use ``("", "", "")`` as the
no-value sentinel (same as HDF), populated by walking ``node_rs_list``
to find the nearest flanking cross sections.
- ``weir``: populated from ``IW Dist`` even when ``mode`` is ``''``
(HDF returns ``None`` for ``mode=''``).
.. TODO: Align weir-presence logic with HDF once ``mode`` parsing
is implemented (see :class:`Structure` TODO).
- ``description``: text-file-specific field from the
``BEGIN/END DESCRIPTION`` block; no equivalent in HDF.
- ``pilot_flow``: minimum flow through the structure when all gates are
fully closed — from ``IW Pilot Flow=``; no equivalent in HDF.
- ``crest_profile``: station/elevation pairs defining the weir crest
geometry — from the ``#Inline Weir SE=`` block; no equivalent in HDF.
Attributes
----------
location:
``(river, reach, rs)`` of this structure.
upstream_node:
``(river, reach, rs)`` of the nearest upstream XS;
``("", "", "")`` when none found.
downstream_node:
``(river, reach, rs)`` of the nearest downstream XS;
``("", "", "")`` when none found.
weir:
Overflow weir data from ``IW Dist`` block; ``None``
if no ``IW Dist`` line is present.
gate_groups:
Gate group definitions from ``IW Gate Name`` blocks.
description:
Node description from ``BEGIN/END DESCRIPTION``.
pilot_flow:
Minimum (pilot) flow through the structure —
``IW Pilot Flow=``; ``0.0`` when absent.
crest_profile:
Weir crest station/elevation pairs from the
``#Inline Weir SE=`` block; empty list when absent.
"""
location: tuple[str, str, str] = ("", "", "")
upstream_node: tuple[str, str, str] = ("", "", "")
downstream_node: tuple[str, str, str] = ("", "", "")
weir: Weir | None = None
gate_groups: list[GateGroup] = field(default_factory=list)
description: str = ""
pilot_flow: float = 0.0
crest_profile: list[tuple[float, float]] = field(default_factory=list)
[docs]
@dataclass
class Bridge(Structure):
"""Bridge or culvert (node types 3 / 2) parsed from a ``.g**`` text geometry file.
Mirrors :class:`rivia.hdf.geometry.Bridge`. Inherits ``mode``,
``upstream_type``, ``downstream_type``, and ``centerline`` from
:class:`Structure`.
Differences vs the HDF version:
- ``weir``: populated from the ``Deck Dist`` data line scalar fields
(width, coefficient, skew, max_submergence, shape); ``us_slope``,
``ds_slope``, and ``use_water_surface`` are always ``nan`` / ``False``
— not stored in the text format. Redundant with ``roadway`` scalars.
- ``roadway``: full deck geometry including upstream/downstream station,
high-chord, and low-chord arrays; ``None`` when no ``Deck Dist`` line
is present.
- ``culvert_groups``: barrel definitions from ``Culvert=`` lines.
- ``piers``: pier groups from ``Pier Skew, ...`` blocks.
- ``gate_groups``: always ``[]`` — bridge/culvert format uses
``Bridge Culvert`` hydraulics, not ``IW Gate Name`` blocks.
- ``description``: text-file-specific field from
``BEGIN/END DESCRIPTION``; no equivalent in HDF.
Attributes
----------
location:
``(river, reach, rs)`` of this structure.
upstream_node:
``(river, reach, rs)`` of the nearest upstream XS;
``("", "", "")`` when none found.
downstream_node:
``(river, reach, rs)`` of the nearest downstream XS;
``("", "", "")`` when none found.
weir:
Overflow weir scalars from the ``Deck Dist`` line;
``None`` if the line is absent.
gate_groups:
Always ``[]``.
description:
Node description from ``BEGIN/END DESCRIPTION``.
roadway:
Full deck geometry (stations, hi/lo chords) from the
``Deck Dist`` block; ``None`` when absent.
culvert_groups:
Barrel definitions from ``Culvert=`` lines.
piers:
Pier groups from ``Pier Skew, ...`` blocks.
node_name:
Node name from ``Node Name=``; empty string when
absent.
htab_hw_max:
Max headwater elevation from ``BC HTab HWMax=``;
``None`` when absent.
htab_tw_max:
Max tailwater elevation from ``BC HTab TWMax=``;
``None`` when absent.
htab_max_flow:
Max flow from ``BC HTab MaxFlow=``; ``None`` when
absent.
"""
location: tuple[str, str, str] = ("", "", "")
upstream_node: tuple[str, str, str] = ("", "", "")
downstream_node: tuple[str, str, str] = ("", "", "")
weir: Weir | None = None
gate_groups: list[GateGroup] = field(default_factory=list)
description: str = ""
roadway: Roadway | None = None
culvert_groups: list[CulvertGroup] = field(default_factory=list)
piers: list[Pier] = field(default_factory=list)
node_name: str = ""
htab_hw_max: float | None = None
htab_tw_max: float | None = None
htab_max_flow: float | None = None
[docs]
@dataclass
class LateralStructure(Structure):
"""Lateral structure (node type 6) parsed from a ``.g**`` text geometry file.
Mirrors :class:`rivia.hdf.geometry.LateralStructure`. Inherits ``mode``,
``upstream_type``, ``downstream_type``, and ``centerline`` from
:class:`Structure`.
Differences vs the HDF version:
- ``downstream_node``: HDF stores the name of the connected Storage Area
or 2-D Flow Area. The text format stores a connected river+reach via
``Lateral Weir End=river,reach,rs,...``; this field holds
``"river reach"`` (stripped, space-joined). Empty string when the
``Lateral Weir End=`` line is absent.
- ``weir``: built from individual ``Lateral Weir WD=``,
``Lateral Weir Coef=``, and ``Lateral Weir WSCriteria=`` lines;
``skew=0``, ``min_elevation=nan``, ``us_slope/ds_slope=nan``.
- ``gate_groups``: always ``[]`` — lateral structures in the text format
use ``Lateral Weir`` hydraulics, not ``IW Gate Name`` blocks.
- ``description``: text-file-specific field from
``BEGIN/END DESCRIPTION``; no equivalent in HDF.
Attributes
----------
location:
``(river, reach, rs)`` of this structure.
upstream_node:
``(river, reach, rs)`` of the nearest upstream XS;
``("", "", "")`` when none found.
downstream_node:
Connected river+reach as ``"river reach"``
(from ``Lateral Weir End=``); empty string when
absent.
weir:
Overflow weir data from ``Lateral Weir`` lines;
``None`` if no ``Lateral Weir WD=`` line is present.
gate_groups:
Always ``[]``.
description:
Node description from ``BEGIN/END DESCRIPTION``.
pos:
Bank side of the weir — ``0`` = left, ``1`` = right
(from ``Lateral Weir Pos=``).
distance:
Setback distance from the upstream cross section
(from ``Lateral Weir Distance=``); ``nan`` when
absent.
crest_profile:
Weir crest station/elevation pairs from the
``Lateral Weir SE=`` block; empty list when absent.
flap_gates:
``True`` when ``Lateral Weir Flap Gates= -1`` or
``1``; ``False`` when ``0`` or absent.
tw_multiple_xs:
``True`` when tailwater uses multiple XS averaging
(``Lateral Weir TW Multiple XS=`` non-zero).
.. TODO: ``Lateral Weir Hagers EQN=`` (Hager's weir equation on/off flag
and six coefficients) is not yet parsed.
.. TODO: ``Lateral Weir SS=`` (upstream and downstream side slopes) is not
yet parsed.
.. TODO: ``Lateral Weir Connection Pos and Dist=`` (connection-point
position code and distance) is not yet parsed.
.. TODO: ``Lateral Weir Centerline=`` (GIS centreline point count and
coordinate block) is not yet parsed.
.. TODO: ``LW Div RC=`` (diversion rating-curve flag and label) is not yet
parsed.
"""
location: tuple[str, str, str] = ("", "", "")
upstream_node: tuple[str, str, str] = ("", "", "")
downstream_node: str = ""
weir: Weir | None = None
gate_groups: list[GateGroup] = field(default_factory=list)
description: str = ""
pos: int = 0
distance: float = nan
crest_profile: list[tuple[float, float]] = field(default_factory=list)
flap_gates: bool = False
tw_multiple_xs: bool = False
# ---------------------------------------------------------------------------
# Structure containers
# ---------------------------------------------------------------------------
[docs]
class StructureIndex(Generic[_T]):
"""Read-only ordered mapping from a string key to a structure object.
Supports both string key (``"River Reach RS"``) and integer positional
index. Mirrors the interface of :class:`~rivia.hdf.geometry.StructureIndex`.
"""
def __init__(self, items: list[tuple[str, _T]]) -> None:
self._keys: list[str] = [k for k, _ in items]
self._values: list[_T] = [v for _, v in items]
self._map: dict[str, _T] = {k: v for k, v in items}
def __len__(self) -> int:
return len(self._keys)
def __iter__(self):
return iter(self._keys)
def __contains__(self, key: object) -> bool:
if isinstance(key, int):
return 0 <= key < len(self._values)
return key in self._map
@overload
def __getitem__(self, key: str) -> _T: ...
@overload
def __getitem__(self, key: int) -> _T: ...
def __getitem__(self, key): # type: ignore[override]
if isinstance(key, int):
return self._values[key]
return self._map[key]
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}"
f"({list(zip(self._keys, self._values, strict=True))})"
)
[docs]
def keys(self) -> list[str]:
"""Ordered list of string keys."""
return list(self._keys)
[docs]
def values(self) -> list[_T]:
"""Ordered list of structure objects."""
return list(self._values)
[docs]
def items(self) -> list[tuple[str, _T]]:
"""Ordered ``(key, structure)`` pairs."""
return list(zip(self._keys, self._values, strict=True))
[docs]
class StructureCollection:
"""Structure collection parsed from a HEC-RAS text geometry file.
Covers inline structures (node type 5), bridges/culverts (types 3/2),
and lateral structures (type 6).
Access structures via the typed properties:
.. code-block:: python
g = Geometry("model.g01")
for key, iw in g.structures.inlines.items():
print(key, iw.gate_groups)
for key, br in g.structures.bridges.items():
print(key, br.weir)
for key, lat in g.structures.laterals.items():
print(key, lat.downstream_node)
"""
def __init__(
self,
inlines: list[tuple[str, InlineStructure]],
bridges: list[tuple[str, Bridge]],
laterals: list[tuple[str, LateralStructure]],
) -> None:
self._inlines: StructureIndex[InlineStructure] = StructureIndex(inlines)
self._bridges: StructureIndex[Bridge] = StructureIndex(bridges)
self._laterals: StructureIndex[LateralStructure] = StructureIndex(laterals)
@property
def inlines(self) -> StructureIndex[InlineStructure]:
"""All inline structures (type 5) keyed by ``'River Reach RS'``."""
return self._inlines
@property
def bridges(self) -> StructureIndex[Bridge]:
"""All bridges and culverts (types 3 and 2) keyed by ``'River Reach RS'``."""
return self._bridges
@property
def laterals(self) -> StructureIndex[LateralStructure]:
"""All lateral structures (type 6) keyed by ``'River Reach RS'``."""
return self._laterals
@property
def summary(self) -> dict[str, int]:
"""Count of each parsed structure type."""
return {
"inlines": len(self._inlines),
"bridges": len(self._bridges),
"laterals": len(self._laterals),
}
def __repr__(self) -> str:
return (
f"StructureCollection("
f"inlines={len(self._inlines)}, "
f"bridges={len(self._bridges)}, "
f"laterals={len(self._laterals)})"
)
logger = logging.getLogger("rivia.model")
# ---------------------------------------------------------------------------
# Geometry
# ---------------------------------------------------------------------------
[docs]
class Geometry:
"""Verbatim-line editor for HEC-RAS geometry files (.g**).
All lines are loaded verbatim. Structured cross-section data can be
read (:meth:`get_cross_section`, :meth:`cross_sections`) and written
(:meth:`set_mannings`, :meth:`set_stations`, :meth:`set_bank_stations`,
:meth:`set_exp_cntr`). Structure nodes (bridges, culverts, etc.) are
accessible as raw lines via :meth:`get_node_lines`.
``save()`` writes all in-memory lines back to disk; a no-op parse+save
produces a byte-identical file.
"""
def __init__(self, path: str | Path) -> None:
self._path = Path(path)
if not self._path.is_file():
raise FileNotFoundError(f"Geometry file not found: {self._path}")
with open(self._path, encoding="utf-8", errors="replace") as fh:
self._lines: list[str] = fh.readlines()
self._modified: bool = False
self._structures_cache: StructureCollection | None = None
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get(self, key: str) -> str | None:
prefix = key + "="
for line in self._lines:
if line.startswith(prefix):
value = line[len(prefix) :].strip()
return value if value else None
return None
def _set(self, key: str, raw_value: str) -> None:
prefix = key + "="
for i, line in enumerate(self._lines):
if line.startswith(prefix):
self._lines[i] = f"{prefix}{raw_value}\n"
self._modified = True
return
raise KeyError(f"Key not found in geometry file: {key!r}")
def _splice(self, start: int, old_count: int, new_lines: list[str]) -> None:
"""Replace *old_count* lines beginning at *start* with *new_lines*."""
self._lines[start : start + old_count] = [
(ln if ln.endswith("\n") else ln + "\n") for ln in new_lines
]
self._modified = True
# ------------------------------------------------------------------
# Modification state
# ------------------------------------------------------------------
@property
def is_modified(self) -> bool:
"""``True`` if any value has been changed since the last :meth:`save`."""
return self._modified
# ------------------------------------------------------------------
# Static parsing helpers
# ------------------------------------------------------------------
@staticmethod
def _normalize_rs(rs: str) -> str:
"""Strip whitespace and trailing ``*`` (interpolated XS marker)."""
return rs.strip().rstrip("*").strip()
@staticmethod
def _parse_node_header(
line: str,
) -> tuple[int, str, float | None, float | None, float | None] | None:
"""Parse ``Type RM Length L Ch R = TYPE,RS,L,Ch,R``.
Returns ``(node_type, rs_normalised, left, ch, right)`` or ``None``.
"""
prefix = _KEY_NODE + " ="
if not line.startswith(prefix):
return None
tail = line[len(prefix) :].strip()
parts = tail.split(",", 4)
if len(parts) < 2:
return None
try:
node_type = int(parts[0].strip())
except ValueError:
return None
rs = Geometry._normalize_rs(parts[1]) if len(parts) > 1 else ""
def _opt_float(s: str) -> float | None:
s = s.strip()
return float(s) if s else None
left = _opt_float(parts[2]) if len(parts) > 2 else None
ch = _opt_float(parts[3]) if len(parts) > 3 else None
right = _opt_float(parts[4]) if len(parts) > 4 else None
return node_type, rs, left, ch, right
@staticmethod
def _parse_reach_header(line: str) -> tuple[str, str] | None:
"""Parse ``River Reach=river,reach``. Returns ``(river, reach)``."""
prefix = _KEY_REACH + "="
if not line.startswith(prefix):
return None
parts = line[len(prefix) :].split(",", 1)
if len(parts) != 2:
return None
return parts[0].strip(), parts[1].strip()
# ------------------------------------------------------------------
# Node location
# ------------------------------------------------------------------
def _find_node_start(self, river: str, reach: str, rs: str) -> int | None:
"""Return the line index of the matching node header, or ``None``."""
prefix = _KEY_NODE + " ="
river_l = river.strip().lower()
reach_l = reach.strip().lower()
rs_norm = self._normalize_rs(rs)
in_reach = False
for i, line in enumerate(self._lines):
if line.startswith(_KEY_REACH + "="):
rh = self._parse_reach_header(line)
if rh:
in_reach = rh[0].lower() == river_l and rh[1].lower() == reach_l
if in_reach and line.startswith(prefix):
parsed = self._parse_node_header(line)
if parsed and self._normalize_rs(parsed[1]) == rs_norm:
return i
return None
def _find_node_end(self, start: int) -> int:
"""Return the index of the first line *after* the node block at *start*.
A new node begins with ``Type RM Length L Ch R =``. A new reach
begins with ``River Reach=`` or ``Junct Name=``.
"""
prefix = _KEY_NODE + " ="
n = len(self._lines)
i = start + 1
while i < n:
line = self._lines[i]
if (
line.startswith(prefix)
or line.startswith(_KEY_REACH + "=")
or line.startswith(_KEY_JUNCT + "=")
):
return i
i += 1
return n
# ------------------------------------------------------------------
# Generic escape hatch
# ------------------------------------------------------------------
[docs]
def get(self, key: str) -> str | None:
"""Return the raw stripped value for *key*, or ``None`` if absent."""
return self._get(key)
[docs]
def set(self, key: str, value: str) -> None:
"""Set *key* to *value* verbatim. Raises ``KeyError`` if absent."""
self._set(key, value)
# ------------------------------------------------------------------
# Metadata properties
# ------------------------------------------------------------------
@property
def title(self) -> str | None:
"""Geometry title (``Geom Title=``)."""
return self._get("Geom Title")
@title.setter
def title(self, value: str) -> None:
self._set("Geom Title", value)
@property
def program_version(self) -> str | None:
"""HEC-RAS version that wrote this file (``Program Version=``).
Treat as read-only; HEC-RAS manages this field.
"""
return self._get("Program Version")
@property
def viewing_rectangle(self) -> tuple[float, float, float, float] | None:
"""Map viewport as ``(min_x, min_y, max_x, max_y)``, or ``None``.
HEC-RAS writes this as ``Viewing Rectangle= x1 , y1 , x2 , y2``.
"""
raw = self._get("Viewing Rectangle")
if raw is None:
return None
parts = [p.strip() for p in raw.split(",")]
if len(parts) < 4:
return None
try:
return (
float(parts[0]),
float(parts[1]),
float(parts[2]),
float(parts[3]),
)
except ValueError:
return None
@viewing_rectangle.setter
def viewing_rectangle(
self, value: tuple[float, float, float, float]
) -> None:
self._set("Viewing Rectangle", " , ".join(str(v) for v in value))
# ------------------------------------------------------------------
# Reach / node inventory
# ------------------------------------------------------------------
@property
def reaches(self) -> list[tuple[str, str]]:
"""Ordered list of ``(river, reach)`` pairs in file order."""
result: list[tuple[str, str]] = []
seen: set[tuple[str, str]] = set()
for line in self._lines:
if line.startswith(_KEY_REACH + "="):
rh = self._parse_reach_header(line)
if rh and rh not in seen:
result.append(rh)
seen.add(rh)
return result
@property
def junctions(self) -> list[str]:
"""Junction names defined in the file, in order of appearance."""
result: list[str] = []
prefix = _KEY_JUNCT + "="
for line in self._lines:
if line.startswith(prefix):
result.append(line[len(prefix) :].strip())
return result
[docs]
def node_rs_list(self, river: str, reach: str) -> list[tuple[int, str]]:
"""Return ``(node_type, rs)`` pairs for every node in *reach*.
Useful for surveying what cross sections and structures exist.
Results are in file order (upstream to downstream for standard
HEC-RAS convention).
"""
result: list[tuple[int, str]] = []
prefix = _KEY_NODE + " ="
river_l = river.strip().lower()
reach_l = reach.strip().lower()
in_reach = False
for line in self._lines:
if line.startswith(_KEY_REACH + "="):
rh = self._parse_reach_header(line)
if rh:
in_reach = rh[0].lower() == river_l and rh[1].lower() == reach_l
if in_reach and line.startswith(prefix):
parsed = self._parse_node_header(line)
if parsed:
result.append((parsed[0], parsed[1]))
return result
# ------------------------------------------------------------------
# Cross-section parsing
# ------------------------------------------------------------------
def _parse_xs_from_lines(
self,
river: str,
reach: str,
start: int,
end: int,
) -> CrossSection:
"""Parse a cross section from ``self._lines[start:end]``."""
header_line = self._lines[start]
parsed_hdr = self._parse_node_header(header_line)
# Detect interpolated XS (RS had trailing '*')
raw_rs = header_line.split("=", 1)[1].split(",")[1] if parsed_hdr else ""
interpolated = raw_rs.strip().endswith("*")
rs = parsed_hdr[1] if parsed_hdr else ""
left_len = parsed_hdr[2] if parsed_hdr else None
ch_len = parsed_hdr[3] if parsed_hdr else None
right_len = parsed_hdr[4] if parsed_hdr else None
xs = CrossSection(
river=river,
reach=reach,
rs=rs,
left_length=left_len,
channel_length=ch_len,
right_length=right_len,
interpolated=interpolated,
)
# Collect block lines (stripped of newline for easy processing)
block = [ln.rstrip("\n") for ln in self._lines[start + 1 : end]]
# --- Pass 1: extract description ---
desc_lines: list[str] = []
in_desc = False
for ln in block:
stripped = ln.strip()
if stripped == "BEGIN DESCRIPTION:":
in_desc = True
continue
if stripped == "END DESCRIPTION:":
in_desc = False
continue
if in_desc:
desc_lines.append(ln)
xs.description = "\n".join(desc_lines)
# --- Pass 2: extract keyed fields ---
i = 0
while i < len(block):
ln = block[i]
# #Sta/Elev= N
if ln.startswith("#Sta/Elev="):
n_pts = int(ln.split("=", 1)[1].strip())
n_rows = _row_count(n_pts * 2, _COLS_STAE)
flat = _parse_block(block[i + 1 : i + 1 + n_rows], n_pts * 2)
xs.stations = flat[0::2]
xs.elevations = flat[1::2]
i += 1 + n_rows
continue
# #Mann= N , type , alt
if ln.startswith("#Mann="):
parts = ln.split("=", 1)[1].split(",")
n_zones = int(parts[0].strip())
xs.mann_type = int(parts[1].strip()) if len(parts) > 1 else 0
xs.mann_alt = int(parts[2].strip()) if len(parts) > 2 else 0
n_rows = _row_count(n_zones * 3, _COLS_MANN)
flat = _parse_block(block[i + 1 : i + 1 + n_rows], n_zones * 3)
xs.mann_entries = [
ManningEntry(
station=flat[j],
n_value=flat[j + 1],
variation=flat[j + 2],
)
for j in range(0, len(flat), 3)
]
i += 1 + n_rows
continue
# #XS Ineff= N , type
if ln.startswith("#XS Ineff="):
parts = ln.split("=", 1)[1].split(",")
n_ineff = int(parts[0].strip())
n_rows = _row_count(n_ineff * 3, _COLS_MANN)
flat = _parse_block(block[i + 1 : i + 1 + n_rows], n_ineff * 3)
areas: list[IneffArea] = [
IneffArea(
x_start=flat[j],
x_end=flat[j + 1],
elevation=flat[j + 2],
)
for j in range(0, len(flat), 3)
]
i += 1 + n_rows
# Permanent Ineff= flags (marker line + flag lines)
if i < len(block) and block[i].startswith("Permanent Ineff="):
i += 1 # skip marker
n_flag_rows = _row_count(n_ineff, _COLS_FLAGS)
flags: list[str] = []
for _ in range(n_flag_rows):
if i < len(block):
# Flags are 8-char right-justified; use split() safely
flags.extend(block[i].split())
i += 1
for k, area in enumerate(areas):
if k < len(flags):
area.permanent = flags[k].strip().upper() == "T"
xs.ineff_areas = areas
continue
# Bank Sta=LB,RB
if ln.startswith("Bank Sta="):
parts = ln.split("=", 1)[1].split(",")
if len(parts) >= 2:
try:
xs.bank_left = float(parts[0].strip())
xs.bank_right = float(parts[1].strip())
except ValueError:
pass
i += 1
continue
# Exp/Cntr=exp,cntr
if ln.startswith("Exp/Cntr="):
parts = ln.split("=", 1)[1].split(",")
if len(parts) >= 2:
try:
xs.expansion = float(parts[0].strip())
xs.contraction = float(parts[1].strip())
except ValueError:
pass
i += 1
continue
# Levee=L_flag,L_sta,L_elev,R_flag,R_sta,R_elev[,L_fail,R_fail]
if ln.startswith("Levee="):
lp = ln.split("=", 1)[1].split(",")
def _gp(p: list[str], idx: int) -> str:
return p[idx].strip() if idx < len(p) else ""
if _gp(lp, 0) == "-1" and _gp(lp, 1):
sta = _gp(lp, 1)
elv = _gp(lp, 2)
fai = _gp(lp, 6)
if sta:
xs.levee_left = LeveeData(
station=float(sta),
elevation=float(elv) if elv else 0.0,
failure_elevation=float(fai) if fai else None,
)
if _gp(lp, 3) == "-1" and _gp(lp, 4):
sta = _gp(lp, 4)
elv = _gp(lp, 5)
fai = _gp(lp, 7)
if sta:
xs.levee_right = LeveeData(
station=float(sta),
elevation=float(elv) if elv else 0.0,
failure_elevation=float(fai) if fai else None,
)
i += 1
continue
# #Block Obstruct= N , type
if ln.startswith("#Block Obstruct="):
parts = ln.split("=", 1)[1].split(",")
n_obs = int(parts[0].strip())
n_rows = _row_count(n_obs * 3, _COLS_MANN)
# Use fixed-position reader: blank columns = 0.0 (absent
# endpoints are left blank in interpolated XS).
flat = _parse_block_fixed(
block[i + 1 : i + 1 + n_rows], n_obs * 3
)
xs.blocked_obstructions = [
BlockedObstruction(
x_start=flat[j],
x_end=flat[j + 1],
elevation=flat[j + 2],
)
for j in range(0, len(flat), 3)
]
i += 1 + n_rows
continue
# XS HTab Starting El and Incr=el,incr,count
if ln.startswith("XS HTab Starting El and Incr="):
parts = ln.split("=", 1)[1].split(",")
try:
xs.htab_starting_elevation = float(parts[0].strip())
if len(parts) > 1:
xs.htab_increment = float(parts[1].strip())
if len(parts) > 2:
xs.htab_count = int(parts[2].strip())
except (ValueError, IndexError):
pass
i += 1
continue
# Vertical n Elevations= N
if ln.startswith("Vertical n Elevations="):
n_bp = int(ln.split("=", 1)[1].strip())
n_bp_rows = _row_count(n_bp, _COLS_STAE)
breakpoints = _parse_block(
block[i + 1 : i + 1 + n_bp_rows], n_bp
)
i += 1 + n_bp_rows
vn_stations: list[VerticalNStation] = []
by_flow = False
while i < len(block):
sln = block[i]
if sln.startswith("Vertical n for Station="):
sta = float(sln.split("=", 1)[1].strip())
n_val_rows = _row_count(n_bp, _COLS_STAE)
n_vals = _parse_block(
block[i + 1 : i + 1 + n_val_rows], n_bp
)
vn_stations.append(
VerticalNStation(station=sta, n_values=n_vals)
)
i += 1 + n_val_rows
elif sln.startswith("Vertical n Flow="):
flow_val = int(sln.split("=", 1)[1].strip())
by_flow = flow_val == -1
i += 1
break
else:
i += 1
xs.vertical_n = VerticalN(
breakpoints=breakpoints,
by_flow=by_flow,
stations=vn_stations,
)
continue
i += 1
return xs
# ------------------------------------------------------------------
# Cross-section access
# ------------------------------------------------------------------
[docs]
def get_cross_section(self, river: str, reach: str, rs: str) -> CrossSection | None:
"""Parse and return the cross section at *(river, reach, rs)*.
Returns ``None`` if not found or if the node is not a cross section
(type != 1).
"""
start = self._find_node_start(river, reach, rs)
if start is None:
return None
parsed = self._parse_node_header(self._lines[start])
if parsed is None or parsed[0] != NodeType.CROSS_SECTION:
return None
end = self._find_node_end(start)
return self._parse_xs_from_lines(river, reach, start, end)
[docs]
def all_cross_sections(self) -> list[CrossSection]:
"""Return every cross section in the geometry, in file order.
Iterates all reaches and collects cross sections from each.
Structure nodes (bridges, culverts, etc.) are skipped.
"""
result: list[CrossSection] = []
for river, reach in self.reaches:
result.extend(self.cross_sections(river, reach))
return result
[docs]
def cross_sections(self, river: str, reach: str) -> list[CrossSection]:
"""Return all cross sections in *reach*, in file order.
Structure nodes (bridges, culverts, etc.) are skipped.
"""
prefix = _KEY_NODE + " ="
river_l = river.strip().lower()
reach_l = reach.strip().lower()
result: list[CrossSection] = []
in_reach = False
i = 0
n = len(self._lines)
while i < n:
line = self._lines[i]
if line.startswith(_KEY_REACH + "="):
rh = self._parse_reach_header(line)
if rh:
in_reach = rh[0].lower() == river_l and rh[1].lower() == reach_l
if in_reach and line.startswith(prefix):
parsed = self._parse_node_header(line)
if parsed and parsed[0] == NodeType.CROSS_SECTION:
end = self._find_node_end(i)
xs = self._parse_xs_from_lines(river, reach, i, end)
result.append(xs)
i = end
continue
i += 1
return result
# ------------------------------------------------------------------
# Cross-section write helpers
# ------------------------------------------------------------------
def _find_key_in_block(self, start: int, end: int, key: str) -> int | None:
"""Return index of first line in ``[start, end)`` starting with *key*."""
for i in range(start, end):
if self._lines[i].startswith(key):
return i
return None
[docs]
def set_mannings(
self,
river: str,
reach: str,
rs: str,
entries: list[ManningEntry],
mann_type: int | None = None,
mann_alt: int | None = None,
) -> None:
"""Replace the Manning's n data for the given cross section.
The ``#Mann=`` header and data rows are rebuilt from *entries*.
If *mann_type* or *mann_alt* are ``None``, the existing values from
the file are preserved.
Parameters
----------
river:
River name (case-insensitive).
reach:
Reach name (case-insensitive).
rs:
River station string (leading/trailing whitespace and
trailing ``*`` are ignored in comparisons).
entries:
New Manning's n zones (station, n_value, variation).
mann_type:
Type flag for the ``#Mann=`` header. ``None`` keeps
the existing value.
mann_alt:
Alt flag for the ``#Mann=`` header. ``None`` keeps
the existing value.
Raises
------
KeyError
No matching node or no ``#Mann=`` line found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
mann_i = self._find_key_in_block(start, end, "#Mann=")
if mann_i is None:
raise KeyError(f"No #Mann= line found for {river!r}, {reach!r}, {rs!r}")
# Preserve existing type/alt if caller did not supply them
existing = self._lines[mann_i]
parts = existing.split("=", 1)[1].split(",")
existing_type = int(parts[1].strip()) if len(parts) > 1 else 0
existing_alt = int(parts[2].strip()) if len(parts) > 2 else 0
if mann_type is None:
mann_type = existing_type
if mann_alt is None:
mann_alt = existing_alt
n_old_zones = int(parts[0].strip())
n_old_rows = _row_count(n_old_zones * 3, _COLS_MANN)
n = len(entries)
header = f"#Mann= {n} , {mann_type} , {mann_alt} "
flat = [v for e in entries for v in (e.station, e.n_value, e.variation)]
new_lines = [header] + _format_block(flat, _COLS_MANN)
self._splice(mann_i, 1 + n_old_rows, new_lines)
[docs]
def set_stations(
self,
river: str,
reach: str,
rs: str,
stations: list[float],
elevations: list[float],
) -> None:
"""Replace the station/elevation data for the given cross section.
Parameters
----------
river:
River name (case-insensitive).
reach:
Reach name (case-insensitive).
rs:
River station string.
stations:
New station values.
elevations:
New elevation values (must match length of *stations*).
Raises
------
ValueError
*stations* and *elevations* have different lengths.
KeyError
No matching node or no ``#Sta/Elev=`` line found.
"""
if len(stations) != len(elevations):
raise ValueError(
f"stations ({len(stations)}) and elevations ({len(elevations)})"
" must have the same length"
)
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
sta_i = self._find_key_in_block(start, end, "#Sta/Elev=")
if sta_i is None:
raise KeyError(f"No #Sta/Elev= line found for {river!r}, {reach!r}, {rs!r}")
n_old = int(self._lines[sta_i].split("=", 1)[1].strip())
n_old_rows = _row_count(n_old * 2, _COLS_STAE)
n = len(stations)
header = f"#Sta/Elev= {n} "
flat = [v for pair in zip(stations, elevations) for v in pair]
new_lines = [header] + _format_block(flat, _COLS_STAE)
self._splice(sta_i, 1 + n_old_rows, new_lines)
[docs]
def set_bank_stations(
self,
river: str,
reach: str,
rs: str,
left: float,
right: float,
) -> None:
"""Set the left and right bank stations for the given cross section.
Raises
------
KeyError
No matching node or no ``Bank Sta=`` line found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
bank_i = self._find_key_in_block(start, end, "Bank Sta=")
if bank_i is None:
raise KeyError(f"No Bank Sta= line found for {river!r}, {reach!r}, {rs!r}")
self._splice(bank_i, 1, [f"Bank Sta={left},{right}"])
[docs]
def set_exp_cntr(
self,
river: str,
reach: str,
rs: str,
expansion: float,
contraction: float,
) -> None:
"""Set the expansion/contraction loss coefficients.
Raises
------
KeyError
No matching node or no ``Exp/Cntr=`` line found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
ec_i = self._find_key_in_block(start, end, "Exp/Cntr=")
if ec_i is None:
raise KeyError(f"No Exp/Cntr= line found for {river!r}, {reach!r}, {rs!r}")
self._splice(ec_i, 1, [f"Exp/Cntr={expansion},{contraction}"])
[docs]
def set_vertical_n(
self,
river: str,
reach: str,
rs: str,
vertical_n: VerticalN | None,
) -> None:
"""Replace or remove the vertical n block for the given cross section.
When *vertical_n* is not ``None`` the existing block (if any) is
replaced in-place; if none exists, the block is inserted after the
``XS Rating Curve=`` line. Passing ``None`` removes any existing
block.
The caller is responsible for ensuring the ``#Mann`` block still
contains valid zone boundary stations. When vertical n is active
HEC-RAS expects those n-values to be ``0`` (placeholders).
Parameters
----------
river:
River name (case-insensitive).
reach:
Reach name (case-insensitive).
rs:
River station string.
vertical_n:
New vertical n data, or ``None`` to remove.
Raises
------
KeyError
No matching cross-section node found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
vn_elev_i = self._find_key_in_block(
start, end, "Vertical n Elevations="
)
if vn_elev_i is not None:
vn_flow_i = self._find_key_in_block(
vn_elev_i, end, "Vertical n Flow="
)
old_count = (
(vn_flow_i - vn_elev_i + 1) if vn_flow_i is not None else 0
)
insert_at = vn_elev_i
else:
rc_i = self._find_key_in_block(start, end, "XS Rating Curve=")
insert_at = (rc_i + 1) if rc_i is not None else end
old_count = 0
if vertical_n is None:
if old_count > 0:
self._splice(insert_at, old_count, [])
return
n_bp = len(vertical_n.breakpoints)
new_lines: list[str] = [f"Vertical n Elevations= {n_bp} "]
new_lines += _format_block(vertical_n.breakpoints, _COLS_STAE)
for vs in vertical_n.stations:
sta_val = vs.station
sta_str = (
str(int(sta_val))
if sta_val == int(sta_val)
else str(sta_val)
)
new_lines.append(f"Vertical n for Station={sta_str}")
new_lines += _format_block(vs.n_values, _COLS_STAE)
flow_val = -1 if vertical_n.by_flow else 0
new_lines.append(f"Vertical n Flow={flow_val} ")
self._splice(insert_at, old_count, new_lines)
[docs]
def set_levee(
self,
river: str,
reach: str,
rs: str,
left: LeveeData | None,
right: LeveeData | None,
) -> None:
"""Set or remove levee data for the given cross section.
Pass ``None`` for both *left* and *right* to remove any existing
``Levee=`` line. The line is replaced in-place when it already
exists; otherwise it is inserted before ``#XS Ineff=`` or
``Bank Sta=``.
Parameters
----------
river:
River name (case-insensitive).
reach:
Reach name (case-insensitive).
rs:
River station string.
left:
Left-bank levee definition, or ``None`` to clear.
right:
Right-bank levee definition, or ``None`` to clear.
Raises
------
KeyError
No matching cross-section node found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
levee_i = self._find_key_in_block(start, end, "Levee=")
if left is None and right is None:
if levee_i is not None:
self._splice(levee_i, 1, [])
return
line = _fmt_levee_line(left, right)
if levee_i is not None:
self._splice(levee_i, 1, [line])
else:
insert_at = end
for key in ("#XS Ineff=", "Bank Sta="):
idx = self._find_key_in_block(start, end, key)
if idx is not None:
insert_at = idx
break
self._splice(insert_at, 0, [line])
[docs]
def set_blocked_obstructions(
self,
river: str,
reach: str,
rs: str,
obstructions: list[BlockedObstruction],
) -> None:
"""Replace or remove the blocked-obstruction block for the given XS.
Pass an empty list to remove any existing ``#Block Obstruct=`` block.
When no block exists, the new block is inserted before ``Bank Sta=``.
Parameters
----------
river:
River name (case-insensitive).
reach:
Reach name (case-insensitive).
rs:
River station string.
obstructions:
New obstruction list (empty = remove).
Raises
------
KeyError
No matching cross-section node found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
bo_i = self._find_key_in_block(start, end, "#Block Obstruct=")
if bo_i is not None:
n_old = int(
self._lines[bo_i].split("=", 1)[1].split(",")[0].strip()
)
old_count = 1 + _row_count(n_old * 3, _COLS_MANN)
else:
old_count = 0
if not obstructions:
if bo_i is not None:
self._splice(bo_i, old_count, [])
return
n = len(obstructions)
flat = [v for o in obstructions for v in (o.x_start, o.x_end, o.elevation)]
new_lines = [f"#Block Obstruct= {n} , 0 "] + _format_block(
flat, _COLS_MANN
)
if bo_i is not None:
self._splice(bo_i, old_count, new_lines)
else:
bank_i = self._find_key_in_block(start, end, "Bank Sta=")
insert_at = bank_i if bank_i is not None else end
self._splice(insert_at, 0, new_lines)
[docs]
def set_htab(
self,
river: str,
reach: str,
rs: str,
starting_elevation: float,
increment: float,
count: int,
) -> None:
"""Set the hydraulic-table parameters for the given cross section.
Replaces the ``XS HTab Starting El and Incr=`` line in-place, or
inserts it after ``XS Rating Curve=`` if absent.
Parameters
----------
river:
River name (case-insensitive).
reach:
Reach name (case-insensitive).
rs:
River station string.
starting_elevation:
First elevation in the hydraulic table.
increment:
Elevation increment between table entries.
count:
Number of entries in the hydraulic table.
Raises
------
KeyError
No matching cross-section node found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
raise KeyError(f"No node found for {river!r}, {reach!r}, {rs!r}")
end = self._find_node_end(start)
htab_i = self._find_key_in_block(
start, end, "XS HTab Starting El and Incr="
)
line = (
f"XS HTab Starting El and Incr="
f"{starting_elevation},{increment}, {count} "
)
if htab_i is not None:
self._splice(htab_i, 1, [line])
else:
rc_i = self._find_key_in_block(start, end, "XS Rating Curve=")
insert_at = (rc_i + 1) if rc_i is not None else end
self._splice(insert_at, 0, [line])
# ------------------------------------------------------------------
# Structure parsing (inline structures)
# ------------------------------------------------------------------
@property
def structures(self) -> StructureCollection:
"""Structure collection parsed from this geometry file.
Returns a :class:`StructureCollection` whose ``inlines`` property
holds all inline structures keyed by ``'River Reach RS'``.
The result is cached; invalidated automatically on :meth:`save`.
Example::
g = Geometry("model.g01")
for key, iw in g.structures.inlines.items():
print(key, [gg.name for gg in iw.gate_groups])
"""
if self._structures_cache is None:
self._structures_cache = self._build_structures()
return self._structures_cache
def _build_structures(self) -> StructureCollection:
"""Scan all reaches and build the :class:`StructureCollection`."""
inlines: list[tuple[str, InlineStructure]] = []
bridges: list[tuple[str, Bridge]] = []
laterals: list[tuple[str, LateralStructure]] = []
for river, reach in self.reaches:
for node_type, rs in self.node_rs_list(river, reach):
if node_type not in (
NodeType.CULVERT,
NodeType.BRIDGE,
NodeType.INLINE_STRUCTURE,
NodeType.LATERAL_STRUCTURE,
):
continue
start = self._find_node_start(river, reach, rs)
if start is None:
continue
end = self._find_node_end(start)
upstream_node, downstream_node = self._adjacent_xs_nodes(
river, reach, rs
)
key = f"{river} {reach} {rs}"
if node_type == NodeType.INLINE_STRUCTURE:
iw = self._parse_inline_structure(
river, reach, rs, start, end, upstream_node, downstream_node
)
inlines.append((key, iw))
elif node_type in (NodeType.CULVERT, NodeType.BRIDGE):
br = self._parse_bridge(
river, reach, rs, start, end, upstream_node, downstream_node
)
bridges.append((key, br))
elif node_type == NodeType.LATERAL_STRUCTURE:
lat = self._parse_lateral(
river, reach, rs, start, end, upstream_node
)
laterals.append((key, lat))
return StructureCollection(inlines, bridges, laterals)
def _adjacent_xs_nodes(
self, river: str, reach: str, rs: str
) -> tuple[tuple[str, str, str], tuple[str, str, str]]:
"""Return the nearest upstream and downstream XS nodes flanking *rs*.
HEC-RAS lists nodes from upstream (high RS) to downstream (low RS).
Walking backward in the list finds the upstream XS; forward finds the
downstream XS.
Returns ``(upstream, downstream)`` tuples of ``(river, reach, rs)``.
Each tuple is ``("", "", "")`` when no adjacent XS exists on that side.
"""
_empty: tuple[str, str, str] = ("", "", "")
nodes = self.node_rs_list(river, reach)
rs_norm = self._normalize_rs(rs)
idx = next(
(
i
for i, (nt, r) in enumerate(nodes)
if self._normalize_rs(r) == rs_norm
),
None,
)
if idx is None:
return _empty, _empty
upstream: tuple[str, str, str] = _empty
for i in range(idx - 1, -1, -1):
if nodes[i][0] == NodeType.CROSS_SECTION:
upstream = (river, reach, nodes[i][1])
break
downstream: tuple[str, str, str] = _empty
for i in range(idx + 1, len(nodes)):
if nodes[i][0] == NodeType.CROSS_SECTION:
downstream = (river, reach, nodes[i][1])
break
return upstream, downstream
def _parse_inline_structure(
self,
river: str,
reach: str,
rs: str,
start: int,
end: int,
upstream_node: tuple[str, str, str],
downstream_node: tuple[str, str, str],
) -> InlineStructure:
"""Parse one inline-structure node block into an :class:`InlineStructure`.
Parameters
----------
river, reach, rs:
Node identity.
start, end:
Line-index range of the node block (``self._lines[start:end]``).
upstream_node, downstream_node:
Adjacent XS tuples from :meth:`_adjacent_xs_nodes`.
"""
lines = [ln.rstrip("\n") for ln in self._lines[start:end]]
# -- description ---------------------------------------------------
description = ""
desc_s = next(
(i for i, ln in enumerate(lines) if ln.strip() == "BEGIN DESCRIPTION:"),
None,
)
desc_e = next(
(i for i, ln in enumerate(lines) if ln.strip() == "END DESCRIPTION:"),
None,
)
if desc_s is not None and desc_e is not None and desc_e > desc_s:
description = "\n".join(lines[desc_s + 1 : desc_e]).strip()
# -- pilot flow (IW Pilot Flow=) -----------------------------------
pilot_flow = 0.0
for ln in lines:
if ln.startswith("IW Pilot Flow="):
val = ln[len("IW Pilot Flow="):].strip()
try:
pilot_flow = float(val)
except ValueError:
pass
break
# -- weir crest profile (#Inline Weir SE=) -------------------------
crest_profile: list[tuple[float, float]] = []
iw_se_i = next(
(i for i, ln in enumerate(lines) if ln.startswith("#Inline Weir SE=")),
None,
)
if iw_se_i is not None and iw_se_i + 1 < len(lines):
count_str = lines[iw_se_i][len("#Inline Weir SE="):].strip()
try:
n_pairs = int(count_str)
except ValueError:
n_pairs = 0
if n_pairs > 0:
flat = _parse_block(lines[iw_se_i + 1:], n_pairs * 2, _COL)
crest_profile = [
(flat[j], flat[j + 1]) for j in range(0, len(flat) - 1, 2)
]
# -- helper --------------------------------------------------------
def _cf(parts: list[str], idx: int, default: float = 0.0) -> float:
"""Parse comma-split field at *idx* to float; return *default* on blank."""
if idx >= len(parts):
return default
s = parts[idx].strip()
if not s:
return default
try:
return float(s)
except ValueError:
return default
def _cf_nan(parts: list[str], idx: int) -> float:
"""Like ``_cf`` but returns ``nan`` for blank fields."""
if idx >= len(parts):
return nan
s = parts[idx].strip()
if not s:
return nan
try:
return float(s)
except ValueError:
return nan
# -- overflow weir (IW Dist) ---------------------------------------
weir: Weir | None = None
iw_dist_i = next(
(i for i, ln in enumerate(lines) if ln.startswith("IW Dist,")), None
)
if iw_dist_i is not None and iw_dist_i + 1 < len(lines):
parts = lines[iw_dist_i + 1].split(",")
is_ogee = int(_cf(parts, 6)) == -1
weir = Weir(
width=_cf(parts, 1),
coefficient=_cf(parts, 2),
skew=_cf(parts, 3),
max_submergence=_cf(parts, 4),
min_elevation=_cf_nan(parts, 5),
shape="Ogee" if is_ogee else "Broad Crested",
dist=_cf(parts, 0),
spillway_approach_height=_cf(parts, 7),
design_energy_head=_cf(parts, 8),
)
# -- gate openings (one IW Gate Opening= line per opening) ------------
# Format: IW Gate Opening=<index>,<name>,<n_gis_points>
# When n_gis_points > 0 the next line holds n_gis_points (x, y) pairs
# in 16-char fixed-width columns.
_GIS_COL = 16
all_openings: list[tuple[str, list[tuple[float, float]]]] = []
for i, ln in enumerate(lines):
if ln.startswith("IW Gate Opening="):
op_parts = ln[len("IW Gate Opening="):].split(",")
oname = op_parts[1].strip() if len(op_parts) >= 2 else ""
n_gis = int(op_parts[2].strip()) if len(op_parts) >= 3 and op_parts[2].strip() else 0
gis: list[tuple[float, float]] = []
if n_gis > 0 and i + 1 < len(lines):
flat = _parse_block([lines[i + 1]], n_gis * 2, _GIS_COL)
gis = [(flat[j], flat[j + 1]) for j in range(0, len(flat) - 1, 2)]
all_openings.append((oname, gis))
# -- gate groups (IW Gate Name blocks) -----------------------------
gate_groups: list[GateGroup] = []
gate_header_indices = [
i for i, ln in enumerate(lines) if ln.startswith("IW Gate Name")
]
opening_iter = iter(all_openings)
for gate_i in gate_header_indices:
if gate_i + 1 >= len(lines):
continue
data_parts = lines[gate_i + 1].split(",")
name = data_parts[0].strip() if data_parts else ""
n_openings = int(_cf(data_parts, 13))
# Station line: fixed-width 8-char columns, immediately after data line
stations: list[float] = []
if n_openings > 0 and gate_i + 2 < len(lines):
stations = _parse_block([lines[gate_i + 2]], n_openings, _COL)
openings: list[GateOpening] = []
for k in range(n_openings):
oname, ogis = next(opening_iter, ("", []))
st = stations[k] if k < len(stations) else 0.0
openings.append(GateOpening(name=oname, station=st, gis=ogis))
gate_type_code = int(_cf(data_parts, 8))
gate_groups.append(
GateGroup(
name=name,
width=_cf(data_parts, 1),
height=_cf(data_parts, 2),
invert=_cf(data_parts, 3),
gate_coefficient=_cf(data_parts, 4),
trunnion_exponent=_cf(data_parts, 5),
opening_exponent=_cf(data_parts, 6),
height_exponent=_cf(data_parts, 7),
gate_type=_GATE_TYPE_NAMES.get(gate_type_code, str(gate_type_code)),
weir_coefficient=_cf(data_parts, 9),
is_ogee=int(_cf(data_parts, 10)) == -1,
spillway_approach_height=_cf(data_parts, 11),
design_energy_head=_cf(data_parts, 12),
trunnion_height=_cf(data_parts, 14),
orifice_coefficient=_cf(data_parts, 15),
head_reference=int(_cf(data_parts, 16)),
radial_coefficient=_cf(data_parts, 17),
is_sharp_crested=int(_cf(data_parts, 19)) == -1,
use_weir_param1=int(_cf(data_parts, 20)) == -1,
use_weir_param2=int(_cf(data_parts, 21)) == -1,
use_weir_param3=int(_cf(data_parts, 22)) == -1,
openings=openings,
)
)
_empty: tuple[str, str, str] = ("", "", "")
return InlineStructure(
mode="",
upstream_type="XS" if upstream_node != _empty else "",
downstream_type="XS" if downstream_node != _empty else "",
location=(river, reach, rs),
upstream_node=upstream_node,
downstream_node=downstream_node,
weir=weir,
gate_groups=gate_groups,
description=description,
pilot_flow=pilot_flow,
crest_profile=crest_profile,
)
def _parse_bridge(
self,
river: str,
reach: str,
rs: str,
start: int,
end: int,
upstream_node: tuple[str, str, str],
downstream_node: tuple[str, str, str],
) -> Bridge:
"""Parse one bridge or culvert node block into a :class:`Bridge`.
Parameters
----------
river, reach, rs:
Node identity.
start, end:
Line-index range of the node block (``self._lines[start:end]``).
upstream_node, downstream_node:
Adjacent XS tuples from :meth:`_adjacent_xs_nodes`.
"""
lines = [ln.rstrip("\n") for ln in self._lines[start:end]]
# Shared float-parsing helper (takes the current parts list as arg
# to avoid closure rebinding issues across multiple blocks).
def _f(parts: list[str], idx: int, default: float = 0.0) -> float:
if idx >= len(parts):
return default
s = parts[idx].strip()
try:
return float(s) if s else default
except ValueError:
return default
def _fn(parts: list[str], idx: int) -> float:
"""Like _f but returns nan for blank/missing."""
if idx >= len(parts):
return nan
s = parts[idx].strip()
try:
return float(s)
except (ValueError, TypeError):
return nan
# -- description ---------------------------------------------------
description = ""
desc_s = next(
(i for i, ln in enumerate(lines) if ln.strip() == "BEGIN DESCRIPTION:"),
None,
)
desc_e = next(
(i for i, ln in enumerate(lines) if ln.strip() == "END DESCRIPTION:"),
None,
)
if desc_s is not None and desc_e is not None and desc_e > desc_s:
description = "\n".join(lines[desc_s + 1 : desc_e]).strip()
# -- node name (Node Name= line, if present) -----------------------
node_name = ""
for ln in lines:
if ln.startswith("Node Name="):
node_name = ln[len("Node Name="):].strip()
break
# -- roadway / deck geometry (Deck Dist ... block) -----------------
# Header: "Deck Dist Width WeirC Skew NumUp NumDn MinLoCord MaxHiCord
# MaxSubmerge Is_Ogee"
# Data line columns (0-based):
# 0=dist, 1=width, 2=coef, 3=skew, 4=numUp, 5=numDn,
# 6=minLoCord, 7=maxHiCord, 8=maxSubmerge, 9=is_ogee
# After the data line: numUp stations, numUp hi-chords, numUp lo-chords,
# then numDn stations, numDn hi-chords, numDn lo-chords
# (all 8-char fixed-width columns, up to 10 per row).
roadway: Roadway | None = None
weir: Weir | None = None
deck_hdr_i = next(
(i for i, ln in enumerate(lines) if ln.startswith("Deck Dist")),
None,
)
if deck_hdr_i is not None and deck_hdr_i + 1 < len(lines):
dp = lines[deck_hdr_i + 1].split(",")
dist = _f(dp, 0)
width = _f(dp, 1)
coef = _f(dp, 2)
skew = _f(dp, 3)
num_up = int(_f(dp, 4))
num_dn = int(_f(dp, 5))
min_lo = _fn(dp, 6)
max_hi = _fn(dp, 7)
max_sub = _fn(dp, 8)
is_ogee = int(_f(dp, 9)) == -1
shape = "Ogee" if is_ogee else "Broad Crested"
row_off = deck_hdr_i + 2 # first fixed-width row
rows_up = _row_count(num_up, _COLS_STAE)
stations_up = _parse_block(lines[row_off: row_off + rows_up], num_up)
row_off += rows_up
hi_up = _parse_block(lines[row_off: row_off + rows_up], num_up)
row_off += rows_up
lo_up = _parse_block(lines[row_off: row_off + rows_up], num_up)
row_off += rows_up
rows_dn = _row_count(num_dn, _COLS_STAE)
stations_dn = _parse_block(lines[row_off: row_off + rows_dn], num_dn)
row_off += rows_dn
hi_dn = _parse_block(lines[row_off: row_off + rows_dn], num_dn)
row_off += rows_dn
lo_dn = _parse_block(lines[row_off: row_off + rows_dn], num_dn)
roadway = Roadway(
dist=dist,
width=width,
weir_coefficient=coef,
skew=skew,
max_submergence=max_sub,
shape=shape,
min_lo_chord=min_lo,
max_hi_chord=max_hi,
stations_up=stations_up,
hi_chord_up=hi_up,
lo_chord_up=lo_up,
stations_dn=stations_dn,
hi_chord_dn=hi_dn,
lo_chord_dn=lo_dn,
)
weir = Weir(
width=width,
coefficient=coef,
skew=skew,
max_submergence=max_sub,
min_elevation=nan,
shape=shape,
)
# -- culvert_groups (Culvert= and Multiple Barrel Culv= lines) ------
culvert_groups: list[CulvertGroup] = []
i = 0
while i < len(lines):
ln = lines[i]
if ln.startswith("Culvert="):
cp = ln[len("Culvert="):].split(",")
shape_code = int(_f(cp, 0)) if cp[0].strip() else 0
sname = _CULVERT_SHAPES.get(shape_code, f"Unknown ({shape_code})")
chart = (
int(_f(cp, 15))
if len(cp) > 15 and cp[15].strip()
else 0
)
culvert_group = CulvertGroup(
name=cp[13].strip() if len(cp) > 13 else "",
shape_code=shape_code,
shape_name=sname,
span=_f(cp, 1),
rise=_f(cp, 2),
length=_f(cp, 3),
n_top=_f(cp, 4),
entrance_loss=_f(cp, 5),
exit_loss=_f(cp, 6),
inlet_type=int(_f(cp, 7)),
outlet_type=int(_f(cp, 8)),
upstream_invert=_f(cp, 9),
upstream_station=_f(cp, 10),
downstream_invert=_f(cp, 11),
downstream_station=_f(cp, 12),
chart_number=chart,
)
# Consume immediately-following optional parameter lines
j = i + 1
while j < i + 5 and j < len(lines):
nxt = lines[j]
if nxt.startswith("Culvert Bottom n="):
with contextlib.suppress(ValueError):
culvert_group.n_bottom = float(
nxt[len("Culvert Bottom n="):].strip()
)
j += 1
elif nxt.startswith("Culvert Bottom Depth="):
with contextlib.suppress(ValueError):
culvert_group.depth_n_bottom = float(
nxt[len("Culvert Bottom Depth="):].strip()
)
j += 1
elif nxt.startswith("BC Culvert Barrel="):
bp = nxt[len("BC Culvert Barrel="):].split(",")
with contextlib.suppress(ValueError, IndexError):
culvert_group.num_barrels = int(bp[0].strip())
j += 1
else:
break
culvert_groups.append(culvert_group)
i = j
elif ln.startswith("Multiple Barrel Culv="):
# Field layout differs from Culvert=: no us_station/ds_station
# columns; num_barrels is at index 11; the next line contains
# num_barrels upstream + num_barrels downstream station values
# in 8-char fixed-width columns (not yet parsed — see TODO in
# CulvertGroup docstring).
mp = ln[len("Multiple Barrel Culv="):].split(",")
shape_code = int(_f(mp, 0)) if mp[0].strip() else 0
sname = _CULVERT_SHAPES.get(shape_code, f"Unknown ({shape_code})")
num_barrels_m = (
int(_f(mp, 11)) if len(mp) > 11 and mp[11].strip() else 1
)
chart = (
int(_f(mp, 14)) if len(mp) > 14 and mp[14].strip() else 0
)
culvert_group = CulvertGroup(
name=mp[12].strip() if len(mp) > 12 else "",
shape_code=shape_code,
shape_name=sname,
span=_f(mp, 1),
rise=_f(mp, 2),
length=_f(mp, 3),
n_top=_f(mp, 4),
entrance_loss=_f(mp, 5),
exit_loss=_f(mp, 6),
inlet_type=int(_f(mp, 7)),
outlet_type=int(_f(mp, 8)),
upstream_invert=_f(mp, 9),
upstream_station=0.0, # station data is on next line
downstream_invert=_f(mp, 10),
downstream_station=0.0, # station data is on next line
chart_number=chart,
num_barrels=num_barrels_m,
)
# Skip the station line (num_barrels * 2 values) and optional
# Culvert Bottom n= line
j = i + 1
non_data = ("Culvert", "BC ", "Pier ", "BR ")
if j < len(lines) and not lines[j].startswith(non_data):
j += 1 # station data line
while j < i + 5 and j < len(lines):
nxt = lines[j]
if nxt.startswith("Culvert Bottom n="):
with contextlib.suppress(ValueError):
culvert_group.n_bottom = float(
nxt[len("Culvert Bottom n="):].strip()
)
j += 1
elif nxt.startswith("Culvert Bottom Depth="):
with contextlib.suppress(ValueError):
culvert_group.depth_n_bottom = float(
nxt[len("Culvert Bottom Depth="):].strip()
)
j += 1
else:
break
culvert_groups.append(culvert_group)
i = j
else:
i += 1
# -- piers (Pier Skew, UpSta & Num, DnSta & Num= blocks) ----------
piers: list[Pier] = []
i = 0
while i < len(lines):
ln = lines[i]
if ln.startswith("Pier Skew, UpSta & Num, DnSta & Num="):
pp = ln[ln.index("=") + 1:].split(",")
skew_p = _f(pp, 0)
up_sta = _f(pp, 1)
up_num = int(_f(pp, 2))
dn_sta = _f(pp, 3)
dn_num = int(_f(pp, 4))
row_i = i + 1
rows_up = _row_count(up_num, _COLS_STAE)
up_widths = _parse_block(lines[row_i: row_i + rows_up], up_num)
row_i += rows_up
up_elev = _parse_block(lines[row_i: row_i + rows_up], up_num)
row_i += rows_up
rows_dn = _row_count(dn_num, _COLS_STAE)
dn_widths = _parse_block(lines[row_i: row_i + rows_dn], dn_num)
row_i += rows_dn
dn_elev = _parse_block(lines[row_i: row_i + rows_dn], dn_num)
row_i += rows_dn
piers.append(Pier(
skew=skew_p,
upstream_station=up_sta,
upstream_count=up_num,
downstream_station=dn_sta,
downstream_count=dn_num,
upstream_widths=up_widths,
upstream_elevations=up_elev,
downstream_widths=dn_widths,
downstream_elevations=dn_elev,
))
i = row_i
else:
i += 1
# -- HTab parameters -----------------------------------------------
htab_hw_max: float | None = None
htab_tw_max: float | None = None
htab_max_flow: float | None = None
for ln in lines:
if ln.startswith("BC HTab HWMax="):
with contextlib.suppress(ValueError):
htab_hw_max = float(ln[len("BC HTab HWMax="):].strip())
elif ln.startswith("BC HTab TWMax="):
with contextlib.suppress(ValueError):
htab_tw_max = float(ln[len("BC HTab TWMax="):].strip())
elif ln.startswith("BC HTab MaxFlow="):
with contextlib.suppress(ValueError):
htab_max_flow = float(ln[len("BC HTab MaxFlow="):].strip())
_empty: tuple[str, str, str] = ("", "", "")
return Bridge(
mode="",
upstream_type="XS" if upstream_node != _empty else "",
downstream_type="XS" if downstream_node != _empty else "",
location=(river, reach, rs),
upstream_node=upstream_node,
downstream_node=downstream_node,
weir=weir,
description=description,
roadway=roadway,
culvert_groups=culvert_groups,
piers=piers,
node_name=node_name,
htab_hw_max=htab_hw_max,
htab_tw_max=htab_tw_max,
htab_max_flow=htab_max_flow,
)
def _parse_lateral(
self,
river: str,
reach: str,
rs: str,
start: int,
end: int,
upstream_node: tuple[str, str, str],
) -> LateralStructure:
"""Parse one lateral structure node block into a :class:`LateralStructure`.
Parameters
----------
river, reach, rs:
Node identity.
start, end:
Line-index range of the node block (``self._lines[start:end]``).
upstream_node:
Adjacent upstream XS tuple from :meth:`_adjacent_xs_nodes`.
"""
lines = [ln.rstrip("\n") for ln in self._lines[start:end]]
# -- description ---------------------------------------------------
description = ""
desc_s = next(
(i for i, ln in enumerate(lines) if ln.strip() == "BEGIN DESCRIPTION:"),
None,
)
desc_e = next(
(i for i, ln in enumerate(lines) if ln.strip() == "END DESCRIPTION:"),
None,
)
if desc_s is not None and desc_e is not None and desc_e > desc_s:
description = "\n".join(lines[desc_s + 1 : desc_e]).strip()
# -- downstream connection (Lateral Weir End=river,reach,rs,...) ---
downstream_node = ""
for ln in lines:
if ln.startswith("Lateral Weir End="):
parts = ln[len("Lateral Weir End="):].split(",")
if len(parts) >= 2:
lat_river = parts[0].strip()
lat_reach = parts[1].strip()
downstream_node = f"{lat_river} {lat_reach}".strip()
break
# -- scalar Lateral Weir fields ------------------------------------
def _get_float(prefix: str, default: float = nan) -> float:
for ln in lines:
if ln.startswith(prefix):
s = ln[len(prefix):].strip().rstrip(",").strip()
try:
return float(s)
except ValueError:
return default
return default
pos = int(_get_float("Lateral Weir Pos=", default=0.0))
distance = _get_float("Lateral Weir Distance=")
flap_val = _get_float("Lateral Weir Flap Gates=", default=0.0)
flap_gates = not (isnan(flap_val) or flap_val == 0.0)
tw_val = _get_float("Lateral Weir TW Multiple XS=", default=0.0)
tw_multiple_xs = not (isnan(tw_val) or tw_val == 0.0)
# -- weir from individual Lateral Weir lines -----------------------
weir: Weir | None = None
wd = _get_float("Lateral Weir WD=")
coef = _get_float("Lateral Weir Coef=")
if not isnan(wd): # Lateral Weir WD= line was found
ws_criteria = _get_float("Lateral Weir WSCriteria=", default=0.0)
use_ws = (ws_criteria == -1)
# Lateral Weir Type= 0=broad crested, 1=ogee (not commonly set)
lat_type = _get_float("Lateral Weir Type=", default=0.0)
shape = "Ogee" if int(lat_type) == 1 else "Broad Crested"
weir = Weir(
width=wd,
coefficient=coef if not isnan(coef) else 0.0,
skew=0.0,
max_submergence=nan,
min_elevation=nan,
shape=shape,
use_water_surface=use_ws,
)
# -- weir crest profile (Lateral Weir SE= N block) -----------------
# Format: "Lateral Weir SE= N" then N pairs of (station, elevation)
# in 8-char fixed-width columns, up to 10 values per row.
crest_profile: list[tuple[float, float]] = []
for i, ln in enumerate(lines):
if ln.startswith("Lateral Weir SE="):
n_str = ln[len("Lateral Weir SE="):].strip()
try:
n_pairs = int(n_str)
except ValueError:
n_pairs = 0
if n_pairs > 0:
n_rows = _row_count(n_pairs * 2, _COLS_STAE)
flat = _parse_block(lines[i + 1: i + 1 + n_rows], n_pairs * 2)
crest_profile = [
(flat[j], flat[j + 1])
for j in range(0, len(flat) - 1, 2)
]
break
_empty: tuple[str, str, str] = ("", "", "")
return LateralStructure(
mode="",
upstream_type="XS" if upstream_node != _empty else "",
downstream_type="XS" if downstream_node else "",
location=(river, reach, rs),
upstream_node=upstream_node,
downstream_node=downstream_node,
weir=weir,
description=description,
pos=pos,
distance=distance,
crest_profile=crest_profile,
flap_gates=flap_gates,
tw_multiple_xs=tw_multiple_xs,
)
# ------------------------------------------------------------------
# Raw node access (for structures)
# ------------------------------------------------------------------
[docs]
def get_node_lines(self, river: str, reach: str, rs: str) -> list[str] | None:
"""Return the raw lines for a node block (header inclusive).
Useful for inspecting structure nodes (bridges, culverts, etc.) that
are not fully parsed. Returns ``None`` if not found.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
return None
end = self._find_node_end(start)
return [ln.rstrip("\n") for ln in self._lines[start:end]]
[docs]
def inline_gate_groups(self, river: str, reach: str, rs: str) -> list[str]:
"""Return gate group names for an inline structure, in file order.
Parses ``IW Gate Name`` header lines from the node block. The name
is the first comma-separated field on the data line that immediately
follows each header line.
Parameters
----------
river:
River name.
reach:
Reach name.
rs:
River station of the inline structure.
Returns
-------
list[str]
Gate group names, e.g. ``["Left Group", "Center Group", "Right Group"]``.
Raises
------
KeyError
If the node is not found or is not an inline structure (type 5).
"""
lines = self.get_node_lines(river, reach, rs)
if lines is None:
raise KeyError(
f"Node not found: river={river!r}, reach={reach!r}, rs={rs!r}"
)
node_type = self.node_type(river, reach, rs)
if node_type != NodeType.INLINE_STRUCTURE:
raise KeyError(
f"Node at river={river!r}, reach={reach!r}, rs={rs!r} "
f"is not an inline structure (type={node_type!r})"
)
names: list[str] = []
for i, line in enumerate(lines):
if line.startswith("IW Gate Name") and i + 1 < len(lines):
names.append(lines[i + 1].split(",", 1)[0].strip())
return names
[docs]
def node_type(self, river: str, reach: str, rs: str) -> int | None:
"""Return the node type code for *(river, reach, rs)*, or ``None``.
Returns one of :data:`NodeType.CROSS_SECTION`, :data:`NodeType.CULVERT`,
:data:`NodeType.BRIDGE`, :data:`NodeType.MULTIPLE_OPENING`,
:data:`NodeType.INLINE_STRUCTURE`, :data:`NodeType.LATERAL_STRUCTURE`.
"""
start = self._find_node_start(river, reach, rs)
if start is None:
return None
parsed = self._parse_node_header(self._lines[start])
return parsed[0] if parsed else None
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
[docs]
def save(self, path: str | Path | None = None) -> None:
"""Write all in-memory lines back to disk.
If *path* is omitted the source file is overwritten.
"""
dest = Path(path) if path is not None else self._path
with open(dest, "w", encoding="utf-8") as fh:
fh.writelines(self._lines)
self._modified = False
self._structures_cache = None