"""Interactive curses dashboard for real-time reactor operation.""" from __future__ import annotations import curses import logging from collections import deque from dataclasses import dataclass from pathlib import Path from typing import Optional from . import constants from .commands import ReactorCommand from .reactor import Reactor from .simulation import ReactorSimulation from .state import PlantState LOGGER = logging.getLogger(__name__) @dataclass class DashboardKey: key: str description: str class ReactorDashboard: """Minimal TUI dashboard allowing user control in real-time.""" def __init__( self, reactor: Reactor, start_state: Optional[PlantState], timestep: float = 1.0, save_path: Optional[str] = None, ) -> None: self.reactor = reactor self.start_state = start_state self.timestep = timestep self.save_path = save_path self.pending_command: Optional[ReactorCommand] = None self.sim: Optional[ReactorSimulation] = None self.quit_requested = False self.reset_requested = False self.log_buffer: deque[str] = deque(maxlen=4) self._log_handler: Optional[logging.Handler] = None self._previous_handlers: list[logging.Handler] = [] self._logger = logging.getLogger("reactor_sim") self.keys = [ DashboardKey("q", "Quit & save"), DashboardKey("space", "SCRAM"), DashboardKey("p", "Toggle primary pump"), DashboardKey("o", "Toggle secondary pump"), DashboardKey("t", "Toggle turbine"), DashboardKey("1/2/3", "Toggle turbine units 1-3"), DashboardKey("y/u/i", "Maintain turbine 1/2/3"), DashboardKey("c", "Toggle consumer"), DashboardKey("r", "Reset & clear state"), DashboardKey("m", "Maintain primary pump"), DashboardKey("n", "Maintain secondary pump"), DashboardKey("k", "Maintain core (requires shutdown)"), DashboardKey("+/-", "Withdraw/insert rods"), DashboardKey("[/]", "Adjust consumer demand −/+50 MW"), DashboardKey("s", "Setpoint −250 MW"), DashboardKey("d", "Setpoint +250 MW"), DashboardKey("a", "Toggle auto rod control"), ] def run(self) -> None: curses.wrapper(self._main) def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined] curses.curs_set(0) curses.start_color() curses.use_default_colors() curses.init_pair(1, curses.COLOR_CYAN, -1) curses.init_pair(2, curses.COLOR_YELLOW, -1) curses.init_pair(3, curses.COLOR_GREEN, -1) curses.init_pair(4, curses.COLOR_RED, -1) stdscr.nodelay(True) self._install_log_capture() try: while True: self.sim = ReactorSimulation( self.reactor, timestep=self.timestep, duration=None, realtime=True, command_provider=self._next_command, ) self.sim.start_state = self.start_state try: for state in self.sim.run(): self._draw(stdscr, state) self._handle_input(stdscr) if self.quit_requested or self.reset_requested: # Persist the latest state if we are exiting early. if self.sim: self.sim.last_state = state self.sim.stop() break finally: if self.save_path and self.sim and self.sim.last_state and not self.reset_requested: self.reactor.save_state(self.save_path, self.sim.last_state) if self.quit_requested: break if self.reset_requested: self._clear_saved_state() self._reset_to_greenfield() continue break finally: self._restore_logging() def _handle_input(self, stdscr: "curses._CursesWindow") -> None: while True: ch = stdscr.getch() if ch == -1: break if ch in (ord("q"), ord("Q")): self.quit_requested = True return if ch == ord(" "): self._queue_command(ReactorCommand.scram_all()) elif ch in (ord("p"), ord("P")): self._queue_command(ReactorCommand(primary_pump_on=not self.reactor.primary_pump_active)) elif ch in (ord("o"), ord("O")): self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active)) elif ch in (ord("t"), ord("T")): self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active)) elif ord("1") <= ch <= ord("9"): idx = ch - ord("1") self._toggle_turbine_unit(idx) elif ch in (ord("+"), ord("=")): # Insert rods (increase fraction) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(0.05))) elif ch == ord("-"): # Withdraw rods (decrease fraction) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05))) elif ch == ord("["): demand = self._current_demand() - 50.0 self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand))) elif ch == ord("]"): demand = self._current_demand() + 50.0 self._queue_command(ReactorCommand(consumer_demand=demand)) elif ch in (ord("c"), ord("C")): online = not (self.reactor.consumer.online if self.reactor.consumer else False) self._queue_command(ReactorCommand(consumer_online=online)) elif ch in (ord("r"), ord("R")): self._request_reset() elif ch in (ord("s"), ord("S")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0)) elif ch in (ord("d"), ord("D")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0)) elif ch in (ord("a"), ord("A")): self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control)) elif ch in (ord("m"), ord("M")): self._queue_command(ReactorCommand.maintain("primary_pump")) elif ch in (ord("n"), ord("N")): self._queue_command(ReactorCommand.maintain("secondary_pump")) elif ch in (ord("k"), ord("K")): self._queue_command(ReactorCommand.maintain("core")) elif ch in (ord("y"), ord("Y")): self._queue_command(ReactorCommand.maintain("turbine_1")) elif ch in (ord("u"), ord("U")): self._queue_command(ReactorCommand.maintain("turbine_2")) elif ch in (ord("i"), ord("I")): self._queue_command(ReactorCommand.maintain("turbine_3")) def _queue_command(self, command: ReactorCommand) -> None: if self.pending_command is None: self.pending_command = command else: for field in command.__dataclass_fields__: # type: ignore[attr-defined] value = getattr(command, field) if value is None or value is False: continue if field == "rod_step" and getattr(self.pending_command, field) is not None: setattr( self.pending_command, field, getattr(self.pending_command, field) + value, ) else: setattr(self.pending_command, field, value) if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None: self.pending_command.rod_manual = True def _toggle_turbine_unit(self, index: int) -> None: if index < 0 or index >= len(self.reactor.turbine_unit_active): return current = self.reactor.turbine_unit_active[index] self._queue_command(ReactorCommand(turbine_units={index + 1: not current})) def _request_reset(self) -> None: self.reset_requested = True if self.sim: self.sim.stop() def _clear_saved_state(self) -> None: if not self.save_path: return path = Path(self.save_path) try: path.unlink() LOGGER.info("Cleared saved state at %s", path) except FileNotFoundError: LOGGER.info("No saved state to clear at %s", path) def _reset_to_greenfield(self) -> None: LOGGER.info("Resetting reactor to initial state") self.reactor = Reactor.default() self.start_state = None self.pending_command = None self.reset_requested = False self.log_buffer.clear() def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]: cmd = self.pending_command self.pending_command = None return cmd def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None: stdscr.erase() height, width = stdscr.getmaxyx() if height < 24 or width < 90: stdscr.addstr( 0, 0, "Terminal window too small. Resize to at least 90x24.".ljust(width), curses.color_pair(4), ) stdscr.refresh() return data_height = height - 6 right_width = max(32, width // 3) left_width = width - right_width if left_width < 60: left_width = min(60, width - 20) right_width = width - left_width data_win = stdscr.derwin(data_height, left_width, 0, 0) help_win = stdscr.derwin(data_height, right_width, 0, left_width) status_win = stdscr.derwin(6, width, data_height, 0) self._draw_data_panel(data_win, state) self._draw_help_panel(help_win) self._draw_status_panel(status_win, state) stdscr.refresh() def _draw_data_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.box() win.addstr(0, 2, " Plant Overview ", curses.color_pair(1) | curses.A_BOLD) y = 2 y = self._draw_section( win, y, "Core", [ ("Fuel Temp", f"{state.core.fuel_temperature:8.1f} K"), ("Core Power", f"{state.core.power_output_mw:8.1f} MW"), ("Neutron Flux", f"{state.core.neutron_flux:10.2e}"), ("Rods", f"{self.reactor.control.rod_fraction:.3f}"), ("Rod Mode", "AUTO" if not self.reactor.control.manual_control else "MANUAL"), ("Setpoint", f"{self.reactor.control.setpoint_mw:7.0f} MW"), ("Reactivity", f"{state.core.reactivity_margin:+.4f}"), ], ) y = self._draw_section(win, y, "Key Poisons / Emitters", self._poison_lines(state)) y = self._draw_section( win, y, "Primary Loop", [ ("Pump", "ON" if self.reactor.primary_pump_active else "OFF"), ("Flow", f"{state.primary_loop.mass_flow_rate:7.0f} kg/s"), ("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"), ("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K"), ("Pressure", f"{state.primary_loop.pressure:5.2f} MPa"), ], ) y = self._draw_section( win, y, "Secondary Loop", [ ("Pump", "ON" if self.reactor.secondary_pump_active else "OFF"), ("Flow", f"{state.secondary_loop.mass_flow_rate:7.0f} kg/s"), ("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K"), ("Pressure", f"{state.secondary_loop.pressure:5.2f} MPa"), ("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}"), ], ) consumer_status = "n/a" consumer_demand = 0.0 if self.reactor.consumer: consumer_status = "ONLINE" if self.reactor.consumer.online else "OFF" consumer_demand = self.reactor.consumer.demand_mw y = self._draw_section( win, y, "Turbine / Grid", [ ("Turbines", " ".join(self._turbine_status_lines())), ("Electrical", f"{state.total_electrical_output():7.1f} MW"), ("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"), ("Consumer", f"{consumer_status}"), ("Demand", f"{consumer_demand:7.1f} MW"), ], ) self._draw_health_bar(win, y + 1) def _draw_help_panel(self, win: "curses._CursesWindow") -> None: win.erase() win.box() win.addstr(0, 2, " Controls ", curses.color_pair(1) | curses.A_BOLD) y = 2 for entry in self.keys: win.addstr(y, 2, f"{entry.key:<8} {entry.description}") y += 1 win.addstr(y + 1, 2, "Tips:", curses.color_pair(2) | curses.A_BOLD) tips = [ "Start pumps before withdrawing rods.", "Bring turbine and consumer online after thermal stabilization.", "Toggle turbine units (1/2/3) for staggered maintenance.", "Use m/n/k/y/u/i to request maintenance (stop equipment first).", "Press 'r' to reset/clear state if you want a cold start.", "Watch component health to avoid automatic trips.", ] for idx, tip in enumerate(tips, start=y + 2): win.addstr(idx, 4, f"- {tip}") def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1]) turbine_text = " ".join(self._turbine_status_lines()) msg = ( f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | " f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | " f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | " f"Turbines {turbine_text}" ) win.addstr(1, 1, msg, curses.color_pair(3)) if self.reactor.health_monitor.failure_log: win.addstr( 3, 1, f"Failures: {', '.join(self.reactor.health_monitor.failure_log)}", curses.color_pair(4) | curses.A_BOLD, ) log_y = 3 for record in list(self.log_buffer): if log_y >= win.getmaxyx()[0] - 1: break win.addstr(log_y, 1, record[: win.getmaxyx()[1] - 2], curses.color_pair(2)) log_y += 1 win.addstr(log_y, 1, "Press 'q' to exit and persist the current snapshot.", curses.color_pair(2)) def _draw_section( self, win: "curses._CursesWindow", start_y: int, title: str, lines: list[tuple[str, str] | str], ) -> int: height, width = win.getmaxyx() inner_width = width - 4 if start_y >= height - 2: return height - 2 win.addstr(start_y, 2, title[:inner_width], curses.A_BOLD | curses.color_pair(1)) row = start_y + 1 for line in lines: if row >= height - 1: break if isinstance(line, tuple): label, value = line text = f"{label:<18}: {value}" else: text = line win.addstr(row, 4, text[:inner_width]) row += 1 return row + 1 def _turbine_status_lines(self) -> list[str]: if not self.reactor.turbine_unit_active: return ["n/a"] return [ f"{idx + 1}:{'ON' if active else 'OFF'}" for idx, active in enumerate(self.reactor.turbine_unit_active) ] def _total_load_supplied(self, state: PlantState) -> float: return sum(t.load_supplied_mw for t in state.turbines) def _total_load_demand(self, state: PlantState) -> float: return sum(t.load_demand_mw for t in state.turbines) def _poison_lines(self, state: PlantState) -> list[tuple[str, str]]: inventory = state.core.fission_product_inventory or {} particles = state.core.emitted_particles or {} lines: list[tuple[str, str]] = [] def fmt(symbol: str, label: str) -> tuple[str, str]: qty = inventory.get(symbol, 0.0) threshold = constants.KEY_POISON_THRESHOLDS.get(symbol) flag = " !" if threshold is not None and qty >= threshold else "" return (f"{label}{flag}", f"{qty:9.2e}") lines.append(fmt("Xe", "Xe (xenon)")) lines.append(fmt("Sm", "Sm (samarium)")) lines.append(fmt("I", "I (iodine)")) lines.append(("Neutrons (src)", f"{particles.get('n', 0.0):9.2e}")) lines.append(("Gammas", f"{particles.get('gamma', 0.0):9.2e}")) lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}")) return lines def _draw_health_bar(self, win: "curses._CursesWindow", start_y: int) -> None: height, width = win.getmaxyx() if start_y >= height - 2: return win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1)) bar_width = max(10, min(width - 28, 40)) for idx, (name, comp) in enumerate(self.reactor.health_monitor.components.items(), start=1): filled = int(bar_width * comp.integrity) bar = "█" * filled + "-" * (bar_width - filled) color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4 row = start_y + idx if row >= height - 1: break label = f"{name:<12}:" win.addstr(row, 4, label[:14], curses.A_BOLD) bar_start = 4 + max(len(label), 14) + 1 win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color)) percent_col = min(width - 8, bar_start + bar_width + 2) win.addstr(row, percent_col, f"{comp.integrity*100:5.1f}%", curses.color_pair(color)) def _current_demand(self) -> float: if self.reactor.consumer: return self.reactor.consumer.demand_mw return 0.0 def _clamped_rod(self, delta: float) -> float: new_fraction = self.reactor.control.rod_fraction + delta return max(0.0, min(0.95, new_fraction)) def _install_log_capture(self) -> None: if self._log_handler: return self._previous_handlers = list(self._logger.handlers) for handler in self._previous_handlers: self._logger.removeHandler(handler) handler = _DashboardLogHandler(self.log_buffer) handler.setFormatter(logging.Formatter("%(levelname)s | %(message)s")) self._logger.addHandler(handler) self._logger.propagate = False self._log_handler = handler def _restore_logging(self) -> None: if not self._log_handler: return self._logger.removeHandler(self._log_handler) for handler in self._previous_handlers: self._logger.addHandler(handler) self._log_handler = None self._previous_handlers = [] class _DashboardLogHandler(logging.Handler): def __init__(self, buffer: deque[str]) -> None: super().__init__() self.buffer = buffer def emit(self, record: logging.LogRecord) -> None: msg = self.format(record) self.buffer.append(msg)