Improve persistence and reactor dynamics

This commit is contained in:
Andrii Prokhorov
2025-11-21 18:59:11 +02:00
parent 7c8321e3c4
commit d37620ccc1
11 changed files with 340 additions and 97 deletions

View File

@@ -4,12 +4,15 @@
All source code lives under `src/reactor_sim`. Submodules map to plant systems: `fuel.py` and `neutronics.py` govern fission power, `thermal.py` and `coolant.py` cover heat transfer and pumps, `turbine.py` drives the steam cycle, and `consumer.py` represents external electrical loads. High-level coordination happens in `reactor.py` and `simulation.py`. The convenience runner `run_simulation.py` executes the default scenario; add notebooks or scenario scripts under `experiments/` (create as needed). Keep assets such as plots or exported telemetry inside `artifacts/`.
## Build, Test, and Development Commands
- `python -m reactor_sim.simulation` — run the default 10-minute transient and print JSON snapshots.
- `python run_simulation.py` — same as above, handy for IDE launch configs.
- `python -m pip install -e .[dev]` — install editable package with optional tooling when dev dependencies are defined.
- `.venv/bin/python -m reactor_sim.simulation` — run the default 10-minute transient and print JSON snapshots using the checked-in virtualenv.
- `.venv/bin/python run_simulation.py` — same as above, handy for IDE launch configs.
- `.venv/bin/python -m pip install -e .[dev]` — install editable package with optional tooling when dev dependencies are defined.
- When running pytest or ad-hoc scripts, prefer `.venv/bin/python -m pytest` so everyone uses the shared interpreter and dependency set that ships with the repo.
- State snapshots now auto-save/load from `artifacts/last_state.json`; override the path via `FISSION_STATE_PATH` (or continue to use `FISSION_SAVE_STATE`/`FISSION_LOAD_STATE` for explicit overrides).
- A git remote for `origin` is configured; push changes to `origin/main` once work is complete so dashboards stay in sync.
## Operations & Control Hooks
Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, +/- rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation.
Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, 1/2/3 toggle individual turbines, r reset/clear saved state, +/- rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation.
The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches.
## Coding Style & Naming Conventions
@@ -25,4 +28,4 @@ Write commits in imperative mood (`Add turbine moisture separator model`). Squas
Sim parameters live in constructors; never hard-code environment-specific paths. When adding new physics, guard unstable calculations with clamps and highlight operating limits in comments. Sensitive experiments (e.g., beyond design basis) should default to disabled flags so scripted studies remain safe by default.
## Reliability & Persistence
Component wear is tracked via `failures.py`; stress from overheating, pump starvation, or turbine imbalance will degrade integrity and eventually disable the affected subsystem with automatic SCRAM for core damage. Persist plant snapshots with `ControlSystem.save_state()`/`load_state()` (used by `Reactor.save_state`/`load_state`) so long-running studies can resume; `FISSION_LOAD_STATE`/`FISSION_SAVE_STATE` env vars wire this into the CLI.
Component wear is tracked via `failures.py`; stress from overheating, pump starvation, or turbine imbalance will degrade integrity and eventually disable the affected subsystem with automatic SCRAM for core damage. Plant state now persists between runs to `artifacts/last_state.json` by default (override via `FISSION_STATE_PATH` or the explicit save/load env vars). In the dashboard, press `r` to clear the saved snapshot and reboot the reactor to a cold, green-field state whenever you need a clean slate.

View File

@@ -20,8 +20,15 @@ class ReactorCommand:
consumer_online: bool | None = None
consumer_demand: float | None = None
rod_manual: bool | None = None
turbine_units: dict[int, bool] | None = None
maintenance_components: tuple[str, ...] = tuple()
@classmethod
def scram_all(cls) -> "ReactorCommand":
"""Convenience constructor for an emergency shutdown."""
return cls(scram=True, turbine_on=False, primary_pump_on=True, secondary_pump_on=True)
@classmethod
def maintain(cls, *components: str) -> "ReactorCommand":
"""Request maintenance for the provided component names."""
return cls(maintenance_components=tuple(components))

View File

