"""Interactive Textual-based dashboard (optional).""" from __future__ import annotations import logging from collections import deque from typing import Optional from textual.app import App, ComposeResult from textual.widgets import Static, Header, Footer from textual.containers import Horizontal, Vertical from textual.reactive import reactive from textual.timer import Timer from . import constants from .reactor import Reactor from .simulation import ReactorSimulation from .state import PlantState from .commands import ReactorCommand LOGGER = logging.getLogger(__name__) class TextualDashboard(App): """A lightweight Textual UI with keybindings mapped to reactor commands.""" CSS = """ Screen { layout: vertical; } .row { width: 1fr; } .panel { padding: 0 1; } """ BINDINGS = [ ("q", "quit", "Quit"), ("space", "scram", "SCRAM"), ("g", "toggle_primary_p1", "P1"), ("h", "toggle_primary_p2", "P2"), ("j", "toggle_secondary_p1", "S1"), ("k", "toggle_secondary_p2", "S2"), ("b", "toggle_generator1", "Gen1"), ("v", "toggle_generator2", "Gen2"), ("t", "toggle_turbine_bank", "Turbines"), ("1", "toggle_turbine_unit('1')", "T1"), ("2", "toggle_turbine_unit('2')", "T2"), ("3", "toggle_turbine_unit('3')", "T3"), ("+", "rods_insert", "+ rods"), ("-", "rods_withdraw", "- rods"), ("[", "demand_down", "Demand -"), ("]", "demand_up", "Demand +"), ("s", "setpoint_down", "SP -"), ("d", "setpoint_up", "SP +"), ("l", "toggle_primary_relief", "Relief pri"), (";", "toggle_secondary_relief", "Relief sec"), ("c", "toggle_consumer", "Consumer"), ("a", "toggle_auto_rods", "Auto rods"), ] timestep: float = 1.0 def __init__( self, reactor: Reactor, start_state: Optional[PlantState], timestep: float = 1.0, save_path: Optional[str] = None, ) -> None: super().__init__() self.reactor = reactor self.state = start_state or self.reactor.initial_state() self.timestep = timestep self.save_path = save_path self._sim: Optional[ReactorSimulation] = None self._pending: deque[ReactorCommand] = deque() self._timer: Optional[Timer] = None self.core_panel = Static(classes="panel") self.loop_panel = Static(classes="panel") self.turbine_panel = Static(classes="panel") self.status_panel = Static(classes="panel") def compose(self) -> ComposeResult: yield Header() with Vertical(): with Horizontal(classes="row"): yield self.core_panel yield self.loop_panel with Horizontal(classes="row"): yield self.turbine_panel yield self.status_panel yield Footer() def on_mount(self) -> None: self._timer = self.set_interval(self.timestep, self._tick, pause=False) self._render_panels() def action_quit(self) -> None: if self._timer: self._timer.pause() if self.save_path and self.state: self.reactor.save_state(self.save_path, self.state) self.exit() # Command helpers def _enqueue(self, cmd: ReactorCommand) -> None: self._pending.append(cmd) def action_scram(self) -> None: self._enqueue(ReactorCommand.scram_all()) def action_toggle_primary_p1(self) -> None: self._enqueue(ReactorCommand(primary_pumps={1: not self.reactor.primary_pump_units[0]})) def action_toggle_primary_p2(self) -> None: self._enqueue(ReactorCommand(primary_pumps={2: not self.reactor.primary_pump_units[1]})) def action_toggle_secondary_p1(self) -> None: self._enqueue(ReactorCommand(secondary_pumps={1: not self.reactor.secondary_pump_units[0]})) def action_toggle_secondary_p2(self) -> None: self._enqueue(ReactorCommand(secondary_pumps={2: not self.reactor.secondary_pump_units[1]})) def action_toggle_generator1(self) -> None: self._enqueue(ReactorCommand(generator_units={1: True})) def action_toggle_generator2(self) -> None: self._enqueue(ReactorCommand(generator_units={2: True})) def action_toggle_turbine_bank(self) -> None: self._enqueue(ReactorCommand(turbine_on=not self.reactor.turbine_active)) def action_toggle_turbine_unit(self, unit: str) -> None: idx = int(unit) current = self.reactor.turbine_unit_active[idx - 1] if idx - 1 < len(self.reactor.turbine_unit_active) else False self._enqueue(ReactorCommand(turbine_units={idx: not current})) def action_rods_insert(self) -> None: self._enqueue(ReactorCommand(rod_position=min(0.95, self.reactor.control.rod_fraction + constants.ROD_MANUAL_STEP))) def action_rods_withdraw(self) -> None: self._enqueue(ReactorCommand(rod_position=max(0.0, self.reactor.control.rod_fraction - constants.ROD_MANUAL_STEP))) def action_demand_down(self) -> None: if self.reactor.consumer: self._enqueue(ReactorCommand(consumer_demand=max(0.0, self.reactor.consumer.demand_mw - 50.0))) def action_demand_up(self) -> None: if self.reactor.consumer: self._enqueue(ReactorCommand(consumer_demand=self.reactor.consumer.demand_mw + 50.0)) def action_setpoint_down(self) -> None: self._enqueue(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0)) def action_setpoint_up(self) -> None: self._enqueue(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0)) def action_toggle_primary_relief(self) -> None: self._enqueue(ReactorCommand(primary_relief=not self.reactor.primary_relief_open)) def action_toggle_secondary_relief(self) -> None: self._enqueue(ReactorCommand(secondary_relief=not self.reactor.secondary_relief_open)) def action_toggle_consumer(self) -> None: if self.reactor.consumer: self._enqueue(ReactorCommand(consumer_online=not self.reactor.consumer.online)) def action_toggle_auto_rods(self) -> None: self._enqueue(ReactorCommand(rod_manual=not self.reactor.control.manual_control)) def _merge_commands(self) -> Optional[ReactorCommand]: if not self._pending: return None # Take the last command to approximate latest intent. cmd = ReactorCommand() while self._pending: nxt = self._pending.popleft() for field in nxt.__dataclass_fields__: # type: ignore[attr-defined] val = getattr(nxt, field) if val is None or val is False: continue setattr(cmd, field, val) return cmd def _tick(self) -> None: cmd = self._merge_commands() self.reactor.step(self.state, self.timestep, cmd) self._render_panels() def _render_panels(self) -> None: core = self.state.core prim = self.state.primary_loop sec = self.state.secondary_loop t0 = self.state.turbines[0] if self.state.turbines else None self.core_panel.update( f"[bold cyan]Core[/bold cyan]\\n" f"Power {core.power_output_mw:6.1f} MW Fuel {core.fuel_temperature:6.1f} K\\n" f"Rods {self.reactor.control.rod_fraction:.3f} ({'AUTO' if not self.reactor.control.manual_control else 'MAN'})\\n" f"Setpoint {self.reactor.control.setpoint_mw:5.0f} MW Reactivity {core.reactivity_margin:+.4f}\\n" f"DNB {core.dnb_margin:4.2f} Subcool {core.subcooling_margin:4.1f}K" ) self.loop_panel.update( f"[bold cyan]Loops[/bold cyan]\\n" f"Primary P {prim.pressure:4.1f}/{constants.MAX_PRESSURE:4.1f} MPa Tin {prim.temperature_in:6.1f}K Tout {prim.temperature_out:6.1f}K\\n" f"Flow {prim.mass_flow_rate:6.0f} kg/s Pressurizer {self.reactor.pressurizer_level*100:5.1f}%\\n" f"Secondary P {sec.pressure:4.1f}/{constants.MAX_PRESSURE:4.1f} MPa Tin {sec.temperature_in:6.1f}K Tout {sec.temperature_out:6.1f}K q {sec.steam_quality:4.2f}\\n" f"HX ΔT {self.state.primary_to_secondary_delta_t:4.0f}K Eff {self.state.heat_exchanger_efficiency*100:5.1f}%" ) if t0: condenser = f"P={t0.condenser_pressure:4.2f}/{constants.CONDENSER_MAX_PRESSURE_MPA:4.2f}MPa T={t0.condenser_temperature:6.1f}K" else: condenser = "n/a" turbine_lines = [ f"[bold cyan]Turbines/Grid[/bold cyan]", f"Steam avail {getattr(self, '_steam_available_power', lambda s:0.0)(self.state):5.1f} MW h={t0.steam_enthalpy if t0 else 0:5.0f} kJ/kg Cond {condenser}", " ".join([f"T{idx+1}:{t.electrical_output_mw:5.1f}MW" for idx, t in enumerate(self.state.turbines)]) if self.state.turbines else "Turbines n/a", f"Electrical {self.state.total_electrical_output():5.1f} MW Load {self._total_load_supplied(self.state):5.1f}/{self._total_load_demand(self.state):5.1f} MW", ] self.turbine_panel.update("\\n".join(turbine_lines)) failures = ", ".join(self.reactor.health_monitor.failure_log) if self.reactor.health_monitor.failure_log else "None" self.status_panel.update( f"[bold cyan]Status[/bold cyan]\\n" f"Time {self.state.time_elapsed:6.1f}s\\n" f"Pumps pri {[p.status for p in self.state.primary_pumps]} sec {[p.status for p in self.state.secondary_pumps]}\\n" f"Reliefs pri={'OPEN' if self.reactor.primary_relief_open else 'CLOSED'} sec={'OPEN' if self.reactor.secondary_relief_open else 'CLOSED'}\\n" f"Failures: {failures}" ) def _total_load_supplied(self, state: PlantState) -> float: return sum(t.load_supplied_mw for t in state.turbines) def _total_load_demand(self, state: PlantState) -> float: return sum(t.load_demand_mw for t in state.turbines) def _steam_available_power(self, state: PlantState) -> float: mass_flow = state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality) if mass_flow <= 1.0: return 0.0 enthalpy = state.turbines[0].steam_enthalpy if state.turbines else (constants.STEAM_LATENT_HEAT / 1_000.0) return (enthalpy * mass_flow) / 1_000.0 def run_textual_dashboard(reactor: Reactor, start_state: Optional[PlantState], timestep: float, save_path: Optional[str]) -> None: app = TextualDashboard(reactor, start_state, timestep=timestep, save_path=save_path) app.run()