feat: add realtime dashboard control

This commit is contained in:
Andrii Prokhorov
2025-11-21 17:22:02 +02:00
parent 15f2930b0c
commit 146e8fd26b
4 changed files with 201 additions and 1 deletions

View File

@@ -9,7 +9,7 @@ All source code lives under `src/reactor_sim`. Submodules map to plant systems:
- `python -m pip install -e .[dev]` — install editable package with optional tooling when dev dependencies are defined. - `python -m pip install -e .[dev]` — install editable package with optional tooling when dev dependencies are defined.
## Operations & Control Hooks ## Operations & Control Hooks
Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`. Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, +/- rods, [/ ] consumer demand, s/d setpoint).
The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches. The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches.
## Coding Style & Naming Conventions ## Coding Style & Naming Conventions

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from .atomic import Atom, AtomicPhysics, FissionEvent from .atomic import Atom, AtomicPhysics, FissionEvent
from .commands import ReactorCommand from .commands import ReactorCommand
from .consumer import ElectricalConsumer from .consumer import ElectricalConsumer
from .dashboard import ReactorDashboard
from .failures import HealthMonitor from .failures import HealthMonitor
from .logging_utils import configure_logging from .logging_utils import configure_logging
from .state import CoreState, CoolantLoopState, TurbineState, PlantState from .state import CoreState, CoolantLoopState, TurbineState, PlantState
@@ -24,5 +25,6 @@ __all__ = [
"ReactorCommand", "ReactorCommand",
"ElectricalConsumer", "ElectricalConsumer",
"HealthMonitor", "HealthMonitor",
"ReactorDashboard",
"configure_logging", "configure_logging",
] ]

View File

@@ -0,0 +1,186 @@
"""Interactive curses dashboard for real-time reactor operation."""
from __future__ import annotations
import curses
from dataclasses import dataclass
from typing import Optional
from .commands import ReactorCommand
from .reactor import Reactor
from .simulation import ReactorSimulation
from .state import PlantState
@dataclass
class DashboardKey:
key: str
description: str
class ReactorDashboard:
"""Minimal TUI dashboard allowing user control in real-time."""
def __init__(
self,
reactor: Reactor,
start_state: Optional[PlantState],
timestep: float = 1.0,
save_path: Optional[str] = None,
) -> None:
self.reactor = reactor
self.start_state = start_state
self.timestep = timestep
self.save_path = save_path
self.pending_command: Optional[ReactorCommand] = None
self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False
self.keys = [
DashboardKey("q", "Quit & save"),
DashboardKey("space", "SCRAM"),
DashboardKey("p", "Toggle primary pump"),
DashboardKey("o", "Toggle secondary pump"),
DashboardKey("t", "Toggle turbine"),
DashboardKey("c", "Toggle consumer"),
DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
DashboardKey("s", "Setpoint 250 MW"),
DashboardKey("d", "Setpoint +250 MW"),
]
def run(self) -> None:
curses.wrapper(self._main)
def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined]
curses.curs_set(0)
stdscr.nodelay(True)
self.sim = ReactorSimulation(
self.reactor,
timestep=self.timestep,
duration=None,
realtime=True,
command_provider=self._next_command,
)
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._draw(stdscr, state)
self._handle_input(stdscr, state)
if self.quit_requested:
self.sim.stop()
break
finally:
if self.save_path and self.sim and self.sim.last_state:
self.reactor.save_state(self.save_path, self.sim.last_state)
def _handle_input(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
while True:
ch = stdscr.getch()
if ch == -1:
break
if ch in (ord("q"), ord("Q")):
self.quit_requested = True
return
if ch == ord(" "):
self._queue_command(ReactorCommand.scram_all())
elif ch in (ord("p"), ord("P")):
self._queue_command(ReactorCommand(primary_pump_on=not self.reactor.primary_pump_active))
elif ch in (ord("o"), ord("O")):
self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active))
elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif ch in (ord("+"), ord("=")):
self._queue_command(ReactorCommand(rod_step=-0.02))
elif ch == ord("-"):
self._queue_command(ReactorCommand(rod_step=0.02))
elif ch == ord("["):
demand = self._current_demand() - 50.0
self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand)))
elif ch == ord("]"):
demand = self._current_demand() + 50.0
self._queue_command(ReactorCommand(consumer_demand=demand))
elif ch in (ord("c"), ord("C")):
online = not (self.reactor.consumer.online if self.reactor.consumer else False)
self._queue_command(ReactorCommand(consumer_online=online))
elif ch in (ord("s"), ord("S")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0))
elif ch in (ord("d"), ord("D")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0))
def _queue_command(self, command: ReactorCommand) -> None:
if self.pending_command is None:
self.pending_command = command
else:
for field in command.__dataclass_fields__: # type: ignore[attr-defined]
value = getattr(command, field)
if value is None or value is False:
continue
if field == "rod_step" and getattr(self.pending_command, field) is not None:
setattr(
self.pending_command,
field,
getattr(self.pending_command, field) + value,
)
else:
setattr(self.pending_command, field, value)
def _next_command(self, sim_time: float, _: PlantState) -> Optional[ReactorCommand]:
cmd = self.pending_command
self.pending_command = None
return cmd
def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
stdscr.erase()
stdscr.addstr(0, 0, "Realtime Reactor Dashboard".ljust(60))
stdscr.addstr(1, 0, f"Time: {state.time_elapsed:8.1f}s Mode: realtime")
stdscr.addstr(
3,
0,
f"Core Temp: {state.core.fuel_temperature:8.1f} K Power: {state.core.power_output_mw:8.1f} MW "
f"Flux: {state.core.neutron_flux:10.2e}",
)
stdscr.addstr(
4,
0,
f"Rod fraction: {self.reactor.control.rod_fraction:.3f} Setpoint: {self.reactor.control.setpoint_mw:.0f} MW",
)
stdscr.addstr(
6,
0,
f"Primary Loop: {'ON ' if self.reactor.primary_pump_active else 'OFF'} "
f"Flow {state.primary_loop.mass_flow_rate:7.0f} kg/s Outlet {state.primary_loop.temperature_out:6.1f} K",
)
stdscr.addstr(
7,
0,
f"Secondary Loop: {'ON ' if self.reactor.secondary_pump_active else 'OFF'} "
f"Flow {state.secondary_loop.mass_flow_rate:7.0f} kg/s Pressure {state.secondary_loop.pressure:4.1f} MPa",
)
stdscr.addstr(
9,
0,
f"Turbine: {'ON ' if self.reactor.turbine_active else 'OFF'} Electrical {state.turbine.electrical_output_mw:7.1f} MW "
f"Load {state.turbine.load_supplied_mw:7.1f}/{state.turbine.load_demand_mw:7.1f} MW",
)
if self.reactor.consumer:
stdscr.addstr(
10,
0,
f"Consumer: {'ONLINE ' if self.reactor.consumer.online else 'OFFLINE'} "
f"Demand {self.reactor.consumer.demand_mw:7.1f} MW",
)
health = self.reactor.health_monitor.components
stdscr.addstr(
12,
0,
"Health: " + " ".join(f"{name}:{comp.integrity*100:5.1f}%" for name, comp in health.items()),
)
stdscr.addstr(14, 0, "Controls:")
for idx, key in enumerate(self.keys, start=15):
stdscr.addstr(idx, 0, f"{key.key.ljust(8)} - {key.description}")
stdscr.refresh()
def _current_demand(self) -> float:
if self.reactor.consumer:
return self.reactor.consumer.demand_mw
return 0.0

View File

@@ -75,8 +75,20 @@ def main() -> None:
sim = ReactorSimulation(reactor, timestep=5.0, duration=duration, realtime=realtime) sim = ReactorSimulation(reactor, timestep=5.0, duration=duration, realtime=realtime)
load_path = os.getenv("FISSION_LOAD_STATE") load_path = os.getenv("FISSION_LOAD_STATE")
save_path = os.getenv("FISSION_SAVE_STATE") save_path = os.getenv("FISSION_SAVE_STATE")
dashboard_mode = os.getenv("FISSION_DASHBOARD", "0") == "1"
if load_path: if load_path:
sim.start_state = reactor.load_state(load_path) sim.start_state = reactor.load_state(load_path)
if dashboard_mode:
from .dashboard import ReactorDashboard
dashboard = ReactorDashboard(
reactor,
start_state=sim.start_state,
timestep=sim.timestep,
save_path=save_path,
)
dashboard.run()
return
try: try:
if realtime: if realtime:
LOGGER.info("Running in real-time mode (Ctrl+C to stop)...") LOGGER.info("Running in real-time mode (Ctrl+C to stop)...")