Files
Reactor-Sim/src/reactor_sim/textual_dashboard.py

472 lines
22 KiB
Python

"""Interactive Textual-based dashboard mirroring the curses layout."""
from __future__ import annotations
import logging
import os
from collections import deque
from pathlib import Path
from typing import Optional
from textual.app import App, ComposeResult
from textual.containers import Horizontal, VerticalScroll
from textual.widgets import Static, Header, Footer
from textual.timer import Timer
from . import constants
from .reactor import Reactor
from .simulation import ReactorSimulation
from .state import PlantState
from .commands import ReactorCommand
from .snapshot import snapshot_lines
LOGGER = logging.getLogger(__name__)
def _bar(label: str, value: float, width: int = 24) -> str:
filled = int(max(0.0, min(1.0, value)) * width)
return f"{label:<14} [{'#'*filled}{'-'*(width-filled)}] {value*100:5.1f}%"
class TextualDashboard(App):
"""Textual dashboard with controls and sections similar to the curses view."""
CSS = """
Screen { layout: vertical; }
.col { width: 1fr; }
.panel { padding: 0 1; border: solid gray; }
"""
BINDINGS = [
("q", "quit", "Quit"),
("space", "scram", "SCRAM"),
("g", "toggle_primary_p1", "P1"),
("h", "toggle_primary_p2", "P2"),
("j", "toggle_secondary_p1", "S1"),
("k", "toggle_secondary_p2", "S2"),
("b", "toggle_generator1", "Gen1"),
("v", "toggle_generator2", "Gen2"),
("t", "toggle_turbine_bank", "Turbines"),
("1", "toggle_turbine_unit('1')", "T1"),
("2", "toggle_turbine_unit('2')", "T2"),
("3", "toggle_turbine_unit('3')", "T3"),
("+", "rods_insert", "+ rods"),
("-", "rods_withdraw", "- rods"),
("[", "demand_down", "Demand -"),
("]", "demand_up", "Demand +"),
("s", "setpoint_down", "SP -"),
("d", "setpoint_up", "SP +"),
("l", "toggle_primary_relief", "Relief pri"),
(";", "toggle_secondary_relief", "Relief sec"),
("c", "toggle_consumer", "Consumer"),
("a", "toggle_auto_rods", "Auto rods"),
("f12", "snapshot", "Snapshot"),
]
timestep: float = 1.0
def __init__(
self,
reactor: Reactor,
start_state: Optional[PlantState],
timestep: float = 1.0,
save_path: Optional[str] = None,
) -> None:
super().__init__()
self.reactor = reactor
self.state = start_state or self.reactor.initial_state()
self.timestep = timestep
self.save_path = save_path
self._pending: deque[ReactorCommand] = deque()
self._timer: Optional[Timer] = None
self._trend_history: deque[tuple[float, float, float]] = deque(maxlen=120)
self.log_buffer: deque[str] = deque(maxlen=8)
self._log_handler: Optional[logging.Handler] = None
self._previous_handlers: list[logging.Handler] = []
snap_at_env = os.getenv("FISSION_SNAPSHOT_AT")
self.snapshot_at = float(snap_at_env) if snap_at_env else None
self.snapshot_done = False
self.snapshot_path = Path(os.getenv("FISSION_SNAPSHOT_PATH", "artifacts/textual_snapshot.txt"))
# Panels
self.core_panel = Static(classes="panel")
self.trend_panel = Static(classes="panel")
self.poison_panel = Static(classes="panel")
self.primary_panel = Static(classes="panel")
self.secondary_panel = Static(classes="panel")
self.turbine_panel = Static(classes="panel")
self.generator_panel = Static(classes="panel")
self.power_panel = Static(classes="panel")
self.hx_panel = Static(classes="panel")
self.protection_panel = Static(classes="panel")
self.maintenance_panel = Static(classes="panel")
self.health_panel = Static(classes="panel")
self.help_panel = Static(classes="panel")
self.log_panel = Static(classes="panel")
self.status_panel = Static(classes="panel")
def compose(self) -> ComposeResult:
yield Header()
with Horizontal():
with VerticalScroll(classes="col"):
yield self.core_panel
yield self.trend_panel
yield self.poison_panel
yield self.primary_panel
yield self.secondary_panel
with VerticalScroll(classes="col"):
yield self.turbine_panel
yield self.generator_panel
yield self.power_panel
yield self.hx_panel
yield self.protection_panel
yield self.maintenance_panel
yield self.health_panel
yield self.log_panel
yield self.help_panel
yield self.status_panel
yield Footer()
def on_mount(self) -> None:
self._install_log_capture()
self._timer = self.set_interval(self.timestep, self._tick, pause=False)
self._render_panels()
def action_quit(self) -> None:
if self._timer:
self._timer.pause()
if self.save_path and self.state:
self.reactor.save_state(self.save_path, self.state)
self._restore_logging()
self.exit()
# Command helpers
def _enqueue(self, cmd: ReactorCommand) -> None:
self._pending.append(cmd)
def action_scram(self) -> None:
self._enqueue(ReactorCommand.scram_all())
def action_toggle_primary_p1(self) -> None:
self._enqueue(ReactorCommand(primary_pumps={1: not self.reactor.primary_pump_units[0]}))
def action_toggle_primary_p2(self) -> None:
self._enqueue(ReactorCommand(primary_pumps={2: not self.reactor.primary_pump_units[1]}))
def action_toggle_secondary_p1(self) -> None:
self._enqueue(ReactorCommand(secondary_pumps={1: not self.reactor.secondary_pump_units[0]}))
def action_toggle_secondary_p2(self) -> None:
self._enqueue(ReactorCommand(secondary_pumps={2: not self.reactor.secondary_pump_units[1]}))
def action_toggle_generator1(self) -> None:
self._enqueue(ReactorCommand(generator_units={1: True}))
def action_toggle_generator2(self) -> None:
self._enqueue(ReactorCommand(generator_units={2: True}))
def action_toggle_turbine_bank(self) -> None:
self._enqueue(ReactorCommand(turbine_on=not self.reactor.turbine_active))
def action_toggle_turbine_unit(self, unit: str) -> None:
idx = int(unit)
current = self.reactor.turbine_unit_active[idx - 1] if idx - 1 < len(self.reactor.turbine_unit_active) else False
self._enqueue(ReactorCommand(turbine_units={idx: not current}))
def action_rods_insert(self) -> None:
self._enqueue(ReactorCommand(rod_position=min(0.95, self.reactor.control.rod_fraction + constants.ROD_MANUAL_STEP)))
def action_rods_withdraw(self) -> None:
self._enqueue(ReactorCommand(rod_position=max(0.0, self.reactor.control.rod_fraction - constants.ROD_MANUAL_STEP)))
def action_demand_down(self) -> None:
if self.reactor.consumer:
self._enqueue(ReactorCommand(consumer_demand=max(0.0, self.reactor.consumer.demand_mw - 50.0)))
def action_demand_up(self) -> None:
if self.reactor.consumer:
self._enqueue(ReactorCommand(consumer_demand=self.reactor.consumer.demand_mw + 50.0))
def action_setpoint_down(self) -> None:
self._enqueue(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0))
def action_setpoint_up(self) -> None:
self._enqueue(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0))
def action_toggle_primary_relief(self) -> None:
self._enqueue(ReactorCommand(primary_relief=not self.reactor.primary_relief_open))
def action_toggle_secondary_relief(self) -> None:
self._enqueue(ReactorCommand(secondary_relief=not self.reactor.secondary_relief_open))
def action_toggle_consumer(self) -> None:
if self.reactor.consumer:
self._enqueue(ReactorCommand(consumer_online=not self.reactor.consumer.online))
def action_toggle_auto_rods(self) -> None:
self._enqueue(ReactorCommand(rod_manual=not self.reactor.control.manual_control))
def action_snapshot(self) -> None:
self._save_snapshot(auto=False)
def _merge_commands(self) -> Optional[ReactorCommand]:
if not self._pending:
return None
cmd = ReactorCommand()
while self._pending:
nxt = self._pending.popleft()
for field in nxt.__dataclass_fields__: # type: ignore[attr-defined]
val = getattr(nxt, field)
if val is None or val is False:
continue
setattr(cmd, field, val)
return cmd
def _tick(self) -> None:
cmd = self._merge_commands()
self.reactor.step(self.state, self.timestep, cmd)
self._render_panels()
if self.snapshot_at is not None and not self.snapshot_done and self.state.time_elapsed >= self.snapshot_at:
self._save_snapshot(auto=True)
def _render_panels(self) -> None:
self._trend_history.append((self.state.time_elapsed, self.state.core.fuel_temperature, self.state.core.power_output_mw))
self.core_panel.update(
f"[bold cyan]Core[/bold cyan]\\n"
f"Power {self.state.core.power_output_mw:6.1f} MW (Nom {constants.NORMAL_CORE_POWER_MW:4.0f}/Max {constants.TEST_MAX_POWER_MW:4.0f})\\n"
f"Fuel {self.state.core.fuel_temperature:6.1f} K Rods {self.reactor.control.rod_fraction:.3f} ({'AUTO' if not self.reactor.control.manual_control else 'MAN'})\\n"
f"Setpoint {self.reactor.control.setpoint_mw:5.0f} MW Reactivity {self.state.core.reactivity_margin:+.4f}\\n"
f"DNB {self.state.core.dnb_margin:4.2f} Subcool {self.state.core.subcooling_margin:4.1f}K"
)
self.trend_panel.update(self._trend_text())
self.poison_panel.update(self._poison_text())
self.primary_panel.update(
f"[bold cyan]Primary Loop[/bold cyan]\\n"
f"Flow {self.state.primary_loop.mass_flow_rate:7.0f}/{self.reactor.primary_pump.nominal_flow * len(self.reactor.primary_pump_units):.0f} kg/s\\n"
f"Level {self.state.primary_loop.level*100:6.1f}%\\n"
f"Tin {self.state.primary_loop.temperature_in:7.1f} K Tout {self.state.primary_loop.temperature_out:7.1f} K (Target {constants.PRIMARY_OUTLET_TARGET_K:4.0f})\\n"
f"P {self.state.primary_loop.pressure:5.2f}/{constants.MAX_PRESSURE:4.1f} MPa Pressurizer {self.reactor.pressurizer_level*100:6.1f}% @ {constants.PRIMARY_PRESSURIZER_SETPOINT_MPA:4.1f} MPa\\n"
f"Relief {'OPEN' if self.reactor.primary_relief_open else 'CLOSED'} Pumps {[p.status for p in self.state.primary_pumps]}"
)
self.secondary_panel.update(
f"[bold cyan]Secondary Loop[/bold cyan]\\n"
f"Flow {self.state.secondary_loop.mass_flow_rate:7.0f}/{self.reactor.secondary_pump.nominal_flow * len(self.reactor.secondary_pump_units):.0f} kg/s\\n"
f"Level {self.state.secondary_loop.level*100:6.1f}%\\n"
f"Tin {self.state.secondary_loop.temperature_in:7.1f} K Tout {self.state.secondary_loop.temperature_out:7.1f} K (Target {constants.SECONDARY_OUTLET_TARGET_K:4.0f})\\n"
f"P {self.state.secondary_loop.pressure:5.2f}/{constants.MAX_PRESSURE:4.1f} MPa q {self.state.secondary_loop.steam_quality:5.2f}/1.00\\n"
f"Relief {'OPEN' if self.reactor.secondary_relief_open else 'CLOSED'} Pumps {[p.status for p in self.state.secondary_pumps]}"
)
self.turbine_panel.update(self._turbine_text())
self.generator_panel.update(self._generator_text())
self.power_panel.update(self._power_text())
self.hx_panel.update(
f"[bold cyan]Heat Exchanger[/bold cyan]\\n"
f"ΔT (pri-sec) {self.state.primary_to_secondary_delta_t:4.0f} K\\n"
f"Efficiency {self.state.heat_exchanger_efficiency*100:5.1f}%"
)
self.protection_panel.update(self._protection_text())
self.maintenance_panel.update(self._maintenance_text())
self.health_panel.update(self._health_text())
self.log_panel.update(self._log_text())
self.help_panel.update(self._help_text())
failures = ", ".join(self.reactor.health_monitor.failure_log) if self.reactor.health_monitor.failure_log else "None"
self.status_panel.update(
f"[bold cyan]Status[/bold cyan]\\n"
f"Time {self.state.time_elapsed:6.1f}s\\n"
f"Consumer {'ON' if (self.reactor.consumer and self.reactor.consumer.online) else 'OFF'} Demand {self.reactor.consumer.demand_mw if self.reactor.consumer else 0.0:5.1f} MW\\n"
f"Failures: {failures}"
)
def _trend_text(self) -> str:
if len(self._trend_history) < 2:
return "[bold cyan]Trends[/bold cyan]\\nFuel Temp Δ n/a\\nCore Power Δ n/a"
start_t, start_temp, start_power = self._trend_history[0]
end_t, end_temp, end_power = self._trend_history[-1]
duration = max(1.0, end_t - start_t)
temp_delta = end_temp - start_temp
power_delta = end_power - start_power
temp_rate = temp_delta / duration
power_rate = power_delta / duration
return (
"[bold cyan]Trends[/bold cyan]\\n"
f"Fuel Temp Δ {end_temp:7.1f} K (Δ{temp_delta:+6.1f} / {duration:4.0f}s, {temp_rate:+5.2f}/s)\\n"
f"Core Power Δ {end_power:7.1f} MW (Δ{power_delta:+6.1f} / {duration:4.0f}s, {power_rate:+5.2f}/s)"
)
def _poison_text(self) -> str:
inventory = self.state.core.fission_product_inventory or {}
particles = self.state.core.emitted_particles or {}
xe = getattr(self.state.core, "xenon_inventory", 0.0)
sm = inventory.get("Sm", 0.0)
iodine = inventory.get("I", 0.0)
xe_drho = getattr(self.state.core, "reactivity_margin", 0.0)
return (
"[bold cyan]Key Poisons / Emitters[/bold cyan]\\n"
f"Xe (xenon): {xe:9.2e}\\n"
f"Sm (samarium): {sm:9.2e}\\n"
f"I (iodine): {iodine:9.2e}\\n"
f"Xe Δρ: {xe_drho:+.4f}\\n"
f"Neutrons (src): {particles.get('neutrons', 0.0):.2e}"
)
def _turbine_text(self) -> str:
steam_avail = self._steam_available_power(self.state)
enthalpy = self.state.turbines[0].steam_enthalpy if self.state.turbines else 0.0
cond = ""
if self.state.turbines:
cond = (
f"Cond P {self.state.turbines[0].condenser_pressure:4.2f}/{constants.CONDENSER_MAX_PRESSURE_MPA:4.2f} MPa "
f"T {self.state.turbines[0].condenser_temperature:6.1f}K"
)
lines = [
"[bold cyan]Turbine / Grid[/bold cyan]",
f"Turbines {' '.join(self._turbine_status_lines()) if self._turbine_status_lines() else 'n/a'}",
f"Rated Elec {len(self.reactor.turbines)*self.reactor.turbines[0].rated_output_mw:7.1f} MW" if self.reactor.turbines else "Rated Elec n/a",
f"Steam Avail {steam_avail:5.1f} MW h={enthalpy:5.0f} kJ/kg {cond}",
]
if self.state.turbines:
lines.append("Unit Elec " + " ".join([f"{t.electrical_output_mw:6.1f}MW" for t in self.state.turbines]))
lines.append(f"Throttle {self.reactor.turbines[0].throttle:5.2f}" if self.reactor.turbines else "Throttle n/a")
lines.append(f"Electrical {self.state.total_electrical_output():7.1f} MW Load {self._total_load_supplied(self.state):7.1f}/{self._total_load_demand(self.state):7.1f} MW")
if self.reactor.consumer:
lines.append(f"Consumer {'ONLINE' if self.reactor.consumer.online else 'OFF'} Demand {self.reactor.consumer.demand_mw:7.1f} MW")
return "\\n".join(lines)
def _generator_text(self) -> str:
lines = ["[bold cyan]Generators[/bold cyan]"]
for idx, g in enumerate(self.state.generators):
status = "RUN" if g.running else "OFF"
if g.starting:
status = "START"
lines.append(f"Gen{idx+1}: {status:5} {g.power_output_mw:5.1f} MW batt {g.battery_charge*100:5.1f}%")
return "\\n".join(lines)
def _power_text(self) -> str:
draws = getattr(self.state, "aux_draws", {}) or {}
base = draws.get("base", 0.0)
prim = draws.get("primary_pumps", 0.0)
sec = draws.get("secondary_pumps", 0.0)
demand = draws.get("total_demand", 0.0)
supplied = draws.get("supplied", 0.0)
gen_out = draws.get("generator_output", 0.0)
turb_out = draws.get("turbine_output", 0.0)
return (
"[bold cyan]Power Stats[/bold cyan]\\n"
f"Base Aux {base:5.1f} MW Prim Aux {prim:5.1f} MW Sec Aux {sec:5.1f} MW\\n"
f"Aux demand {demand:5.1f} MW supplied {supplied:5.1f} MW\\n"
f"Gen out {gen_out:5.1f} MW Turbine out {turb_out:5.1f} MW"
)
def _protection_text(self) -> str:
lines = ["[bold cyan]Protections / Warnings[/bold cyan]"]
lines.append(f"SCRAM {'ACTIVE' if self.reactor.shutdown else 'CLEAR'}")
if self.reactor.meltdown:
lines.append("Meltdown IN PROGRESS")
sec_flow_low = self.state.secondary_loop.mass_flow_rate <= 1.0 or not self.reactor.secondary_pump_active
heat_sink_risk = sec_flow_low and self.state.core.power_output_mw > 50.0
if heat_sink_risk:
heat_text = "TRIPPED low secondary flow >50 MW"
elif sec_flow_low:
heat_text = "ARMED (secondary off/low flow)"
else:
heat_text = "OK"
lines.append(f"Heat sink {heat_text}")
lines.append(f"DNB margin {self.state.core.dnb_margin:4.2f}")
lines.append(f"Subcooling {self.state.core.subcooling_margin:5.1f} K")
lines.append(f"Reliefs pri={'OPEN' if self.reactor.primary_relief_open else 'CLOSED'} sec={'OPEN' if self.reactor.secondary_relief_open else 'CLOSED'}")
return "\\n".join(lines)
def _maintenance_text(self) -> str:
active = list(self.reactor.maintenance_active)
return "[bold cyan]Maintenance[/bold cyan]\\nActive: " + (", ".join(active) if active else "None")
def _health_text(self) -> str:
lines = ["[bold cyan]Component Health[/bold cyan]"]
for name, comp in self.reactor.health_monitor.components.items():
lines.append(_bar(name, comp.integrity))
return "\\n".join(lines)
def _help_text(self) -> str:
tips = [
"Start pumps before withdrawing rods.",
"Bring turbine/consumer online after thermal stabilization.",
"Toggle turbine units 1/2/3 individually.",
"Use m/n/,/. in curses; mapped to j/k etc here.",
"F12 saves a snapshot; set FISSION_SNAPSHOT_AT for auto.",
]
return "[bold cyan]Controls & Tips[/bold cyan]\\n" + "\\n".join(f"- {t}" for t in tips)
def _log_text(self) -> str:
lines = ["[bold cyan]Logs[/bold cyan]"]
if not self.log_buffer:
lines.append("No recent logs.")
else:
lines.extend(list(self.log_buffer))
return "\\n".join(lines)
def _turbine_status_lines(self) -> list[str]:
if not self.reactor.turbine_unit_active:
return []
lines: list[str] = []
for idx, active in enumerate(self.reactor.turbine_unit_active):
label = f"{idx + 1}:"
status = "ON" if active else "OFF"
if idx < len(getattr(self.state, "turbines", [])):
t_state = self.state.turbines[idx]
status = getattr(t_state, "status", status)
lines.append(f"{label}{status}")
return lines
def _total_load_supplied(self, state: PlantState) -> float:
return sum(t.load_supplied_mw for t in state.turbines)
def _total_load_demand(self, state: PlantState) -> float:
return sum(t.load_demand_mw for t in state.turbines)
def _steam_available_power(self, state: PlantState) -> float:
mass_flow = state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality)
if mass_flow <= 1.0:
return 0.0
enthalpy = state.turbines[0].steam_enthalpy if state.turbines else (constants.STEAM_LATENT_HEAT / 1_000.0)
return (enthalpy * mass_flow) / 1_000.0
def _snapshot_lines(self) -> list[str]:
return snapshot_lines(self.reactor, self.state)
def _save_snapshot(self, auto: bool = False) -> None:
try:
self.snapshot_path.parent.mkdir(parents=True, exist_ok=True)
self.snapshot_path.write_text("\\n".join(self._snapshot_lines()))
self.snapshot_done = True
LOGGER.info("Saved dashboard snapshot to %s%s", self.snapshot_path, " (auto)" if auto else "")
except Exception as exc: # pragma: no cover
LOGGER.error("Failed to save snapshot: %s", exc)
def _install_log_capture(self) -> None:
logger = logging.getLogger("reactor_sim")
self._previous_handlers = list(logger.handlers)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
def emit(record: logging.LogRecord) -> None:
msg = handler.format(record)
self.log_buffer.append(msg)
handler.emit = emit # type: ignore[assignment]
logger.handlers = [handler]
logger.setLevel(logging.INFO)
self._log_handler = handler
def _restore_logging(self) -> None:
logger = logging.getLogger("reactor_sim")
if self._previous_handlers:
logger.handlers = self._previous_handlers
if self._log_handler and self._log_handler in logger.handlers:
logger.removeHandler(self._log_handler)
def run_textual_dashboard(reactor: Reactor, start_state: Optional[PlantState], timestep: float, save_path: Optional[str]) -> None:
app = TextualDashboard(reactor, start_state, timestep=timestep, save_path=save_path)
app.run()