@@ -6,6 +6,7 @@ import curses
import logging
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from .commands import ReactorCommand
@@ -37,6 +38,7 @@ class ReactorDashboard:
self.pending_command: Optional[ReactorCommand] = None
self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False
self.reset_requested = False
self.log_buffer: deque[str] = deque(maxlen=4)
self._log_handler: Optional[logging.Handler] = None
self._previous_handlers: list[logging.Handler] = []
@@ -47,7 +49,9 @@ class ReactorDashboard:
DashboardKey("p", "Toggle primary pump"),
DashboardKey("o", "Toggle secondary pump"),
DashboardKey("t", "Toggle turbine"),
DashboardKey("1/2/3", "Toggle turbine units 1-3"),
DashboardKey("c", "Toggle consumer"),
DashboardKey("r", "Reset & clear state"),
DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
DashboardKey("s", "Setpoint 250 MW"),
@@ -68,24 +72,34 @@ class ReactorDashboard:
curses.init_pair(4, curses.COLOR_RED, -1)
stdscr.nodelay(True)
self._install_log_capture()
self.sim = ReactorSimulation(
self.reactor,
timestep=self.timestep,
duration=None,
realtime=True,
command_provider=self._next_command,
)
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._draw(stdscr, state)
self._handle_input(stdscr)
while True:
self.sim = ReactorSimulation(
self.reactor,
timestep=self.timestep,
duration=None,
realtime=True,
command_provider=self._next_command,
)
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._draw(stdscr, state)
self._handle_input(stdscr)
if self.quit_requested or self.reset_requested:
self.sim.stop()
break
finally:
if self.save_path and self.sim and self.sim.last_state and not self.reset_requested:
self.reactor.save_state(self.save_path, self.sim.last_state)
if self.quit_requested:
self.sim.stop()
break
if self.reset_requested:
self._clear_saved_state()
self._reset_to_greenfield()
continue
break
finally:
if self.save_path and self.sim and self.sim.last_state:
self.reactor.save_state(self.save_path, self.sim.last_state)
self._restore_logging()
def _handle_input(self, stdscr: "curses._CursesWindow") -> None:
@@ -104,6 +118,9 @@ class ReactorDashboard:
self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active))
elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif ord("1") <= ch <= ord("9"):
idx = ch - ord("1")
self._toggle_turbine_unit(idx)
elif ch in (ord("+"), ord("=")):
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05)))
elif ch == ord("-"):
@@ -117,6 +134,8 @@ class ReactorDashboard:
elif ch in (ord("c"), ord("C")):
online = not (self.reactor.consumer.online if self.reactor.consumer else False)
self._queue_command(ReactorCommand(consumer_online=online))
elif ch in (ord("r"), ord("R")):
self._request_reset()
elif ch in (ord("s"), ord("S")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0))
elif ch in (ord("d"), ord("D")):
@@ -143,6 +162,35 @@ class ReactorDashboard:
if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None:
self.pending_command.rod_manual = True
def _toggle_turbine_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.turbine_unit_active):
return
current = self.reactor.turbine_unit_active[index]
self._queue_command(ReactorCommand(turbine_units={index + 1: not current}))
def _request_reset(self) -> None:
self.reset_requested = True
if self.sim:
self.sim.stop()
def _clear_saved_state(self) -> None:
if not self.save_path:
return
path = Path(self.save_path)
try:
path.unlink()
LOGGER.info("Cleared saved state at %s", path)
except FileNotFoundError:
LOGGER.info("No saved state to clear at %s", path)
def _reset_to_greenfield(self) -> None:
LOGGER.info("Resetting reactor to initial state")
self.reactor = Reactor.default()
self.start_state = None
self.pending_command = None
self.reset_requested = False
self.log_buffer.clear()
def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]:
cmd = self.pending_command
@@ -230,9 +278,9 @@ class ReactorDashboard:
y,
"Turbine / Grid",
[
("Turbine", "ON" if self.reactor.turbine_active else "OFF"),
("Electrical", f"{state.turbine.electrical_output_mw:7.1f} MW"),
("Load", f"{state.turbine.load_supplied_mw:7.1f}/{state.turbine.load_demand_mw:7.1f} MW"),
("Turbines", " ".join(self._turbine_status_lines())),
("Electrical", f"{state.total_electrical_output():7.1f} MW"),
("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"),
("Consumer", f"{consumer_status}"),
("Demand", f"{consumer_demand:7.1f} MW"),
],
@@ -251,6 +299,8 @@ class ReactorDashboard:
tips = [
"Start pumps before withdrawing rods.",
"Bring turbine and consumer online after thermal stabilization.",
"Toggle turbine units (1/2/3) for staggered maintenance.",
"Press 'r' to reset/clear state if you want a cold start.",
"Watch component health to avoid automatic trips.",
]
for idx, tip in enumerate(tips, start=y + 2):
@@ -259,11 +309,12 @@ class ReactorDashboard:
def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1])
turbine_text = " ".join(self._turbine_status_lines())
msg = (
f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | "
f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | "
f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | "
f"Turbine {'ON' if self.reactor.turbine_active else 'OFF'}"
f"Turbines {turbine_text}"
)
win.addstr(1, 1, msg, curses.color_pair(3))
if self.reactor.health_monitor.failure_log:
@@ -306,6 +357,19 @@ class ReactorDashboard:
row += 1
return row + 1
def _turbine_status_lines(self) -> list[str]:
if not self.reactor.turbine_unit_active:
return ["n/a"]
return [
f"{idx + 1}:{'ON' if active else 'OFF'}" for idx, active in enumerate(self.reactor.turbine_unit_active)
]
def _total_load_supplied(self, state: PlantState) -> float:
return sum(t.load_supplied_mw for t in state.turbines)
def _total_load_demand(self, state: PlantState) -> float:
return sum(t.load_demand_mw for t in state.turbines)
def _draw_health_bar(self, win: "curses._CursesWindow", start_y: int) -> None:
height, width = win.getmaxyx()
if start_y >= height - 2:

