"""Interactive curses dashboard for real-time reactor operation.""" from __future__ import annotations import curses import logging from collections import deque from dataclasses import dataclass from pathlib import Path from typing import Optional from . import constants from .commands import ReactorCommand from .reactor import Reactor from .simulation import ReactorSimulation from .state import PlantState LOGGER = logging.getLogger(__name__) @dataclass class DashboardKey: key: str description: str class ReactorDashboard: """Minimal TUI dashboard allowing user control in real-time.""" def __init__( self, reactor: Reactor, start_state: Optional[PlantState], timestep: float = 1.0, save_path: Optional[str] = None, ) -> None: self.reactor = reactor self.start_state = start_state self.timestep = timestep self.save_path = save_path self.pending_command: Optional[ReactorCommand] = None self.sim: Optional[ReactorSimulation] = None self.quit_requested = False self.reset_requested = False self.log_buffer: deque[str] = deque(maxlen=4) self._log_handler: Optional[logging.Handler] = None self._previous_handlers: list[logging.Handler] = [] self._logger = logging.getLogger("reactor_sim") self.keys = [ DashboardKey("q", "Quit & save"), DashboardKey("space", "SCRAM"), DashboardKey("p", "Toggle primary pump"), DashboardKey("o", "Toggle secondary pump"), DashboardKey("g", "Toggle primary pump 1"), DashboardKey("h", "Toggle primary pump 2"), DashboardKey("j", "Toggle secondary pump 1"), DashboardKey("k", "Toggle secondary pump 2"), DashboardKey("t", "Toggle turbine"), DashboardKey("1/2/3", "Toggle turbine units 1-3"), DashboardKey("y/u/i", "Maintain turbine 1/2/3"), DashboardKey("c", "Toggle consumer"), DashboardKey("r", "Reset & clear state"), DashboardKey("m", "Maintain primary pump"), DashboardKey("n", "Maintain secondary pump"), DashboardKey("k", "Maintain core (requires shutdown)"), DashboardKey("+/-", "Withdraw/insert rods"), DashboardKey("[/]", "Adjust consumer demand −/+50 MW"), DashboardKey("s", "Setpoint −250 MW"), DashboardKey("d", "Setpoint +250 MW"), DashboardKey("a", "Toggle auto rod control"), ] def run(self) -> None: curses.wrapper(self._main) def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined] curses.curs_set(0) curses.start_color() curses.use_default_colors() curses.init_pair(1, curses.COLOR_CYAN, -1) curses.init_pair(2, curses.COLOR_YELLOW, -1) curses.init_pair(3, curses.COLOR_GREEN, -1) curses.init_pair(4, curses.COLOR_RED, -1) stdscr.nodelay(True) self._install_log_capture() try: while True: self.sim = ReactorSimulation( self.reactor, timestep=self.timestep, duration=None, realtime=True, command_provider=self._next_command, ) self.sim.start_state = self.start_state try: for state in self.sim.run(): self._draw(stdscr, state) self._handle_input(stdscr) if self.quit_requested or self.reset_requested: # Persist the latest state if we are exiting early. if self.sim: self.sim.last_state = state self.sim.stop() break finally: if self.save_path and self.sim and self.sim.last_state and not self.reset_requested: self.reactor.save_state(self.save_path, self.sim.last_state) if self.quit_requested: break if self.reset_requested: self._clear_saved_state() self._reset_to_greenfield() continue break finally: self._restore_logging() def _handle_input(self, stdscr: "curses._CursesWindow") -> None: while True: ch = stdscr.getch() if ch == -1: break if ch in (ord("q"), ord("Q")): self.quit_requested = True return if ch == ord(" "): self._queue_command(ReactorCommand.scram_all()) elif ch in (ord("p"), ord("P")): self._queue_command(ReactorCommand(primary_pump_on=not self.reactor.primary_pump_active)) elif ch in (ord("o"), ord("O")): self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active)) elif ch in (ord("g"), ord("G")): self._toggle_primary_pump_unit(0) elif ch in (ord("h"), ord("H")): self._toggle_primary_pump_unit(1) elif ch in (ord("j"), ord("J")): self._toggle_secondary_pump_unit(0) elif ch in (ord("k"), ord("K")): self._toggle_secondary_pump_unit(1) elif ch in (ord("t"), ord("T")): self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active)) elif ord("1") <= ch <= ord("9"): idx = ch - ord("1") self._toggle_turbine_unit(idx) elif ch in (ord("+"), ord("=")): # Insert rods (increase fraction) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(0.05))) elif ch == ord("-"): # Withdraw rods (decrease fraction) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05))) elif ch == ord("["): demand = self._current_demand() - 50.0 self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand))) elif ch == ord("]"): demand = self._current_demand() + 50.0 self._queue_command(ReactorCommand(consumer_demand=demand)) elif ch in (ord("c"), ord("C")): online = not (self.reactor.consumer.online if self.reactor.consumer else False) self._queue_command(ReactorCommand(consumer_online=online)) elif ch in (ord("r"), ord("R")): self._request_reset() elif ch in (ord("s"), ord("S")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0)) elif ch in (ord("d"), ord("D")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0)) elif ch in (ord("a"), ord("A")): self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control)) elif ch in (ord("m"), ord("M")): self._queue_command(ReactorCommand.maintain("primary_pump")) elif ch in (ord("n"), ord("N")): self._queue_command(ReactorCommand.maintain("secondary_pump")) elif ch in (ord("k"), ord("K")): self._queue_command(ReactorCommand.maintain("core")) elif ch in (ord("y"), ord("Y")): self._queue_command(ReactorCommand.maintain("turbine_1")) elif ch in (ord("u"), ord("U")): self._queue_command(ReactorCommand.maintain("turbine_2")) elif ch in (ord("i"), ord("I")): self._queue_command(ReactorCommand.maintain("turbine_3")) def _queue_command(self, command: ReactorCommand) -> None: if self.pending_command is None: self.pending_command = command else: for field in command.__dataclass_fields__: # type: ignore[attr-defined] value = getattr(command, field) if value is None or value is False: continue if field == "rod_step" and getattr(self.pending_command, field) is not None: setattr( self.pending_command, field, getattr(self.pending_command, field) + value, ) else: setattr(self.pending_command, field, value) if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None: self.pending_command.rod_manual = True def _toggle_turbine_unit(self, index: int) -> None: if index < 0 or index >= len(self.reactor.turbine_unit_active): return current = self.reactor.turbine_unit_active[index] self._queue_command(ReactorCommand(turbine_units={index + 1: not current})) def _request_reset(self) -> None: self.reset_requested = True if self.sim: self.sim.stop() def _clear_saved_state(self) -> None: if not self.save_path: return path = Path(self.save_path) try: path.unlink() LOGGER.info("Cleared saved state at %s", path) except FileNotFoundError: LOGGER.info("No saved state to clear at %s", path) def _reset_to_greenfield(self) -> None: LOGGER.info("Resetting reactor to initial state") self.reactor = Reactor.default() self.start_state = None self.pending_command = None self.reset_requested = False self.log_buffer.clear() def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]: cmd = self.pending_command self.pending_command = None return cmd def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None: stdscr.erase() height, width = stdscr.getmaxyx() min_status = 4 if height < min_status + 12 or width < 70: stdscr.addstr( 0, 0, "Terminal too small; try >=70x16 or reduce font size.".ljust(width), curses.color_pair(4), ) stdscr.refresh() return status_height = min_status data_height = height - status_height right_width = max(28, width // 3) left_width = width - right_width if left_width < 50: left_width = min(50, width - 18) right_width = width - left_width data_height = max(1, data_height) left_width = max(1, left_width) right_width = max(1, right_width) data_win = stdscr.derwin(data_height, left_width, 0, 0) help_win = stdscr.derwin(data_height, right_width, 0, left_width) status_win = stdscr.derwin(status_height, width, data_height, 0) self._draw_data_panel(data_win, state) self._draw_help_panel(help_win) self._draw_status_panel(status_win, state) stdscr.refresh() def _draw_data_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.box() win.addstr(0, 2, " Plant Overview ", curses.color_pair(1) | curses.A_BOLD) y = 2 y = self._draw_section( win, y, "Core", [ ("Fuel Temp", f"{state.core.fuel_temperature:8.1f} K"), ("Core Power", f"{state.core.power_output_mw:8.1f} MW"), ("Neutron Flux", f"{state.core.neutron_flux:10.2e}"), ("Rods", f"{self.reactor.control.rod_fraction:.3f}"), ("Rod Mode", "AUTO" if not self.reactor.control.manual_control else "MANUAL"), ("Setpoint", f"{self.reactor.control.setpoint_mw:7.0f} MW"), ("Reactivity", f"{state.core.reactivity_margin:+.4f}"), ], ) y = self._draw_section(win, y, "Key Poisons / Emitters", self._poison_lines(state)) y = self._draw_section( win, y, "Primary Loop", [ ("Pump1", self._pump_status(state.primary_pumps, 0)), ("Pump2", self._pump_status(state.primary_pumps, 1)), ("Flow", f"{state.primary_loop.mass_flow_rate:7.0f} kg/s"), ("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"), ("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K"), ("Pressure", f"{state.primary_loop.pressure:5.2f} MPa"), ], ) y = self._draw_section( win, y, "Secondary Loop", [ ("Pump1", self._pump_status(state.secondary_pumps, 0)), ("Pump2", self._pump_status(state.secondary_pumps, 1)), ("Flow", f"{state.secondary_loop.mass_flow_rate:7.0f} kg/s"), ("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K"), ("Pressure", f"{state.secondary_loop.pressure:5.2f} MPa"), ("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}"), ], ) consumer_status = "n/a" consumer_demand = 0.0 if self.reactor.consumer: consumer_status = "ONLINE" if self.reactor.consumer.online else "OFF" consumer_demand = self.reactor.consumer.demand_mw y = self._draw_section( win, y, "Turbine / Grid", [ ("Turbines", " ".join(self._turbine_status_lines())), ("Unit1 Elec", f"{state.turbines[0].electrical_output_mw:7.1f} MW" if state.turbines else "n/a"), ( "Unit2 Elec", f"{state.turbines[1].electrical_output_mw:7.1f} MW" if len(state.turbines) > 1 else "n/a", ), ( "Unit3 Elec", f"{state.turbines[2].electrical_output_mw:7.1f} MW" if len(state.turbines) > 2 else "n/a", ), ("Electrical", f"{state.total_electrical_output():7.1f} MW"), ("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"), ("Consumer", f"{consumer_status}"), ("Demand", f"{consumer_demand:7.1f} MW"), ], ) y = self._draw_section(win, y, "Maintenance", self._maintenance_lines()) y = self._draw_section(win, y, "Component Health", self._health_lines()) def _draw_help_panel(self, win: "curses._CursesWindow") -> None: win.erase() win.box() win.addstr(0, 2, " Controls ", curses.color_pair(1) | curses.A_BOLD) y = 2 for entry in self.keys: win.addstr(y, 2, f"{entry.key:<8} {entry.description}") y += 1 win.addstr(y + 1, 2, "Tips:", curses.color_pair(2) | curses.A_BOLD) tips = [ "Start pumps before withdrawing rods.", "Bring turbine and consumer online after thermal stabilization.", "Toggle turbine units (1/2/3) for staggered maintenance.", "Use m/n/k/y/u/i to request maintenance (stop equipment first).", "Press 'r' to reset/clear state if you want a cold start.", "Watch component health to avoid automatic trips.", ] for idx, tip in enumerate(tips, start=y + 2): win.addstr(idx, 4, f"- {tip}") def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1]) turbine_text = " ".join(self._turbine_status_lines()) msg = ( f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | " f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | " f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | " f"Turbines {turbine_text}" ) win.addstr(1, 1, msg, curses.color_pair(3)) if self.reactor.health_monitor.failure_log: win.addstr( 3, 1, f"Failures: {', '.join(self.reactor.health_monitor.failure_log)}", curses.color_pair(4) | curses.A_BOLD, ) log_y = 3 for record in list(self.log_buffer): if log_y >= win.getmaxyx()[0] - 1: break win.addstr(log_y, 1, record[: win.getmaxyx()[1] - 2], curses.color_pair(2)) log_y += 1 win.addstr(log_y, 1, "Press 'q' to exit and persist the current snapshot.", curses.color_pair(2)) def _draw_section( self, win: "curses._CursesWindow", start_y: int, title: str, lines: list[tuple[str, str] | str], ) -> int: height, width = win.getmaxyx() inner_width = width - 4 if start_y >= height - 2: return height - 2 win.addstr(start_y, 2, title[:inner_width], curses.A_BOLD | curses.color_pair(1)) row = start_y + 1 for line in lines: if row >= height - 1: break if isinstance(line, tuple): label, value = line text = f"{label:<18}: {value}" else: text = line win.addstr(row, 4, text[:inner_width]) row += 1 return row + 1 def _turbine_status_lines(self) -> list[str]: if not self.reactor.turbine_unit_active: return ["n/a"] return [ f"{idx + 1}:{'ON' if active else 'OFF'}" for idx, active in enumerate(self.reactor.turbine_unit_active) ] def _total_load_supplied(self, state: PlantState) -> float: return sum(t.load_supplied_mw for t in state.turbines) def _total_load_demand(self, state: PlantState) -> float: return sum(t.load_demand_mw for t in state.turbines) def _poison_lines(self, state: PlantState) -> list[tuple[str, str]]: inventory = state.core.fission_product_inventory or {} particles = state.core.emitted_particles or {} lines: list[tuple[str, str]] = [] def fmt(symbol: str, label: str) -> tuple[str, str]: qty = inventory.get(symbol, 0.0) threshold = constants.KEY_POISON_THRESHOLDS.get(symbol) flag = " !" if threshold is not None and qty >= threshold else "" return (f"{label}{flag}", f"{qty:9.2e}") lines.append(fmt("Xe", "Xe (xenon)")) lines.append(fmt("Sm", "Sm (samarium)")) lines.append(fmt("I", "I (iodine)")) lines.append(("Neutrons (src)", f"{particles.get('n', 0.0):9.2e}")) lines.append(("Gammas", f"{particles.get('gamma', 0.0):9.2e}")) lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}")) return lines def _health_lines(self) -> list[tuple[str, str]]: comps = self.reactor.health_monitor.components lines: list[tuple[str, str]] = [] for name, comp in comps.items(): pct = f"{comp.integrity*100:5.1f}%" state = "FAILED" if comp.failed else pct lines.append((name, state)) return lines def _maintenance_lines(self) -> list[tuple[str, str]]: if not self.reactor.maintenance_active: return [("Active", "None")] return [(comp, "IN PROGRESS") for comp in sorted(self.reactor.maintenance_active)] def _pump_status(self, pumps: list, index: int) -> str: if index >= len(pumps): return "n/a" state = pumps[index] return f"{'ON ' if state.active else 'OFF'} {state.flow_rate:6.0f} kg/s" def _current_demand(self) -> float: if self.reactor.consumer: return self.reactor.consumer.demand_mw return 0.0 def _clamped_rod(self, delta: float) -> float: new_fraction = self.reactor.control.rod_fraction + delta return max(0.0, min(0.95, new_fraction)) def _install_log_capture(self) -> None: if self._log_handler: return self._previous_handlers = list(self._logger.handlers) for handler in self._previous_handlers: self._logger.removeHandler(handler) handler = _DashboardLogHandler(self.log_buffer) handler.setFormatter(logging.Formatter("%(levelname)s | %(message)s")) self._logger.addHandler(handler) self._logger.propagate = False self._log_handler = handler def _restore_logging(self) -> None: if not self._log_handler: return self._logger.removeHandler(self._log_handler) for handler in self._previous_handlers: self._logger.addHandler(handler) self._log_handler = None self._previous_handlers = [] class _DashboardLogHandler(logging.Handler): def __init__(self, buffer: deque[str]) -> None: super().__init__() self.buffer = buffer def emit(self, record: logging.LogRecord) -> None: msg = self.format(record) self.buffer.append(msg)