Source code for rivia.controller.controller

import logging
import os
import threading
import weakref

import psutil
import pywintypes
import win32com.client

from ._geometry import GeometryBase as _GeometryBase
from ._runtime import Runtime, kill_hecras_version
from ._ver400 import Controller as C400
from ._ver400 import RASEvents as E400
from ._ver500 import Controller as C500
from ._ver500 import RASEvents as E500
from ._ver503 import Controller as C503
from ._ver503 import RASEvents as E503
from .ras import installed_ras_display_name, installed_ras_progid, installed_ras_directory 

logger = logging.getLogger("rivia.controller")

_controllers: "weakref.WeakValueDictionary[int, _ControllerBase]" = weakref.WeakValueDictionary()
_connect_lock = threading.Lock()


[docs] class HecRasComputeError(RuntimeError): """Raised when a HEC-RAS computation fails or the COM call errors. Attributes ---------- messages : tuple[str, ...] Messages returned by HEC-RAS at the time of failure. Empty when the version does not expose messages or when the error is COM-level. com_error : pywintypes.com_error or None The underlying COM exception, if the failure was a COM-level error. ``None`` when HEC-RAS returned ``success=False`` without a COM error. """ def __init__( self, message: str, messages: tuple[str, ...] = (), com_error=None, ): super().__init__(message) self.messages = messages self.com_error = com_error
[docs] def connect(version: str | int): """Return a version-appropriate HEC-RAS controller, reusing a live one if available. If a controller for the requested version is already alive (process running and COM responsive), it is returned directly without killing or relaunching HEC-RAS. Only when no live controller exists is a new one launched; any stale HEC-RAS process for that version is terminated first. Parameters ---------- version : str or int HEC-RAS version to connect to. Accepts a version string (e.g. ``"6.3"``, ``"5.0.3"``) or an integer version code (e.g. ``6030``). Must match an installed HEC-RAS entry in the Windows registry. Returns ------- _Controller400 or _Controller500 or _Controller503 A controller instance connected to the requested HEC-RAS version. The exact type depends on the resolved version number: ``_Controller400`` for versions below 5000, ``_Controller500`` for 5000-5029, and ``_Controller503`` for 5030 and above. Raises ------ RuntimeError If the requested HEC-RAS version is not found in the Windows registry. """ version_xxxx, info = installed_ras_progid(version) geometry_progid = info["geometry"] flow_progid = info["flow"] controller_progid = info["controller"] if controller_progid is not None: with _connect_lock: existing = _controllers.get(version_xxxx) if existing is not None and existing.is_alive and existing.is_empty: logger.debug("Reusing live controller for HEC-RAS %d with pid %r.", version_xxxx, existing._runtime.parent_pid) return existing kill_hecras_version(installed_ras_display_name(version_xxxx)) _rc = _dispatch(controller_progid) _geom = _dispatch(geometry_progid) _flow = _dispatch(flow_progid) if flow_progid is not None else None _events = None if version_xxxx < 5000: _events = _bind_events(_rc, E400) rc = _Controller400(_rc, _geom, _flow, _events, version_xxxx) elif version_xxxx < 5030: _events = _bind_events(_rc, E500) rc = _Controller500(_rc, _geom, _flow, _events, version_xxxx) else: _events = _bind_events(_rc, E503) rc = _Controller503(_rc, _geom, _flow, _events, version_xxxx) _controllers[version_xxxx] = rc return rc
def _dispatch(prog_id: str): return win32com.client.DispatchEx(prog_id) def _dispatch2(prog_id: str): return win32com.client.Dispatch(prog_id) def _bind_events(com_obj, event_class): return win32com.client.WithEvents(com_obj, event_class) class _ControllerBase: def __init__(self, rc, geom, flow, events, version_xxxx): self._rc = rc self._geometry = geom self._flow = flow self._events = events self._rasver = version_xxxx def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def pause(self, time: float) -> None: """Pause execution for the given number of seconds. Parameters ---------- time : float Seconds to pause. """ self._runtime.pause(time) def runtime(self) -> Runtime: """Return the :class:`~rivia.controller._runtime.Runtime` for this session.""" return self._runtime def ras_version(self, descriptive: bool = False) -> str | int: """Return the HEC-RAS version for this controller. Parameters ---------- descriptive : bool, optional If ``True``, return the human-readable version string from the COM interface (e.g. ``"HEC-RAS 6.3.1"``). If ``False`` (default), return the integer version code (e.g. ``6031``). Returns ------- str or int Version string when *descriptive* is ``True``; integer code otherwise. """ if descriptive: return self.HECRASVersion() return self._rasver @property def exe(self) -> str: """Absolute path to the HEC-RAS executable for this controller.""" return os.path.join(installed_ras_directory(self.ras_version()), "Ras.exe") @property def is_empty(self) -> bool: """Return True if the controller is alive but no project file is currently loaded.""" return self.Project_Current().strip() == "prj" @property def is_alive(self) -> bool: """Return True if the HEC-RAS process and COM server are still responsive.""" pid = self._runtime.parent_pid if pid is None: return False try: proc = psutil.Process(pid) if not proc.is_running() or proc.status() == psutil.STATUS_ZOMBIE: return False except psutil.NoSuchProcess: return False try: _ = self._rc.HECRASVersion() return True except pywintypes.com_error: return False def close(self) -> None: """Close the HEC-RAS process if it is still running. Safe to call multiple times; does nothing when the process has already exited. """ if self.is_alive: self._runtime.close() # ------------------------------------------------------------------ # Window visibility # ------------------------------------------------------------------ def show(self) -> None: """Display the main HEC-RAS window. Works on all supported HEC-RAS versions. """ self._rc.ShowRas() def hide(self) -> None: """Hide the main HEC-RAS window. Notes ----- HEC-RAS 4.x does not expose a window-hide COM method. Calling this on a version-4 controller logs a warning and does nothing. For version 5.0 and above, ``QuitRas()`` is used, which hides (minimises) the window. """ if self._rasver < 5000: logger.warning( "show/hide: HEC-RAS %d does not support window hide via COM; " "ignoring hide() call.", self._rasver, ) return self._rc.QuitRas() # ------------------------------------------------------------------ # Project lifecycle (version-gated) # ------------------------------------------------------------------ def Project_Close(self) -> None: """Close the currently open HEC-RAS project. Raises ------ NotImplementedError If the connected HEC-RAS version is older than 5.0.3 (version code 5030), which does not expose ``Project_Close`` via COM. """ if self._rasver < 5030: raise NotImplementedError( f"Project_Close() requires HEC-RAS 5.0.3+ (version code ≥ 5030); " f"connected version is {self._rasver}." ) self._rc.Project_Close() # ------------------------------------------------------------------ # Compute helpers (version-gated) # ------------------------------------------------------------------ def Compute_Complete(self) -> bool: """Return ``True`` once an asynchronous computation has finished. Notes ----- Only meaningful when ``BlockingMode=False`` was passed to :meth:`Compute_CurrentPlan`. Poll this in a loop while waiting. Raises ------ NotImplementedError For HEC-RAS versions below 5.0, which do not expose this method. """ if self._rasver < 5000: raise NotImplementedError( f"Compute_Complete() requires HEC-RAS 5.0+ (version code ≥ 5000); " f"connected version is {self._rasver}." ) return self._rc.Compute_Complete() def Compute_StartedFromController(self) -> bool: """Return ``True`` if the current computation was started via COM. Notes ----- Requires ``BlockingMode=False`` in :meth:`Compute_CurrentPlan` to be meaningful. Raises ------ NotImplementedError For HEC-RAS versions below 5.0. """ if self._rasver < 5000: raise NotImplementedError( f"Compute_StartedFromController() requires HEC-RAS 5.0+ " f"(version code ≥ 5000); connected version is {self._rasver}." ) return self._rc.Compute_StartedFromController def Compute_Cancel(self) -> None: """Cancel a running computation. Notes ----- Only available in HEC-RAS 4.x. This method was removed in version 5.0. Use :meth:`compute` with ``blocking=True`` (default) to avoid needing cancellation on modern versions. Raises ------ NotImplementedError For HEC-RAS 5.0 and above, where this COM method no longer exists. """ if self._rasver >= 5000: raise NotImplementedError( f"Compute_Cancel() was removed in HEC-RAS 5.0; " f"connected version is {self._rasver}." ) self._rc.Compute_Cancel() def Compute_IsStillComputing(self) -> bool: """Return ``True`` if a computation is still running. Notes ----- Only available in HEC-RAS 4.x. Removed in version 5.0. On 5.0+ use :meth:`Compute_Complete` instead. Raises ------ NotImplementedError For HEC-RAS 5.0 and above. """ if self._rasver >= 5000: raise NotImplementedError( f"Compute_IsStillComputing() was removed in HEC-RAS 5.0; " f"use Compute_Complete() instead. " f"Connected version is {self._rasver}." ) return self._rc.Compute_IsStillComputing() def Compute_CurrentPlan( # noqa: N802 self, BlockingMode: bool = True ) -> tuple[bool, tuple[str, ...]]: """Compute the current plan, compatible with all HEC-RAS versions. Parameters ---------- BlockingMode : bool, optional If True (default), block until computation completes. If False, return immediately while HEC-RAS computes in the background. Ignored for HEC-RAS versions below 5.0 (always blocking). Returns ------- success : bool True if the computation completed successfully. messages : tuple[str, ...] Messages returned by HEC-RAS during computation. Empty for versions below 5.0.3. Raises ------ HecRasComputeError If HEC-RAS reports ``success=False`` or a COM-level error occurs during computation. """ rc = self._rc version = self.ras_version() try: if version < 5000: if not BlockingMode: logger.debug( "compute: blocking unavailable in HEC-RAS version %d", version ) res = rc.Compute_CurrentPlan(None, None) success = res[0] logger.debug("compute: version %d returns no messages", version) if not success: raise HecRasComputeError("HEC-RAS computation failed.") return success, () elif version < 5030: res = rc.Compute_CurrentPlan(None, None, BlockingMode) success = res[0] logger.debug("compute: version %d returns no messages", version) if not success: raise HecRasComputeError("HEC-RAS computation failed.") return success, () else: res = rc.Compute_CurrentPlan(None, None, int(BlockingMode)) success = res[0] # res layout for v5.0.3+: (status, nmsg, Msg, BlockingMode) raw = res[2] if raw is None: messages = () elif isinstance(raw, (list, tuple)): messages = raw else: messages = (str(raw),) if raw else () if not success: detail = "; ".join(messages) if messages else "no details available" raise HecRasComputeError( f"HEC-RAS computation failed: {detail}", messages=messages, ) return success, messages except HecRasComputeError: raise except pywintypes.com_error as e: raise HecRasComputeError( f"COM error during HEC-RAS computation: {e}", com_error=e, ) from e def __del__(self): try: logger.debug("HEC-RAS Controller destructor called.") except Exception: pass try: self.close() except Exception: pass class _Controller400(_ControllerBase, C400, _GeometryBase): def __init__(self, rc, geom, flow, events, version_xxxx): super().__init__(rc, geom, flow, events, version_xxxx) self._runtime = Runtime(self, installed_ras_display_name(version_xxxx)) class _Controller500(_ControllerBase, C500, _GeometryBase): def __init__(self, rc, geom, flow, events, version_xxxx): super().__init__(rc, geom, flow, events, version_xxxx) self._runtime = Runtime(self, installed_ras_display_name(version_xxxx)) class _Controller503(_ControllerBase, C503, _GeometryBase): def __init__(self, rc, geom, flow, events, version_xxxx): super().__init__(rc, geom, flow, events, version_xxxx) self._runtime = Runtime(self, installed_ras_display_name(version_xxxx)) # class _ControllerGeometry(_GeometryBase): # def __init__(self, geometry): # self._geometry = geometry