View File

@@ -30,6 +30,15 @@ class ComponentHealth:
self.failed = True
LOGGER.error("Component %s has failed", self.name)
def restore(self, amount: float) -> None:
if amount <= 0.0:
return
previous = self.integrity
self.integrity = min(1.0, self.integrity + amount)
if self.integrity > 0.05:
self.failed = False
LOGGER.info("Maintenance on %s: %.3f -> %.3f", self.name, previous, self.integrity)
def snapshot(self) -> dict:
return asdict(self)
@@ -46,8 +55,10 @@ class HealthMonitor:
"core": ComponentHealth("core"),
"primary_pump": ComponentHealth("primary_pump"),
"secondary_pump": ComponentHealth("secondary_pump"),
"turbine": ComponentHealth("turbine"),
}
for idx in range(3):
name = f"turbine_{idx + 1}"
self.components[name] = ComponentHealth(name)
self.failure_log: list[str] = []
def component(self, name: str) -> ComponentHealth:
@@ -58,10 +69,11 @@ class HealthMonitor:
state: PlantState,
primary_active: bool,
secondary_active: bool,
turbine_active: bool,
turbine_active: Iterable[bool],
dt: float,
) -> List[str]:
events: list[str] = []
turbine_flags = list(turbine_active)
core = self.component("core")
core_temp = state.core.fuel_temperature
temp_stress = max(0.0, (core_temp - 900.0) / (constants.MAX_CORE_TEMPERATURE - 900.0))
@@ -82,17 +94,23 @@ class HealthMonitor:
else:
self.component("secondary_pump").degrade(0.0005 * dt)
if turbine_active:
electrical = state.turbine.electrical_output_mw
load_ratio = (
0.0
if state.turbine.load_demand_mw <= 0
else min(1.0, electrical / max(1e-6, state.turbine.load_demand_mw))
)
stress = 0.0002 + abs(1 - load_ratio) * 0.003
self.component("turbine").degrade(stress * dt)
else:
self.component("turbine").degrade(0.0001 * dt)
turbines = state.turbines if hasattr(state, "turbines") else []
for idx, active in enumerate(turbine_flags):
name = f"turbine_{idx + 1}"
component = self.component(name)
if active and idx < len(turbines):
turbine_state = turbines[idx]
demand = turbine_state.load_demand_mw
supplied = turbine_state.load_supplied_mw
if demand <= 0:
load_ratio = 0.0
else:
load_ratio = min(1.0, supplied / max(1e-6, demand))
mismatch = abs(1 - load_ratio)
stress = 0.00005 + mismatch * 0.0006
else:
stress = 0.00002
component.degrade(stress * dt)
for name, component in self.components.items():
if component.failed and name not in self.failure_log:
@@ -105,6 +123,14 @@ class HealthMonitor:
def load_snapshot(self, data: dict) -> None:
for name, comp_data in data.items():
if name in self.components:
self.components[name] = ComponentHealth.from_snapshot(comp_data)
mapped = "turbine_1" if name == "turbine" else name
if mapped in self.components:
self.components[mapped] = ComponentHealth.from_snapshot(comp_data)
def maintain(self, component: str, amount: float = 0.05) -> bool:
comp = self.components.get(component)
if not comp:
LOGGER.warning("Maintenance requested for unknown component %s", component)
return False
comp.restore(amount)
return True

