"""Terrain HDF reading and GeoTIFF/VRT export.
Provides :func:`export_terrain` — reads a RasMapper terrain HDF5 file,
mosaics the source GeoTIFFs by priority, applies any Levee-type ground-line
modifications stored in the same HDF, and writes the result to a GeoTIFF or
GDAL VRT.
Derived from analysis of:
RasMapperLib/RasMapperLib/TerrainLayer.cs
RasMapperLib/RasMapperLib.Terrain/RasterFileInfo.cs
RasMapperLib/RasMapperLib/GroundLineModificationLayer.cs
RasMapperLib/RasMapperLib/ElevationModificationGroup.cs
"""
from __future__ import annotations
import logging
import shutil
import xml.dom.minidom
from pathlib import Path
from typing import Any
from xml.etree.ElementTree import Element, SubElement, tostring
import numpy as np
logger = logging.getLogger("rivia.hdf")
__all__ = ["export_terrain"]
# NoData sentinel used by RasMapper terrain TIFFs.
_NODATA = -9999.0
# Mapping from rasterio dtype strings to GDAL VRT dataType attribute values.
_GDAL_DTYPE: dict[str, str] = {
"float32": "Float32",
"float64": "Float64",
"int16": "Int16",
"int32": "Int32",
"uint8": "Byte",
"uint16": "UInt16",
"uint32": "UInt32",
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def export_terrain(
hdf_path: str | Path,
raster_path: str | Path,
copy: bool = False,
) -> Path:
"""Export a RasMapper terrain HDF to a GeoTIFF or GDAL VRT.
Reads all source GeoTIFFs from the ``Terrain/`` group of *hdf_path*,
mosaics them in priority order (lowest ``@Priority`` value wins), applies
any ``Levee``-type ground-line modifications from the ``Modifications/``
group, and writes the result to *raster_path*.
When *raster_path* has a ``.vrt`` extension the output is a GDAL VRT that
references the original source TIFFs directly rather than re-encoding
pixels. If modifications are present a sidecar
``<stem>_mods.tif`` is written beside the VRT and included as the
top-most layer.
Parameters
----------
hdf_path:
Path to a RasMapper terrain HDF file (``File Type = "HEC Terrain"``).
raster_path:
Destination path. Use a ``.tif`` extension for a merged GeoTIFF or a
``.vrt`` extension for a GDAL VRT. Parent directories are created
automatically.
copy:
Only relevant when *raster_path* is a ``.vrt``. When ``True`` each
source TIFF is copied into the VRT's parent directory and the VRT uses
relative paths. When ``False`` (default) the VRT uses absolute paths
and source files are not copied.
Returns
-------
Path
Resolved absolute path of the written file.
Raises
------
FileNotFoundError
If *hdf_path* does not exist, or any source TIFF referenced in the
HDF is missing.
KeyError
If the HDF contains no ``Terrain/`` group or no source TIFF entries.
"""
import h5py
import rasterio
from rasterio.merge import merge
hdf_path = Path(hdf_path)
raster_path = Path(raster_path)
if not hdf_path.exists():
raise FileNotFoundError(f"Terrain HDF not found: {hdf_path}")
hdf_dir = hdf_path.parent
# ------------------------------------------------------------------
# 1. Collect source TIFFs and modification data from HDF
# ------------------------------------------------------------------
tiff_entries: list[tuple[int, Path]] = []
modifications: list[dict[str, Any]] = []
with h5py.File(hdf_path, "r") as f:
terrain_grp = f.get("Terrain")
if terrain_grp is None:
raise KeyError(f"No 'Terrain' group in {hdf_path}")
for name in terrain_grp:
child = terrain_grp[name]
if not isinstance(child, h5py.Group):
continue
if "File" not in child.attrs:
continue
rel = child.attrs["File"]
if isinstance(rel, bytes):
rel = rel.decode()
priority = int(child.attrs.get("Priority", 999))
tiff_path = (hdf_dir / rel).resolve()
if not tiff_path.exists():
raise FileNotFoundError(
f"Source TIFF not found: {tiff_path}\n"
f" (referenced from {hdf_path})"
)
tiff_entries.append((priority, tiff_path))
if not tiff_entries:
raise KeyError(
f"No source TIFF entries found in Terrain/ group of {hdf_path}"
)
mod_grp = f.get("Modifications")
if mod_grp is not None:
modifications = _read_modifications(mod_grp)
# ------------------------------------------------------------------
# 2. Mosaic source TIFFs (lowest Priority number = highest priority
# = first in list = wins overlapping pixels in rasterio.merge)
# ------------------------------------------------------------------
tiff_entries.sort(key=lambda e: e[0])
tiff_paths = [p for _, p in tiff_entries]
datasets = [rasterio.open(p) for p in tiff_paths]
try:
mosaic, transform = merge(datasets)
profile = datasets[0].profile.copy()
profile.update(
driver="GTiff",
height=mosaic.shape[1],
width=mosaic.shape[2],
transform=transform,
compress="deflate",
)
finally:
for ds in datasets:
ds.close()
is_vrt = raster_path.suffix.lower() == ".vrt"
# ------------------------------------------------------------------
# 3. Apply modifications (if any)
# ------------------------------------------------------------------
mod_sidecar_path: Path | None = None
if modifications:
nodata = float(profile.get("nodata") or _NODATA)
merged_unmodified = mosaic.copy()
mosaic = _apply_modifications(mosaic, transform, modifications, nodata)
if is_vrt:
# Write a sidecar TIF containing only the pixels that changed so
# that the VRT can reference it as a top-most layer.
changed = mosaic[0] != merged_unmodified[0]
mod_only = np.full_like(mosaic, nodata)
mod_only[0][changed] = mosaic[0][changed]
mod_sidecar_path = raster_path.with_name(raster_path.stem + "_mods.tif")
mod_profile = profile.copy()
mod_profile["nodata"] = nodata
raster_path.parent.mkdir(parents=True, exist_ok=True)
with rasterio.open(mod_sidecar_path, "w", **mod_profile) as dst:
dst.write(mod_only)
logger.debug("export_terrain: wrote mod sidecar %s", mod_sidecar_path)
# ------------------------------------------------------------------
# 4. Write output
# ------------------------------------------------------------------
raster_path.parent.mkdir(parents=True, exist_ok=True)
if is_vrt:
# VRT: sources in descending priority order so that the lowest
# Priority number (highest precedence) is listed last and wins
# overlapping pixels (GDAL VRT last-source-wins semantics).
vrt_sources = list(reversed(tiff_paths))
if mod_sidecar_path is not None:
vrt_sources.append(mod_sidecar_path)
_write_vrt(raster_path, vrt_sources, profile, transform, copy=copy)
else:
with rasterio.open(raster_path, "w", **profile) as dst:
dst.write(mosaic)
logger.info("export_terrain: wrote %s", raster_path)
return raster_path.resolve()
# ---------------------------------------------------------------------------
# VRT writer
# ---------------------------------------------------------------------------
def _write_vrt(
vrt_path: Path,
source_paths: list[Path],
vrt_profile: dict[str, Any],
vrt_transform: Any,
copy: bool,
) -> None:
"""Write a GDAL VRT mosaic referencing *source_paths*.
Parameters
----------
vrt_path:
Destination ``.vrt`` file.
source_paths:
Ordered list of source TIFFs. Sources listed later overwrite earlier
ones in overlapping pixels (GDAL last-source-wins semantics), so the
highest-precedence source should be last.
vrt_profile:
rasterio profile for the VRT canvas (must include ``width``,
``height``, ``crs``, ``dtype``).
vrt_transform:
Affine transform for the VRT canvas.
copy:
When ``True``, copy each source into the VRT's parent directory and
use relative ``SourceFilename`` paths. When ``False``, use absolute
paths and leave source files in place.
"""
import rasterio
vrt_dir = vrt_path.parent
W = vrt_profile["width"]
H = vrt_profile["height"]
nodata = float(vrt_profile.get("nodata") or _NODATA)
dtype = str(vrt_profile.get("dtype", "float32")).lower()
gdal_dtype = _GDAL_DTYPE.get(dtype, "Float32")
crs = vrt_profile.get("crs")
crs_wkt = crs.to_wkt() if crs else ""
gt = vrt_transform
gt_str = f" {gt.c}, {gt.a}, {gt.b}, {gt.f}, {gt.d}, {gt.e}"
root = Element("VRTDataset", rasterXSize=str(W), rasterYSize=str(H))
if crs_wkt:
SubElement(root, "SRS").text = crs_wkt
SubElement(root, "GeoTransform").text = gt_str
band_el = SubElement(root, "VRTRasterBand", dataType=gdal_dtype, band="1")
SubElement(band_el, "NoDataValue").text = str(nodata)
for src_path in source_paths:
with rasterio.open(src_path) as src:
src_w = src.width
src_h = src.height
src_t = src.transform
block_shapes = src.block_shapes
block_w, block_h = block_shapes[0] if block_shapes else (256, 256)
# Pixel offset of this source within the VRT canvas.
dst_x_off = round((src_t.c - gt.c) / gt.a)
dst_y_off = round((src_t.f - gt.f) / gt.e)
# Destination size scaled for any resolution difference.
dst_x_size = round(src_w * src_t.a / gt.a)
dst_y_size = round(src_h * abs(src_t.e) / abs(gt.e))
if copy:
if src_path.parent != vrt_dir:
dest = vrt_dir / src_path.name
shutil.copy2(src_path, dest)
file_ref = src_path.name
relative_to_vrt = "1"
else:
file_ref = src_path.as_posix()
relative_to_vrt = "0"
# ComplexSource (vs SimpleSource) is required so that nodata pixels in
# the source are masked (transparent) when compositing. With
# SimpleSource, nodata values are copied as-is and would overwrite
# valid pixels from underlying sources — critically wrong for the
# modification sidecar TIF which is nodata everywhere except where
# modifications were applied.
src_el = SubElement(band_el, "ComplexSource")
SubElement(
src_el, "SourceFilename", relativeToVRT=relative_to_vrt
).text = file_ref
SubElement(src_el, "SourceBand").text = "1"
SubElement(
src_el,
"SourceProperties",
RasterXSize=str(src_w),
RasterYSize=str(src_h),
DataType=gdal_dtype,
BlockXSize=str(block_w),
BlockYSize=str(block_h),
)
SubElement(
src_el,
"SrcRect",
xOff="0",
yOff="0",
xSize=str(src_w),
ySize=str(src_h),
)
SubElement(
src_el,
"DstRect",
xOff=str(dst_x_off),
yOff=str(dst_y_off),
xSize=str(dst_x_size),
ySize=str(dst_y_size),
)
SubElement(src_el, "NODATA").text = str(nodata)
xml_str = xml.dom.minidom.parseString(tostring(root)).toprettyxml(indent=" ")
vrt_path.write_text(xml_str, encoding="utf-8")
# ---------------------------------------------------------------------------
# Modification reading
# ---------------------------------------------------------------------------
def _read_modifications(mod_grp: Any) -> list[dict[str, Any]]:
"""Return a list of parsed modification dicts from an HDF Modifications/ group.
``Levee`` and ``Channel`` subtypes are parsed; others emit a warning and
are skipped. The returned list is sorted so lower-priority modifications
come first (they are applied first and can be overwritten by
higher-priority ones).
"""
import h5py
_SUPPORTED_SUBTYPES = {"Levee", "Channel"}
mods: list[dict[str, Any]] = []
for name, grp in mod_grp.items():
if not isinstance(grp, h5py.Group):
continue
subtype = grp.attrs.get("Subtype", b"")
if isinstance(subtype, bytes):
subtype = subtype.decode()
subtype = subtype.strip("\x00")
if subtype not in _SUPPORTED_SUBTYPES:
logger.warning(
"Modification %r has unsupported subtype %r — skipped "
"(only 'Levee' and 'Channel' are currently implemented)",
name,
subtype,
)
continue
attrs_ds = grp.get("Attributes")
if attrs_ds is None:
logger.warning("Modification %r has no Attributes dataset — skipped", name)
continue
attrs = attrs_ds[0]
elev_type = attrs["Elevation Type"]
if isinstance(elev_type, bytes):
elev_type = elev_type.decode()
elev_type = elev_type.strip("\x00")
top_width = float(attrs["Top Width"])
left_slope = float(attrs["Left Slope"])
right_slope = float(attrs["Right Slope"])
max_reach = float(attrs["Max Reach"])
# Slope normalisation: NaN → 3.0 (archive default); 0 → 0.001 (near-
# vertical wall, matching RasMapper's GetUsableSlope behaviour).
if np.isnan(left_slope):
left_slope = 3.0
if np.isnan(right_slope):
right_slope = 3.0
if left_slope == 0.0:
left_slope = 0.001
if right_slope == 0.0:
right_slope = 0.001
pts_ds = grp.get("Polyline Points")
if pts_ds is None:
logger.warning("Modification %r has no Polyline Points — skipped", name)
continue
pts = pts_ds[:, :2].astype(np.float64) # (N, 2) XY only
profile_ds = grp.get("Profile Values")
if profile_ds is None or len(profile_ds) < 2:
logger.warning(
"Modification %r has no usable Profile Values — skipped", name
)
continue
profile = profile_ds[:].astype(np.float64) # (M, 2): [station, elevation]
priority = int(grp.attrs.get("Priority", 0))
mods.append(
{
"name": name,
"subtype": subtype,
"elev_type": elev_type,
"top_width": top_width,
"left_slope": left_slope,
"right_slope": right_slope,
"max_reach": max_reach,
"pts": pts,
"profile": profile,
"priority": priority,
}
)
# Lower priority value = applied last = wins. Apply high-number (low-priority)
# modifications first so low-number (high-priority) ones overwrite them.
mods.sort(key=lambda m: -m["priority"])
return mods
# ---------------------------------------------------------------------------
# Modification application
# ---------------------------------------------------------------------------
def _apply_modifications(
mosaic: np.ndarray,
transform: Any,
modifications: list[dict[str, Any]],
nodata: float,
) -> np.ndarray:
"""Apply levee/channel modifications to *mosaic* and return the updated array.
Parameters
----------
mosaic:
Float32 array of shape ``(1, H, W)`` from ``rasterio.merge``.
transform:
Affine transform mapping pixel coordinates to map coordinates.
modifications:
Parsed modification dicts from :func:`_read_modifications`.
nodata:
NoData sentinel value in the mosaic.
Returns
-------
numpy.ndarray
Updated mosaic array of the same shape and dtype.
"""
from rasterio.transform import rowcol, xy
data = mosaic[0].copy() # (H, W)
H, W = data.shape
for mod in modifications:
pts = mod["pts"]
profile = mod["profile"]
max_reach = mod["max_reach"]
top_width = mod["top_width"]
left_slope = mod["left_slope"]
right_slope = mod["right_slope"]
elev_type = mod["elev_type"]
subtype = mod["subtype"]
if max_reach <= 0:
continue
# Max Reach in the HDF is the total edge-to-edge width of the
# modification footprint; the distance from the centerline to the
# outer edge is therefore half that value.
half_reach = max_reach / 2.0
# -- Bounding box of modification footprint in map coordinates ------
minx = pts[:, 0].min() - half_reach
maxx = pts[:, 0].max() + half_reach
miny = pts[:, 1].min() - half_reach
maxy = pts[:, 1].max() + half_reach
# Convert to pixel row/col (rowcol expects xs then ys)
(r0, r1), (c0, c1) = rowcol(
transform,
[minx, maxx],
[maxy, miny], # maxy → smaller row (north-up), miny → larger row
)
r0 = max(0, int(r0))
c0 = max(0, int(c0))
r1 = min(H - 1, int(r1))
c1 = min(W - 1, int(c1))
if r0 > r1 or c0 > c1:
continue # footprint outside raster extent
# -- Build pixel coordinate grid for the bounding box ---------------
rows = np.arange(r0, r1 + 1)
cols = np.arange(c0, c1 + 1)
col_grid, row_grid = np.meshgrid(cols, rows) # each (nrows, ncols)
px, py = xy(transform, row_grid.ravel(), col_grid.ravel(), offset="center")
px = np.asarray(px, dtype=np.float64)
py = np.asarray(py, dtype=np.float64)
# -- Project pixels onto polyline centerline ------------------------
signed_perp, stations = _project_onto_polyline(px, py, pts)
# -- Interpolate crest elevation from profile -----------------------
crest = np.interp(stations, profile[:, 0], profile[:, 1])
# -- Cross-section elevation ----------------------------------------
# For a Levee the flat crest is the high point; sides slope *down*
# away from it (crest − drop). For a Channel the flat bottom is the
# low point; sides slope *up* away from it (crest + drop).
sign = 1.0 if subtype == "Channel" else -1.0
abs_perp = np.abs(signed_perp)
half_w = top_width / 2.0
mod_elev = np.full(len(px), np.nan, dtype=np.float64)
# Flat bottom/crest: |perp| <= half_width
on_top = abs_perp <= half_w
mod_elev[on_top] = crest[on_top]
# Left side (signed_perp > 0) sloping zone
left_zone = (signed_perp > half_w) & (abs_perp <= half_reach)
if np.any(left_zone):
drop = (abs_perp[left_zone] - half_w) / left_slope
mod_elev[left_zone] = crest[left_zone] + sign * drop
# Right side (signed_perp < 0) sloping zone
right_zone = (signed_perp < -half_w) & (abs_perp <= half_reach)
if np.any(right_zone):
drop = (abs_perp[right_zone] - half_w) / right_slope
mod_elev[right_zone] = crest[right_zone] + sign * drop
# -- Apply elevation type -------------------------------------------
valid = ~np.isnan(mod_elev)
if not np.any(valid):
continue
vr = row_grid.ravel()[valid]
vc = col_grid.ravel()[valid]
orig = data[vr, vc].astype(np.float64)
mval = mod_elev[valid]
if elev_type in ("SetIfHigher", "TakeHigher"):
new = np.maximum(orig, mval)
elif elev_type in ("SetIfLower", "TakeLower"):
new = np.minimum(orig, mval)
elif elev_type in ("SetValue", "FixedElevation"):
new = mval
elif elev_type == "AddValue":
new = orig + mval
else:
logger.warning(
"Modification %r has unknown elevation type %r — skipped",
mod["name"],
elev_type,
)
continue
# NoData pixels in original terrain always receive the modification value.
nodata_mask = orig == nodata
new[nodata_mask] = mval[nodata_mask]
data[vr, vc] = new.astype(data.dtype)
logger.debug(
"Applied modification %r (%s) to %d pixels",
mod["name"],
elev_type,
int(valid.sum()),
)
result = mosaic.copy()
result[0] = data
return result
# ---------------------------------------------------------------------------
# Geometry helpers
# ---------------------------------------------------------------------------
def _project_onto_polyline(
px: np.ndarray,
py: np.ndarray,
pts: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""Project query points onto a polyline, returning signed perpendicular
distance and cumulative station for each point.
Parameters
----------
px, py:
1-D arrays of query point X and Y coordinates.
pts:
``(N, 2)`` array of polyline vertex coordinates.
Returns
-------
signed_perp:
Signed perpendicular distance from the centerline.
Positive = left side of the line in the direction of travel.
stations:
Cumulative distance along the polyline to the closest point.
"""
seg_dx = np.diff(pts[:, 0])
seg_dy = np.diff(pts[:, 1])
seg_len = np.sqrt(seg_dx**2 + seg_dy**2)
cum_len = np.concatenate([[0.0], np.cumsum(seg_len)])
n_pts = len(px)
best_dist = np.full(n_pts, np.inf)
best_station = np.zeros(n_pts)
best_signed = np.zeros(n_pts)
for i, (length, dx, dy) in enumerate(zip(seg_len, seg_dx, seg_dy)):
if length < 1e-10:
continue
ax, ay = pts[i]
# Parameter t for closest point on this segment, clamped to [0, 1]
t = ((px - ax) * dx + (py - ay) * dy) / (length**2)
t = np.clip(t, 0.0, 1.0)
cx = ax + t * dx
cy = ay + t * dy
dist = np.sqrt((px - cx) ** 2 + (py - cy) ** 2)
# Signed distance: cross product of unit-direction and offset vector.
# Positive → left of the line in direction of travel.
ux, uy = dx / length, dy / length
signed = ux * (py - cy) - uy * (px - cx)
station = cum_len[i] + t * length
closer = dist < best_dist
best_dist[closer] = dist[closer]
best_station[closer] = station[closer]
best_signed[closer] = signed[closer]
return best_signed, best_station