Source code for rivia.geo.raster

"""Raster utilities for HEC-RAS mesh rendering and export workflows."""

from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

import numpy as np

from rivia.utils import assert_path_writable, log_call, timed

logger = logging.getLogger("rivia.geo")

if TYPE_CHECKING:
    import rasterio.io


[docs] @log_call(logging.INFO) @timed() def rasterize_results( variable: Literal["wse", "water_surface", "depth", "velocity", "velocity_vector"], cell_wse: np.ndarray, cell_min_elevation: np.ndarray, face_min_elevation: np.ndarray, face_cell_indexes: np.ndarray, cell_face_info: np.ndarray, cell_face_values: np.ndarray, face_facepoint_indexes: np.ndarray, fp_coords: np.ndarray, face_normals: np.ndarray, fp_face_info: np.ndarray, fp_face_values: np.ndarray, cell_polygons: list[np.ndarray], face_normal_velocity: np.ndarray | None = None, output_path: str | Path | None = None, *, cell_centers: np.ndarray | None = None, cell_surface_area: np.ndarray | None = None, reference_raster: str | Path | None = None, cell_size: float | None = None, crs: Any | None = None, nodata: float = -9999.0, render_mode: Literal["horizontal", "sloping", "hybrid"] = "sloping", use_depth_weights: bool = False, shallow_to_flat: bool = False, depth_threshold: float = 0.001, tight_extent: bool = True, perimeter: np.ndarray | None = None, use_numba: bool | None = None, ) -> Path | rasterio.io.DatasetReader: """Rasterize HEC-RAS 2D mesh results using the RASMapper-exact algorithm. Implements the pixel-perfect pipeline reverse-engineered from ``RasMapperLib/`` (decompiled C# source, HEC-RAS 6.6). Produces output matching RASMapper's ``"Horizontal"``, ``"Sloping Cell Corners"``, and ``"Sloping Cell Corners + Face Centers"`` render modes. Pipeline summary ---------------- **water_surface / depth — horizontal** (``render_mode="horizontal"``) 1. ``build_cell_id_raster`` — scan-line rasterize wet cell polygons (Numba, mirrors ``RasterizePolygon.ComputeCells``). 2. *(pixel loop)* — write ``cell_wse`` directly; every pixel in a cell gets the same flat value. **water_surface / depth — sloping** (``render_mode="sloping"``) Matches RasMapperLib ``SetSlopingRenderingMode()`` (SharedData.cs:1778): ``CellStencilMethod.JustFacepoints`` + ``ShallowBehavior.None``. GUI label: *"Sloping (Cell Corners)"*. The "Shallow Water reduces to Horizontal" checkbox is a sub-option of WithFaces only and does not apply to this mode. **A.** ``compute_face_wss`` — hydraulic connectivity (``face_connected``), per-face WSE values (``face_value_a``, ``face_value_b``), and full connection classification (``face_hconn``, one of the ``HC_*`` constants). **B.** ``compute_facepoint_wse`` — planar regression fitting a plane through the face midpoint WSE samples; the intercept ``c`` at each facepoint is the corner WSE. **4a.** ``build_cell_id_raster`` — scan-line rasterize wet cell polygons. **4b.** ``sample_terrain_at_facepoints`` — terrain elevation at facepoints for depth rebalancing (only when a DEM is supplied). **4c.** ``rasterize_rasmap`` — per-pixel barycentric interpolation of the facepoint WSE values within each cell's triangles (``with_faces=False``, ``shallow_to_flat=False``). **water_surface / depth — hybrid** (``render_mode="hybrid"``) Matches RasMapperLib ``WithFaces`` path. Same steps as ``sloping`` but calls ``rasterize_rasmap`` with ``with_faces=True``; ``use_depth_weights`` and ``shallow_to_flat`` are user-configurable. **velocity / velocity_vector — sloping / hybrid** **A.** ``compute_face_wss`` — hydraulic connectivity + ``face_hconn`` (same as WSE pipeline). **2.** ``reconstruct_face_velocities`` — C-stencil least-squares reconstruction of full ``(Vx, Vy)`` at each face from the stored face-normal velocity scalar. **3.** ``compute_facepoint_velocities`` — inverse-face-length weighted averaging of face velocity vectors to facepoints. **3.5.** ``replace_face_velocities_sloped`` — replace each face velocity with the average of its two endpoint facepoint velocities. **4a.** ``build_cell_id_raster`` — scan-line rasterize wet cell polygons. **4c.** ``rasterize_rasmap`` — per-pixel barycentric interpolation of facepoint velocity vectors; speed magnitude ``sqrt(Vx²+Vy²)`` computed per pixel. ``"velocity"`` returns the speed band only; ``"velocity_vector"`` returns all four bands. **velocity / velocity_vector — horizontal** (``render_mode="horizontal"``) RASMapper uses its stencil pipeline (``Render2D_8Stencil``) for velocity even in horizontal mode whenever cells are large relative to the pixel size (threshold = ``pixel_size² × 5``, ``MeshFV2D.cs`` line 1431). For typical HEC-RAS 2D meshes all cells exceed this threshold, so this implementation routes horizontal-velocity requests through the same sloping stencil pipeline (Steps A, 2, 3, 3.5, 4) as ``"sloping"`` mode. ``shallow_to_flat`` is always ``False`` for this route. WSE and depth still use the flat per-cell paint path. Parameters ---------- variable: ``"wse"`` / ``"water_surface"`` — water-surface elevation (aliases). ``"depth"`` — water depth (WSE minus terrain); requires *reference_raster* (DEM). ``"velocity"`` — 1-band speed raster ``sqrt(Vx²+Vy²)``; requires *face_normal_velocity*. ``"velocity_vector"`` — 4-band velocity raster ``[Vx, Vy, speed, direction_deg]`` (bands 1–4); requires *face_normal_velocity*. ``direction_deg`` is degrees clockwise from north. cell_wse: ``(n_cells,)`` water-surface elevation per cell. cell_min_elevation: ``(n_cells + n_ghost,)`` minimum bed elevation per cell including ghost (virtual boundary) cell rows. Must be the full unsliced array (:attr:`~rivia.hdf.FlowArea.cell_min_elevation` returns only real cells — pass the raw HDF dataset or append ghost rows). Ghost rows have ``NaN`` in HEC-RAS output. The extra rows are required because :func:`~rivia.geo._rasmap.compute_face_wss` indexes into this array using ``face_cell_indexes`` which contains ghost cell indices on the cellB side of perimeter faces. face_min_elevation: ``(n_faces,)`` minimum bed elevation at each face (:attr:`~rivia.hdf.FlowArea.face_min_elevation`). face_cell_indexes: ``(n_faces, 2)`` — ``[cellA, cellB]`` (:attr:`~rivia.hdf.FlowArea.face_cell_indexes`). cell_face_info: ``(n_cells, 2)`` ``[start, count]`` (first element of :attr:`~rivia.hdf.FlowArea.cell_face_info` tuple). cell_face_values: ``(total, 2)`` ``[face_idx, orientation]`` (second element of :attr:`~rivia.hdf.FlowArea.cell_face_info` tuple). face_facepoint_indexes: ``(n_faces, 2)`` (:attr:`~rivia.hdf.FlowArea.face_facepoint_indexes`). fp_coords: ``(n_fp, 2)`` (:attr:`~rivia.hdf.FlowArea.facepoint_coordinates`). face_normals: ``(n_faces, 3)`` ``[nx, ny, length]`` (:attr:`~rivia.hdf.FlowArea.face_normals`). Columns 0–1 are used as unit normal vectors; column 2 as face lengths. fp_face_info: ``(n_fp, 2)`` angle-sorted CSR start/count from :attr:`~rivia.hdf.FlowArea.facepoint_face_orientation`. fp_face_values: ``(total, 2)`` angle-sorted ``[face_idx, orientation]`` from :attr:`~rivia.hdf.FlowArea.facepoint_face_orientation`. cell_polygons: Per-cell polygon vertex arrays from :attr:`~rivia.hdf.FlowArea.cell_polygons`. face_normal_velocity: ``(n_faces,)`` signed face-normal velocity scalars. Required for ``variable="velocity"``. output_path: Destination ``.tif`` file path. ``None`` returns an open in-memory ``rasterio.DatasetReader``; the caller must close it. cell_centers: ``(n_cells, 2)`` cell-centre XY coordinates (:attr:`~rivia.hdf.FlowArea.cell_centers`). When provided, face application points for the PlanarRegressionZ in Step B are computed as the intersection of the cell-centre-to-cell-centre line with each face chord (the RASMapper-exact ``GetFaceMidSide`` algorithm). When ``None`` the chord midpoint of the two endpoint facepoints is used instead, which is adequate for orthogonal meshes. cell_surface_area: ``(n_cells,)`` plan-view cell areas in model area units (:attr:`~rivia.hdf.FlowArea.cell_surface_area`). Used to split cells into "flat" (area ≤ ``pixel_size² × 5``) and "sloping" groups, matching RASMapper's ``SplitCellsOnThreshold`` / ``PixelRenderingCutoff = 5`` logic. When ``None`` all cells are treated as sloping (current behaviour, correct for typical meshes but incorrect for fine-resolution refinement areas such as channels). Only relevant for velocity variables. reference_raster: Existing GeoTIFF whose transform and CRS are inherited. Also used as the terrain DEM for depth computation and per-pixel wet/dry masking. Mutually exclusive with *cell_size*. cell_size: Output pixel size in model coordinate units. Used when no *reference_raster* is provided; the grid origin is derived from *perimeter* (if given) or from the facepoint bounding box. crs: Output CRS. Inherited from *reference_raster* when ``None``. nodata: Fill value for dry / out-of-domain pixels. render_mode: ``"sloping"`` (default) — RASMapper "Sloping (Cell Corners)" pipeline; N-point barycentric interpolation using corner facepoints only (``with_faces=False``, ``shallow_to_flat=False``). Matches ``SetSlopingRenderingMode()`` (SharedData.cs:1778) and ``store_map(render_mode="sloping")``. ``"hybrid"`` — RASMapper "Sloping Cell Corners + Face Centers" (``with_faces=True``); more accurate near cell edges. ``use_depth_weights`` and ``shallow_to_flat`` are honoured as supplied. Matches ``store_map(render_mode="hybrid")``. ``"horizontal"`` — flat per-cell value painted over each owned pixel; facepoint interpolation is skipped entirely. Matches ``store_map(render_mode="horizontal")``. use_depth_weights: When ``True``, face weights in the ``hybrid`` stencil are proportional to water depth at each face. **Ignored** (forced ``False``) for ``"sloping"`` and ``"horizontal"`` modes. Requires *reference_raster* when ``True``. shallow_to_flat: When ``True``, cells with no hydraulically-connected faces are rendered flat (horizontal). **Ignored** for ``"horizontal"`` and ``"sloping"`` modes (forced ``False`` — "Shallow Water reduces to Horizontal" is a WithFaces-only sub-option in the RASMapper GUI). Only user-configurable for ``"hybrid"`` mode. depth_threshold: Minimum depth for a pixel to be considered wet (default ``0.001``). Matches ``RASResults.MinWSPlotTolerance``. tight_extent: When ``True`` (default), pixels outside *perimeter* are set to *nodata*. Has no effect when *perimeter* is ``None``. perimeter: ``(n_pts, 2)`` boundary polygon used for extent and clipping. Pass :attr:`~rivia.hdf.FlowArea.perimeter` here. When ``None`` the bounding box of *fp_coords* is used for the grid extent and no polygon clipping is applied. use_numba: ``True`` — require Numba JIT (raises ``ImportError`` if absent). ``False`` — force pure-Python fallback. ``None`` (default) — use Numba if available, otherwise fall back silently. *(Numba path not yet implemented; reserved for future.)* Returns ------- Path Written GeoTIFF path when *output_path* is given. rasterio.io.DatasetReader Open in-memory dataset when *output_path* is ``None``. Raises ------ ImportError If ``rasterio`` or ``shapely`` are not installed. ValueError If ``variable="depth"`` and *reference_raster* is ``None``. If ``variable="velocity"`` and *face_normal_velocity* is ``None``. If ``use_depth_weights=True`` and *reference_raster* is ``None``. If both *reference_raster* and *cell_size* are ``None``. """ try: import rasterio from rasterio.crs import CRS from rasterio.transform import from_origin except ImportError as exc: raise ImportError( "rasterize_results requires rasterio. " "Install it with: pip install rivia[geo]" ) from exc from rivia.geo import _rasmapper_pipeline as _rasmap # -- 0. Guards ---------------------------------------------------------- if variable == "wse": variable = "water_surface" # Translate public velocity names to internal rasterize_rasmap names. # "velocity" → "speed" (1-band magnitude output) # "velocity_vector" → "velocity" (4-band Vx/Vy/speed/direction output) if variable == "velocity": variable = "speed" elif variable == "velocity_vector": variable = "velocity" _valid_render_modes = ("horizontal", "sloping", "hybrid") if render_mode not in _valid_render_modes: raise ValueError( f"render_mode={render_mode!r} is not valid. " f"Must be one of {_valid_render_modes}." ) if variable == "depth" and reference_raster is None: raise ValueError( "reference_raster is required when variable='depth'. " "Provide a path to a terrain DEM GeoTIFF." ) if variable in ("speed", "velocity") and face_normal_velocity is None: raise ValueError( "face_normal_velocity is required when variable='velocity' " "or variable='velocity_vector'." ) if reference_raster is None and cell_size is None: raise ValueError( "Provide either reference_raster or cell_size to define the output grid." ) if reference_raster is not None and cell_size is not None: raise ValueError( "Specify either reference_raster or cell_size, not both." ) # Map render_mode to internal pipeline flags per RasMapperLib SharedData defaults. # shallow_to_flat and use_depth_weights are only user-controllable for hybrid; # for other modes the values are dictated by the RasMapperLib implementation. if render_mode == "sloping": # CellStencilMethod.JustFacepoints + ShallowBehavior.None # SetSlopingRenderingMode() in SharedData.cs:1778. # "Shallow Water reduces to Horizontal" is a sub-option of WithFaces only # (GUI: "Sloping Cell Corners + Face Centers") — not available for this mode. shallow_to_flat = False use_depth_weights = False elif render_mode == "horizontal": shallow_to_flat = False use_depth_weights = False # render_mode == "hybrid": keep user-provided shallow_to_flat and use_depth_weights if use_depth_weights and reference_raster is None: raise ValueError( "use_depth_weights=True requires reference_raster (terrain DEM) " "to sample facepoint elevations. Provide a terrain GeoTIFF." ) # -- 1. Resolve output grid and optionally load terrain ----------------- terrain_grid: np.ndarray | None = None out_transform: Any out_width: int out_height: int out_crs: Any if reference_raster is not None: import rasterio.windows as _rwin with rasterio.open(reference_raster) as src: out_crs = crs if crs is not None else src.crs nd = src.nodata if tight_extent and perimeter is not None and len(perimeter) >= 3: # Derive a tight window snapped to the reference pixel grid. # window_transform(win) stays pixel-aligned with the full # reference raster, so terrain values are consistent. px = abs(src.transform.a) # pixel width (model units) py = abs(src.transform.e) # pixel height (model units) x_min = float(perimeter[:, 0].min()) - px x_max = float(perimeter[:, 0].max()) + px y_min = float(perimeter[:, 1].min()) - py y_max = float(perimeter[:, 1].max()) + py _win_f = _rwin.from_bounds( x_min, y_min, x_max, y_max, src.transform ) # Floor offset, ceil size so we never clip the perimeter _col_off = max(0, int(np.floor(_win_f.col_off))) _row_off = max(0, int(np.floor(_win_f.row_off))) _win_w = min(src.width - _col_off, int(np.ceil(_win_f.col_off + _win_f.width)) - _col_off) _win_h = min(src.height - _row_off, int(np.ceil(_win_f.row_off + _win_f.height)) - _row_off) win = _rwin.Window(_col_off, _row_off, _win_w, _win_h) out_transform = src.window_transform(win) out_width = _win_w out_height = _win_h raw = src.read(1, window=win).astype(np.float32) else: out_transform = src.transform out_width = src.width out_height = src.height raw = src.read(1).astype(np.float32) if nd is not None: raw[raw == nd] = np.nan terrain_grid = raw # already float32 from src.read else: # Derive grid from perimeter bbox or facepoint bbox if perimeter is not None and len(perimeter) >= 3: x_min = float(perimeter[:, 0].min()) x_max = float(perimeter[:, 0].max()) y_min = float(perimeter[:, 1].min()) y_max = float(perimeter[:, 1].max()) else: x_min = float(fp_coords[:, 0].min()) x_max = float(fp_coords[:, 0].max()) y_min = float(fp_coords[:, 1].min()) y_max = float(fp_coords[:, 1].max()) west = float(np.floor(x_min / cell_size) * cell_size) # type: ignore[operator] north = float(np.ceil(y_max / cell_size) * cell_size) # type: ignore[operator] out_transform = from_origin(west, north, cell_size, cell_size) east = float(np.ceil(x_max / cell_size) * cell_size) # type: ignore[operator] south = float(np.floor(y_min / cell_size) * cell_size) # type: ignore[operator] out_width = int(round((east - west) / cell_size)) out_height = int(round((north - south) / cell_size)) out_crs = CRS.from_user_input(crs) if crs is not None else None if out_crs is not None and not isinstance(out_crs, CRS): out_crs = CRS.from_user_input(out_crs) n_cells = len(cell_wse) n_faces = len(face_cell_indexes) logger.info( "rasterize_results: variable=%r, render_mode=%r, n_cells=%d, n_faces=%d, " "grid=%dx%d, pixel_size=%.2f", variable, render_mode, n_cells, n_faces, out_width, out_height, abs(out_transform.a), ) logger.info("raster_map: output_path=%r", str(Path(output_path).resolve()) if output_path is not None else None) if output_path is not None: assert_path_writable(output_path) # -- 2. Wet-cell mask (common to flat and sloping) ---------------------- # Number of faces per cell (needed for virtual-cell detection in Step A) _cell_face_count_arr = cell_face_info[:, 1].astype(np.int32) wet_mask = (cell_wse - cell_min_elevation[:n_cells]) > depth_threshold # -- 3a. Flat mode — paint cell values directly ------------------------- # RASMapper uses the stencil pipeline for velocity even in "horizontal" # mode whenever cells are large relative to the pixel size (threshold = # pixel_size² × 5). For typical HEC-RAS meshes all cells exceed this # threshold, so horizontal velocity is rendered with the sloping stencil. # We mirror this by directing velocity to the sloping pipeline below. # WSE and depth keep their flat (per-cell) rendering. _flat_velocity = render_mode == "horizontal" and variable in ("speed", "velocity") if render_mode == "horizontal" and not _flat_velocity: # This path runs for WSE and depth in horizontal model cell_id_grid = _rasmap.build_cell_id_raster( cell_polygons, wet_mask, out_transform, out_height, out_width ) valid_rows, valid_cols = np.where(cell_id_grid > 0) out_arr: np.ndarray = np.full((out_height, out_width), nodata, dtype=np.float32) # ci_arr: 0-based cell index for every valid pixel (vectorised lookup). ci_arr = cell_id_grid[valid_rows, valid_cols] - 1 if variable == "water_surface": wse_vals = cell_wse[ci_arr].astype(np.float32) if terrain_grid is not None: t_vals = terrain_grid[valid_rows, valid_cols] ok = ( ~np.isnan(t_vals) & (t_vals != nodata) & (wse_vals >= t_vals + depth_threshold) ) out_arr[valid_rows[ok], valid_cols[ok]] = wse_vals[ok] else: out_arr[valid_rows, valid_cols] = wse_vals elif variable == "depth": # terrain_grid is guaranteed non-None for depth (guarded at entry). t_vals = terrain_grid[valid_rows, valid_cols] # type: ignore[index] ok = ~np.isnan(t_vals) & (t_vals != nodata) dep = cell_wse[ci_arr[ok]].astype(np.float32) - t_vals[ok] wet = dep > 0 r_wet = valid_rows[ok][wet] c_wet = valid_cols[ok][wet] out_arr[r_wet, c_wet] = dep[wet].astype(np.float32) # -- 3b. Sloping/hybrid mode — full RASMapper stencil pipeline ---------- # Also used for horizontal-mode velocity (see comment above). if render_mode != "horizontal" or _flat_velocity: # Step A: hydraulic connectivity face_value_a, face_value_b, face_hconn = _rasmap.compute_face_wss( cell_wse, cell_min_elevation, face_min_elevation, face_cell_indexes, _cell_face_count_arr, ) face_connected = (face_hconn >= _rasmap.HC_BACKFILL) & (face_hconn <= _rasmap.HC_DOWNHILL_SHALLOW) # Step B: facepoint WSE (for WSE/depth sloping render, and for # velocity wet-pixel masking — velocity wet extent must match the # sloped WSE wet extent, not the coarser horizontal cell-WSE check). fp_wse: np.ndarray | None = None if render_mode != "horizontal": # Precompute face midsides (RASMapper-exact application points for # PlanarRegressionZ) when cell centres are available. Pass an # empty (0, 2) array when not available — compute_facepoint_wse # uses shape[0] > 0 as the "available" sentinel. if cell_centers is not None: face_midsides = _rasmap._compute_face_midsides( fp_coords, face_facepoint_indexes, face_cell_indexes, np.asarray(cell_centers, dtype=np.float64), ) else: face_midsides = np.empty((0, 2), dtype=np.float64) fp_wse = _rasmap.compute_facepoint_wse( fp_coords, fp_face_info, fp_face_values, face_facepoint_indexes, face_value_a, face_value_b, face_connected, face_midsides=face_midsides, ) # Steps 2 / 3 / 3.5: velocity reconstruction (velocity only) fp_vel_data: np.ndarray | None = None face_fp_local_idx: np.ndarray | None = None replaced_face_vel = None face_vel_A = None face_vel_B = None if variable in ("speed", "velocity") and face_normal_velocity is not None: face_normals_2d = face_normals[:, :2] face_vel_A, face_vel_B = _rasmap.reconstruct_face_velocities( np.asarray(face_normal_velocity, dtype=np.float32), face_normals_2d, face_connected, face_cell_indexes, cell_face_info[:n_cells], cell_face_values, ) fp_vel_data, face_fp_local_idx = _rasmap.compute_facepoint_velocities( face_vel_A, face_vel_B, face_connected, face_normals[:, 2], # face lengths face_facepoint_indexes, face_cell_indexes, cell_wse, fp_face_info, fp_face_values, face_value_a, face_value_b, ) replaced_face_vel = _rasmap.replace_face_velocities_sloped( fp_vel_data, fp_face_info, face_fp_local_idx, face_facepoint_indexes, ) # Step 4a: cell-ID raster cell_id_grid = _rasmap.build_cell_id_raster( cell_polygons, wet_mask, out_transform, out_height, out_width ) # Sample terrain at facepoints for depth-weighted rebalancing fp_elev: np.ndarray | None = None if terrain_grid is not None and fp_wse is not None: fp_elev = _rasmap.sample_terrain_at_facepoints( fp_coords, terrain_grid, out_transform ) # -- Flat-cell velocity (SplitCellsOnThreshold / PixelRenderingCutoff) -- # Cells with area ≤ pixel_size² × 5 receive a uniform whole-cell # least-squares velocity instead of the stencil, matching RASMapper's # ComputeFromFacePerpValues(FlatMeshMap) path (Renderer.cs). # NaN sentinel means "use stencil" for that cell. flat_cell_vx: np.ndarray | None = None flat_cell_vy: np.ndarray | None = None if variable in ("speed", "velocity") and cell_surface_area is not None: _px = abs(out_transform.a) _threshold = _px * _px * 5.0 _flat_mask = np.asarray(cell_surface_area[:n_cells]) <= _threshold if _flat_mask.any(): flat_cell_vx = np.full(n_cells, np.nan, dtype=np.float32) flat_cell_vy = np.full(n_cells, np.nan, dtype=np.float32) _fvx, _fvy = _rasmap.compute_cell_flat_velocities( cell_face_info[:n_cells], cell_face_values, np.asarray(face_normal_velocity, dtype=np.float32), np.asarray(face_normals[:, :2], dtype=np.float32), wet_mask & _flat_mask, ) flat_cell_vx[_flat_mask] = _fvx[_flat_mask] flat_cell_vy[_flat_mask] = _fvy[_flat_mask] # Step 4b–4e: pixel loop # rasterize_rasmap uses "speed" for 1-band magnitude, "velocity" for # 4-band (Vx, Vy, speed, direction). The public variable="velocity" # returns the 4-band array so callers can access vector components. out_arr = _rasmap.rasterize_rasmap( variable=variable, cell_id_grid=cell_id_grid, transform=out_transform, terrain_grid=terrain_grid, cell_wse=cell_wse, cell_face_info=cell_face_info[:n_cells], cell_face_values=cell_face_values, face_facepoint_indexes=face_facepoint_indexes, face_cell_indexes=face_cell_indexes, face_min_elev=face_min_elevation, fp_coords=fp_coords, fp_wse=fp_wse, face_value_a=face_value_a, face_value_b=face_value_b, fp_vel_data=fp_vel_data, fp_face_info=fp_face_info, face_fp_local_idx=face_fp_local_idx, replaced_face_vel=replaced_face_vel, face_vel_A=face_vel_A, face_vel_B=face_vel_B, fp_elev=fp_elev, face_hconn=face_hconn, nodata=nodata, depth_threshold=depth_threshold, with_faces=(render_mode == "hybrid"), use_depth_weights=use_depth_weights, shallow_to_flat=shallow_to_flat, flat_cell_vx=flat_cell_vx, flat_cell_vy=flat_cell_vy, ) # -- 4. Write output ---------------------------------------------------- # velocity_vector (internal: "velocity") → 4-band; all others → 1-band. n_bands = 4 if variable == "velocity" else 1 profile: dict[str, Any] = dict( driver="GTiff", dtype="float32", count=n_bands, width=out_width, height=out_height, transform=out_transform, nodata=nodata, ) if out_crs is not None: profile["crs"] = out_crs # Apply perimeter mask if requested if tight_extent and perimeter is not None: if n_bands == 1: out_arr = _mask_outside_polygon_array( out_arr, perimeter, out_transform, nodata ) else: for b in range(n_bands): out_arr[b] = _mask_outside_polygon_array( out_arr[b], perimeter, out_transform, nodata ) out_f32 = out_arr.astype(np.float32) # rasterio.write(arr) requires shape (bands, H, W); 1-band arrays need # an explicit band index or a leading dimension. if n_bands == 1: out_f32 = out_f32[np.newaxis, :, :] # (1, H, W) if output_path is None: memfile = rasterio.MemoryFile() with memfile.open(**profile) as dst: dst.write(out_f32) return memfile.open() out_path = Path(output_path).resolve() out_path.parent.mkdir(parents=True, exist_ok=True) try: with rasterio.open(out_path, "w", **profile) as dst: dst.write(out_f32) except Exception as err: msg = str(err).lower() if "permission denied" in msg or "sharing violation" in msg: raise PermissionError( f"Output raster is locked by another application: {out_path}\n" "Close the file (e.g. in ArcGIS, QGIS, or RASMapper) and retry." ) from err raise return out_path
def _mask_outside_polygon_array( arr: np.ndarray, polygon_xy: np.ndarray, transform: Any, nodata: float, ) -> np.ndarray: """Set pixels outside *polygon_xy* to *nodata* in a 2D float array. Thin wrapper around rasterio.features.geometry_mask so the caller doesn't need to manage an in-memory dataset just for masking. """ import rasterio.features from shapely.geometry import Polygon as _Polygon out = arr.copy() poly = _Polygon(polygon_xy) mask = rasterio.features.geometry_mask( [poly], out_shape=arr.shape, transform=transform, invert=False, # True = inside polygon; False = mask outside all_touched=False, ) out[mask] = nodata return out