View File

@@ -15,12 +15,12 @@ LOGGER = logging.getLogger(__name__)
def temperature_feedback(temp: float) -> float:
"""Negative coefficient: higher temperature lowers reactivity."""
reference = 900.0
coefficient = -5e-5
coefficient = -2.5e-5
return coefficient * (temp - reference)
def xenon_poisoning(flux: float) -> float:
return min(0.05, 1e-8 * flux)
return min(0.015, 2e-9 * flux)
@dataclass

View File

@@ -30,15 +30,23 @@ class Reactor:
secondary_pump: Pump
thermal: ThermalSolver
steam_generator: SteamGenerator
turbine: Turbine
turbines: list[Turbine]
atomic_model: AtomicPhysics
consumer: ElectricalConsumer | None = None
health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
primary_pump_active: bool = True
secondary_pump_active: bool = True
turbine_active: bool = True
turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True])
shutdown: bool = False
def __post_init__(self) -> None:
if not self.turbines:
self.turbines = [Turbine()]
if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines):
self.turbine_unit_active = [True] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active)
@classmethod
def default(cls) -> "Reactor":
atomic_model = AtomicPhysics()
@@ -50,7 +58,7 @@ class Reactor:
secondary_pump=Pump(nominal_flow=16_000.0, efficiency=0.85),
thermal=ThermalSolver(),
steam_generator=SteamGenerator(),
turbine=Turbine(),
turbines=[Turbine() for _ in range(3)],
atomic_model=atomic_model,
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
health_monitor=HealthMonitor(),
@@ -79,15 +87,18 @@ class Reactor:
mass_flow_rate=0.0,
steam_quality=0.0,
)
turbine = TurbineState(
steam_enthalpy=2_000.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=ambient,
load_demand_mw=0.0,
load_supplied_mw=0.0,
)
return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbine=turbine)
turbine_states = [
TurbineState(
steam_enthalpy=2_000.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=ambient,
load_demand_mw=0.0,
load_supplied_mw=0.0,
)
for _ in self.turbines
]
return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbines=turbine_states)
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
if self.shutdown:
@@ -126,19 +137,13 @@ class Reactor:
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power)
self.thermal.step_secondary(state.secondary_loop, transferred)
if self.turbine_active:
self.turbine.step(state.secondary_loop, state.turbine, self.consumer, steam_power_mw=transferred)
else:
state.turbine.shaft_power_mw = 0.0
state.turbine.electrical_output_mw = 0.0
if self.consumer:
self.consumer.update_power_received(0.0)
self._step_turbine_bank(state, transferred)
failures = self.health_monitor.evaluate(
state,
self.primary_pump_active,
self.secondary_pump_active,
self.turbine_active,
self.turbine_unit_active,
dt,
)
for failure in failures:
@@ -157,11 +162,54 @@ class Reactor:
prompt_power,
fission_rate,
state.primary_loop.temperature_out,
state.turbine.electrical_output_mw,
state.turbine.load_supplied_mw,
state.turbine.load_demand_mw,
state.total_electrical_output(),
sum(t.load_supplied_mw for t in state.turbines),
sum(t.load_demand_mw for t in state.turbines),
)
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float) -> None:
if not state.turbines:
return
active_indices = [
idx for idx, active in enumerate(self.turbine_unit_active) if active and idx < len(state.turbines)
]
power_per_unit = steam_power_mw / len(active_indices) if active_indices else 0.0
for idx, turbine in enumerate(self.turbines):
if idx >= len(state.turbines):
break
turbine_state = state.turbines[idx]
if idx in active_indices:
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit)
else:
self._reset_turbine_state(turbine_state)
self._dispatch_consumer_load(state, active_indices)
def _reset_turbine_state(self, turbine_state: TurbineState) -> None:
turbine_state.shaft_power_mw = 0.0
turbine_state.electrical_output_mw = 0.0
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = 0.0
def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None:
total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices)
if self.consumer:
demand = self.consumer.request_power()
supplied = min(total_electrical, demand)
self.consumer.update_power_received(supplied)
else:
demand = 0.0
supplied = 0.0
demand_per_unit = demand / len(active_indices) if active_indices else 0.0
total_for_share = total_electrical if total_electrical > 0 else 1.0
for idx, turbine_state in enumerate(state.turbines):
if idx not in active_indices:
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = 0.0
continue
share = 0.0 if total_electrical <= 0 else supplied * (turbine_state.electrical_output_mw / total_for_share)
turbine_state.load_demand_mw = demand_per_unit
turbine_state.load_supplied_mw = share if demand_per_unit <= 0 else min(share, demand_per_unit)
def _handle_failure(self, component: str) -> None:
if component == "core":
LOGGER.critical("Core failure detected. Initiating SCRAM.")
@@ -171,8 +219,9 @@ class Reactor:
self._set_primary_pump(False)
elif component == "secondary_pump":
self._set_secondary_pump(False)
elif component == "turbine":
self._set_turbine_state(False)
elif component.startswith("turbine"):
idx = self._component_index(component)
self._set_turbine_state(False, index=idx)
def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]:
overrides: dict[str, float] = {}
@@ -196,12 +245,18 @@ class Reactor:
self._set_secondary_pump(command.secondary_pump_on)
if command.turbine_on is not None:
self._set_turbine_state(command.turbine_on)
if command.turbine_units:
for key, state_flag in command.turbine_units.items():
idx = key - 1
self._set_turbine_state(state_flag, index=idx)
if command.consumer_online is not None and self.consumer:
self.consumer.set_online(command.consumer_online)
if command.consumer_demand is not None and self.consumer:
self.consumer.set_demand(command.consumer_demand)
if command.coolant_demand is not None:
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
for component in command.maintenance_components:
self._perform_maintenance(component)
return overrides
def _set_primary_pump(self, active: bool) -> None:
@@ -214,10 +269,46 @@ class Reactor:
self.secondary_pump_active = active
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
def _set_turbine_state(self, active: bool) -> None:
if self.turbine_active != active:
self.turbine_active = active
LOGGER.info("Turbine %s", "started" if active else "stopped")
def _set_turbine_state(self, active: bool, index: int | None = None) -> None:
if index is None:
for idx in range(len(self.turbine_unit_active)):
self._set_turbine_state(active, index=idx)
return
if index < 0 or index >= len(self.turbine_unit_active):
LOGGER.warning("Ignoring turbine index %s", index)
return
if self.turbine_unit_active[index] != active:
self.turbine_unit_active[index] = active
LOGGER.info("Turbine %d %s", index + 1, "started" if active else "stopped")
self.turbine_active = any(self.turbine_unit_active)
def _component_index(self, name: str) -> int:
if name == "turbine":
return 0
try:
return int(name.split("_")[1]) - 1
except (IndexError, ValueError):
return -1
def _perform_maintenance(self, component: str) -> None:
if component == "core" and not self.shutdown:
LOGGER.warning("Cannot maintain core while reactor is running")
return
if component == "primary_pump" and self.primary_pump_active:
LOGGER.warning("Stop primary pump before maintenance")
return
if component == "secondary_pump" and self.secondary_pump_active:
LOGGER.warning("Stop secondary pump before maintenance")
return
if component.startswith("turbine"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.turbine_unit_active):
LOGGER.warning("Unknown turbine maintenance target %s", component)
return
if self.turbine_unit_active[idx]:
LOGGER.warning("Stop turbine %d before maintenance", idx + 1)
return
self.health_monitor.maintain(component)
def attach_consumer(self, consumer: ElectricalConsumer) -> None:
self.consumer = consumer
@@ -233,6 +324,7 @@ class Reactor:
"primary_pump_active": self.primary_pump_active,
"secondary_pump_active": self.secondary_pump_active,
"turbine_active": self.turbine_active,
"turbine_units": self.turbine_unit_active,
"shutdown": self.shutdown,
"consumer": {
"online": self.consumer.online if self.consumer else False,
@@ -246,7 +338,10 @@ class Reactor:
plant, metadata, health = self.control.load_state(filepath)
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
self.turbine_active = metadata.get("turbine_active", self.turbine_active)
unit_states = metadata.get("turbine_units")
if unit_states:
self.turbine_unit_active = list(unit_states)
self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active))
self.shutdown = metadata.get("shutdown", self.shutdown)
consumer_cfg = metadata.get("consumer")
if consumer_cfg:
@@ -262,4 +357,17 @@ class Reactor:
if health:
self.health_monitor.load_snapshot(health)
LOGGER.info("Reactor state restored from %s", filepath)
if len(plant.turbines) < len(self.turbines):
ambient = constants.ENVIRONMENT_TEMPERATURE
while len(plant.turbines) < len(self.turbines):
plant.turbines.append(
TurbineState(
steam_enthalpy=2_000.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=ambient,
load_demand_mw=0.0,
load_supplied_mw=0.0,
)
)
return plant

