"""WLS cell-centre velocity reconstruction from HEC-RAS face normal velocities.
This module contains pure-numpy functions for cell-centre WLS velocity
reconstruction. They are called by ``FlowAreaResults`` which provides the
mesh geometry arrays. Face and facepoint velocity reconstruction uses the
RASMapper-exact pipeline in ``rivia.geo._rasmap``.
Background
----------
HEC-RAS 2-D solves for *face normal velocities* (velocity component
perpendicular to each cell face) as primary unknowns. Cell-centre velocity
vectors are a derived quantity reconstructed via the weighted least-squares
(WLS) method described in the HEC-RAS Technical Reference Manual
(Section: 2D Unsteady Flow - Numerical Methods - Cell Velocity).
The 2x2 WLS system (per cell, summing over its k faces):
[a11 a12] [u] [b1]
[a12 a22] [v] = [b2]
a11 = sum(w_k * nx_k**2)
a22 = sum(w_k * ny_k**2)
a12 = sum(w_k * nx_k * ny_k)
b1 = sum(w_k * V_n,k * nx_k)
b2 = sum(w_k * V_n,k * ny_k)
Solved via Cramer's rule:
det = a11*a22 - a12**2
u = (a22*b1 - a12*b2) / det
v = (a11*b2 - a12*b1) / det
Weights *w_k* are either:
- ``"area_weighted"`` : wetted face flow areas interpolated from the
per-face hydraulic property tables at the estimated face WSE. This
matches HEC-RAS's internal reconstruction.
- ``"length_weighted"``: face plan-view lengths (column 2 of
``Faces NormalUnitVector and Length``). Simpler; no table lookup.
Sign convention note
--------------------
The stored face normal ``n_hat_stored`` points from the *left* cell to the
*right* cell (``Faces Cell Indexes`` col 0 -> col 1). Orientation (+1/-1)
records whether ``n_hat_stored`` is *outward* (+1) or *inward* (-1) relative
to the current cell. Orientation cancels in the WLS product
``V_n * n_hat``, so the stored values are used directly.
"""
from __future__ import annotations
import logging
import numpy as np
logger = logging.getLogger("rivia.hdf")
# ---------------------------------------------------------------------------
# Single-cell helpers (used internally and exposed for testing)
# ---------------------------------------------------------------------------
def _wls_velocity(
vn: np.ndarray,
weights: np.ndarray,
normals: np.ndarray,
) -> np.ndarray:
"""Solve the 2x2 WLS system for one cell.
Parameters
----------
vn : shape ``(n_faces,)``
Signed face normal velocities.
weights : shape ``(n_faces,)``
Per-face weights (flow areas or lengths).
normals : shape ``(n_faces, 2)``
Unit normal vectors ``[nx, ny]``.
Returns
-------
np.ndarray, shape ``(2,)``
``[u, v]`` cell-centre velocity components.
"""
nx = normals[:, 0]
ny = normals[:, 1]
a11 = np.dot(weights, nx * nx)
a22 = np.dot(weights, ny * ny)
a12 = np.dot(weights, nx * ny)
b1 = np.dot(weights * vn, nx)
b2 = np.dot(weights * vn, ny)
det = a11 * a22 - a12 * a12
# Ill-conditioning guard — condition-number relative threshold.
#
# Background
# ----------
# An absolute threshold (e.g. 1e-30) is dangerously permissive.
# The 2×2 WLS matrix can have a positive det yet be so ill-conditioned
# that the solution is physically meaningless. Two distinct scenarios
# arise in practice:
#
# Scenario A — nearly anti-parallel wet faces (area_weighted):
# A cell may have only two wet faces whose normals are nearly anti-
# parallel (≈180° apart, e.g. n1 ≈ −n2). Both faces measure the
# same flow component but at different magnitudes (e.g. vn₁ = −26.4
# vs vn₂ = +15.5 m/s for n₁ ≈ −n₂). The WLS must reconcile this
# inconsistency by assigning a huge perpendicular velocity (~828 m/s)
# to satisfy both equations simultaneously, even though det > 0 (the
# faces are not exactly anti-parallel).
#
# Scenario B — nearly parallel high-weight faces + tiny-weight outlier:
# Most faces deep (area ≈ 100, nearly parallel normals); one barely-
# wet face (area ≈ 1e-8) is the only constraint for the perpendicular
# direction. det ≈ (high_w)(low_w)(sin θ)² can be ~1e-11; a 0.001
# m/s discrepancy in that face's vₙ is amplified to hundreds of m/s.
#
# The 2×2 condition number approximates to
# κ ≈ (a11 + a22)² / det (for large κ)
# Requiring κ ≤ MAX_COND is equivalent to
# det ≥ (a11 + a22)² / MAX_COND
# which scales correctly with the weight magnitude.
#
# MAX_COND = 1e4 in practice:
# Two 90°-separated faces (equal weight): κ ≈ 1 → passes ✓
# Two 5°-separated faces (equal weight): κ ≈ 530 → passes ✓
# Two 1°-separated faces (equal weight): κ ≈ 13000 → rejected ✓
# Cell with two anti-parallel wet faces: κ ≈ 1e5 → rejected ✓
_MAX_COND = 1e4
_trace = a11 + a22
if _trace == 0.0 or abs(det) * _MAX_COND < _trace ** 2:
return np.zeros(2)
u = (a22 * b1 - a12 * b2) / det
v = (a11 * b2 - a12 * b1) / det
return np.array([u, v])
def _interpolate_face_flow_area(
face_idx: int,
wse: float,
ae_info: np.ndarray,
ae_values: np.ndarray,
) -> float:
"""Interpolate flow area for one face from its hydraulic property table.
Parameters
----------
face_idx : int
0-based face index.
wse : float
Water-surface elevation at the face.
ae_info : ndarray, shape ``(n_faces, 2)``
``[start_index, count]`` for each face into *ae_values*.
ae_values : ndarray, shape ``(total, 4)``
Columns: ``[elevation, flow_area, wetted_perimeter, mannings_n]``.
Returns
-------
float
Wetted cross-sectional flow area at the given WSE.
"""
start = int(ae_info[face_idx, 0])
count = int(ae_info[face_idx, 1])
table = ae_values[start : start + count]
elevs = table[:, 0]
areas = table[:, 1]
if wse <= elevs[0]:
return 0.0
if wse >= elevs[-1]:
return float(areas[-1])
return float(np.interp(wse, elevs, areas))
def _estimate_face_wse_average(
face_cell_indexes: np.ndarray,
cell_wse: np.ndarray,
) -> np.ndarray:
"""Estimate face WSE as the simple average of the two adjacent cell WSEs.
For boundary faces (one neighbour index is -1 or beyond the length of
*cell_wse*), the single available cell's WSE is used. Passing an
extended *cell_wse* that includes ghost-cell rows (indices
``n_cells_real .. n_total-1``) makes boundary faces use the
ghost-cell WSE instead of falling back to the inner cell only.
Parameters
----------
face_cell_indexes : ndarray, shape ``(n_faces, 2)``
Left and right cell index per face; -1 = no neighbour.
cell_wse : ndarray, shape ``(n_total,)``
Water-surface elevation indexed by cell index. May be real cells
only (length ``n_cells``) or extended with ghost cells
(length ``n_cells + n_ghost``).
Returns
-------
ndarray, shape ``(n_faces,)``
"""
n_total = len(cell_wse)
n_faces = face_cell_indexes.shape[0]
face_wse = np.zeros(n_faces)
left = face_cell_indexes[:, 0].astype(int)
right = face_cell_indexes[:, 1].astype(int)
l_real = (left >= 0) & (left < n_total)
r_real = (right >= 0) & (right < n_total)
both = l_real & r_real
face_wse[both] = 0.5 * (cell_wse[left[both]] + cell_wse[right[both]])
left_only = l_real & ~r_real
face_wse[left_only] = cell_wse[left[left_only]]
right_only = ~l_real & r_real
face_wse[right_only] = cell_wse[right[right_only]]
return face_wse
def _estimate_face_wse_max(
face_cell_indexes: np.ndarray,
cell_wse: np.ndarray,
) -> np.ndarray:
"""Estimate face WSE as the maximum of the two adjacent cell WSEs.
For boundary faces (one neighbour index is -1 or beyond the length of
*cell_wse*), the single available cell's WSE is used.
Parameters
----------
face_cell_indexes : ndarray, shape ``(n_faces, 2)``
Left and right cell index per face; -1 = no neighbour.
cell_wse : ndarray, shape ``(n_total,)``
Water-surface elevation indexed by cell index. May include ghost cells.
Returns
-------
ndarray, shape ``(n_faces,)``
"""
n_total = len(cell_wse)
n_faces = face_cell_indexes.shape[0]
face_wse = np.zeros(n_faces)
left = face_cell_indexes[:, 0].astype(int)
right = face_cell_indexes[:, 1].astype(int)
l_real = (left >= 0) & (left < n_total)
r_real = (right >= 0) & (right < n_total)
both = l_real & r_real
face_wse[both] = np.maximum(cell_wse[left[both]], cell_wse[right[both]])
left_only = l_real & ~r_real
face_wse[left_only] = cell_wse[left[left_only]]
right_only = ~l_real & r_real
face_wse[right_only] = cell_wse[right[right_only]]
return face_wse
def _estimate_face_wse_sloped(
face_cell_indexes: np.ndarray,
cell_wse: np.ndarray,
cell_centers: np.ndarray,
face_velocity_coords: np.ndarray,
) -> np.ndarray:
"""Estimate face WSE via distance-weighted linear interpolation.
The face WSE is placed at the face's actual position along the line
connecting the two adjacent cell centres:
t = d_left / (d_left + d_right)
wse_face = (1 - t) * wse_left + t * wse_right
where *d_left* (*d_right*) is the Euclidean distance from the face
velocity position to the left (right) cell centre. When the two
distances sum to nearly zero (degenerate geometry), the simple average
is used as a fallback.
For boundary faces (one neighbour index is -1 or beyond the length of
*cell_wse*), the single available cell's WSE is used unchanged.
Passing extended *cell_wse* and *cell_centers* that include ghost cells
makes boundary faces use distance-weighted interpolation like interior
faces.
Parameters
----------
face_cell_indexes : ndarray, shape ``(n_faces, 2)``
Left and right cell index per face; -1 = no neighbour.
cell_wse : ndarray, shape ``(n_total,)``
Water-surface elevation indexed by cell index. May include ghost
cells (length ``n_cells + n_ghost``).
cell_centers : ndarray, shape ``(n_total, 2)``
X, Y coordinates of cell centres. Must have the same length as
*cell_wse* so ghost-cell coordinates are accessible.
face_velocity_coords : ndarray, shape ``(n_faces, 2)``
X, Y position of the face normal velocity measurement point.
Typically either the face centroid or the face normal intercept
(where the cell-centre connecting line crosses the face).
Returns
-------
ndarray, shape ``(n_faces,)``
"""
n_total = len(cell_wse)
n_faces = face_cell_indexes.shape[0]
face_wse = np.zeros(n_faces)
left = face_cell_indexes[:, 0].astype(int)
right = face_cell_indexes[:, 1].astype(int)
l_real = (left >= 0) & (left < n_total)
r_real = (right >= 0) & (right < n_total)
# Interior faces: interpolate at the face's position between cell centres
both = l_real & r_real
if np.any(both):
fi = np.where(both)[0]
l_idx = left[fi]
r_idx = right[fi]
d_left = np.linalg.norm(face_velocity_coords[fi] - cell_centers[l_idx], axis=1)
d_right = np.linalg.norm(face_velocity_coords[fi] - cell_centers[r_idx], axis=1)
total = d_left + d_right
# fallback to 0.5 for degenerate geometry (cells at same location).
# Use a safe denominator to avoid division-by-zero in the numpy
# expression before np.where applies its mask.
safe_total = np.where(total > 1e-12, total, 1.0)
t = np.where(total > 1e-12, d_left / safe_total, 0.5)
face_wse[fi] = (1.0 - t) * cell_wse[l_idx] + t * cell_wse[r_idx]
# Boundary faces: use the single real neighbour's WSE
left_only = l_real & ~r_real
face_wse[left_only] = cell_wse[left[left_only]]
right_only = ~l_real & r_real
face_wse[right_only] = cell_wse[right[right_only]]
return face_wse
# ---------------------------------------------------------------------------
# Vectorised batch computation (main entry point)
# ---------------------------------------------------------------------------
[docs]
def compute_all_cell_velocities(
n_cells: int,
cell_face_info: np.ndarray,
cell_face_values: np.ndarray,
face_normals: np.ndarray,
face_cell_indexes: np.ndarray,
face_ae_info: np.ndarray,
face_ae_values: np.ndarray,
face_normal_velocity: np.ndarray,
cell_wse: np.ndarray,
method: str = "area_weighted",
face_flow: np.ndarray | None = None,
wse_interp: str = "average",
cell_centers: np.ndarray | None = None,
face_velocity_coords: np.ndarray | None = None,
) -> np.ndarray:
"""Compute WLS cell-centre velocity vectors for real cells.
Ghost cells are not reconstructed: HEC-RAS stores no face normal
velocities for ghost-only faces, so the WLS system for a ghost cell
would be severely underdetermined (one face, two unknowns). Ghost-cell
WSE values in *cell_wse* are still used by the face-WSE estimators to
improve accuracy at boundary faces of real cells.
Parameters
----------
n_cells : int
Number of real computational cells.
cell_face_info : ndarray, shape ``(>= n_cells, 2)``
``[start_index, count]`` into *cell_face_values*.
cell_face_values : ndarray, shape ``(total, 2)``
``[face_index, orientation]`` for each cell-face pair.
face_normals : ndarray, shape ``(n_faces, 3)``
``[nx, ny, face_length]``.
face_cell_indexes : ndarray, shape ``(n_faces, 2)``
Left and right cell indices per face.
face_ae_info : ndarray, shape ``(n_faces, 2)``
Area-elevation table index for each face.
face_ae_values : ndarray, shape ``(total, 4)``
Area-elevation table values.
face_normal_velocity : ndarray, shape ``(n_faces,)``
Signed face normal velocities for this timestep.
cell_wse : ndarray, shape ``(n_cells,)`` or ``(n_cells + n_ghost,)``
Cell-centre water-surface elevations. Ghost-cell WSEs (indices
``>= n_cells``) are used by the face-WSE estimators but are not
reconstructed as velocity vectors.
method : str
``"area_weighted"`` (default, matches HEC-RAS),
``"length_weighted"`` (simpler, no table lookup), or
``"flow_ratio"`` (requires *face_flow*; back-calculates flow area
as \|Q\|/\|V_n\|, exactly as HEC-RAS computed internally).
face_flow : ndarray, shape ``(n_faces,)``, optional
Volumetric face flows. Required when ``method="flow_ratio"``.
wse_interp : str
How to estimate face WSE when ``method="area_weighted"``.
``"average"`` (default) — simple mean of the two adjacent cell WSEs.
``"sloped"`` — distance-weighted linear interpolation at the face's
actual position between the two cell centres (requires *cell_centers*
and *face_velocity_coords*).
``"max"`` — maximum of the two adjacent cell WSEs (conservative;
tends to increase flow area at partially-wet faces).
cell_centers : ndarray, shape ``(n_cells + n_ghost, 2)``, optional
X, Y coordinates of cell centres. Must cover ghost rows when
``wse_interp="sloped"`` so boundary faces are interpolated
correctly. Required when ``wse_interp="sloped"``.
face_velocity_coords : ndarray, shape ``(n_faces, 2)``, optional
X, Y position of the face normal velocity measurement point.
Typically the face centroid or the face normal intercept.
Required when ``wse_interp="sloped"``.
Returns
-------
ndarray, shape ``(n_cells, 2)``
``[Vx, Vy]`` depth-averaged velocity at each real cell centre.
"""
if method not in {"area_weighted", "length_weighted", "flow_ratio"}:
raise ValueError(
f"method must be 'area_weighted', 'length_weighted', or "
f"'flow_ratio'; got {method!r}"
)
if method == "flow_ratio" and face_flow is None:
raise ValueError("face_flow is required when method='flow_ratio'")
if wse_interp not in {"average", "sloped", "max"}:
raise ValueError(
f"wse_interp must be 'average', 'sloped', or 'max'; got {wse_interp!r}"
)
if wse_interp == "sloped" and (
cell_centers is None or face_velocity_coords is None
):
raise ValueError(
"cell_centers and face_velocity_coords are required "
"when wse_interp='sloped'"
)
# Pre-compute face WSE for area_weighted (used once, shared across cells).
# cell_wse may include ghost rows; the estimators use len(cell_wse) as the
# bound so ghost-cell WSE contributes to boundary face estimates.
if method == "area_weighted":
if wse_interp == "sloped":
face_wse = _estimate_face_wse_sloped(
face_cell_indexes, cell_wse, cell_centers, face_velocity_coords
)
elif wse_interp == "max":
face_wse = _estimate_face_wse_max(face_cell_indexes, cell_wse)
else:
face_wse = _estimate_face_wse_average(face_cell_indexes, cell_wse)
else:
face_wse = None
velocities = np.zeros((n_cells, 2), dtype=np.float64)
for c in range(n_cells):
start = int(cell_face_info[c, 0])
count = int(cell_face_info[c, 1])
vals = cell_face_values[start : start + count]
face_idxs = vals[:, 0].astype(int)
normals = face_normals[face_idxs, :2] # (k, 2)
lengths = face_normals[face_idxs, 2] # (k,)
vn = face_normal_velocity[face_idxs] # (k,)
if method == "area_weighted":
weights = np.array(
[
_interpolate_face_flow_area(
fi, face_wse[fi], face_ae_info, face_ae_values
)
for fi in face_idxs
]
)
elif method == "length_weighted":
weights = lengths
else: # flow_ratio
qf = face_flow[face_idxs]
weights = np.where(np.abs(vn) > 1e-10, np.abs(qf / vn), 0.0)
velocities[c] = _wls_velocity(vn, weights, normals)
return velocities