Files
Reactor-Sim/src/reactor_sim/dashboard.py
2025-11-21 17:26:39 +02:00

285 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Interactive curses dashboard for real-time reactor operation."""
from __future__ import annotations
import curses
from dataclasses import dataclass
from typing import Optional
from .commands import ReactorCommand
from .reactor import Reactor
from .simulation import ReactorSimulation
from .state import PlantState
@dataclass
class DashboardKey:
key: str
description: str
class ReactorDashboard:
"""Minimal TUI dashboard allowing user control in real-time."""
def __init__(
self,
reactor: Reactor,
start_state: Optional[PlantState],
timestep: float = 1.0,
save_path: Optional[str] = None,
) -> None:
self.reactor = reactor
self.start_state = start_state
self.timestep = timestep
self.save_path = save_path
self.pending_command: Optional[ReactorCommand] = None
self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False
self.keys = [
DashboardKey("q", "Quit & save"),
DashboardKey("space", "SCRAM"),
DashboardKey("p", "Toggle primary pump"),
DashboardKey("o", "Toggle secondary pump"),
DashboardKey("t", "Toggle turbine"),
DashboardKey("c", "Toggle consumer"),
DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
DashboardKey("s", "Setpoint 250 MW"),
DashboardKey("d", "Setpoint +250 MW"),
]
def run(self) -> None:
curses.wrapper(self._main)
def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined]
curses.curs_set(0)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_CYAN, -1)
curses.init_pair(2, curses.COLOR_YELLOW, -1)
curses.init_pair(3, curses.COLOR_GREEN, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
stdscr.nodelay(True)
self.sim = ReactorSimulation(
self.reactor,
timestep=self.timestep,
duration=None,
realtime=True,
command_provider=self._next_command,
)
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._draw(stdscr, state)
self._handle_input(stdscr)
if self.quit_requested:
self.sim.stop()
break
finally:
if self.save_path and self.sim and self.sim.last_state:
self.reactor.save_state(self.save_path, self.sim.last_state)
def _handle_input(self, stdscr: "curses._CursesWindow") -> None:
while True:
ch = stdscr.getch()
if ch == -1:
break
if ch in (ord("q"), ord("Q")):
self.quit_requested = True
return
if ch == ord(" "):
self._queue_command(ReactorCommand.scram_all())
elif ch in (ord("p"), ord("P")):
self._queue_command(ReactorCommand(primary_pump_on=not self.reactor.primary_pump_active))
elif ch in (ord("o"), ord("O")):
self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active))
elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif ch in (ord("+"), ord("=")):
self._queue_command(ReactorCommand(rod_step=-0.02))
elif ch == ord("-"):
self._queue_command(ReactorCommand(rod_step=0.02))
elif ch == ord("["):
demand = self._current_demand() - 50.0
self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand)))
elif ch == ord("]"):
demand = self._current_demand() + 50.0
self._queue_command(ReactorCommand(consumer_demand=demand))
elif ch in (ord("c"), ord("C")):
online = not (self.reactor.consumer.online if self.reactor.consumer else False)
self._queue_command(ReactorCommand(consumer_online=online))
elif ch in (ord("s"), ord("S")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0))
elif ch in (ord("d"), ord("D")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0))
def _queue_command(self, command: ReactorCommand) -> None:
if self.pending_command is None:
self.pending_command = command
return
for field in command.__dataclass_fields__: # type: ignore[attr-defined]
value = getattr(command, field)
if value is None or value is False:
continue
if field == "rod_step" and getattr(self.pending_command, field) is not None:
setattr(
self.pending_command,
field,
getattr(self.pending_command, field) + value,
)
else:
setattr(self.pending_command, field, value)
def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]:
cmd = self.pending_command
self.pending_command = None
return cmd
def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
if height < 24 or width < 90:
stdscr.addstr(
0,
0,
"Terminal window too small. Resize to at least 90x24.".ljust(width),
curses.color_pair(4),
)
stdscr.refresh()
return
data_height = height - 6
left_width = min(max(width - 32, 60), width - 20)
right_width = width - left_width
data_win = stdscr.derwin(data_height, left_width, 0, 0)
help_win = stdscr.derwin(data_height, right_width, 0, left_width)
status_win = stdscr.derwin(6, width, data_height, 0)
self._draw_data_panel(data_win, state)
self._draw_help_panel(help_win)
self._draw_status_panel(status_win, state)
stdscr.refresh()
def _draw_data_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.box()
win.addstr(0, 2, " Plant Overview ", curses.color_pair(1) | curses.A_BOLD)
y = 2
y = self._draw_section(
win,
y,
"Core",
[
f"Tfuel {state.core.fuel_temperature:8.1f} K Power {state.core.power_output_mw:8.1f} MW",
f"Neutron Flux {state.core.neutron_flux:10.2e} Rods {self.reactor.control.rod_fraction:.3f}",
f"Setpoint {self.reactor.control.setpoint_mw:7.0f} MW Reactivity {state.core.reactivity_margin:+.4f}",
],
)
y = self._draw_section(
win,
y,
"Primary Loop",
[
f"Pump {'ON ' if self.reactor.primary_pump_active else 'OFF'} "
f"Flow {state.primary_loop.mass_flow_rate:7.0f} kg/s",
f"T_in {state.primary_loop.temperature_in:7.1f} K "
f"T_out {state.primary_loop.temperature_out:7.1f} K",
f"Pressure {state.primary_loop.pressure:5.2f} MPa",
],
)
y = self._draw_section(
win,
y,
"Secondary Loop",
[
f"Pump {'ON ' if self.reactor.secondary_pump_active else 'OFF'} "
f"Flow {state.secondary_loop.mass_flow_rate:7.0f} kg/s",
f"T_in {state.secondary_loop.temperature_in:7.1f} K "
f"Pressure {state.secondary_loop.pressure:5.2f} MPa",
f"Steam quality {state.secondary_loop.steam_quality:5.2f}",
],
)
consumer_status = "n/a"
consumer_demand = 0.0
if self.reactor.consumer:
consumer_status = "ONLINE" if self.reactor.consumer.online else "OFF"
consumer_demand = self.reactor.consumer.demand_mw
y = self._draw_section(
win,
y,
"Turbine / Grid",
[
f"Turbine {'ON ' if self.reactor.turbine_active else 'OFF'} "
f"Electrical {state.turbine.electrical_output_mw:7.1f} MW",
f"Load {state.turbine.load_supplied_mw:7.1f}/{state.turbine.load_demand_mw:7.1f} MW",
f"Consumer {consumer_status} demand {consumer_demand:7.1f} MW",
],
)
self._draw_health_bar(win, y + 1)
def _draw_help_panel(self, win: "curses._CursesWindow") -> None:
win.erase()
win.box()
win.addstr(0, 2, " Controls ", curses.color_pair(1) | curses.A_BOLD)
y = 2
for entry in self.keys:
win.addstr(y, 2, f"{entry.key:<8} {entry.description}")
y += 1
win.addstr(y + 1, 2, "Tips:", curses.color_pair(2) | curses.A_BOLD)
tips = [
"Start pumps before withdrawing rods.",
"Bring turbine and consumer online after thermal stabilization.",
"Watch component health to avoid automatic trips.",
]
for idx, tip in enumerate(tips, start=y + 2):
win.addstr(idx, 4, f"- {tip}")
def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1])
msg = (
f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | "
f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | "
f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | "
f"Turbine {'ON' if self.reactor.turbine_active else 'OFF'}"
)
win.addstr(1, 1, msg, curses.color_pair(3))
if self.reactor.health_monitor.failure_log:
win.addstr(
3,
1,
f"Failures: {', '.join(self.reactor.health_monitor.failure_log)}",
curses.color_pair(4) | curses.A_BOLD,
)
win.addstr(4, 1, "Press 'q' to exit and persist the current snapshot.", curses.color_pair(2))
def _draw_section(
self,
win: "curses._CursesWindow",
start_y: int,
title: str,
lines: list[str],
) -> int:
width = win.getmaxyx()[1] - 4
win.addstr(start_y, 2, title, curses.A_BOLD | curses.color_pair(1))
for idx, line in enumerate(lines, start=start_y + 1):
win.addstr(idx, 4, line[:width])
return start_y + len(lines) + 2
def _draw_health_bar(self, win: "curses._CursesWindow", start_y: int) -> None:
win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1))
bar_width = win.getmaxyx()[1] - 10
for idx, (name, comp) in enumerate(self.reactor.health_monitor.components.items(), start=1):
filled = int(bar_width * comp.integrity)
bar = "" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
win.addstr(start_y + idx, 4, f"{name:<12}", curses.A_BOLD)
win.addstr(start_y + idx, 16, bar, curses.color_pair(color))
win.addstr(start_y + idx, 18 + bar_width, f"{comp.integrity*100:5.1f}%", curses.color_pair(color))
def _current_demand(self) -> float:
if self.reactor.consumer:
return self.reactor.consumer.demand_mw
return 0.0