View File

@@ -7,6 +7,7 @@ import json
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from threading import Event
import time
from typing import Callable, Iterable, Optional
@@ -21,6 +22,11 @@ LOGGER = logging.getLogger(__name__)
CommandProvider = Callable[[float, PlantState], Optional[ReactorCommand]]
def _default_state_path() -> Path:
custom = os.getenv("FISSION_STATE_PATH")
return Path(custom) if custom else Path("artifacts/last_state.json")
@dataclass
class ReactorSimulation:
reactor: Reactor
@@ -80,8 +86,11 @@ def main() -> None:
timestep = 1.0 if dashboard_mode or realtime else 5.0
sim = ReactorSimulation(reactor, timestep=timestep, duration=duration, realtime=realtime)
state_path = _default_state_path()
load_path = os.getenv("FISSION_LOAD_STATE")
save_path = os.getenv("FISSION_SAVE_STATE")
if not load_path and state_path.exists():
load_path = str(state_path)
save_path = os.getenv("FISSION_SAVE_STATE") or str(state_path)
if load_path:
sim.start_state = reactor.load_state(load_path)
if dashboard_mode:

View File

@@ -49,7 +49,7 @@ class PlantState:
core: CoreState
primary_loop: CoolantLoopState
secondary_loop: CoolantLoopState
turbine: TurbineState
turbines: list[TurbineState]
time_elapsed: float = field(default=0.0)
def snapshot(self) -> dict[str, float]:
@@ -60,18 +60,27 @@ class PlantState:
"neutron_flux": self.core.neutron_flux,
"primary_outlet_temp": self.primary_loop.temperature_out,
"secondary_pressure": self.secondary_loop.pressure,
"turbine_electric": self.turbine.electrical_output_mw,
"turbine_electric": self.total_electrical_output(),
}
def total_electrical_output(self) -> float:
return sum(t.electrical_output_mw for t in self.turbines)
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "PlantState":
turbines_blob = data.get("turbines")
if turbines_blob is None:
# Compatibility with previous single-turbine snapshots.
old_turbine = data.get("turbine")
turbines_blob = [old_turbine] if old_turbine else []
turbines = [TurbineState(**t) for t in turbines_blob]
return cls(
core=CoreState(**data["core"]),
primary_loop=CoolantLoopState(**data["primary_loop"]),
secondary_loop=CoolantLoopState(**data["secondary_loop"]),
turbine=TurbineState(**data["turbine"]),
turbines=turbines,
time_elapsed=data.get("time_elapsed", 0.0),
)

