"""Interactive curses dashboard for real-time reactor operation.""" from __future__ import annotations import curses import logging from collections import deque from dataclasses import dataclass from pathlib import Path from typing import Optional from . import constants from .commands import ReactorCommand from .reactor import Reactor from .simulation import ReactorSimulation from .state import PlantState, PumpState LOGGER = logging.getLogger(__name__) def _build_numpad_mapping() -> dict[int, float]: # Use keypad matrix constants when available; skip missing ones to avoid import errors on some terminals. mapping: dict[int, float] = {} table = { "KEY_C1": 0.1, # numpad 1 "KEY_C2": 0.2, # numpad 2 "KEY_C3": 0.3, # numpad 3 "KEY_B1": 0.4, # numpad 4 "KEY_B2": 0.5, # numpad 5 "KEY_B3": 0.6, # numpad 6 "KEY_A1": 0.7, # numpad 7 "KEY_A2": 0.8, # numpad 8 "KEY_A3": 0.9, # numpad 9 # Common keypad aliases when NumLock is on "KEY_END": 0.1, "KEY_DOWN": 0.2, "KEY_NPAGE": 0.3, "KEY_LEFT": 0.4, "KEY_B2": 0.5, # center stays 0.5 "KEY_RIGHT": 0.6, "KEY_HOME": 0.7, "KEY_UP": 0.8, "KEY_PPAGE": 0.9, } for name, value in table.items(): code = getattr(curses, name, None) if code is not None: mapping[code] = value return mapping _NUMPAD_ROD_KEYS = _build_numpad_mapping() @dataclass class DashboardKey: key: str description: str class ReactorDashboard: """Minimal TUI dashboard allowing user control in real-time.""" def __init__( self, reactor: Reactor, start_state: Optional[PlantState], timestep: float = 1.0, save_path: Optional[str] = None, ) -> None: self.reactor = reactor self.start_state = start_state self.timestep = timestep self.save_path = save_path self.pending_command: Optional[ReactorCommand] = None self.sim: Optional[ReactorSimulation] = None self.quit_requested = False self.reset_requested = False self.page = 1 # 1=metrics, 2=schematic (placeholder) self._last_state: Optional[PlantState] = None self._trend_history: deque[tuple[float, float, float]] = deque(maxlen=120) self.log_buffer: deque[str] = deque(maxlen=8) self._log_handler: Optional[logging.Handler] = None self._previous_handlers: list[logging.Handler] = [] self._logger = logging.getLogger("reactor_sim") self.help_sections: list[tuple[str, list[DashboardKey]]] = [ ( "Reactor / Safety", [ DashboardKey("q", "Quit & save"), DashboardKey("space", "SCRAM"), DashboardKey("r", "Reset & clear state"), DashboardKey("a", "Toggle auto rod control"), DashboardKey("F1/F2", "Metrics / schematic views"), DashboardKey("+/-", "Withdraw/insert rods"), DashboardKey("1-9 / Numpad", "Set rods to 0.1 … 0.9 (manual)"), DashboardKey("[/]", "Adjust consumer demand −/+50 MW"), DashboardKey("s/d", "Setpoint −/+250 MW"), DashboardKey("p", "Maintain core (shutdown required)"), ], ), ( "Pumps", [ DashboardKey("g/h", "Toggle primary pump 1/2"), DashboardKey("j/k", "Toggle secondary pump 1/2"), DashboardKey("m/n", "Maintain primary pumps 1/2"), DashboardKey(",/.", "Maintain secondary pumps 1/2"), ], ), ( "Generators", [ DashboardKey("b/v", "Toggle generator 1/2"), DashboardKey("x", "Toggle generator auto"), DashboardKey("l/;", "Toggle relief primary/secondary"), DashboardKey("B/V", "Maintain generator 1/2"), ], ), ( "Turbines / Grid", [ DashboardKey("t", "Toggle turbine bank"), DashboardKey("Shift+1/2/3", "Toggle turbine units 1-3"), DashboardKey("y/u/i", "Maintain turbine 1/2/3"), DashboardKey("c", "Toggle consumer"), ], ), ] def run(self) -> None: curses.wrapper(self._main) def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined] curses.curs_set(0) curses.start_color() curses.use_default_colors() curses.init_pair(1, curses.COLOR_CYAN, -1) curses.init_pair(2, curses.COLOR_YELLOW, -1) curses.init_pair(3, curses.COLOR_GREEN, -1) curses.init_pair(4, curses.COLOR_RED, -1) stdscr.nodelay(True) stdscr.keypad(True) self._install_log_capture() try: while True: self.sim = ReactorSimulation( self.reactor, timestep=self.timestep, duration=None, realtime=True, command_provider=self._next_command, ) self.sim.start_state = self.start_state try: for state in self.sim.run(): self._last_state = state self._draw(stdscr, state) self._handle_input(stdscr) if self.quit_requested or self.reset_requested: # Persist the latest state if we are exiting early. if self.sim: self.sim.last_state = state self.sim.stop() break finally: if self.save_path and self.sim and self.sim.last_state and not self.reset_requested: self.reactor.save_state(self.save_path, self.sim.last_state) if self.quit_requested: break if self.reset_requested: self._clear_saved_state() self._reset_to_greenfield() continue break finally: self._restore_logging() def _handle_input(self, stdscr: "curses._CursesWindow") -> None: while True: ch = stdscr.getch() if ch == -1: break keyname = None try: keyname = curses.keyname(ch) except curses.error: keyname = None if ch in (ord("q"), ord("Q")): self.quit_requested = True return if ch == ord(" "): self._queue_command(ReactorCommand.scram_all()) elif ch in (ord("o"), ord("O")): continue elif ch in (ord("g"), ord("G")): self._toggle_primary_pump_unit(0) elif ch in (ord("h"), ord("H")): self._toggle_primary_pump_unit(1) elif ch in (ord("j"), ord("J")): self._toggle_secondary_pump_unit(0) elif ch in (ord("k"), ord("K")): self._toggle_secondary_pump_unit(1) elif ch in (ord("p"), ord("P")): self._queue_command(ReactorCommand.maintain("core")) elif ch in (ord("b"), ord("B")): self._toggle_generator_unit(0) elif ch in (ord("v"), ord("V")): self._toggle_generator_unit(1) elif ch == ord("l"): self._queue_command(ReactorCommand(primary_relief=not self.reactor.primary_relief_open)) elif ch == ord(";"): self._queue_command(ReactorCommand(secondary_relief=not self.reactor.secondary_relief_open)) elif ch in (ord("x"), ord("X")): self._queue_command(ReactorCommand(generator_auto=not self.reactor.generator_auto)) elif ch in (ord("t"), ord("T")): self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active)) elif keyname and keyname.decode(errors="ignore") in ("!", "@", "#", '"'): name = keyname.decode(errors="ignore") turbine_hotkeys = {"!": 0, "@": 1, "#": 2, '"': 1} self._toggle_turbine_unit(turbine_hotkeys[name]) elif ch in (ord("!"), ord("@"), ord("#"), ord('"')): turbine_hotkeys = {ord("!"): 0, ord("@"): 1, ord("#"): 2, ord('"'): 1} self._toggle_turbine_unit(turbine_hotkeys[ch]) elif keyname and keyname.startswith(b"KP_") and keyname[-1:] in b"123456789": target = (keyname[-1] - ord("0")) / 10.0 # type: ignore[arg-type] self._queue_command(ReactorCommand(rod_position=target, rod_manual=True)) elif ord("1") <= ch <= ord("9"): target = (ch - ord("0")) / 10.0 self._queue_command(ReactorCommand(rod_position=target, rod_manual=True)) elif ch in _NUMPAD_ROD_KEYS: self._queue_command(ReactorCommand(rod_position=_NUMPAD_ROD_KEYS[ch], rod_manual=True)) elif curses.KEY_F1 <= ch <= curses.KEY_F9: target = (ch - curses.KEY_F1 + 1) / 10.0 self._queue_command(ReactorCommand(rod_position=target, rod_manual=True)) elif ch in (ord("+"), ord("=")): # Insert rods (increase fraction) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(constants.ROD_MANUAL_STEP))) elif ch == ord("-"): # Withdraw rods (decrease fraction) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-constants.ROD_MANUAL_STEP))) elif ch == ord("["): demand = self._current_demand() - 50.0 self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand))) elif ch == ord("]"): demand = self._current_demand() + 50.0 self._queue_command(ReactorCommand(consumer_demand=demand)) elif ch in (ord("c"), ord("C")): online = not (self.reactor.consumer.online if self.reactor.consumer else False) self._queue_command(ReactorCommand(consumer_online=online)) elif ch in (ord("r"), ord("R")): self._request_reset() elif ch in (ord("s"), ord("S")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0)) elif ch in (ord("d"), ord("D")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0)) elif ch in (ord("a"), ord("A")): self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control)) elif ch in (ord("m"), ord("M")): self._queue_command(ReactorCommand.maintain("primary_pump_1")) elif ch in (ord("n"), ord("N")): self._queue_command(ReactorCommand.maintain("primary_pump_2")) elif ch == ord(","): self._queue_command(ReactorCommand.maintain("secondary_pump_1")) elif ch == ord("."): self._queue_command(ReactorCommand.maintain("secondary_pump_2")) elif ch in (ord("B"),): self._queue_command(ReactorCommand.maintain("generator_1")) elif ch in (ord("V"),): self._queue_command(ReactorCommand.maintain("generator_2")) elif ch in (ord("y"), ord("Y")): self._queue_command(ReactorCommand.maintain("turbine_1")) elif ch in (ord("u"), ord("U")): self._queue_command(ReactorCommand.maintain("turbine_2")) elif ch in (ord("i"), ord("I")): self._queue_command(ReactorCommand.maintain("turbine_3")) def _queue_command(self, command: ReactorCommand) -> None: if self.pending_command is None: self.pending_command = command else: for field in command.__dataclass_fields__: # type: ignore[attr-defined] value = getattr(command, field) if value is None or value is False: continue if field == "rod_step" and getattr(self.pending_command, field) is not None: setattr( self.pending_command, field, getattr(self.pending_command, field) + value, ) else: setattr(self.pending_command, field, value) if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None: self.pending_command.rod_manual = True def _toggle_turbine_unit(self, index: int) -> None: if index < 0 or index >= len(self.reactor.turbine_unit_active): return current = self.reactor.turbine_unit_active[index] self._queue_command(ReactorCommand(turbine_units={index + 1: not current})) def _toggle_primary_pump_unit(self, index: int) -> None: if index < 0 or index >= len(self.reactor.primary_pump_units): return current = self.reactor.primary_pump_units[index] self._queue_command(ReactorCommand(primary_pumps={index + 1: not current})) def _toggle_secondary_pump_unit(self, index: int) -> None: if index < 0 or index >= len(self.reactor.secondary_pump_units): return current = self.reactor.secondary_pump_units[index] self._queue_command(ReactorCommand(secondary_pumps={index + 1: not current})) def _toggle_generator_unit(self, index: int) -> None: current = False if self._last_state and index < len(self._last_state.generators): gen = self._last_state.generators[index] current = gen.running or gen.starting self._queue_command(ReactorCommand(generator_units={index + 1: not current})) def _request_reset(self) -> None: self.reset_requested = True if self.sim: self.sim.stop() def _clear_saved_state(self) -> None: if not self.save_path: return path = Path(self.save_path) try: path.unlink() LOGGER.info("Cleared saved state at %s", path) except FileNotFoundError: LOGGER.info("No saved state to clear at %s", path) def _reset_to_greenfield(self) -> None: LOGGER.info("Resetting reactor to initial state") self.reactor = Reactor.default() self.start_state = None self.pending_command = None self._last_state = None self.reset_requested = False self.log_buffer.clear() def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]: cmd = self.pending_command self.pending_command = None return cmd def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None: stdscr.erase() height, width = stdscr.getmaxyx() min_status = 6 if height < min_status + 12 or width < 70: stdscr.addstr( 0, 0, "Terminal too small; try >=70x16 or reduce font size.".ljust(width), curses.color_pair(4), ) stdscr.refresh() return log_rows = max(2, min(len(self.log_buffer) + 2, 8)) status_height = min(height - 1, max(min_status, min_status + log_rows)) data_height = max(1, height - status_height) gap = 2 right_width = max(28, width // 3) left_width = width - right_width - gap if left_width < 50: left_width = min(50, width - (18 + gap)) right_width = width - left_width - gap data_height = max(1, data_height) left_width = max(1, left_width) right_width = max(1, right_width) data_win = stdscr.derwin(data_height, left_width, 0, 0) help_win = stdscr.derwin(data_height, right_width, 0, left_width + gap) status_win = stdscr.derwin(status_height, width, data_height, 0) self._draw_data_panel(data_win, state) self._draw_help_panel(help_win) self._draw_status_panel(status_win, state) stdscr.refresh() def _draw_data_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.box() win.addstr(0, 2, " Plant Overview ", curses.color_pair(1) | curses.A_BOLD) self._update_trends(state) height, width = win.getmaxyx() inner_height = height - 2 inner_width = width - 2 left_width = max(28, inner_width // 2) right_width = inner_width - left_width left_win = win.derwin(inner_height, left_width, 1, 1) right_win = win.derwin(inner_height, right_width, 1, 1 + left_width) for row in range(1, height - 1): win.addch(row, 1 + left_width, curses.ACS_VLINE) if left_width + 1 < width - 1: win.addch(row, 1 + left_width + 1, curses.ACS_VLINE) left_win.erase() right_win.erase() left_y = 0 left_y = self._draw_section( left_win, left_y, "Core", [ ( "Fuel Temp", f"{state.core.fuel_temperature:6.1f} K (Max {constants.CORE_MELTDOWN_TEMPERATURE:4.0f})", ), ( "Core Power", f"{state.core.power_output_mw:6.1f} MW (Nom {constants.NORMAL_CORE_POWER_MW:4.0f}/Max {constants.TEST_MAX_POWER_MW:4.0f})", ), ("Neutron Flux", f"{state.core.neutron_flux:10.2e}"), ("Rods", f"{self.reactor.control.rod_fraction:.3f}"), ("Rod Mode", "AUTO" if not self.reactor.control.manual_control else "MANUAL"), ("Setpoint", f"{self.reactor.control.setpoint_mw:7.0f} MW"), ("Reactivity", f"{state.core.reactivity_margin:+.4f}"), ], ) left_y = self._draw_section( left_win, left_y, "Trends", self._trend_lines(), ) left_y = self._draw_section(left_win, left_y, "Key Poisons / Emitters", self._poison_lines(state)) left_y = self._draw_section( left_win, left_y, "Primary Loop", [ ("Pump1", self._pump_status(state.primary_pumps, 0)), ("Pump2", self._pump_status(state.primary_pumps, 1)), ( "Flow", f"{state.primary_loop.mass_flow_rate:7.0f}/{self.reactor.primary_pump.nominal_flow * len(self.reactor.primary_pump_units):.0f} kg/s", ), ("Level", f"{state.primary_loop.level*100:6.1f}%"), ("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"), ("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K (Target {constants.PRIMARY_OUTLET_TARGET_K:4.0f})"), ("Pressure", f"{state.primary_loop.pressure:5.2f}/{constants.MAX_PRESSURE:4.1f} MPa"), ("Pressurizer", f"{self.reactor.pressurizer_level*100:6.1f}% @ {constants.PRIMARY_PRESSURIZER_SETPOINT_MPA:4.1f} MPa"), ("Relief", "OPEN" if self.reactor.primary_relief_open else "CLOSED"), ], ) self._draw_section( left_win, left_y, "Secondary Loop", [ ("Pump1", self._pump_status(state.secondary_pumps, 0)), ("Pump2", self._pump_status(state.secondary_pumps, 1)), ( "Flow", f"{state.secondary_loop.mass_flow_rate:7.0f}/{self.reactor.secondary_pump.nominal_flow * len(self.reactor.secondary_pump_units):.0f} kg/s", ), ("Level", f"{state.secondary_loop.level*100:6.1f}% (Target {constants.SECONDARY_INVENTORY_TARGET*100:4.0f}%)"), ( "Feedwater", f"valve {self.reactor.feedwater_valve*100:5.1f}% steam {state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality):6.0f} kg/s", ), ("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K (Target {constants.SECONDARY_OUTLET_TARGET_K:4.0f})"), ("Outlet Temp", f"{state.secondary_loop.temperature_out:7.1f} K (Target {constants.SECONDARY_OUTLET_TARGET_K:4.0f})"), ("Pressure", f"{state.secondary_loop.pressure:5.2f}/{constants.MAX_PRESSURE:4.1f} MPa"), ("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}/1.00"), ("Relief", "OPEN" if self.reactor.secondary_relief_open else "CLOSED"), ], ) right_y = 0 consumer_status = "n/a" consumer_demand = 0.0 if self.reactor.consumer: consumer_status = "ONLINE" if self.reactor.consumer.online else "OFF" consumer_demand = self.reactor.consumer.demand_mw right_y = self._draw_section( right_win, right_y, "Turbine / Grid", [ ("Turbines", " ".join(self._turbine_status_lines())), ("Rated Elec", f"{len(self.reactor.turbines)*self.reactor.turbines[0].rated_output_mw:7.1f} MW"), ( "Steam", f"h={state.turbines[0].steam_enthalpy:5.0f} kJ/kg avail {self._steam_available_power(state):6.1f} MW " f"flow {state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality):6.0f} kg/s" if state.turbines else "n/a", ), ( "Units Elec", " ".join([f"{t.electrical_output_mw:6.1f}MW" for t in state.turbines]) if state.turbines else "n/a", ), ( "Governor", ( f"thr {self.reactor.turbines[0].throttle:4.2f}→{self._desired_throttle(state.turbines[0]):4.2f} " f"ΔP {(state.turbines[0].load_demand_mw - state.turbines[0].electrical_output_mw):6.1f} MW" ) if state.turbines else "n/a", ), ( "Condenser", ( f"P={state.turbines[0].condenser_pressure:4.2f}/{constants.CONDENSER_MAX_PRESSURE_MPA:4.2f} MPa " f"T={state.turbines[0].condenser_temperature:6.1f}K Foul={state.turbines[0].fouling_penalty*100:4.1f}%" ) if state.turbines else "n/a", ), ("Electrical", f"{state.total_electrical_output():7.1f} MW | Load {self._total_load_supplied(state):6.1f}/{self._total_load_demand(state):6.1f} MW"), ("Consumer", f"{consumer_status} demand {consumer_demand:6.1f} MW"), ], ) right_y = self._draw_section(right_win, right_y, "Generators", self._generator_lines(state)) right_y = self._draw_section(right_win, right_y, "Power Stats", self._power_lines(state)) right_y = self._draw_section(right_win, right_y, "Heat Exchanger", self._heat_exchanger_lines(state)) right_y = self._draw_section(right_win, right_y, "Protections / Warnings", self._protection_lines(state)) right_y = self._draw_section(right_win, right_y, "Maintenance", self._maintenance_lines()) self._draw_health_bars(right_win, right_y) def _draw_help_panel(self, win: "curses._CursesWindow") -> None: def _add_safe(row: int, col: int, text: str, attr: int = 0) -> bool: max_y, max_x = win.getmaxyx() if row >= max_y - 1 or col >= max_x - 1: return False clipped = text[: max(0, max_x - col - 1)] try: win.addstr(row, col, clipped, attr) except curses.error: return False return True win.erase() win.box() _add_safe(0, 2, " Controls ", curses.color_pair(1) | curses.A_BOLD) y = 2 for title, entries in self.help_sections: if not _add_safe(y, 2, title, curses.color_pair(1) | curses.A_BOLD): return y += 1 for entry in entries: if not _add_safe(y, 4, f"{entry.key:<8} {entry.description}"): return y += 1 y += 1 if not _add_safe(y, 2, "Tips:", curses.color_pair(2) | curses.A_BOLD): return tips = [ "Start pumps before withdrawing rods.", "Bring turbine and consumer online after thermal stabilization.", "Toggle turbine units (1/2/3) for staggered maintenance.", "Use m/n/,/. for pump maintenance; B/V for generators.", "Press 'r' to reset/clear state if you want a cold start.", "Watch component health, DNB margin, and subcooling to avoid automatic trips.", ] for idx, tip in enumerate(tips, start=y + 2): if not _add_safe(idx, 4, f"- {tip}"): break def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1]) turbine_text = " ".join(self._turbine_status_lines()) msg = ( f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | " f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | " f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | " f"Turbines {turbine_text} | Page {'Metrics' if self.page == 1 else 'Schematic'}" ) win.addstr(1, 1, msg, curses.color_pair(3)) if self.reactor.health_monitor.failure_log: win.addstr( 3, 1, f"Failures: {', '.join(self.reactor.health_monitor.failure_log)}", curses.color_pair(4) | curses.A_BOLD, ) log_y = 4 else: log_y = 3 for record in list(self.log_buffer): if log_y >= win.getmaxyx()[0] - 1: break win.addstr(log_y, 1, record[: win.getmaxyx()[1] - 2], curses.color_pair(2)) log_y += 1 win.addstr(log_y, 1, "Press 'q' to exit and persist the current snapshot.", curses.color_pair(2)) def _draw_section( self, win: "curses._CursesWindow", start_y: int, title: str, lines: list[tuple[str, str] | tuple[str, str, int] | str], ) -> int: height, width = win.getmaxyx() inner_width = width - 4 if start_y >= height - 2: return height - 2 win.addstr(start_y, 2, title[:inner_width], curses.A_BOLD | curses.color_pair(1)) row = start_y + 1 for line in lines: if row >= height - 1: break attr = 0 if isinstance(line, tuple): if len(line) == 3: label, value, attr = line else: label, value = line text = f"{label:<18}: {value}" else: text = line win.addstr(row, 4, text[:inner_width], attr) row += 1 return row + 1 def _flow_arrow(self, flow: float) -> str: if flow > 15000: return "====>" if flow > 5000: return "===>" if flow > 500: return "->" return "--" def _pump_glyph(self, pump_state: PumpState | None) -> str: if pump_state is None: return "·" status = getattr(pump_state, "status", "OFF") if status == "RUN": return "▶" if status == "CAV": return "!" if status == "STARTING": return ">" if status == "STOPPING": return "-" return "·" def _turbine_status_lines(self) -> list[str]: if not self.reactor.turbine_unit_active: return ["n/a"] lines: list[str] = [] for idx, active in enumerate(self.reactor.turbine_unit_active): label = f"{idx + 1}:" status = "ON" if active else "OFF" if idx < len(getattr(self._last_state, "turbines", [])): t_state = self._last_state.turbines[idx] status = getattr(t_state, "status", status) lines.append(f"{label}{status}") return lines def _total_load_supplied(self, state: PlantState) -> float: return sum(t.load_supplied_mw for t in state.turbines) def _total_load_demand(self, state: PlantState) -> float: return sum(t.load_demand_mw for t in state.turbines) def _poison_lines(self, state: PlantState) -> list[tuple[str, str]]: inventory = state.core.fission_product_inventory or {} particles = state.core.emitted_particles or {} lines: list[tuple[str, str]] = [] def fmt(symbol: str, label: str, qty: float) -> tuple[str, str]: threshold = constants.KEY_POISON_THRESHOLDS.get(symbol) flag = " !" if threshold is not None and qty >= threshold else "" return (f"{label}{flag}", f"{qty:9.2e}") lines.append(fmt("Xe", "Xe (xenon)", getattr(state.core, "xenon_inventory", 0.0))) lines.append(fmt("Sm", "Sm (samarium)", inventory.get("Sm", 0.0))) lines.append(fmt("I", "I (iodine)", getattr(state.core, "iodine_inventory", 0.0))) try: xe_penalty = -self.reactor.neutronics.xenon_penalty(state.core) lines.append(("Xe Δρ", f"{xe_penalty:+.4f}")) except Exception: pass lines.append(("Neutrons (src)", f"{particles.get('n', 0.0):9.2e}")) lines.append(("Gammas", f"{particles.get('gamma', 0.0):9.2e}")) lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}")) return lines def _health_lines(self) -> list[tuple[str, str]]: comps = self.reactor.health_monitor.components lines: list[tuple[str, str]] = [] for name, comp in comps.items(): pct = f"{comp.integrity*100:5.1f}%" state = "FAILED" if comp.failed else pct lines.append((name, state)) return lines def _maintenance_lines(self) -> list[tuple[str, str]]: if not self.reactor.maintenance_active: return [("Active", "None")] return [(comp, "IN PROGRESS") for comp in sorted(self.reactor.maintenance_active)] def _generator_lines(self, state: PlantState) -> list[tuple[str, str]]: if not state.generators: return [("Status", "n/a")] lines: list[tuple[str, str]] = [] control = "AUTO" if self.reactor.generator_auto else "MANUAL" lines.append(("Control", control)) for idx, gen in enumerate(state.generators): status = "RUN" if gen.running else "START" if gen.starting else "OFF" spool = f" spool {gen.spool_remaining:4.1f}s" if gen.starting else "" lines.append((f"Gen{idx + 1}", f"{status} {gen.power_output_mw:6.1f}/{self.reactor.generators[idx].rated_output_mw:4.0f} MW{spool}")) lines.append((f" Battery", f"{gen.battery_charge*100:5.1f}% out {gen.battery_output_mw:4.1f} MW")) return lines def _power_lines(self, state: PlantState) -> list[tuple[str, str]]: draws = getattr(state, "aux_draws", {}) or {} primary_nom = constants.PUMP_POWER_MW * len(self.reactor.primary_pump_units) secondary_nom = constants.PUMP_POWER_MW * len(self.reactor.secondary_pump_units) lines = [ ("Base Aux", f"{draws.get('base', 0.0):6.1f}/{constants.BASE_AUX_LOAD_MW:4.1f} MW"), ("Primary Aux", f"{draws.get('primary_pumps', 0.0):6.1f}/{primary_nom:4.1f} MW"), ("Secondary Aux", f"{draws.get('secondary_pumps', 0.0):6.1f}/{secondary_nom:4.1f} MW"), ("Aux Demand", f"{draws.get('total_demand', 0.0):6.1f} MW"), ("Aux Supplied", f"{draws.get('supplied', 0.0):6.1f} MW"), ("Gen Output", f"{draws.get('generator_output', 0.0):6.1f} MW"), ("Turbine Elec", f"{draws.get('turbine_output', 0.0):6.1f} MW"), ] return lines def _heat_exchanger_lines(self, state: PlantState) -> list[tuple[str, str]]: delta_t = getattr(state, "primary_to_secondary_delta_t", 0.0) eff = getattr(state, "heat_exchanger_efficiency", 0.0) hx_fouling = getattr(state, "hx_fouling", 0.0) return [ ("ΔT (pri-sec)", f"{delta_t:6.1f} K"), ("HX Eff", f"{eff*100:6.1f}%"), ("Chem/Foul", f"O2 {getattr(state, 'dissolved_oxygen_ppm', 0.0):5.1f} ppm Na {getattr(state, 'sodium_ppm', 0.0):5.1f} ppm Foul {hx_fouling*100:5.1f}%"), ] def _protection_lines(self, state: PlantState) -> list[tuple[str, str]]: lines: list[tuple[str, str] | tuple[str, str, int]] = [] lines.append(("SCRAM", "ACTIVE" if self.reactor.shutdown else "CLEAR", curses.color_pair(4) if self.reactor.shutdown else 0)) if self.reactor.meltdown: lines.append(("Meltdown", "IN PROGRESS", curses.color_pair(4) | curses.A_BOLD)) sec_flow_low = state.secondary_loop.mass_flow_rate <= 1.0 or not self.reactor.secondary_pump_active heat_sink_risk = sec_flow_low and state.core.power_output_mw > 50.0 if heat_sink_risk: heat_text = "TRIPPED low secondary flow >50 MW" heat_attr = curses.color_pair(4) | curses.A_BOLD elif sec_flow_low: heat_text = "ARMED (secondary off/low flow)" heat_attr = curses.color_pair(2) | curses.A_BOLD else: heat_text = "OK" heat_attr = curses.color_pair(3) lines.append(("Heat sink", heat_text, heat_attr)) draws = getattr(state, "aux_draws", {}) or {} demand = draws.get("total_demand", 0.0) supplied = draws.get("supplied", 0.0) if demand > 0.1 and supplied + 1e-6 < demand: aux_text = f"DEFICIT {supplied:5.1f}/{demand:5.1f} MW" aux_attr = curses.color_pair(2) | curses.A_BOLD elif demand > 0.1: aux_text = f"OK {supplied:5.1f}/{demand:5.1f} MW" aux_attr = curses.color_pair(3) else: aux_text = "Idle" aux_attr = 0 lines.append(("Aux power", aux_text, aux_attr)) reliefs = [] if self.reactor.primary_relief_open: reliefs.append("Primary") if self.reactor.secondary_relief_open: reliefs.append("Secondary") relief_attr = curses.color_pair(2) | curses.A_BOLD if reliefs else 0 lines.append(("Relief valves", ", ".join(reliefs) if reliefs else "Closed", relief_attr)) lines.append( ( "SCRAM trips", "DNB<0.5 | Subcool<2K | SG lvl<5/>98% | SG P>15.2MPa", ) ) return lines def _steam_available_power(self, state: PlantState) -> float: mass_flow = state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality) if mass_flow <= 1.0: return 0.0 if state.turbines: enthalpy_kjkg = max(0.0, state.turbines[0].steam_enthalpy) else: enthalpy_kjkg = (constants.STEAM_LATENT_HEAT / 1_000.0) return (enthalpy_kjkg * mass_flow) / 1_000.0 def _desired_throttle(self, turbine_state) -> float: if not self.reactor.turbines: return 0.0 turbine = self.reactor.turbines[0] demand = turbine_state.load_demand_mw return 0.4 if demand <= 0 else min(1.0, 0.4 + demand / max(1e-6, turbine.rated_output_mw)) def _update_trends(self, state: PlantState) -> None: self._trend_history.append((state.time_elapsed, state.core.fuel_temperature, state.core.power_output_mw)) def _trend_lines(self) -> list[tuple[str, str]]: if len(self._trend_history) < 2: return [("Fuel Temp Δ", "n/a"), ("Core Power Δ", "n/a")] start_t, start_temp, start_power = self._trend_history[0] end_t, end_temp, end_power = self._trend_history[-1] duration = max(1.0, end_t - start_t) temp_delta = end_temp - start_temp power_delta = end_power - start_power temp_rate = temp_delta / duration power_rate = power_delta / duration return [ ("Fuel Temp Δ", f"{end_temp:7.1f} K (Δ{temp_delta:+6.1f} / {duration:4.0f}s, {temp_rate:+5.2f}/s)"), ("Core Power Δ", f"{end_power:7.1f} MW (Δ{power_delta:+6.1f} / {duration:4.0f}s, {power_rate:+5.2f}/s)"), ] def _draw_health_bars(self, win: "curses._CursesWindow", start_y: int) -> int: height, width = win.getmaxyx() inner_width = width - 4 if start_y >= height - 2: return height - 2 win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1)) bar_width = max(8, min(inner_width - 18, 40)) label_width = 16 row = start_y + 1 for name, comp in self.reactor.health_monitor.components.items(): if row >= height - 1: break label = f"{name:<{label_width}}" target = 0.0 if comp.failed else comp.integrity filled = int(bar_width * max(0.0, min(1.0, target))) bar = "#" * filled + "-" * (bar_width - filled) color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4 win.addstr(row, 4, f"{label}:") bar_start = 4 + label_width + 1 win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color)) percent_text = "FAILED" if comp.failed else f"{comp.integrity*100:5.1f}%" percent_x = min(width - len(percent_text) - 2, bar_start + bar_width + 2) win.addstr(row, percent_x, percent_text, curses.color_pair(color)) row += 1 return row + 1 def _pump_status(self, pumps: list, index: int) -> str: if index >= len(pumps): return "n/a" state = pumps[index] status = getattr(state, "status", "ON" if state.active else "OFF") return f"{status:<8} {state.flow_rate:6.0f} kg/s" def _current_demand(self) -> float: if self.reactor.consumer: return self.reactor.consumer.demand_mw return 0.0 def _clamped_rod(self, delta: float) -> float: new_fraction = self.reactor.control.rod_fraction + delta step = constants.ROD_MANUAL_STEP quantized = round(new_fraction / step) * step return max(0.0, min(0.95, quantized)) def _install_log_capture(self) -> None: if self._log_handler: return self._previous_handlers = list(self._logger.handlers) for handler in self._previous_handlers: self._logger.removeHandler(handler) handler = _DashboardLogHandler(self.log_buffer) handler.setFormatter(logging.Formatter("%(levelname)s | %(message)s")) self._logger.addHandler(handler) self._logger.propagate = False self._log_handler = handler def _restore_logging(self) -> None: if not self._log_handler: return self._logger.removeHandler(self._log_handler) for handler in self._previous_handlers: self._logger.addHandler(handler) self._log_handler = None self._previous_handlers = [] class _DashboardLogHandler(logging.Handler): def __init__(self, buffer: deque[str]) -> None: super().__init__() self.buffer = buffer self._last_msg: str | None = None self._repeat_count: int = 0 def emit(self, record: logging.LogRecord) -> None: msg = self.format(record) if msg == self._last_msg: self._repeat_count += 1 if self.buffer and self.buffer[-1].startswith(self._last_msg): try: self.buffer[-1] = f"{self._last_msg} (x{self._repeat_count + 1})" except Exception: self.buffer.append(f"{msg} (x{self._repeat_count + 1})") else: self.buffer.append(f"{msg} (x{self._repeat_count + 1})") return else: self._last_msg = msg self._repeat_count = 0 self.buffer.append(msg)