diff --git a/AGENTS.md b/AGENTS.md index 98ceaf6..c66e388 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -4,12 +4,15 @@ All source code lives under `src/reactor_sim`. Submodules map to plant systems: `fuel.py` and `neutronics.py` govern fission power, `thermal.py` and `coolant.py` cover heat transfer and pumps, `turbine.py` drives the steam cycle, and `consumer.py` represents external electrical loads. High-level coordination happens in `reactor.py` and `simulation.py`. The convenience runner `run_simulation.py` executes the default scenario; add notebooks or scenario scripts under `experiments/` (create as needed). Keep assets such as plots or exported telemetry inside `artifacts/`. ## Build, Test, and Development Commands -- `python -m reactor_sim.simulation` — run the default 10-minute transient and print JSON snapshots. -- `python run_simulation.py` — same as above, handy for IDE launch configs. -- `python -m pip install -e .[dev]` — install editable package with optional tooling when dev dependencies are defined. +- `.venv/bin/python -m reactor_sim.simulation` — run the default 10-minute transient and print JSON snapshots using the checked-in virtualenv. +- `.venv/bin/python run_simulation.py` — same as above, handy for IDE launch configs. +- `.venv/bin/python -m pip install -e .[dev]` — install editable package with optional tooling when dev dependencies are defined. +- When running pytest or ad-hoc scripts, prefer `.venv/bin/python -m pytest` so everyone uses the shared interpreter and dependency set that ships with the repo. +- State snapshots now auto-save/load from `artifacts/last_state.json`; override the path via `FISSION_STATE_PATH` (or continue to use `FISSION_SAVE_STATE`/`FISSION_LOAD_STATE` for explicit overrides). +- A git remote for `origin` is configured; push changes to `origin/main` once work is complete so dashboards stay in sync. ## Operations & Control Hooks -Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, +/- rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation. +Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, 1/2/3 toggle individual turbines, r reset/clear saved state, +/- rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation. The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches. ## Coding Style & Naming Conventions @@ -25,4 +28,4 @@ Write commits in imperative mood (`Add turbine moisture separator model`). Squas Sim parameters live in constructors; never hard-code environment-specific paths. When adding new physics, guard unstable calculations with clamps and highlight operating limits in comments. Sensitive experiments (e.g., beyond design basis) should default to disabled flags so scripted studies remain safe by default. ## Reliability & Persistence -Component wear is tracked via `failures.py`; stress from overheating, pump starvation, or turbine imbalance will degrade integrity and eventually disable the affected subsystem with automatic SCRAM for core damage. Persist plant snapshots with `ControlSystem.save_state()`/`load_state()` (used by `Reactor.save_state`/`load_state`) so long-running studies can resume; `FISSION_LOAD_STATE`/`FISSION_SAVE_STATE` env vars wire this into the CLI. +Component wear is tracked via `failures.py`; stress from overheating, pump starvation, or turbine imbalance will degrade integrity and eventually disable the affected subsystem with automatic SCRAM for core damage. Plant state now persists between runs to `artifacts/last_state.json` by default (override via `FISSION_STATE_PATH` or the explicit save/load env vars). In the dashboard, press `r` to clear the saved snapshot and reboot the reactor to a cold, green-field state whenever you need a clean slate. diff --git a/src/reactor_sim/commands.py b/src/reactor_sim/commands.py index 1a7e59d..158395d 100644 --- a/src/reactor_sim/commands.py +++ b/src/reactor_sim/commands.py @@ -20,8 +20,15 @@ class ReactorCommand: consumer_online: bool | None = None consumer_demand: float | None = None rod_manual: bool | None = None + turbine_units: dict[int, bool] | None = None + maintenance_components: tuple[str, ...] = tuple() @classmethod def scram_all(cls) -> "ReactorCommand": """Convenience constructor for an emergency shutdown.""" return cls(scram=True, turbine_on=False, primary_pump_on=True, secondary_pump_on=True) + + @classmethod + def maintain(cls, *components: str) -> "ReactorCommand": + """Request maintenance for the provided component names.""" + return cls(maintenance_components=tuple(components)) diff --git a/src/reactor_sim/dashboard.py b/src/reactor_sim/dashboard.py index fc910c2..227292e 100644 --- a/src/reactor_sim/dashboard.py +++ b/src/reactor_sim/dashboard.py @@ -6,6 +6,7 @@ import curses import logging from collections import deque from dataclasses import dataclass +from pathlib import Path from typing import Optional from .commands import ReactorCommand @@ -37,6 +38,7 @@ class ReactorDashboard: self.pending_command: Optional[ReactorCommand] = None self.sim: Optional[ReactorSimulation] = None self.quit_requested = False + self.reset_requested = False self.log_buffer: deque[str] = deque(maxlen=4) self._log_handler: Optional[logging.Handler] = None self._previous_handlers: list[logging.Handler] = [] @@ -47,7 +49,9 @@ class ReactorDashboard: DashboardKey("p", "Toggle primary pump"), DashboardKey("o", "Toggle secondary pump"), DashboardKey("t", "Toggle turbine"), + DashboardKey("1/2/3", "Toggle turbine units 1-3"), DashboardKey("c", "Toggle consumer"), + DashboardKey("r", "Reset & clear state"), DashboardKey("+/-", "Withdraw/insert rods"), DashboardKey("[/]", "Adjust consumer demand −/+50 MW"), DashboardKey("s", "Setpoint −250 MW"), @@ -68,24 +72,34 @@ class ReactorDashboard: curses.init_pair(4, curses.COLOR_RED, -1) stdscr.nodelay(True) self._install_log_capture() - self.sim = ReactorSimulation( - self.reactor, - timestep=self.timestep, - duration=None, - realtime=True, - command_provider=self._next_command, - ) - self.sim.start_state = self.start_state try: - for state in self.sim.run(): - self._draw(stdscr, state) - self._handle_input(stdscr) + while True: + self.sim = ReactorSimulation( + self.reactor, + timestep=self.timestep, + duration=None, + realtime=True, + command_provider=self._next_command, + ) + self.sim.start_state = self.start_state + try: + for state in self.sim.run(): + self._draw(stdscr, state) + self._handle_input(stdscr) + if self.quit_requested or self.reset_requested: + self.sim.stop() + break + finally: + if self.save_path and self.sim and self.sim.last_state and not self.reset_requested: + self.reactor.save_state(self.save_path, self.sim.last_state) if self.quit_requested: - self.sim.stop() break + if self.reset_requested: + self._clear_saved_state() + self._reset_to_greenfield() + continue + break finally: - if self.save_path and self.sim and self.sim.last_state: - self.reactor.save_state(self.save_path, self.sim.last_state) self._restore_logging() def _handle_input(self, stdscr: "curses._CursesWindow") -> None: @@ -104,6 +118,9 @@ class ReactorDashboard: self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active)) elif ch in (ord("t"), ord("T")): self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active)) + elif ord("1") <= ch <= ord("9"): + idx = ch - ord("1") + self._toggle_turbine_unit(idx) elif ch in (ord("+"), ord("=")): self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05))) elif ch == ord("-"): @@ -117,6 +134,8 @@ class ReactorDashboard: elif ch in (ord("c"), ord("C")): online = not (self.reactor.consumer.online if self.reactor.consumer else False) self._queue_command(ReactorCommand(consumer_online=online)) + elif ch in (ord("r"), ord("R")): + self._request_reset() elif ch in (ord("s"), ord("S")): self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0)) elif ch in (ord("d"), ord("D")): @@ -143,6 +162,35 @@ class ReactorDashboard: if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None: self.pending_command.rod_manual = True + def _toggle_turbine_unit(self, index: int) -> None: + if index < 0 or index >= len(self.reactor.turbine_unit_active): + return + current = self.reactor.turbine_unit_active[index] + self._queue_command(ReactorCommand(turbine_units={index + 1: not current})) + + def _request_reset(self) -> None: + self.reset_requested = True + if self.sim: + self.sim.stop() + + def _clear_saved_state(self) -> None: + if not self.save_path: + return + path = Path(self.save_path) + try: + path.unlink() + LOGGER.info("Cleared saved state at %s", path) + except FileNotFoundError: + LOGGER.info("No saved state to clear at %s", path) + + def _reset_to_greenfield(self) -> None: + LOGGER.info("Resetting reactor to initial state") + self.reactor = Reactor.default() + self.start_state = None + self.pending_command = None + self.reset_requested = False + self.log_buffer.clear() + def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]: cmd = self.pending_command @@ -230,9 +278,9 @@ class ReactorDashboard: y, "Turbine / Grid", [ - ("Turbine", "ON" if self.reactor.turbine_active else "OFF"), - ("Electrical", f"{state.turbine.electrical_output_mw:7.1f} MW"), - ("Load", f"{state.turbine.load_supplied_mw:7.1f}/{state.turbine.load_demand_mw:7.1f} MW"), + ("Turbines", " ".join(self._turbine_status_lines())), + ("Electrical", f"{state.total_electrical_output():7.1f} MW"), + ("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"), ("Consumer", f"{consumer_status}"), ("Demand", f"{consumer_demand:7.1f} MW"), ], @@ -251,6 +299,8 @@ class ReactorDashboard: tips = [ "Start pumps before withdrawing rods.", "Bring turbine and consumer online after thermal stabilization.", + "Toggle turbine units (1/2/3) for staggered maintenance.", + "Press 'r' to reset/clear state if you want a cold start.", "Watch component health to avoid automatic trips.", ] for idx, tip in enumerate(tips, start=y + 2): @@ -259,11 +309,12 @@ class ReactorDashboard: def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None: win.erase() win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1]) + turbine_text = " ".join(self._turbine_status_lines()) msg = ( f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | " f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | " f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | " - f"Turbine {'ON' if self.reactor.turbine_active else 'OFF'}" + f"Turbines {turbine_text}" ) win.addstr(1, 1, msg, curses.color_pair(3)) if self.reactor.health_monitor.failure_log: @@ -306,6 +357,19 @@ class ReactorDashboard: row += 1 return row + 1 + def _turbine_status_lines(self) -> list[str]: + if not self.reactor.turbine_unit_active: + return ["n/a"] + return [ + f"{idx + 1}:{'ON' if active else 'OFF'}" for idx, active in enumerate(self.reactor.turbine_unit_active) + ] + + def _total_load_supplied(self, state: PlantState) -> float: + return sum(t.load_supplied_mw for t in state.turbines) + + def _total_load_demand(self, state: PlantState) -> float: + return sum(t.load_demand_mw for t in state.turbines) + def _draw_health_bar(self, win: "curses._CursesWindow", start_y: int) -> None: height, width = win.getmaxyx() if start_y >= height - 2: diff --git a/src/reactor_sim/failures.py b/src/reactor_sim/failures.py index 453d566..07760ce 100644 --- a/src/reactor_sim/failures.py +++ b/src/reactor_sim/failures.py @@ -30,6 +30,15 @@ class ComponentHealth: self.failed = True LOGGER.error("Component %s has failed", self.name) + def restore(self, amount: float) -> None: + if amount <= 0.0: + return + previous = self.integrity + self.integrity = min(1.0, self.integrity + amount) + if self.integrity > 0.05: + self.failed = False + LOGGER.info("Maintenance on %s: %.3f -> %.3f", self.name, previous, self.integrity) + def snapshot(self) -> dict: return asdict(self) @@ -46,8 +55,10 @@ class HealthMonitor: "core": ComponentHealth("core"), "primary_pump": ComponentHealth("primary_pump"), "secondary_pump": ComponentHealth("secondary_pump"), - "turbine": ComponentHealth("turbine"), } + for idx in range(3): + name = f"turbine_{idx + 1}" + self.components[name] = ComponentHealth(name) self.failure_log: list[str] = [] def component(self, name: str) -> ComponentHealth: @@ -58,10 +69,11 @@ class HealthMonitor: state: PlantState, primary_active: bool, secondary_active: bool, - turbine_active: bool, + turbine_active: Iterable[bool], dt: float, ) -> List[str]: events: list[str] = [] + turbine_flags = list(turbine_active) core = self.component("core") core_temp = state.core.fuel_temperature temp_stress = max(0.0, (core_temp - 900.0) / (constants.MAX_CORE_TEMPERATURE - 900.0)) @@ -82,17 +94,23 @@ class HealthMonitor: else: self.component("secondary_pump").degrade(0.0005 * dt) - if turbine_active: - electrical = state.turbine.electrical_output_mw - load_ratio = ( - 0.0 - if state.turbine.load_demand_mw <= 0 - else min(1.0, electrical / max(1e-6, state.turbine.load_demand_mw)) - ) - stress = 0.0002 + abs(1 - load_ratio) * 0.003 - self.component("turbine").degrade(stress * dt) - else: - self.component("turbine").degrade(0.0001 * dt) + turbines = state.turbines if hasattr(state, "turbines") else [] + for idx, active in enumerate(turbine_flags): + name = f"turbine_{idx + 1}" + component = self.component(name) + if active and idx < len(turbines): + turbine_state = turbines[idx] + demand = turbine_state.load_demand_mw + supplied = turbine_state.load_supplied_mw + if demand <= 0: + load_ratio = 0.0 + else: + load_ratio = min(1.0, supplied / max(1e-6, demand)) + mismatch = abs(1 - load_ratio) + stress = 0.00005 + mismatch * 0.0006 + else: + stress = 0.00002 + component.degrade(stress * dt) for name, component in self.components.items(): if component.failed and name not in self.failure_log: @@ -105,6 +123,14 @@ class HealthMonitor: def load_snapshot(self, data: dict) -> None: for name, comp_data in data.items(): - if name in self.components: - self.components[name] = ComponentHealth.from_snapshot(comp_data) + mapped = "turbine_1" if name == "turbine" else name + if mapped in self.components: + self.components[mapped] = ComponentHealth.from_snapshot(comp_data) + def maintain(self, component: str, amount: float = 0.05) -> bool: + comp = self.components.get(component) + if not comp: + LOGGER.warning("Maintenance requested for unknown component %s", component) + return False + comp.restore(amount) + return True diff --git a/src/reactor_sim/neutronics.py b/src/reactor_sim/neutronics.py index 02c9fab..01692ae 100644 --- a/src/reactor_sim/neutronics.py +++ b/src/reactor_sim/neutronics.py @@ -15,12 +15,12 @@ LOGGER = logging.getLogger(__name__) def temperature_feedback(temp: float) -> float: """Negative coefficient: higher temperature lowers reactivity.""" reference = 900.0 - coefficient = -5e-5 + coefficient = -2.5e-5 return coefficient * (temp - reference) def xenon_poisoning(flux: float) -> float: - return min(0.05, 1e-8 * flux) + return min(0.015, 2e-9 * flux) @dataclass diff --git a/src/reactor_sim/reactor.py b/src/reactor_sim/reactor.py index 7520c19..837b650 100644 --- a/src/reactor_sim/reactor.py +++ b/src/reactor_sim/reactor.py @@ -30,15 +30,23 @@ class Reactor: secondary_pump: Pump thermal: ThermalSolver steam_generator: SteamGenerator - turbine: Turbine + turbines: list[Turbine] atomic_model: AtomicPhysics consumer: ElectricalConsumer | None = None health_monitor: HealthMonitor = field(default_factory=HealthMonitor) primary_pump_active: bool = True secondary_pump_active: bool = True turbine_active: bool = True + turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True]) shutdown: bool = False + def __post_init__(self) -> None: + if not self.turbines: + self.turbines = [Turbine()] + if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines): + self.turbine_unit_active = [True] * len(self.turbines) + self.turbine_active = any(self.turbine_unit_active) + @classmethod def default(cls) -> "Reactor": atomic_model = AtomicPhysics() @@ -50,7 +58,7 @@ class Reactor: secondary_pump=Pump(nominal_flow=16_000.0, efficiency=0.85), thermal=ThermalSolver(), steam_generator=SteamGenerator(), - turbine=Turbine(), + turbines=[Turbine() for _ in range(3)], atomic_model=atomic_model, consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False), health_monitor=HealthMonitor(), @@ -79,15 +87,18 @@ class Reactor: mass_flow_rate=0.0, steam_quality=0.0, ) - turbine = TurbineState( - steam_enthalpy=2_000.0, - shaft_power_mw=0.0, - electrical_output_mw=0.0, - condenser_temperature=ambient, - load_demand_mw=0.0, - load_supplied_mw=0.0, - ) - return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbine=turbine) + turbine_states = [ + TurbineState( + steam_enthalpy=2_000.0, + shaft_power_mw=0.0, + electrical_output_mw=0.0, + condenser_temperature=ambient, + load_demand_mw=0.0, + load_supplied_mw=0.0, + ) + for _ in self.turbines + ] + return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbines=turbine_states) def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None: if self.shutdown: @@ -126,19 +137,13 @@ class Reactor: transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power) self.thermal.step_secondary(state.secondary_loop, transferred) - if self.turbine_active: - self.turbine.step(state.secondary_loop, state.turbine, self.consumer, steam_power_mw=transferred) - else: - state.turbine.shaft_power_mw = 0.0 - state.turbine.electrical_output_mw = 0.0 - if self.consumer: - self.consumer.update_power_received(0.0) + self._step_turbine_bank(state, transferred) failures = self.health_monitor.evaluate( state, self.primary_pump_active, self.secondary_pump_active, - self.turbine_active, + self.turbine_unit_active, dt, ) for failure in failures: @@ -157,11 +162,54 @@ class Reactor: prompt_power, fission_rate, state.primary_loop.temperature_out, - state.turbine.electrical_output_mw, - state.turbine.load_supplied_mw, - state.turbine.load_demand_mw, + state.total_electrical_output(), + sum(t.load_supplied_mw for t in state.turbines), + sum(t.load_demand_mw for t in state.turbines), ) + def _step_turbine_bank(self, state: PlantState, steam_power_mw: float) -> None: + if not state.turbines: + return + active_indices = [ + idx for idx, active in enumerate(self.turbine_unit_active) if active and idx < len(state.turbines) + ] + power_per_unit = steam_power_mw / len(active_indices) if active_indices else 0.0 + for idx, turbine in enumerate(self.turbines): + if idx >= len(state.turbines): + break + turbine_state = state.turbines[idx] + if idx in active_indices: + turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit) + else: + self._reset_turbine_state(turbine_state) + self._dispatch_consumer_load(state, active_indices) + + def _reset_turbine_state(self, turbine_state: TurbineState) -> None: + turbine_state.shaft_power_mw = 0.0 + turbine_state.electrical_output_mw = 0.0 + turbine_state.load_demand_mw = 0.0 + turbine_state.load_supplied_mw = 0.0 + + def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None: + total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices) + if self.consumer: + demand = self.consumer.request_power() + supplied = min(total_electrical, demand) + self.consumer.update_power_received(supplied) + else: + demand = 0.0 + supplied = 0.0 + demand_per_unit = demand / len(active_indices) if active_indices else 0.0 + total_for_share = total_electrical if total_electrical > 0 else 1.0 + for idx, turbine_state in enumerate(state.turbines): + if idx not in active_indices: + turbine_state.load_demand_mw = 0.0 + turbine_state.load_supplied_mw = 0.0 + continue + share = 0.0 if total_electrical <= 0 else supplied * (turbine_state.electrical_output_mw / total_for_share) + turbine_state.load_demand_mw = demand_per_unit + turbine_state.load_supplied_mw = share if demand_per_unit <= 0 else min(share, demand_per_unit) + def _handle_failure(self, component: str) -> None: if component == "core": LOGGER.critical("Core failure detected. Initiating SCRAM.") @@ -171,8 +219,9 @@ class Reactor: self._set_primary_pump(False) elif component == "secondary_pump": self._set_secondary_pump(False) - elif component == "turbine": - self._set_turbine_state(False) + elif component.startswith("turbine"): + idx = self._component_index(component) + self._set_turbine_state(False, index=idx) def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]: overrides: dict[str, float] = {} @@ -196,12 +245,18 @@ class Reactor: self._set_secondary_pump(command.secondary_pump_on) if command.turbine_on is not None: self._set_turbine_state(command.turbine_on) + if command.turbine_units: + for key, state_flag in command.turbine_units.items(): + idx = key - 1 + self._set_turbine_state(state_flag, index=idx) if command.consumer_online is not None and self.consumer: self.consumer.set_online(command.consumer_online) if command.consumer_demand is not None and self.consumer: self.consumer.set_demand(command.consumer_demand) if command.coolant_demand is not None: overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand)) + for component in command.maintenance_components: + self._perform_maintenance(component) return overrides def _set_primary_pump(self, active: bool) -> None: @@ -214,10 +269,46 @@ class Reactor: self.secondary_pump_active = active LOGGER.info("Secondary pump %s", "enabled" if active else "stopped") - def _set_turbine_state(self, active: bool) -> None: - if self.turbine_active != active: - self.turbine_active = active - LOGGER.info("Turbine %s", "started" if active else "stopped") + def _set_turbine_state(self, active: bool, index: int | None = None) -> None: + if index is None: + for idx in range(len(self.turbine_unit_active)): + self._set_turbine_state(active, index=idx) + return + if index < 0 or index >= len(self.turbine_unit_active): + LOGGER.warning("Ignoring turbine index %s", index) + return + if self.turbine_unit_active[index] != active: + self.turbine_unit_active[index] = active + LOGGER.info("Turbine %d %s", index + 1, "started" if active else "stopped") + self.turbine_active = any(self.turbine_unit_active) + + def _component_index(self, name: str) -> int: + if name == "turbine": + return 0 + try: + return int(name.split("_")[1]) - 1 + except (IndexError, ValueError): + return -1 + + def _perform_maintenance(self, component: str) -> None: + if component == "core" and not self.shutdown: + LOGGER.warning("Cannot maintain core while reactor is running") + return + if component == "primary_pump" and self.primary_pump_active: + LOGGER.warning("Stop primary pump before maintenance") + return + if component == "secondary_pump" and self.secondary_pump_active: + LOGGER.warning("Stop secondary pump before maintenance") + return + if component.startswith("turbine"): + idx = self._component_index(component) + if idx < 0 or idx >= len(self.turbine_unit_active): + LOGGER.warning("Unknown turbine maintenance target %s", component) + return + if self.turbine_unit_active[idx]: + LOGGER.warning("Stop turbine %d before maintenance", idx + 1) + return + self.health_monitor.maintain(component) def attach_consumer(self, consumer: ElectricalConsumer) -> None: self.consumer = consumer @@ -233,6 +324,7 @@ class Reactor: "primary_pump_active": self.primary_pump_active, "secondary_pump_active": self.secondary_pump_active, "turbine_active": self.turbine_active, + "turbine_units": self.turbine_unit_active, "shutdown": self.shutdown, "consumer": { "online": self.consumer.online if self.consumer else False, @@ -246,7 +338,10 @@ class Reactor: plant, metadata, health = self.control.load_state(filepath) self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active) self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active) - self.turbine_active = metadata.get("turbine_active", self.turbine_active) + unit_states = metadata.get("turbine_units") + if unit_states: + self.turbine_unit_active = list(unit_states) + self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active)) self.shutdown = metadata.get("shutdown", self.shutdown) consumer_cfg = metadata.get("consumer") if consumer_cfg: @@ -262,4 +357,17 @@ class Reactor: if health: self.health_monitor.load_snapshot(health) LOGGER.info("Reactor state restored from %s", filepath) + if len(plant.turbines) < len(self.turbines): + ambient = constants.ENVIRONMENT_TEMPERATURE + while len(plant.turbines) < len(self.turbines): + plant.turbines.append( + TurbineState( + steam_enthalpy=2_000.0, + shaft_power_mw=0.0, + electrical_output_mw=0.0, + condenser_temperature=ambient, + load_demand_mw=0.0, + load_supplied_mw=0.0, + ) + ) return plant diff --git a/src/reactor_sim/simulation.py b/src/reactor_sim/simulation.py index c4e6e7e..ac52d83 100644 --- a/src/reactor_sim/simulation.py +++ b/src/reactor_sim/simulation.py @@ -7,6 +7,7 @@ import json import logging import os from dataclasses import dataclass, field +from pathlib import Path from threading import Event import time from typing import Callable, Iterable, Optional @@ -21,6 +22,11 @@ LOGGER = logging.getLogger(__name__) CommandProvider = Callable[[float, PlantState], Optional[ReactorCommand]] +def _default_state_path() -> Path: + custom = os.getenv("FISSION_STATE_PATH") + return Path(custom) if custom else Path("artifacts/last_state.json") + + @dataclass class ReactorSimulation: reactor: Reactor @@ -80,8 +86,11 @@ def main() -> None: timestep = 1.0 if dashboard_mode or realtime else 5.0 sim = ReactorSimulation(reactor, timestep=timestep, duration=duration, realtime=realtime) + state_path = _default_state_path() load_path = os.getenv("FISSION_LOAD_STATE") - save_path = os.getenv("FISSION_SAVE_STATE") + if not load_path and state_path.exists(): + load_path = str(state_path) + save_path = os.getenv("FISSION_SAVE_STATE") or str(state_path) if load_path: sim.start_state = reactor.load_state(load_path) if dashboard_mode: diff --git a/src/reactor_sim/state.py b/src/reactor_sim/state.py index 1ea79bd..b180588 100644 --- a/src/reactor_sim/state.py +++ b/src/reactor_sim/state.py @@ -49,7 +49,7 @@ class PlantState: core: CoreState primary_loop: CoolantLoopState secondary_loop: CoolantLoopState - turbine: TurbineState + turbines: list[TurbineState] time_elapsed: float = field(default=0.0) def snapshot(self) -> dict[str, float]: @@ -60,18 +60,27 @@ class PlantState: "neutron_flux": self.core.neutron_flux, "primary_outlet_temp": self.primary_loop.temperature_out, "secondary_pressure": self.secondary_loop.pressure, - "turbine_electric": self.turbine.electrical_output_mw, + "turbine_electric": self.total_electrical_output(), } + def total_electrical_output(self) -> float: + return sum(t.electrical_output_mw for t in self.turbines) + def to_dict(self) -> dict: return asdict(self) @classmethod def from_dict(cls, data: dict) -> "PlantState": + turbines_blob = data.get("turbines") + if turbines_blob is None: + # Compatibility with previous single-turbine snapshots. + old_turbine = data.get("turbine") + turbines_blob = [old_turbine] if old_turbine else [] + turbines = [TurbineState(**t) for t in turbines_blob] return cls( core=CoreState(**data["core"]), primary_loop=CoolantLoopState(**data["primary_loop"]), secondary_loop=CoolantLoopState(**data["secondary_loop"]), - turbine=TurbineState(**data["turbine"]), + turbines=turbines, time_elapsed=data.get("time_elapsed", 0.0), ) diff --git a/src/reactor_sim/turbine.py b/src/reactor_sim/turbine.py index 2202b25..2b19b23 100644 --- a/src/reactor_sim/turbine.py +++ b/src/reactor_sim/turbine.py @@ -6,10 +6,7 @@ from dataclasses import dataclass import logging from . import constants -from typing import Optional - from .state import CoolantLoopState, TurbineState -from .consumer import ElectricalConsumer LOGGER = logging.getLogger(__name__) @@ -33,7 +30,6 @@ class Turbine: self, loop: CoolantLoopState, state: TurbineState, - consumer: Optional[ElectricalConsumer] = None, steam_power_mw: float = 0.0, ) -> None: enthalpy = 2_700.0 + loop.steam_quality * 600.0 @@ -41,31 +37,14 @@ class Turbine: available_power = max(steam_power_mw, (enthalpy * mass_flow / 1_000.0) / 1_000.0) shaft_power_mw = available_power * self.mechanical_efficiency electrical = shaft_power_mw * self.generator_efficiency - if consumer: - load_demand = consumer.request_power() - supplied = min(electrical, load_demand) - consumer.update_power_received(supplied) - LOGGER.debug( - "Consumer %s demand %.1f -> supplied %.1f MW", - consumer.name, - load_demand, - supplied, - ) - else: - load_demand = 0.0 - supplied = 0.0 condenser_temp = max(305.0, loop.temperature_in - 20.0) state.steam_enthalpy = enthalpy state.shaft_power_mw = shaft_power_mw state.electrical_output_mw = electrical state.condenser_temperature = condenser_temp - state.load_demand_mw = load_demand - state.load_supplied_mw = supplied LOGGER.debug( - "Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK load %.1f/%.1f", + "Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK", shaft_power_mw, electrical, condenser_temp, - supplied, - load_demand, ) diff --git a/tests/test_neutronics.py b/tests/test_neutronics.py new file mode 100644 index 0000000..6cda69e --- /dev/null +++ b/tests/test_neutronics.py @@ -0,0 +1,26 @@ +from reactor_sim.neutronics import NeutronDynamics +from reactor_sim.state import CoreState + + +def _core_state( + temperature: float = 950.0, + flux: float = 2e7, + burnup: float = 0.01, + power: float = 500.0, +) -> CoreState: + return CoreState( + fuel_temperature=temperature, + neutron_flux=flux, + reactivity_margin=0.0, + power_output_mw=power, + burnup=burnup, + ) + + +def test_reactivity_increases_with_rod_withdrawal(): + dynamics = NeutronDynamics() + state = _core_state() + rho_full_out = dynamics.reactivity(state, control_fraction=0.0) + rho_half = dynamics.reactivity(state, control_fraction=0.5) + assert rho_full_out > 0.0 + assert rho_full_out > rho_half diff --git a/tests/test_simulation.py b/tests/test_simulation.py index c0d3ee7..fa52b95 100644 --- a/tests/test_simulation.py +++ b/tests/test_simulation.py @@ -4,6 +4,7 @@ from pathlib import Path import pytest from reactor_sim import constants +from reactor_sim.failures import HealthMonitor from reactor_sim.reactor import Reactor from reactor_sim.simulation import ReactorSimulation @@ -13,7 +14,7 @@ def test_reactor_initial_state_is_cold(): state = reactor.initial_state() assert state.core.fuel_temperature == constants.ENVIRONMENT_TEMPERATURE assert state.primary_loop.mass_flow_rate == 0.0 - assert state.turbine.electrical_output_mw == 0.0 + assert state.total_electrical_output() == 0.0 def test_state_save_and_load_roundtrip(tmp_path: Path): @@ -36,7 +37,18 @@ def test_health_monitor_flags_core_failure(): reactor = Reactor.default() state = reactor.initial_state() state.core.fuel_temperature = constants.MAX_CORE_TEMPERATURE - failures = reactor.health_monitor.evaluate(state, True, True, True, dt=200.0) + failures = reactor.health_monitor.evaluate(state, True, True, [True, True, True], dt=200.0) assert "core" in failures reactor._handle_failure("core") assert reactor.shutdown is True + + +def test_maintenance_recovers_component_health(): + monitor = HealthMonitor() + pump = monitor.component("secondary_pump") + pump.integrity = 0.3 + pump.fail() + restored = monitor.maintain("secondary_pump", amount=0.5) + assert restored is True + assert pump.integrity == pytest.approx(0.8) + assert pump.failed is False