View File

@@ -6,10 +6,7 @@ from dataclasses import dataclass
import logging
from . import constants
from typing import Optional
from .state import CoolantLoopState, TurbineState
from .consumer import ElectricalConsumer
LOGGER = logging.getLogger(__name__)
@@ -33,7 +30,6 @@ class Turbine:
self,
loop: CoolantLoopState,
state: TurbineState,
consumer: Optional[ElectricalConsumer] = None,
steam_power_mw: float = 0.0,
) -> None:
enthalpy = 2_700.0 + loop.steam_quality * 600.0
@@ -41,31 +37,14 @@ class Turbine:
available_power = max(steam_power_mw, (enthalpy * mass_flow / 1_000.0) / 1_000.0)
shaft_power_mw = available_power * self.mechanical_efficiency
electrical = shaft_power_mw * self.generator_efficiency
if consumer:
load_demand = consumer.request_power()
supplied = min(electrical, load_demand)
consumer.update_power_received(supplied)
LOGGER.debug(
"Consumer %s demand %.1f -> supplied %.1f MW",
consumer.name,
load_demand,
supplied,
)
else:
load_demand = 0.0
supplied = 0.0
condenser_temp = max(305.0, loop.temperature_in - 20.0)
state.steam_enthalpy = enthalpy
state.shaft_power_mw = shaft_power_mw
state.electrical_output_mw = electrical
state.condenser_temperature = condenser_temp
state.load_demand_mw = load_demand
state.load_supplied_mw = supplied
LOGGER.debug(
"Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK load %.1f/%.1f",
"Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK",
shaft_power_mw,
electrical,
condenser_temp,
supplied,
load_demand,
)

