diff --git a/pyproject.toml b/pyproject.toml index a932256..c884aea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,11 @@ authors = [ dependencies = [] [project.optional-dependencies] -dev = ["pytest>=7.0", "rich>=13.7.0"] +dev = ["pytest>=7.0"] +dashboard = [ + "rich>=13.7.0", + "textual>=0.50.0", +] [build-system] requires = ["setuptools>=61"] diff --git a/src/reactor_sim/simulation.py b/src/reactor_sim/simulation.py index ef211ad..9528365 100644 --- a/src/reactor_sim/simulation.py +++ b/src/reactor_sim/simulation.py @@ -120,17 +120,16 @@ def main() -> None: if dashboard_mode: if alternate_dashboard: try: - from .rich_dashboard import RichDashboard + from .textual_dashboard import run_textual_dashboard except ImportError as exc: # pragma: no cover - optional dependency - LOGGER.error("Rich dashboard requested but 'rich' is not installed: %s", exc) + LOGGER.error("Textual dashboard requested but dependency missing: %s", exc) return - dashboard = RichDashboard( + run_textual_dashboard( reactor, start_state=sim.start_state, timestep=sim.timestep, save_path=save_path, ) - dashboard.run() return else: from .dashboard import ReactorDashboard diff --git a/src/reactor_sim/textual_dashboard.py b/src/reactor_sim/textual_dashboard.py new file mode 100644 index 0000000..4722451 --- /dev/null +++ b/src/reactor_sim/textual_dashboard.py @@ -0,0 +1,243 @@ +"""Interactive Textual-based dashboard (optional).""" + +from __future__ import annotations + +import logging +from collections import deque +from typing import Optional + +from textual.app import App, ComposeResult +from textual.widgets import Static, Header, Footer +from textual.containers import Horizontal, Vertical +from textual.reactive import reactive +from textual.timer import Timer + +from . import constants +from .reactor import Reactor +from .simulation import ReactorSimulation +from .state import PlantState +from .commands import ReactorCommand + +LOGGER = logging.getLogger(__name__) + + +class TextualDashboard(App): + """A lightweight Textual UI with keybindings mapped to reactor commands.""" + + CSS = """ + Screen { layout: vertical; } + .row { width: 1fr; } + .panel { padding: 0 1; } + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ("space", "scram", "SCRAM"), + ("g", "toggle_primary_p1", "P1"), + ("h", "toggle_primary_p2", "P2"), + ("j", "toggle_secondary_p1", "S1"), + ("k", "toggle_secondary_p2", "S2"), + ("b", "toggle_generator1", "Gen1"), + ("v", "toggle_generator2", "Gen2"), + ("t", "toggle_turbine_bank", "Turbines"), + ("1", "toggle_turbine_unit('1')", "T1"), + ("2", "toggle_turbine_unit('2')", "T2"), + ("3", "toggle_turbine_unit('3')", "T3"), + ("+", "rods_insert", "+ rods"), + ("-", "rods_withdraw", "- rods"), + ("[", "demand_down", "Demand -"), + ("]", "demand_up", "Demand +"), + ("s", "setpoint_down", "SP -"), + ("d", "setpoint_up", "SP +"), + ("l", "toggle_primary_relief", "Relief pri"), + (";", "toggle_secondary_relief", "Relief sec"), + ("c", "toggle_consumer", "Consumer"), + ("a", "toggle_auto_rods", "Auto rods"), + ] + + timestep: float = 1.0 + + def __init__( + self, + reactor: Reactor, + start_state: Optional[PlantState], + timestep: float = 1.0, + save_path: Optional[str] = None, + ) -> None: + super().__init__() + self.reactor = reactor + self.state = start_state or self.reactor.initial_state() + self.timestep = timestep + self.save_path = save_path + self._sim: Optional[ReactorSimulation] = None + self._pending: deque[ReactorCommand] = deque() + self._timer: Optional[Timer] = None + + self.core_panel = Static(classes="panel") + self.loop_panel = Static(classes="panel") + self.turbine_panel = Static(classes="panel") + self.status_panel = Static(classes="panel") + + def compose(self) -> ComposeResult: + yield Header() + with Vertical(): + with Horizontal(classes="row"): + yield self.core_panel + yield self.loop_panel + with Horizontal(classes="row"): + yield self.turbine_panel + yield self.status_panel + yield Footer() + + def on_mount(self) -> None: + self._timer = self.set_interval(self.timestep, self._tick, pause=False) + self._render_panels() + + def action_quit(self) -> None: + if self._timer: + self._timer.pause() + if self.save_path and self.state: + self.reactor.save_state(self.save_path, self.state) + self.exit() + + # Command helpers + def _enqueue(self, cmd: ReactorCommand) -> None: + self._pending.append(cmd) + + def action_scram(self) -> None: + self._enqueue(ReactorCommand.scram_all()) + + def action_toggle_primary_p1(self) -> None: + self._enqueue(ReactorCommand(primary_pumps={1: not self.reactor.primary_pump_units[0]})) + + def action_toggle_primary_p2(self) -> None: + self._enqueue(ReactorCommand(primary_pumps={2: not self.reactor.primary_pump_units[1]})) + + def action_toggle_secondary_p1(self) -> None: + self._enqueue(ReactorCommand(secondary_pumps={1: not self.reactor.secondary_pump_units[0]})) + + def action_toggle_secondary_p2(self) -> None: + self._enqueue(ReactorCommand(secondary_pumps={2: not self.reactor.secondary_pump_units[1]})) + + def action_toggle_generator1(self) -> None: + self._enqueue(ReactorCommand(generator_units={1: True})) + + def action_toggle_generator2(self) -> None: + self._enqueue(ReactorCommand(generator_units={2: True})) + + def action_toggle_turbine_bank(self) -> None: + self._enqueue(ReactorCommand(turbine_on=not self.reactor.turbine_active)) + + def action_toggle_turbine_unit(self, unit: str) -> None: + idx = int(unit) + current = self.reactor.turbine_unit_active[idx - 1] if idx - 1 < len(self.reactor.turbine_unit_active) else False + self._enqueue(ReactorCommand(turbine_units={idx: not current})) + + def action_rods_insert(self) -> None: + self._enqueue(ReactorCommand(rod_position=min(0.95, self.reactor.control.rod_fraction + constants.ROD_MANUAL_STEP))) + + def action_rods_withdraw(self) -> None: + self._enqueue(ReactorCommand(rod_position=max(0.0, self.reactor.control.rod_fraction - constants.ROD_MANUAL_STEP))) + + def action_demand_down(self) -> None: + if self.reactor.consumer: + self._enqueue(ReactorCommand(consumer_demand=max(0.0, self.reactor.consumer.demand_mw - 50.0))) + + def action_demand_up(self) -> None: + if self.reactor.consumer: + self._enqueue(ReactorCommand(consumer_demand=self.reactor.consumer.demand_mw + 50.0)) + + def action_setpoint_down(self) -> None: + self._enqueue(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0)) + + def action_setpoint_up(self) -> None: + self._enqueue(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0)) + + def action_toggle_primary_relief(self) -> None: + self._enqueue(ReactorCommand(primary_relief=not self.reactor.primary_relief_open)) + + def action_toggle_secondary_relief(self) -> None: + self._enqueue(ReactorCommand(secondary_relief=not self.reactor.secondary_relief_open)) + + def action_toggle_consumer(self) -> None: + if self.reactor.consumer: + self._enqueue(ReactorCommand(consumer_online=not self.reactor.consumer.online)) + + def action_toggle_auto_rods(self) -> None: + self._enqueue(ReactorCommand(rod_manual=not self.reactor.control.manual_control)) + + def _merge_commands(self) -> Optional[ReactorCommand]: + if not self._pending: + return None + # Take the last command to approximate latest intent. + cmd = ReactorCommand() + while self._pending: + nxt = self._pending.popleft() + for field in nxt.__dataclass_fields__: # type: ignore[attr-defined] + val = getattr(nxt, field) + if val is None or val is False: + continue + setattr(cmd, field, val) + return cmd + + def _tick(self) -> None: + cmd = self._merge_commands() + self.reactor.step(self.state, self.timestep, cmd) + self._render_panels() + + def _render_panels(self) -> None: + core = self.state.core + prim = self.state.primary_loop + sec = self.state.secondary_loop + t0 = self.state.turbines[0] if self.state.turbines else None + self.core_panel.update( + f"[bold cyan]Core[/bold cyan]\\n" + f"Power {core.power_output_mw:6.1f} MW Fuel {core.fuel_temperature:6.1f} K\\n" + f"Rods {self.reactor.control.rod_fraction:.3f} ({'AUTO' if not self.reactor.control.manual_control else 'MAN'})\\n" + f"Setpoint {self.reactor.control.setpoint_mw:5.0f} MW Reactivity {core.reactivity_margin:+.4f}\\n" + f"DNB {core.dnb_margin:4.2f} Subcool {core.subcooling_margin:4.1f}K" + ) + self.loop_panel.update( + f"[bold cyan]Loops[/bold cyan]\\n" + f"Primary P {prim.pressure:4.1f}/{constants.MAX_PRESSURE:4.1f} MPa Tin {prim.temperature_in:6.1f}K Tout {prim.temperature_out:6.1f}K\\n" + f"Flow {prim.mass_flow_rate:6.0f} kg/s Pressurizer {self.reactor.pressurizer_level*100:5.1f}%\\n" + f"Secondary P {sec.pressure:4.1f}/{constants.MAX_PRESSURE:4.1f} MPa Tin {sec.temperature_in:6.1f}K Tout {sec.temperature_out:6.1f}K q {sec.steam_quality:4.2f}\\n" + f"HX ΔT {self.state.primary_to_secondary_delta_t:4.0f}K Eff {self.state.heat_exchanger_efficiency*100:5.1f}%" + ) + if t0: + condenser = f"P={t0.condenser_pressure:4.2f}/{constants.CONDENSER_MAX_PRESSURE_MPA:4.2f}MPa T={t0.condenser_temperature:6.1f}K" + else: + condenser = "n/a" + turbine_lines = [ + f"[bold cyan]Turbines/Grid[/bold cyan]", + f"Steam avail {getattr(self, '_steam_available_power', lambda s:0.0)(self.state):5.1f} MW h={t0.steam_enthalpy if t0 else 0:5.0f} kJ/kg Cond {condenser}", + " ".join([f"T{idx+1}:{t.electrical_output_mw:5.1f}MW" for idx, t in enumerate(self.state.turbines)]) if self.state.turbines else "Turbines n/a", + f"Electrical {self.state.total_electrical_output():5.1f} MW Load {self._total_load_supplied(self.state):5.1f}/{self._total_load_demand(self.state):5.1f} MW", + ] + self.turbine_panel.update("\\n".join(turbine_lines)) + failures = ", ".join(self.reactor.health_monitor.failure_log) if self.reactor.health_monitor.failure_log else "None" + self.status_panel.update( + f"[bold cyan]Status[/bold cyan]\\n" + f"Time {self.state.time_elapsed:6.1f}s\\n" + f"Pumps pri {[p.status for p in self.state.primary_pumps]} sec {[p.status for p in self.state.secondary_pumps]}\\n" + f"Reliefs pri={'OPEN' if self.reactor.primary_relief_open else 'CLOSED'} sec={'OPEN' if self.reactor.secondary_relief_open else 'CLOSED'}\\n" + f"Failures: {failures}" + ) + + def _total_load_supplied(self, state: PlantState) -> float: + return sum(t.load_supplied_mw for t in state.turbines) + + def _total_load_demand(self, state: PlantState) -> float: + return sum(t.load_demand_mw for t in state.turbines) + + def _steam_available_power(self, state: PlantState) -> float: + mass_flow = state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality) + if mass_flow <= 1.0: + return 0.0 + enthalpy = state.turbines[0].steam_enthalpy if state.turbines else (constants.STEAM_LATENT_HEAT / 1_000.0) + return (enthalpy * mass_flow) / 1_000.0 + + +def run_textual_dashboard(reactor: Reactor, start_state: Optional[PlantState], timestep: float, save_path: Optional[str]) -> None: + app = TextualDashboard(reactor, start_state, timestep=timestep, save_path=save_path) + app.run()