Files
Reactor-Sim/src/reactor_sim/dashboard.py
2025-11-26 23:09:03 +01:00

877 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Interactive curses dashboard for real-time reactor operation."""
from __future__ import annotations
import curses
import logging
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from . import constants
from .commands import ReactorCommand
from .reactor import Reactor
from .simulation import ReactorSimulation
from .state import PlantState, PumpState
LOGGER = logging.getLogger(__name__)
def _build_numpad_mapping() -> dict[int, float]:
# Use keypad matrix constants when available; skip missing ones to avoid import errors on some terminals.
mapping: dict[int, float] = {}
table = {
"KEY_C1": 0.1, # numpad 1
"KEY_C2": 0.2, # numpad 2
"KEY_C3": 0.3, # numpad 3
"KEY_B1": 0.4, # numpad 4
"KEY_B2": 0.5, # numpad 5
"KEY_B3": 0.6, # numpad 6
"KEY_A1": 0.7, # numpad 7
"KEY_A2": 0.8, # numpad 8
"KEY_A3": 0.9, # numpad 9
# Common keypad aliases when NumLock is on
"KEY_END": 0.1,
"KEY_DOWN": 0.2,
"KEY_NPAGE": 0.3,
"KEY_LEFT": 0.4,
"KEY_B2": 0.5, # center stays 0.5
"KEY_RIGHT": 0.6,
"KEY_HOME": 0.7,
"KEY_UP": 0.8,
"KEY_PPAGE": 0.9,
}
for name, value in table.items():
code = getattr(curses, name, None)
if code is not None:
mapping[code] = value
return mapping
_NUMPAD_ROD_KEYS = _build_numpad_mapping()
@dataclass
class DashboardKey:
key: str
description: str
class ReactorDashboard:
"""Minimal TUI dashboard allowing user control in real-time."""
def __init__(
self,
reactor: Reactor,
start_state: Optional[PlantState],
timestep: float = 1.0,
save_path: Optional[str] = None,
) -> None:
self.reactor = reactor
self.start_state = start_state
self.timestep = timestep
self.save_path = save_path
self.pending_command: Optional[ReactorCommand] = None
self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False
self.reset_requested = False
self.page = 1 # 1=metrics, 2=schematic (placeholder)
self._last_state: Optional[PlantState] = None
self._trend_history: deque[tuple[float, float, float]] = deque(maxlen=120)
self.log_buffer: deque[str] = deque(maxlen=8)
self._log_handler: Optional[logging.Handler] = None
self._previous_handlers: list[logging.Handler] = []
self._logger = logging.getLogger("reactor_sim")
self.help_sections: list[tuple[str, list[DashboardKey]]] = [
(
"Reactor / Safety",
[
DashboardKey("q", "Quit & save"),
DashboardKey("space", "SCRAM"),
DashboardKey("r", "Reset & clear state"),
DashboardKey("a", "Toggle auto rod control"),
DashboardKey("F1/F2", "Metrics / schematic views"),
DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("1-9 / Numpad", "Set rods to 0.1 … 0.9 (manual)"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
DashboardKey("s/d", "Setpoint /+250 MW"),
DashboardKey("p", "Maintain core (shutdown required)"),
],
),
(
"Pumps",
[
DashboardKey("g/h", "Toggle primary pump 1/2"),
DashboardKey("j/k", "Toggle secondary pump 1/2"),
DashboardKey("m/n", "Maintain primary pumps 1/2"),
DashboardKey(",/.", "Maintain secondary pumps 1/2"),
],
),
(
"Generators",
[
DashboardKey("b/v", "Toggle generator 1/2"),
DashboardKey("x", "Toggle generator auto"),
DashboardKey("l/;", "Toggle relief primary/secondary"),
DashboardKey("B/V", "Maintain generator 1/2"),
],
),
(
"Turbines / Grid",
[
DashboardKey("t", "Toggle turbine bank"),
DashboardKey("Shift+1/2/3", "Toggle turbine units 1-3"),
DashboardKey("y/u/i", "Maintain turbine 1/2/3"),
DashboardKey("c", "Toggle consumer"),
],
),
]
def run(self) -> None:
curses.wrapper(self._main)
def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined]
curses.curs_set(0)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_CYAN, -1)
curses.init_pair(2, curses.COLOR_YELLOW, -1)
curses.init_pair(3, curses.COLOR_GREEN, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
stdscr.nodelay(True)
stdscr.keypad(True)
self._install_log_capture()
try:
while True:
self.sim = ReactorSimulation(
self.reactor,
timestep=self.timestep,
duration=None,
realtime=True,
command_provider=self._next_command,
)
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._last_state = state
self._draw(stdscr, state)
self._handle_input(stdscr)
if self.quit_requested or self.reset_requested:
# Persist the latest state if we are exiting early.
if self.sim:
self.sim.last_state = state
self.sim.stop()
break
finally:
if self.save_path and self.sim and self.sim.last_state and not self.reset_requested:
self.reactor.save_state(self.save_path, self.sim.last_state)
if self.quit_requested:
break
if self.reset_requested:
self._clear_saved_state()
self._reset_to_greenfield()
continue
break
finally:
self._restore_logging()
def _handle_input(self, stdscr: "curses._CursesWindow") -> None:
while True:
ch = stdscr.getch()
if ch == -1:
break
keyname = None
try:
keyname = curses.keyname(ch)
except curses.error:
keyname = None
if ch in (ord("q"), ord("Q")):
self.quit_requested = True
return
if ch == ord(" "):
self._queue_command(ReactorCommand.scram_all())
elif ch in (ord("o"), ord("O")):
continue
elif ch in (ord("g"), ord("G")):
self._toggle_primary_pump_unit(0)
elif ch in (ord("h"), ord("H")):
self._toggle_primary_pump_unit(1)
elif ch in (ord("j"), ord("J")):
self._toggle_secondary_pump_unit(0)
elif ch in (ord("k"), ord("K")):
self._toggle_secondary_pump_unit(1)
elif ch in (ord("p"), ord("P")):
self._queue_command(ReactorCommand.maintain("core"))
elif ch in (ord("b"), ord("B")):
self._toggle_generator_unit(0)
elif ch in (ord("v"), ord("V")):
self._toggle_generator_unit(1)
elif ch == ord("l"):
self._queue_command(ReactorCommand(primary_relief=not self.reactor.primary_relief_open))
elif ch == ord(";"):
self._queue_command(ReactorCommand(secondary_relief=not self.reactor.secondary_relief_open))
elif ch in (ord("x"), ord("X")):
self._queue_command(ReactorCommand(generator_auto=not self.reactor.generator_auto))
elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif keyname and keyname.decode(errors="ignore") in ("!", "@", "#", '"'):
name = keyname.decode(errors="ignore")
turbine_hotkeys = {"!": 0, "@": 1, "#": 2, '"': 1}
self._toggle_turbine_unit(turbine_hotkeys[name])
elif ch in (ord("!"), ord("@"), ord("#"), ord('"')):
turbine_hotkeys = {ord("!"): 0, ord("@"): 1, ord("#"): 2, ord('"'): 1}
self._toggle_turbine_unit(turbine_hotkeys[ch])
elif keyname and keyname.startswith(b"KP_") and keyname[-1:] in b"123456789":
target = (keyname[-1] - ord("0")) / 10.0 # type: ignore[arg-type]
self._queue_command(ReactorCommand(rod_position=target, rod_manual=True))
elif ord("1") <= ch <= ord("9"):
target = (ch - ord("0")) / 10.0
self._queue_command(ReactorCommand(rod_position=target, rod_manual=True))
elif ch in _NUMPAD_ROD_KEYS:
self._queue_command(ReactorCommand(rod_position=_NUMPAD_ROD_KEYS[ch], rod_manual=True))
elif curses.KEY_F1 <= ch <= curses.KEY_F9:
target = (ch - curses.KEY_F1 + 1) / 10.0
self._queue_command(ReactorCommand(rod_position=target, rod_manual=True))
elif ch in (ord("+"), ord("=")):
# Insert rods (increase fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(constants.ROD_MANUAL_STEP)))
elif ch == ord("-"):
# Withdraw rods (decrease fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-constants.ROD_MANUAL_STEP)))
elif ch == ord("["):
demand = self._current_demand() - 50.0
self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand)))
elif ch == ord("]"):
demand = self._current_demand() + 50.0
self._queue_command(ReactorCommand(consumer_demand=demand))
elif ch in (ord("c"), ord("C")):
online = not (self.reactor.consumer.online if self.reactor.consumer else False)
self._queue_command(ReactorCommand(consumer_online=online))
elif ch in (ord("r"), ord("R")):
self._request_reset()
elif ch in (ord("s"), ord("S")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0))
elif ch in (ord("d"), ord("D")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0))
elif ch in (ord("a"), ord("A")):
self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control))
elif ch in (ord("m"), ord("M")):
self._queue_command(ReactorCommand.maintain("primary_pump_1"))
elif ch in (ord("n"), ord("N")):
self._queue_command(ReactorCommand.maintain("primary_pump_2"))
elif ch == ord(","):
self._queue_command(ReactorCommand.maintain("secondary_pump_1"))
elif ch == ord("."):
self._queue_command(ReactorCommand.maintain("secondary_pump_2"))
elif ch in (ord("B"),):
self._queue_command(ReactorCommand.maintain("generator_1"))
elif ch in (ord("V"),):
self._queue_command(ReactorCommand.maintain("generator_2"))
elif ch in (ord("y"), ord("Y")):
self._queue_command(ReactorCommand.maintain("turbine_1"))
elif ch in (ord("u"), ord("U")):
self._queue_command(ReactorCommand.maintain("turbine_2"))
elif ch in (ord("i"), ord("I")):
self._queue_command(ReactorCommand.maintain("turbine_3"))
def _queue_command(self, command: ReactorCommand) -> None:
if self.pending_command is None:
self.pending_command = command
else:
for field in command.__dataclass_fields__: # type: ignore[attr-defined]
value = getattr(command, field)
if value is None or value is False:
continue
if field == "rod_step" and getattr(self.pending_command, field) is not None:
setattr(
self.pending_command,
field,
getattr(self.pending_command, field) + value,
)
else:
setattr(self.pending_command, field, value)
if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None:
self.pending_command.rod_manual = True
def _toggle_turbine_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.turbine_unit_active):
return
current = self.reactor.turbine_unit_active[index]
self._queue_command(ReactorCommand(turbine_units={index + 1: not current}))
def _toggle_primary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.primary_pump_units):
return
current = self.reactor.primary_pump_units[index]
self._queue_command(ReactorCommand(primary_pumps={index + 1: not current}))
def _toggle_secondary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.secondary_pump_units):
return
current = self.reactor.secondary_pump_units[index]
self._queue_command(ReactorCommand(secondary_pumps={index + 1: not current}))
def _toggle_generator_unit(self, index: int) -> None:
current = False
if self._last_state and index < len(self._last_state.generators):
gen = self._last_state.generators[index]
current = gen.running or gen.starting
self._queue_command(ReactorCommand(generator_units={index + 1: not current}))
def _request_reset(self) -> None:
self.reset_requested = True
if self.sim:
self.sim.stop()
def _clear_saved_state(self) -> None:
if not self.save_path:
return
path = Path(self.save_path)
try:
path.unlink()
LOGGER.info("Cleared saved state at %s", path)
except FileNotFoundError:
LOGGER.info("No saved state to clear at %s", path)
def _reset_to_greenfield(self) -> None:
LOGGER.info("Resetting reactor to initial state")
self.reactor = Reactor.default()
self.start_state = None
self.pending_command = None
self._last_state = None
self.reset_requested = False
self.log_buffer.clear()
def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]:
cmd = self.pending_command
self.pending_command = None
return cmd
def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
min_status = 6
if height < min_status + 12 or width < 70:
stdscr.addstr(
0,
0,
"Terminal too small; try >=70x16 or reduce font size.".ljust(width),
curses.color_pair(4),
)
stdscr.refresh()
return
log_rows = max(2, min(len(self.log_buffer) + 2, 8))
status_height = min(height - 1, max(min_status, min_status + log_rows))
data_height = max(1, height - status_height)
gap = 2
right_width = max(28, width // 3)
left_width = width - right_width - gap
if left_width < 50:
left_width = min(50, width - (18 + gap))
right_width = width - left_width - gap
data_height = max(1, data_height)
left_width = max(1, left_width)
right_width = max(1, right_width)
data_win = stdscr.derwin(data_height, left_width, 0, 0)
help_win = stdscr.derwin(data_height, right_width, 0, left_width + gap)
status_win = stdscr.derwin(status_height, width, data_height, 0)
self._draw_data_panel(data_win, state)
self._draw_help_panel(help_win)
self._draw_status_panel(status_win, state)
stdscr.refresh()
def _draw_data_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.box()
win.addstr(0, 2, " Plant Overview ", curses.color_pair(1) | curses.A_BOLD)
self._update_trends(state)
height, width = win.getmaxyx()
inner_height = height - 2
inner_width = width - 2
left_width = max(28, inner_width // 2)
right_width = inner_width - left_width
left_win = win.derwin(inner_height, left_width, 1, 1)
right_win = win.derwin(inner_height, right_width, 1, 1 + left_width)
for row in range(1, height - 1):
win.addch(row, 1 + left_width, curses.ACS_VLINE)
if left_width + 1 < width - 1:
win.addch(row, 1 + left_width + 1, curses.ACS_VLINE)
left_win.erase()
right_win.erase()
left_y = 0
left_y = self._draw_section(
left_win,
left_y,
"Core",
[
(
"Fuel Temp",
f"{state.core.fuel_temperature:6.1f} K (Max {constants.CORE_MELTDOWN_TEMPERATURE:4.0f})",
),
(
"Core Power",
f"{state.core.power_output_mw:6.1f} MW (Nom {constants.NORMAL_CORE_POWER_MW:4.0f}/Max {constants.TEST_MAX_POWER_MW:4.0f})",
),
("Neutron Flux", f"{state.core.neutron_flux:10.2e}"),
("Rods", f"{self.reactor.control.rod_fraction:.3f}"),
("Rod Mode", "AUTO" if not self.reactor.control.manual_control else "MANUAL"),
("Setpoint", f"{self.reactor.control.setpoint_mw:7.0f} MW"),
("Reactivity", f"{state.core.reactivity_margin:+.4f}"),
],
)
left_y = self._draw_section(
left_win,
left_y,
"Trends",
self._trend_lines(),
)
left_y = self._draw_section(left_win, left_y, "Key Poisons / Emitters", self._poison_lines(state))
left_y = self._draw_section(
left_win,
left_y,
"Primary Loop",
[
("Pump1", self._pump_status(state.primary_pumps, 0)),
("Pump2", self._pump_status(state.primary_pumps, 1)),
(
"Flow",
f"{state.primary_loop.mass_flow_rate:7.0f}/{self.reactor.primary_pump.nominal_flow * len(self.reactor.primary_pump_units):.0f} kg/s",
),
("Level", f"{state.primary_loop.level*100:6.1f}%"),
("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"),
("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K (Target {constants.PRIMARY_OUTLET_TARGET_K:4.0f})"),
("Pressure", f"{state.primary_loop.pressure:5.2f}/{constants.MAX_PRESSURE:4.1f} MPa"),
("Pressurizer", f"{self.reactor.pressurizer_level*100:6.1f}% @ {constants.PRIMARY_PRESSURIZER_SETPOINT_MPA:4.1f} MPa"),
("Relief", "OPEN" if self.reactor.primary_relief_open else "CLOSED"),
],
)
self._draw_section(
left_win,
left_y,
"Secondary Loop",
[
("Pump1", self._pump_status(state.secondary_pumps, 0)),
("Pump2", self._pump_status(state.secondary_pumps, 1)),
(
"Flow",
f"{state.secondary_loop.mass_flow_rate:7.0f}/{self.reactor.secondary_pump.nominal_flow * len(self.reactor.secondary_pump_units):.0f} kg/s",
),
("Level", f"{state.secondary_loop.level*100:6.1f}%"),
("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K (Target {constants.SECONDARY_OUTLET_TARGET_K:4.0f})"),
("Outlet Temp", f"{state.secondary_loop.temperature_out:7.1f} K (Target {constants.SECONDARY_OUTLET_TARGET_K:4.0f})"),
("Pressure", f"{state.secondary_loop.pressure:5.2f}/{constants.MAX_PRESSURE:4.1f} MPa"),
("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}/1.00"),
("Relief", "OPEN" if self.reactor.secondary_relief_open else "CLOSED"),
],
)
right_y = 0
consumer_status = "n/a"
consumer_demand = 0.0
if self.reactor.consumer:
consumer_status = "ONLINE" if self.reactor.consumer.online else "OFF"
consumer_demand = self.reactor.consumer.demand_mw
right_y = self._draw_section(
right_win,
right_y,
"Turbine / Grid",
[
("Turbines", " ".join(self._turbine_status_lines())),
("Rated Elec", f"{len(self.reactor.turbines)*self.reactor.turbines[0].rated_output_mw:7.1f} MW"),
("Steam Avail", f"{self._steam_available_power(state):7.1f} MW"),
("Unit1 Elec", f"{state.turbines[0].electrical_output_mw:7.1f} MW" if state.turbines else "n/a"),
(
"Unit2 Elec",
f"{state.turbines[1].electrical_output_mw:7.1f} MW" if len(state.turbines) > 1 else "n/a",
),
(
"Unit3 Elec",
f"{state.turbines[2].electrical_output_mw:7.1f} MW" if len(state.turbines) > 2 else "n/a",
),
("Throttle", f"{self.reactor.turbines[0].throttle:5.2f}" if self.reactor.turbines else "n/a"),
(
"Condenser",
(
f"P={state.turbines[0].condenser_pressure:4.2f}/{constants.CONDENSER_MAX_PRESSURE_MPA:4.2f} MPa "
f"T={state.turbines[0].condenser_temperature:6.1f}/{constants.CONDENSER_COOLING_WATER_TEMP_K:6.1f}K "
f"Foul={state.turbines[0].fouling_penalty*100:4.1f}%"
)
if state.turbines
else "n/a",
),
("Electrical", f"{state.total_electrical_output():7.1f} MW"),
("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"),
("Consumer", f"{consumer_status}"),
("Demand", f"{consumer_demand:7.1f} MW"),
("Steam Enthalpy", f"{state.turbines[0].steam_enthalpy:7.0f} kJ/kg" if state.turbines else "n/a"),
("Steam Flow", f"{state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality):7.0f} kg/s"),
],
)
right_y = self._draw_section(right_win, right_y, "Generators", self._generator_lines(state))
right_y = self._draw_section(right_win, right_y, "Power Stats", self._power_lines(state))
right_y = self._draw_section(right_win, right_y, "Heat Exchanger", self._heat_exchanger_lines(state))
right_y = self._draw_section(right_win, right_y, "Protections / Warnings", self._protection_lines(state))
right_y = self._draw_section(right_win, right_y, "Maintenance", self._maintenance_lines())
self._draw_health_bars(right_win, right_y)
def _draw_help_panel(self, win: "curses._CursesWindow") -> None:
def _add_safe(row: int, col: int, text: str, attr: int = 0) -> bool:
max_y, max_x = win.getmaxyx()
if row >= max_y - 1 or col >= max_x - 1:
return False
clipped = text[: max(0, max_x - col - 1)]
try:
win.addstr(row, col, clipped, attr)
except curses.error:
return False
return True
win.erase()
win.box()
_add_safe(0, 2, " Controls ", curses.color_pair(1) | curses.A_BOLD)
y = 2
for title, entries in self.help_sections:
if not _add_safe(y, 2, title, curses.color_pair(1) | curses.A_BOLD):
return
y += 1
for entry in entries:
if not _add_safe(y, 4, f"{entry.key:<8} {entry.description}"):
return
y += 1
y += 1
if not _add_safe(y, 2, "Tips:", curses.color_pair(2) | curses.A_BOLD):
return
tips = [
"Start pumps before withdrawing rods.",
"Bring turbine and consumer online after thermal stabilization.",
"Toggle turbine units (1/2/3) for staggered maintenance.",
"Use m/n/,/. for pump maintenance; B/V for generators.",
"Press 'r' to reset/clear state if you want a cold start.",
"Watch component health, DNB margin, and subcooling to avoid automatic trips.",
]
for idx, tip in enumerate(tips, start=y + 2):
if not _add_safe(idx, 4, f"- {tip}"):
break
def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1])
turbine_text = " ".join(self._turbine_status_lines())
msg = (
f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | "
f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | "
f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | "
f"Turbines {turbine_text} | Page {'Metrics' if self.page == 1 else 'Schematic'}"
)
win.addstr(1, 1, msg, curses.color_pair(3))
if self.reactor.health_monitor.failure_log:
win.addstr(
3,
1,
f"Failures: {', '.join(self.reactor.health_monitor.failure_log)}",
curses.color_pair(4) | curses.A_BOLD,
)
log_y = 4
else:
log_y = 3
for record in list(self.log_buffer):
if log_y >= win.getmaxyx()[0] - 1:
break
win.addstr(log_y, 1, record[: win.getmaxyx()[1] - 2], curses.color_pair(2))
log_y += 1
win.addstr(log_y, 1, "Press 'q' to exit and persist the current snapshot.", curses.color_pair(2))
def _draw_section(
self,
win: "curses._CursesWindow",
start_y: int,
title: str,
lines: list[tuple[str, str] | str],
) -> int:
height, width = win.getmaxyx()
inner_width = width - 4
if start_y >= height - 2:
return height - 2
win.addstr(start_y, 2, title[:inner_width], curses.A_BOLD | curses.color_pair(1))
row = start_y + 1
for line in lines:
if row >= height - 1:
break
if isinstance(line, tuple):
label, value = line
text = f"{label:<18}: {value}"
else:
text = line
win.addstr(row, 4, text[:inner_width])
row += 1
return row + 1
def _flow_arrow(self, flow: float) -> str:
if flow > 15000:
return "====>"
if flow > 5000:
return "===>"
if flow > 500:
return "->"
return "--"
def _pump_glyph(self, pump_state: PumpState | None) -> str:
if pump_state is None:
return "·"
status = getattr(pump_state, "status", "OFF")
if status == "RUN":
return ""
if status == "CAV":
return "!"
if status == "STARTING":
return ">"
if status == "STOPPING":
return "-"
return "·"
def _turbine_status_lines(self) -> list[str]:
if not self.reactor.turbine_unit_active:
return ["n/a"]
lines: list[str] = []
for idx, active in enumerate(self.reactor.turbine_unit_active):
label = f"{idx + 1}:"
status = "ON" if active else "OFF"
if idx < len(getattr(self._last_state, "turbines", [])):
t_state = self._last_state.turbines[idx]
status = getattr(t_state, "status", status)
lines.append(f"{label}{status}")
return lines
def _total_load_supplied(self, state: PlantState) -> float:
return sum(t.load_supplied_mw for t in state.turbines)
def _total_load_demand(self, state: PlantState) -> float:
return sum(t.load_demand_mw for t in state.turbines)
def _poison_lines(self, state: PlantState) -> list[tuple[str, str]]:
inventory = state.core.fission_product_inventory or {}
particles = state.core.emitted_particles or {}
lines: list[tuple[str, str]] = []
def fmt(symbol: str, label: str, qty: float) -> tuple[str, str]:
threshold = constants.KEY_POISON_THRESHOLDS.get(symbol)
flag = " !" if threshold is not None and qty >= threshold else ""
return (f"{label}{flag}", f"{qty:9.2e}")
lines.append(fmt("Xe", "Xe (xenon)", getattr(state.core, "xenon_inventory", 0.0)))
lines.append(fmt("Sm", "Sm (samarium)", inventory.get("Sm", 0.0)))
lines.append(fmt("I", "I (iodine)", getattr(state.core, "iodine_inventory", 0.0)))
try:
xe_penalty = -self.reactor.neutronics.xenon_penalty(state.core)
lines.append(("Xe Δρ", f"{xe_penalty:+.4f}"))
except Exception:
pass
lines.append(("Neutrons (src)", f"{particles.get('n', 0.0):9.2e}"))
lines.append(("Gammas", f"{particles.get('gamma', 0.0):9.2e}"))
lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}"))
return lines
def _health_lines(self) -> list[tuple[str, str]]:
comps = self.reactor.health_monitor.components
lines: list[tuple[str, str]] = []
for name, comp in comps.items():
pct = f"{comp.integrity*100:5.1f}%"
state = "FAILED" if comp.failed else pct
lines.append((name, state))
return lines
def _maintenance_lines(self) -> list[tuple[str, str]]:
if not self.reactor.maintenance_active:
return [("Active", "None")]
return [(comp, "IN PROGRESS") for comp in sorted(self.reactor.maintenance_active)]
def _generator_lines(self, state: PlantState) -> list[tuple[str, str]]:
if not state.generators:
return [("Status", "n/a")]
lines: list[tuple[str, str]] = []
control = "AUTO" if self.reactor.generator_auto else "MANUAL"
lines.append(("Control", control))
for idx, gen in enumerate(state.generators):
status = "RUN" if gen.running else "START" if gen.starting else "OFF"
spool = f" spool {gen.spool_remaining:4.1f}s" if gen.starting else ""
lines.append((f"Gen{idx + 1}", f"{status} {gen.power_output_mw:6.1f}/{self.reactor.generators[idx].rated_output_mw:4.0f} MW{spool}"))
lines.append((f" Battery", f"{gen.battery_charge*100:5.1f}% out {gen.battery_output_mw:4.1f} MW"))
return lines
def _power_lines(self, state: PlantState) -> list[tuple[str, str]]:
draws = getattr(state, "aux_draws", {}) or {}
primary_nom = constants.PUMP_POWER_MW * len(self.reactor.primary_pump_units)
secondary_nom = constants.PUMP_POWER_MW * len(self.reactor.secondary_pump_units)
lines = [
("Base Aux", f"{draws.get('base', 0.0):6.1f}/{constants.BASE_AUX_LOAD_MW:4.1f} MW"),
("Primary Aux", f"{draws.get('primary_pumps', 0.0):6.1f}/{primary_nom:4.1f} MW"),
("Secondary Aux", f"{draws.get('secondary_pumps', 0.0):6.1f}/{secondary_nom:4.1f} MW"),
("Aux Demand", f"{draws.get('total_demand', 0.0):6.1f} MW"),
("Aux Supplied", f"{draws.get('supplied', 0.0):6.1f} MW"),
("Gen Output", f"{draws.get('generator_output', 0.0):6.1f} MW"),
("Turbine Elec", f"{draws.get('turbine_output', 0.0):6.1f} MW"),
]
return lines
def _heat_exchanger_lines(self, state: PlantState) -> list[tuple[str, str]]:
delta_t = getattr(state, "primary_to_secondary_delta_t", 0.0)
eff = getattr(state, "heat_exchanger_efficiency", 0.0)
return [
("ΔT (pri-sec)", f"{delta_t:6.1f} K"),
("HX Eff", f"{eff*100:6.1f}%"),
]
def _protection_lines(self, state: PlantState) -> list[tuple[str, str]]:
lines: list[tuple[str, str]] = []
lines.append(("SCRAM", "ACTIVE" if self.reactor.shutdown else "CLEAR"))
if self.reactor.meltdown:
lines.append(("Meltdown", "IN PROGRESS"))
sec_flow_low = state.secondary_loop.mass_flow_rate <= 1.0 or not self.reactor.secondary_pump_active
heat_sink_risk = sec_flow_low and state.core.power_output_mw > 50.0
if heat_sink_risk:
heat_text = "TRIPPED low secondary flow >50 MW"
elif sec_flow_low:
heat_text = "ARMED (secondary off/low flow)"
else:
heat_text = "OK"
lines.append(("Heat sink", heat_text))
draws = getattr(state, "aux_draws", {}) or {}
demand = draws.get("total_demand", 0.0)
supplied = draws.get("supplied", 0.0)
if demand > 0.1 and supplied + 1e-6 < demand:
aux_text = f"DEFICIT {supplied:5.1f}/{demand:5.1f} MW"
elif demand > 0.1:
aux_text = f"OK {supplied:5.1f}/{demand:5.1f} MW"
else:
aux_text = "Idle"
lines.append(("Aux power", aux_text))
reliefs = []
if self.reactor.primary_relief_open:
reliefs.append("Primary")
if self.reactor.secondary_relief_open:
reliefs.append("Secondary")
lines.append(("Relief valves", ", ".join(reliefs) if reliefs else "Closed"))
return lines
def _steam_available_power(self, state: PlantState) -> float:
mass_flow = state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality)
if mass_flow <= 1.0:
return 0.0
if state.turbines:
enthalpy_kjkg = max(0.0, state.turbines[0].steam_enthalpy)
else:
enthalpy_kjkg = (constants.STEAM_LATENT_HEAT / 1_000.0)
return (enthalpy_kjkg * mass_flow) / 1_000.0
def _update_trends(self, state: PlantState) -> None:
self._trend_history.append((state.time_elapsed, state.core.fuel_temperature, state.core.power_output_mw))
def _trend_lines(self) -> list[tuple[str, str]]:
if len(self._trend_history) < 2:
return [("Fuel Temp Δ", "n/a"), ("Core Power Δ", "n/a")]
start_t, start_temp, start_power = self._trend_history[0]
end_t, end_temp, end_power = self._trend_history[-1]
duration = max(1.0, end_t - start_t)
temp_delta = end_temp - start_temp
power_delta = end_power - start_power
temp_rate = temp_delta / duration
power_rate = power_delta / duration
return [
("Fuel Temp Δ", f"{end_temp:7.1f} K (Δ{temp_delta:+6.1f} / {duration:4.0f}s, {temp_rate:+5.2f}/s)"),
("Core Power Δ", f"{end_power:7.1f} MW (Δ{power_delta:+6.1f} / {duration:4.0f}s, {power_rate:+5.2f}/s)"),
]
def _draw_health_bars(self, win: "curses._CursesWindow", start_y: int) -> int:
height, width = win.getmaxyx()
inner_width = width - 4
if start_y >= height - 2:
return height - 2
win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1))
bar_width = max(8, min(inner_width - 18, 40))
label_width = 16
row = start_y + 1
for name, comp in self.reactor.health_monitor.components.items():
if row >= height - 1:
break
label = f"{name:<{label_width}}"
target = 0.0 if comp.failed else comp.integrity
filled = int(bar_width * max(0.0, min(1.0, target)))
bar = "#" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
win.addstr(row, 4, f"{label}:")
bar_start = 4 + label_width + 1
win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color))
percent_text = "FAILED" if comp.failed else f"{comp.integrity*100:5.1f}%"
percent_x = min(width - len(percent_text) - 2, bar_start + bar_width + 2)
win.addstr(row, percent_x, percent_text, curses.color_pair(color))
row += 1
return row + 1
def _pump_status(self, pumps: list, index: int) -> str:
if index >= len(pumps):
return "n/a"
state = pumps[index]
status = getattr(state, "status", "ON" if state.active else "OFF")
return f"{status:<8} {state.flow_rate:6.0f} kg/s"
def _current_demand(self) -> float:
if self.reactor.consumer:
return self.reactor.consumer.demand_mw
return 0.0
def _clamped_rod(self, delta: float) -> float:
new_fraction = self.reactor.control.rod_fraction + delta
step = constants.ROD_MANUAL_STEP
quantized = round(new_fraction / step) * step
return max(0.0, min(0.95, quantized))
def _install_log_capture(self) -> None:
if self._log_handler:
return
self._previous_handlers = list(self._logger.handlers)
for handler in self._previous_handlers:
self._logger.removeHandler(handler)
handler = _DashboardLogHandler(self.log_buffer)
handler.setFormatter(logging.Formatter("%(levelname)s | %(message)s"))
self._logger.addHandler(handler)
self._logger.propagate = False
self._log_handler = handler
def _restore_logging(self) -> None:
if not self._log_handler:
return
self._logger.removeHandler(self._log_handler)
for handler in self._previous_handlers:
self._logger.addHandler(handler)
self._log_handler = None
self._previous_handlers = []
class _DashboardLogHandler(logging.Handler):
def __init__(self, buffer: deque[str]) -> None:
super().__init__()
self.buffer = buffer
self._last_msg: str | None = None
self._repeat_count: int = 0
def emit(self, record: logging.LogRecord) -> None:
msg = self.format(record)
if msg == self._last_msg:
self._repeat_count += 1
if self._repeat_count > 3:
return
else:
self._last_msg = msg
self._repeat_count = 0
self.buffer.append(msg)