26
tests/test_neutronics.py Normal file
View File

@@ -0,0 +1,26 @@
from reactor_sim.neutronics import NeutronDynamics
from reactor_sim.state import CoreState
def _core_state(
temperature: float = 950.0,
flux: float = 2e7,
burnup: float = 0.01,
power: float = 500.0,
) -> CoreState:
return CoreState(
fuel_temperature=temperature,
neutron_flux=flux,
reactivity_margin=0.0,
power_output_mw=power,
burnup=burnup,
)
def test_reactivity_increases_with_rod_withdrawal():
dynamics = NeutronDynamics()
state = _core_state()
rho_full_out = dynamics.reactivity(state, control_fraction=0.0)
rho_half = dynamics.reactivity(state, control_fraction=0.5)
assert rho_full_out > 0.0
assert rho_full_out > rho_half

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import pytest
from reactor_sim import constants
from reactor_sim.failures import HealthMonitor
from reactor_sim.reactor import Reactor
from reactor_sim.simulation import ReactorSimulation
@@ -13,7 +14,7 @@ def test_reactor_initial_state_is_cold():
state = reactor.initial_state()
assert state.core.fuel_temperature == constants.ENVIRONMENT_TEMPERATURE
assert state.primary_loop.mass_flow_rate == 0.0
assert state.turbine.electrical_output_mw == 0.0
assert state.total_electrical_output() == 0.0
def test_state_save_and_load_roundtrip(tmp_path: Path):
@@ -36,7 +37,18 @@ def test_health_monitor_flags_core_failure():
reactor = Reactor.default()
state = reactor.initial_state()
state.core.fuel_temperature = constants.MAX_CORE_TEMPERATURE
failures = reactor.health_monitor.evaluate(state, True, True, True, dt=200.0)
failures = reactor.health_monitor.evaluate(state, True, True, [True, True, True], dt=200.0)
assert "core" in failures
reactor._handle_failure("core")
assert reactor.shutdown is True
def test_maintenance_recovers_component_health():
monitor = HealthMonitor()
pump = monitor.component("secondary_pump")
pump.integrity = 0.3
pump.fail()
restored = monitor.maintain("secondary_pump", amount=0.5)
assert restored is True
assert pump.integrity == pytest.approx(0.8)
assert pump.failed is False