"""Read/write HEC-RAS steady flow files (.f**).
:class:`SteadyFlow` — verbatim-line editor. All lines are stored
verbatim; only the specific data blocks that are mutated are reformatted.
``save()`` is byte-faithful for every unmodified line.
File format notes:
- Flow data: each ``River Rch & RM=River,Reach,RS`` line is immediately
followed by *N* flow values (one per profile) in 8-char fixed-width
columns, up to 10 per row. No ``Num Of Flows=`` or ``Flow=`` prefix.
- Boundary conditions: one ``Boundary for River Rch & Prof#=River,Reach,N``
block **per reach endpoint per profile**. Up/Dn type codes:
.. code-block:: text
0 — None
1 — Known WS (``Up/Dn Known WS=<value>``)
2 — Critical Depth (no additional data)
3 — Normal Depth (``Up/Dn Slope=<value>``)
4 — Rating Curve (``Up/Dn Nval=<count>`` + interleaved stage/flow pairs)
Derived from format inspection of HEC-RAS 6.6 example files.
Convention
----------
``get_*`` methods return ``None`` when the requested item is not found.
``set_*`` methods raise :exc:`KeyError` when the target does not exist.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import IntEnum
from math import ceil
from pathlib import Path
logger = logging.getLogger("rivia.model")
# ---------------------------------------------------------------------------
# Formatting helpers (same algorithm as flow_unsteady)
# ---------------------------------------------------------------------------
_COL_WIDTH = 8
_COLS_PER_ROW = 10
def _fit_width(value: float, width: int = _COL_WIDTH) -> str:
"""Right-justify *value* inside *width* characters.
Tries integer, then progressively fewer decimal places, then scientific
notation. Truncates as a last resort.
"""
if isinstance(value, int) or (
isinstance(value, float)
and value == int(value)
and len(str(int(value))) <= width
):
s = str(int(value))
if len(s) <= width:
return s.rjust(width)
s = repr(value)
if len(s) <= width:
return s.rjust(width)
fv = float(value)
for decimals in range(6, -1, -1):
s = f"{fv:.{decimals}f}"
if len(s) <= width:
return s.rjust(width)
for decimals in range(2, -1, -1):
s = f"{fv:.{decimals}E}"
if len(s) <= width:
return s.rjust(width)
return repr(value)[:width]
def _format_data_block(
values: list[float],
cols: int = _COLS_PER_ROW,
width: int = _COL_WIDTH,
) -> list[str]:
"""Return fixed-width data lines (no trailing newline)."""
lines: list[str] = []
for i in range(0, len(values), cols):
chunk = values[i : i + cols]
lines.append("".join(_fit_width(v, width) for v in chunk))
return lines
def _parse_data_block(
lines: list[str],
count: int,
width: int = _COL_WIDTH,
) -> list[float]:
"""Parse up to *count* fixed-width values from *lines*."""
values: list[float] = []
for line in lines:
pos = 0
while pos < len(line) and len(values) < count:
token = line[pos : pos + width].strip()
if token:
try:
values.append(float(token))
except ValueError:
values.append(0.0)
pos += width
return values[:count]
def _data_line_count(n: int, cols: int = _COLS_PER_ROW) -> int:
return ceil(n / cols) if n > 0 else 0
# ---------------------------------------------------------------------------
# File-section keys
# ---------------------------------------------------------------------------
_KEY_FLOW_LOC = "River Rch & RM"
_KEY_BOUNDARY = "Boundary for River Rch & Prof#"
# Lines that signal the start of trailing (non-boundary, non-flow) content.
_TRAILING_STARTS = (
"DSS Import",
"Storage Area Elev=",
"Observed WS=",
)
class _BcType(IntEnum):
"""Boundary condition type codes (``Up/Dn Type=`` field in steady flow files)."""
NONE = 0
KNOWN_WS = 1
CRITICAL_DEPTH = 2
NORMAL_DEPTH = 3
RATING_CURVE = 4
# Aliases used internally throughout this module.
_BC_NONE = _BcType.NONE
_BC_KNOWN_WS = _BcType.KNOWN_WS
_BC_CRITICAL_DEPTH = _BcType.CRITICAL_DEPTH
_BC_NORMAL_DEPTH = _BcType.NORMAL_DEPTH
_BC_RATING_CURVE = _BcType.RATING_CURVE
# ---------------------------------------------------------------------------
# Boundary condition dataclass
# ---------------------------------------------------------------------------
[docs]
@dataclass
class SteadyBoundary:
"""Boundary conditions for one river/reach endpoint for one flow profile.
``up_type`` / ``dn_type`` encode the condition type::
0 — None (no BC)
1 — Known WS (``up_known_ws`` / ``dn_known_ws``)
2 — Critical Depth (no additional data required)
3 — Normal Depth (``up_slope`` / ``dn_slope``)
4 — Rating Curve (``up_rating_curve`` / ``dn_rating_curve``)
``profile`` is 1-based and matches the profile index in the steady flow
file (``Boundary for River Rch & Prof#=..., 1``).
"""
river: str
reach: str
profile: int
up_type: int = 0
dn_type: int = 0
up_known_ws: float | None = None
dn_known_ws: float | None = None
up_slope: float | None = None
dn_slope: float | None = None
up_rating_curve: list[tuple[float, float]] = field(default_factory=list)
dn_rating_curve: list[tuple[float, float]] = field(default_factory=list)
# ---------------------------------------------------------------------------
# SteadyFlow — verbatim-line editor
# ---------------------------------------------------------------------------
[docs]
class SteadyFlow:
"""Verbatim-line editor for HEC-RAS steady flow files (.f**).
All lines are loaded into memory verbatim. Targeted edits (e.g.
replacing flow values at a cross-section or updating a boundary
condition) splice new formatted lines into the list while leaving every
other line byte-identical. ``save()`` writes the list back; a no-op
parse+save produces an identical file.
Flow data is accessed by river / reach / river-station string
(``get_flows`` / ``set_flows``). Boundary conditions are accessed by
river / reach / 1-based profile number (``get_boundary`` /
``set_boundary``).
Derived from format inspection of HEC-RAS 6.6 example files.
"""
def __init__(self, path: str | Path) -> None:
self._path = Path(path)
if not self._path.is_file():
raise FileNotFoundError(f"Steady flow file not found: {self._path}")
with open(self._path, encoding="utf-8", errors="replace") as fh:
self._lines: list[str] = fh.readlines()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get(self, key: str) -> str | None:
prefix = key + "="
for line in self._lines:
if line.startswith(prefix):
value = line[len(prefix) :].strip()
return value if value else None
return None
def _set(self, key: str, raw_value: str) -> None:
prefix = key + "="
for i, line in enumerate(self._lines):
if line.startswith(prefix):
self._lines[i] = f"{prefix}{raw_value}\n"
return
raise KeyError(f"Key not found in steady flow file: {key!r}")
def _splice(self, start: int, old_count: int, new_lines: list[str]) -> None:
"""Replace *old_count* lines starting at *start* with *new_lines*."""
self._lines[start : start + old_count] = [
(ln if ln.endswith("\n") else ln + "\n") for ln in new_lines
]
@staticmethod
def _match_fields(tail: str, river: str, reach: str, third: str | int) -> bool:
"""Return True if the first three comma-separated fields of *tail*
match (*river*, *reach*, *third*) after stripping and lowercasing."""
parts = tail.split(",", 3)
if len(parts) < 3:
return False
return (
parts[0].strip().lower() == river.strip().lower()
and parts[1].strip().lower() == reach.strip().lower()
and parts[2].strip() == str(third).strip()
)
@staticmethod
def _match_fields_ci(tail: str, river: str, reach: str, third: str) -> bool:
"""Case-insensitive version of _match_fields for RS comparisons."""
parts = tail.split(",", 3)
if len(parts) < 3:
return False
return (
parts[0].strip().lower() == river.strip().lower()
and parts[1].strip().lower() == reach.strip().lower()
and parts[2].strip().lower() == third.strip().lower()
)
def _find_flow_location(self, river: str, reach: str, rs: str) -> int | None:
"""Return the line index of the matching ``River Rch & RM=`` line."""
prefix = _KEY_FLOW_LOC + "="
for i, line in enumerate(self._lines):
if line.startswith(prefix):
if self._match_fields_ci(line[len(prefix) :], river, reach, rs):
return i
return None
def _find_flow_end(self, loc_i: int) -> int:
"""Return the index of the first line *after* the flow data block."""
n = len(self._lines)
i = loc_i + 1
while i < n:
line = self._lines[i]
if line.startswith(_KEY_FLOW_LOC + "=") or line.startswith(
_KEY_BOUNDARY + "="
):
return i
i += 1
return n
def _find_boundary_location(
self, river: str, reach: str, profile: int
) -> int | None:
"""Return the line index of the matching ``Boundary for River Rch & Prof#=``
line for the given profile number."""
prefix = _KEY_BOUNDARY + "="
for i, line in enumerate(self._lines):
if not line.startswith(prefix):
continue
tail = line[len(prefix) :]
parts = tail.split(",", 3)
if len(parts) < 3:
continue
try:
file_profile = int(parts[2].strip())
except ValueError:
continue
if (
parts[0].strip().lower() == river.strip().lower()
and parts[1].strip().lower() == reach.strip().lower()
and file_profile == profile
):
return i
return None
def _find_boundary_end(self, loc_i: int) -> int:
"""Return the index of the first line *after* the boundary block."""
n = len(self._lines)
i = loc_i + 1
while i < n:
line = self._lines[i]
if line.startswith(_KEY_BOUNDARY + "=") or line.startswith(
_KEY_FLOW_LOC + "="
):
return i
if any(line.startswith(t) for t in _TRAILING_STARTS):
return i
i += 1
return n
# ------------------------------------------------------------------
# Generic escape hatch
# ------------------------------------------------------------------
[docs]
def get(self, key: str) -> str | None:
"""Return the raw stripped value for *key*, or ``None`` if absent."""
return self._get(key)
[docs]
def set(self, key: str, value: str) -> None:
"""Set *key* to *value* verbatim. Raises ``KeyError`` if absent."""
self._set(key, value)
# ------------------------------------------------------------------
# Scalar properties
# ------------------------------------------------------------------
@property
def flow_title(self) -> str | None:
"""Flow title (``Flow Title=``)."""
return self._get("Flow Title")
@flow_title.setter
def flow_title(self, value: str) -> None:
self._set("Flow Title", value)
@property
def program_version(self) -> str | None:
"""HEC-RAS version string.
Checks ``Program Version=`` first (modern format), then falls back to
``Version=`` (pre-v4 files such as older WAILUPE-style projects).
Treat as read-only; HEC-RAS manages this field.
"""
v = self._get("Program Version")
return v if v is not None else self._get("Version")
@property
def n_profiles(self) -> int | None:
"""Number of flow profiles (``Number of Profiles=``)."""
raw = self._get("Number of Profiles")
if raw is None:
return None
try:
return int(raw.strip())
except ValueError:
return None
@n_profiles.setter
def n_profiles(self, value: int) -> None:
self._set("Number of Profiles", f" {value} ")
@property
def profile_names(self) -> list[str]:
"""Profile names from the header ``Profile Names=`` line, stripped.
Returns an empty list if the key is absent.
"""
raw = self._get("Profile Names")
if raw is None:
return []
return [name.strip() for name in raw.split(",") if name.strip()]
@profile_names.setter
def profile_names(self, names: list[str]) -> None:
"""Replace the ``Profile Names=`` line.
Raises ``KeyError`` if the key is not present in the file.
"""
self._set("Profile Names", ",".join(names))
# ------------------------------------------------------------------
# Flow data
# ------------------------------------------------------------------
[docs]
def get_flows(self, river: str, reach: str, rs: str) -> list[float] | None:
"""Return flow values (one per profile) at the given location.
Returns ``None`` if no matching ``River Rch & RM`` line is found.
Returns ``[]`` if the location exists but contains no parseable values.
"""
loc_i = self._find_flow_location(river, reach, rs)
if loc_i is None:
return None
end_i = self._find_flow_end(loc_i)
data_lines = [ln.rstrip("\n") for ln in self._lines[loc_i + 1 : end_i]]
n = self.n_profiles
if n is None:
# Parse as many values as the data lines can provide.
n = _COLS_PER_ROW * len(data_lines)
return _parse_data_block(data_lines, n)
[docs]
def set_flows(self, river: str, reach: str, rs: str, values: list[float]) -> None:
"""Replace the flow values at the given location.
The number of values in *values* need not match the current count;
the data lines are replaced wholesale.
Parameters
----------
river:
River name (case-insensitive match).
reach:
Reach name (case-insensitive match).
rs:
River station string (stripped comparison).
values:
New flow values, one per profile.
Raises
------
KeyError
No matching ``River Rch & RM`` line found.
"""
loc_i = self._find_flow_location(river, reach, rs)
if loc_i is None:
raise KeyError(f"No flow location found for {river!r}, {reach!r}, {rs!r}")
end_i = self._find_flow_end(loc_i)
old_line_count = end_i - (loc_i + 1)
self._splice(loc_i + 1, old_line_count, _format_data_block(values))
# ------------------------------------------------------------------
# Boundary conditions
# ------------------------------------------------------------------
[docs]
def get_boundary(
self, river: str, reach: str, profile: int
) -> SteadyBoundary | None:
"""Return the boundary conditions for the given reach endpoint and profile.
Returns ``None`` if no matching ``Boundary for River Rch & Prof#`` line
is found.
"""
loc_i = self._find_boundary_location(river, reach, profile)
if loc_i is None:
return None
bc = SteadyBoundary(river=river, reach=reach, profile=profile)
end_i = self._find_boundary_end(loc_i)
i = loc_i + 1
while i < end_i:
line = self._lines[i]
stripped = line.strip()
if not stripped or "=" not in stripped:
i += 1
continue
key, _, val = stripped.partition("=")
key = key.strip()
val = val.strip()
if key == "Up Type":
bc.up_type = int(val)
elif key == "Dn Type":
bc.dn_type = int(val)
elif key == "Up Known WS":
bc.up_known_ws = float(val)
elif key == "Dn Known WS":
bc.dn_known_ws = float(val)
elif key == "Up Slope":
bc.up_slope = float(val)
elif key == "Dn Slope":
bc.dn_slope = float(val)
elif key == "Up Nval":
count = int(val)
nlines = _data_line_count(count * 2)
data_lines = [
ln.rstrip("\n") for ln in self._lines[i + 1 : i + 1 + nlines]
]
flat = _parse_data_block(data_lines, count * 2)
bc.up_rating_curve = [
(flat[j], flat[j + 1]) for j in range(0, len(flat), 2)
]
i += nlines
elif key == "Dn Nval":
count = int(val)
nlines = _data_line_count(count * 2)
data_lines = [
ln.rstrip("\n") for ln in self._lines[i + 1 : i + 1 + nlines]
]
flat = _parse_data_block(data_lines, count * 2)
bc.dn_rating_curve = [
(flat[j], flat[j + 1]) for j in range(0, len(flat), 2)
]
i += nlines
i += 1
return bc
[docs]
def set_boundary(self, boundary: SteadyBoundary) -> None:
"""Replace the boundary conditions at the given location.
The entire boundary block (from ``Boundary for River Rch & Prof#=``
through its fields) is rebuilt from *boundary*. Fields irrelevant
to the set type are omitted (e.g. ``Dn Slope=`` is not written when
``dn_type != 3``).
Raises
------
KeyError
No matching ``Boundary for River Rch & Prof#`` line found.
"""
river, reach, profile = boundary.river, boundary.reach, boundary.profile
loc_i = self._find_boundary_location(river, reach, profile)
if loc_i is None:
raise KeyError(
f"No boundary found for {river!r}, {reach!r}, profile {profile}"
)
end_i = self._find_boundary_end(loc_i)
# Preserve the original location header line verbatim.
new_lines: list[str] = [self._lines[loc_i].rstrip("\n")]
new_lines.append(f"Up Type= {boundary.up_type} ")
if boundary.up_type == _BC_KNOWN_WS and boundary.up_known_ws is not None:
new_lines.append(f"Up Known WS={boundary.up_known_ws}")
elif boundary.up_type == _BC_NORMAL_DEPTH and boundary.up_slope is not None:
new_lines.append(f"Up Slope={boundary.up_slope}")
elif boundary.up_type == _BC_RATING_CURVE and boundary.up_rating_curve:
count = len(boundary.up_rating_curve)
new_lines.append(f"Up Nval= {count}")
flat = [v for pair in boundary.up_rating_curve for v in pair]
new_lines.extend(_format_data_block(flat))
new_lines.append(f"Dn Type= {boundary.dn_type} ")
if boundary.dn_type == _BC_KNOWN_WS and boundary.dn_known_ws is not None:
new_lines.append(f"Dn Known WS={boundary.dn_known_ws}")
elif boundary.dn_type == _BC_NORMAL_DEPTH and boundary.dn_slope is not None:
new_lines.append(f"Dn Slope={boundary.dn_slope}")
elif boundary.dn_type == _BC_RATING_CURVE and boundary.dn_rating_curve:
count = len(boundary.dn_rating_curve)
new_lines.append(f"Dn Nval= {count}")
flat = [v for pair in boundary.dn_rating_curve for v in pair]
new_lines.extend(_format_data_block(flat))
self._splice(loc_i, end_i - loc_i, new_lines)
[docs]
def get_boundaries(self, river: str, reach: str) -> list[SteadyBoundary]:
"""Return all boundary conditions for a river/reach, sorted by profile.
Returns an empty list if no matching boundaries are found.
"""
prefix = _KEY_BOUNDARY + "="
profiles: list[int] = []
for line in self._lines:
if not line.startswith(prefix):
continue
tail = line[len(prefix) :]
parts = tail.split(",", 3)
if len(parts) < 3:
continue
if (
parts[0].strip().lower() == river.strip().lower()
and parts[1].strip().lower() == reach.strip().lower()
):
try:
profiles.append(int(parts[2].strip()))
except ValueError:
pass
result = []
for p in sorted(profiles):
bc = self.get_boundary(river, reach, p)
if bc is not None:
result.append(bc)
return result
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
[docs]
def save(self, path: str | Path | None = None) -> None:
"""Write all in-memory lines back to disk.
If *path* is omitted the source file is overwritten.
"""
dest = Path(path) if path is not None else self._path
with open(dest, "w", encoding="utf-8") as fh:
fh.writelines(self._lines)