Compare commits

...

22 Commits

Author SHA1 Message Date
Codex Agent
b03e80da9f Add generator power model and meltdown handling 2025-11-22 20:04:04 +01:00
Codex Agent
e7d8a34818 Retune reactivity for withdrawn and partially inserted rods 2025-11-22 19:41:34 +01:00
Codex Agent
287a8e186a Restore full power when rods are withdrawn 2025-11-22 19:27:18 +01:00
Codex Agent
a8da5371db Restore component health progress bars 2025-11-22 19:17:22 +01:00
Codex Agent
e81a9fdbe3 Add spool dynamics for pumps and turbines 2025-11-22 19:13:57 +01:00
Codex Agent
e18f100e15 Handle individual pump unit toggles 2025-11-22 19:07:45 +01:00
Codex Agent
c2d44bbdd3 Guard dashboard window sizing to avoid curses NULL 2025-11-22 19:02:19 +01:00
Codex Agent
7ef61c537f Add per-pump toggles and persist pump unit states 2025-11-22 18:57:04 +01:00
Codex Agent
e664af63ef Move component health into main dashboard panel 2025-11-22 18:53:31 +01:00
Codex Agent
d1751647cd Fix dashboard window sizing to avoid curses error 2025-11-22 18:50:04 +01:00
Codex Agent
038d3d0fea Relax dashboard size requirements 2025-11-22 18:48:51 +01:00
Codex Agent
511544c2eb Persist manual control and pump states 2025-11-22 18:47:19 +01:00
Codex Agent
f626c99f44 Show per-pump status and per-turbine output 2025-11-22 18:43:53 +01:00
Codex Agent
5bd9992ed0 Stop degrading pumps when offline 2025-11-22 18:39:20 +01:00
Codex Agent
290b7a8565 Add toggleable maintenance state and status panel 2025-11-22 18:37:11 +01:00
Codex Agent
ecc193f61b Invert rod hotkeys and update instructions 2025-11-22 18:30:18 +01:00
Codex Agent
c9cd669ca4 Bias shutdown reactivity and keep cold start subcritical 2025-11-22 18:28:40 +01:00
Codex Agent
7a0b5c4b96 Start in cold manual state with rods inserted 2025-11-22 18:25:23 +01:00
Codex Agent
fef1d32fbe Restore three turbines and update controls 2025-11-22 18:22:24 +01:00
Codex Agent
2434034505 Handle heat-sink loss and update turbine controls 2025-11-22 18:20:21 +01:00
Codex Agent
8d9b57f86d Ensure dashboard quit saves state and ignore artifacts 2025-11-22 18:12:17 +01:00
Codex Agent
ba65f84cd4 Fix dashboard reset logging 2025-11-22 18:06:27 +01:00
18 changed files with 926 additions and 204 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.venv/ .venv/
**/__pycache__/ **/__pycache__/
*.egg-info/ *.egg-info/
artifacts/

View File

@@ -12,7 +12,7 @@ All source code lives under `src/reactor_sim`. Submodules map to plant systems:
- A git remote for `origin` is configured; push changes to `origin/main` once work is complete so dashboards stay in sync. - A git remote for `origin` is configured; push changes to `origin/main` once work is complete so dashboards stay in sync.
## Operations & Control Hooks ## Operations & Control Hooks
Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, 1/2/3 toggle individual turbines, r reset/clear saved state, +/- rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation. Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, 1/2/3 toggle individual turbines, y/u/i turbine maintenance, m/n pump maintenance, k core maintenance, c consumer, r reset/clear saved state, + insert rods / - withdraw rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation.
The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches. The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches.
## Coding Style & Naming Conventions ## Coding Style & Naming Conventions

View File

@@ -1,104 +0,0 @@
{
"control": {
"setpoint_mw": 3000.0,
"rod_fraction": 0.3872181667930225
},
"plant": {
"core": {
"fuel_temperature": 633.2342060602825,
"neutron_flux": 31335131.43776027,
"reactivity_margin": 0.006241667151538859,
"power_output_mw": 2976.900110634399,
"burnup": 0.004945521266135161
},
"primary_loop": {
"temperature_in": 295.0,
"temperature_out": 338.7522062115579,
"pressure": 14.0,
"mass_flow_rate": 16200.0,
"steam_quality": 0.0
},
"secondary_loop": {
"temperature_in": 295.0,
"temperature_out": 360.05377003828596,
"pressure": 6.650537700382859,
"mass_flow_rate": 10880.0,
"steam_quality": 0.6505377003828593
},
"turbines": [
{
"steam_enthalpy": 3090.3226202297155,
"shaft_power_mw": 336.9056685758782,
"electrical_output_mw": 323.4294418328431,
"condenser_temperature": 305.0,
"load_demand_mw": 0.0,
"load_supplied_mw": 0.0
},
{
"steam_enthalpy": 3090.3226202297155,
"shaft_power_mw": 336.9056685758782,
"electrical_output_mw": 323.4294418328431,
"condenser_temperature": 305.0,
"load_demand_mw": 0.0,
"load_supplied_mw": 0.0
},
{
"steam_enthalpy": 3090.3226202297155,
"shaft_power_mw": 336.9056685758782,
"electrical_output_mw": 323.4294418328431,
"condenser_temperature": 305.0,
"load_demand_mw": 0.0,
"load_supplied_mw": 0.0
}
],
"time_elapsed": 600.0
},
"metadata": {
"primary_pump_active": true,
"secondary_pump_active": true,
"turbine_active": true,
"turbine_units": [
true,
true,
true
],
"shutdown": false,
"consumer": {
"online": false,
"demand_mw": 800.0,
"name": "Grid"
}
},
"health": {
"core": {
"name": "core",
"integrity": 0.9400000000000066,
"failed": false
},
"primary_pump": {
"name": "primary_pump",
"integrity": 0.5800000000000063,
"failed": false
},
"secondary_pump": {
"name": "secondary_pump",
"integrity": 0.1120000000000021,
"failed": false
},
"turbine_1": {
"name": "turbine_1",
"integrity": 0.610000000000003,
"failed": false
},
"turbine_2": {
"name": "turbine_2",
"integrity": 0.610000000000003,
"failed": false
},
"turbine_3": {
"name": "turbine_3",
"integrity": 0.610000000000003,
"failed": false
}
}
}

View File

@@ -21,6 +21,9 @@ class ReactorCommand:
consumer_demand: float | None = None consumer_demand: float | None = None
rod_manual: bool | None = None rod_manual: bool | None = None
turbine_units: dict[int, bool] | None = None turbine_units: dict[int, bool] | None = None
primary_pumps: dict[int, bool] | None = None
secondary_pumps: dict[int, bool] | None = None
generator_units: dict[int, bool] | None = None
maintenance_components: tuple[str, ...] = tuple() maintenance_components: tuple[str, ...] = tuple()
@classmethod @classmethod

View File

@@ -8,15 +8,23 @@ NEUTRON_LIFETIME = 0.1 # seconds, prompt neutron lifetime surrogate
FUEL_ENERGY_DENSITY = 200.0 * MEGAWATT # J/kg released as heat FUEL_ENERGY_DENSITY = 200.0 * MEGAWATT # J/kg released as heat
COOLANT_HEAT_CAPACITY = 4_200.0 # J/(kg*K) for water/steam COOLANT_HEAT_CAPACITY = 4_200.0 # J/(kg*K) for water/steam
COOLANT_DENSITY = 700.0 # kg/m^3 averaged between phases COOLANT_DENSITY = 700.0 # kg/m^3 averaged between phases
MAX_CORE_TEMPERATURE = 1_800.0 # K CORE_MELTDOWN_TEMPERATURE = 2_873.0 # K (approx 2600C) threshold for irreversible meltdown
MAX_CORE_TEMPERATURE = CORE_MELTDOWN_TEMPERATURE # Allow simulation to approach meltdown temperature
MAX_PRESSURE = 15.0 # MPa typical PWR primary loop limit MAX_PRESSURE = 15.0 # MPa typical PWR primary loop limit
CONTROL_ROD_SPEED = 0.03 # fraction insertion per second CONTROL_ROD_SPEED = 0.03 # fraction insertion per second
CONTROL_ROD_WORTH = 0.042 # delta rho contribution when fully withdrawn
STEAM_TURBINE_EFFICIENCY = 0.34 STEAM_TURBINE_EFFICIENCY = 0.34
GENERATOR_EFFICIENCY = 0.96 GENERATOR_EFFICIENCY = 0.96
ENVIRONMENT_TEMPERATURE = 295.0 # K ENVIRONMENT_TEMPERATURE = 295.0 # K
AMU_TO_KG = 1.660_539_066_60e-27 AMU_TO_KG = 1.660_539_066_60e-27
MEV_TO_J = 1.602_176_634e-13 MEV_TO_J = 1.602_176_634e-13
ELECTRON_FISSION_CROSS_SECTION = 5e-16 # cm^2, tuned for simulation scale ELECTRON_FISSION_CROSS_SECTION = 5e-16 # cm^2, tuned for simulation scale
PUMP_SPOOL_TIME = 5.0 # seconds to reach commanded flow
TURBINE_SPOOL_TIME = 12.0 # seconds to reach steady output
GENERATOR_SPOOL_TIME = 10.0 # seconds to reach full output
# Auxiliary power assumptions
PUMP_POWER_MW = 12.0 # MW draw per pump unit
BASE_AUX_LOAD_MW = 5.0 # control, instrumentation, misc.
# Threshold inventories (event counts) for flagging common poisons in diagnostics. # Threshold inventories (event counts) for flagging common poisons in diagnostics.
KEY_POISON_THRESHOLDS = { KEY_POISON_THRESHOLDS = {
"Xe": 1e20, # xenon "Xe": 1e20, # xenon

View File

@@ -77,6 +77,7 @@ class ControlSystem:
"control": { "control": {
"setpoint_mw": self.setpoint_mw, "setpoint_mw": self.setpoint_mw,
"rod_fraction": self.rod_fraction, "rod_fraction": self.rod_fraction,
"manual_control": self.manual_control,
}, },
"plant": plant_state.to_dict(), "plant": plant_state.to_dict(),
"metadata": metadata or {}, "metadata": metadata or {},
@@ -94,6 +95,7 @@ class ControlSystem:
control = data.get("control", {}) control = data.get("control", {})
self.setpoint_mw = control.get("setpoint_mw", self.setpoint_mw) self.setpoint_mw = control.get("setpoint_mw", self.setpoint_mw)
self.rod_fraction = control.get("rod_fraction", self.rod_fraction) self.rod_fraction = control.get("rod_fraction", self.rod_fraction)
self.manual_control = control.get("manual_control", self.manual_control)
plant = PlantState.from_dict(data["plant"]) plant = PlantState.from_dict(data["plant"])
LOGGER.info("Loaded plant state from %s", path) LOGGER.info("Loaded plant state from %s", path)
return plant, data.get("metadata", {}), data.get("health") return plant, data.get("metadata", {}), data.get("health")

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
import logging import logging
from . import constants
from .state import CoolantLoopState from .state import CoolantLoopState
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@@ -14,6 +15,7 @@ LOGGER = logging.getLogger(__name__)
class Pump: class Pump:
nominal_flow: float nominal_flow: float
efficiency: float = 0.9 efficiency: float = 0.9
spool_time: float = constants.PUMP_SPOOL_TIME
def flow_rate(self, demand: float) -> float: def flow_rate(self, demand: float) -> float:
demand = max(0.0, min(1.0, demand)) demand = max(0.0, min(1.0, demand))

View File

@@ -15,6 +15,8 @@ from .reactor import Reactor
from .simulation import ReactorSimulation from .simulation import ReactorSimulation
from .state import PlantState from .state import PlantState
LOGGER = logging.getLogger(__name__)
@dataclass @dataclass
class DashboardKey: class DashboardKey:
@@ -40,6 +42,7 @@ class ReactorDashboard:
self.sim: Optional[ReactorSimulation] = None self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False self.quit_requested = False
self.reset_requested = False self.reset_requested = False
self._last_state: Optional[PlantState] = None
self.log_buffer: deque[str] = deque(maxlen=4) self.log_buffer: deque[str] = deque(maxlen=4)
self._log_handler: Optional[logging.Handler] = None self._log_handler: Optional[logging.Handler] = None
self._previous_handlers: list[logging.Handler] = [] self._previous_handlers: list[logging.Handler] = []
@@ -47,15 +50,20 @@ class ReactorDashboard:
self.keys = [ self.keys = [
DashboardKey("q", "Quit & save"), DashboardKey("q", "Quit & save"),
DashboardKey("space", "SCRAM"), DashboardKey("space", "SCRAM"),
DashboardKey("p", "Toggle primary pump"), DashboardKey("g", "Toggle primary pump 1"),
DashboardKey("o", "Toggle secondary pump"), DashboardKey("h", "Toggle primary pump 2"),
DashboardKey("j", "Toggle secondary pump 1"),
DashboardKey("k", "Toggle secondary pump 2"),
DashboardKey("b", "Toggle generator 1"),
DashboardKey("v", "Toggle generator 2"),
DashboardKey("t", "Toggle turbine"), DashboardKey("t", "Toggle turbine"),
DashboardKey("1/2/3", "Toggle turbine units 1-3"), DashboardKey("1/2/3", "Toggle turbine units 1-3"),
DashboardKey("y/u/i", "Maintain turbine 1/2/3"), DashboardKey("y/u/i", "Maintain turbine 1/2/3"),
DashboardKey("c", "Toggle consumer"), DashboardKey("c", "Toggle consumer"),
DashboardKey("r", "Reset & clear state"), DashboardKey("r", "Reset & clear state"),
DashboardKey("m", "Maintain primary pump"), DashboardKey("m/n", "Maintain primary pumps 1/2"),
DashboardKey("n", "Maintain secondary pump"), DashboardKey(",/.", "Maintain secondary pumps 1/2"),
DashboardKey("B/V", "Maintain generator 1/2"),
DashboardKey("k", "Maintain core (requires shutdown)"), DashboardKey("k", "Maintain core (requires shutdown)"),
DashboardKey("+/-", "Withdraw/insert rods"), DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"), DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
@@ -89,9 +97,13 @@ class ReactorDashboard:
self.sim.start_state = self.start_state self.sim.start_state = self.start_state
try: try:
for state in self.sim.run(): for state in self.sim.run():
self._last_state = state
self._draw(stdscr, state) self._draw(stdscr, state)
self._handle_input(stdscr) self._handle_input(stdscr)
if self.quit_requested or self.reset_requested: if self.quit_requested or self.reset_requested:
# Persist the latest state if we are exiting early.
if self.sim:
self.sim.last_state = state
self.sim.stop() self.sim.stop()
break break
finally: finally:
@@ -118,18 +130,33 @@ class ReactorDashboard:
if ch == ord(" "): if ch == ord(" "):
self._queue_command(ReactorCommand.scram_all()) self._queue_command(ReactorCommand.scram_all())
elif ch in (ord("p"), ord("P")): elif ch in (ord("p"), ord("P")):
self._queue_command(ReactorCommand(primary_pump_on=not self.reactor.primary_pump_active)) # Deprecated master toggles ignored.
continue
elif ch in (ord("o"), ord("O")): elif ch in (ord("o"), ord("O")):
self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active)) continue
elif ch in (ord("g"), ord("G")):
self._toggle_primary_pump_unit(0)
elif ch in (ord("h"), ord("H")):
self._toggle_primary_pump_unit(1)
elif ch in (ord("j"), ord("J")):
self._toggle_secondary_pump_unit(0)
elif ch in (ord("k"), ord("K")):
self._toggle_secondary_pump_unit(1)
elif ch in (ord("b"), ord("B")):
self._toggle_generator_unit(0)
elif ch in (ord("v"), ord("V")):
self._toggle_generator_unit(1)
elif ch in (ord("t"), ord("T")): elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active)) self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif ord("1") <= ch <= ord("9"): elif ord("1") <= ch <= ord("9"):
idx = ch - ord("1") idx = ch - ord("1")
self._toggle_turbine_unit(idx) self._toggle_turbine_unit(idx)
elif ch in (ord("+"), ord("=")): elif ch in (ord("+"), ord("=")):
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05))) # Insert rods (increase fraction)
elif ch == ord("-"):
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(0.05))) self._queue_command(ReactorCommand(rod_position=self._clamped_rod(0.05)))
elif ch == ord("-"):
# Withdraw rods (decrease fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05)))
elif ch == ord("["): elif ch == ord("["):
demand = self._current_demand() - 50.0 demand = self._current_demand() - 50.0
self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand))) self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand)))
@@ -148,11 +175,19 @@ class ReactorDashboard:
elif ch in (ord("a"), ord("A")): elif ch in (ord("a"), ord("A")):
self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control)) self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control))
elif ch in (ord("m"), ord("M")): elif ch in (ord("m"), ord("M")):
self._queue_command(ReactorCommand.maintain("primary_pump")) self._queue_command(ReactorCommand.maintain("primary_pump_1"))
elif ch in (ord("n"), ord("N")): elif ch in (ord("n"), ord("N")):
self._queue_command(ReactorCommand.maintain("secondary_pump")) self._queue_command(ReactorCommand.maintain("primary_pump_2"))
elif ch in (ord("k"), ord("K")): elif ch in (ord("k"), ord("K")):
self._queue_command(ReactorCommand.maintain("core")) self._queue_command(ReactorCommand.maintain("core"))
elif ch == ord(","):
self._queue_command(ReactorCommand.maintain("secondary_pump_1"))
elif ch == ord("."):
self._queue_command(ReactorCommand.maintain("secondary_pump_2"))
elif ch in (ord("B"),):
self._queue_command(ReactorCommand.maintain("generator_1"))
elif ch in (ord("V"),):
self._queue_command(ReactorCommand.maintain("generator_2"))
elif ch in (ord("y"), ord("Y")): elif ch in (ord("y"), ord("Y")):
self._queue_command(ReactorCommand.maintain("turbine_1")) self._queue_command(ReactorCommand.maintain("turbine_1"))
elif ch in (ord("u"), ord("U")): elif ch in (ord("u"), ord("U")):
@@ -185,6 +220,25 @@ class ReactorDashboard:
current = self.reactor.turbine_unit_active[index] current = self.reactor.turbine_unit_active[index]
self._queue_command(ReactorCommand(turbine_units={index + 1: not current})) self._queue_command(ReactorCommand(turbine_units={index + 1: not current}))
def _toggle_primary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.primary_pump_units):
return
current = self.reactor.primary_pump_units[index]
self._queue_command(ReactorCommand(primary_pumps={index + 1: not current}))
def _toggle_secondary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.secondary_pump_units):
return
current = self.reactor.secondary_pump_units[index]
self._queue_command(ReactorCommand(secondary_pumps={index + 1: not current}))
def _toggle_generator_unit(self, index: int) -> None:
current = False
if self._last_state and index < len(self._last_state.generators):
gen = self._last_state.generators[index]
current = gen.running or gen.starting
self._queue_command(ReactorCommand(generator_units={index + 1: not current}))
def _request_reset(self) -> None: def _request_reset(self) -> None:
self.reset_requested = True self.reset_requested = True
if self.sim: if self.sim:
@@ -217,26 +271,32 @@ class ReactorDashboard:
def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None: def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
stdscr.erase() stdscr.erase()
height, width = stdscr.getmaxyx() height, width = stdscr.getmaxyx()
if height < 24 or width < 90: min_status = 4
if height < min_status + 12 or width < 70:
stdscr.addstr( stdscr.addstr(
0, 0,
0, 0,
"Terminal window too small. Resize to at least 90x24.".ljust(width), "Terminal too small; try >=70x16 or reduce font size.".ljust(width),
curses.color_pair(4), curses.color_pair(4),
) )
stdscr.refresh() stdscr.refresh()
return return
data_height = height - 6 status_height = min_status
right_width = max(32, width // 3) data_height = height - status_height
right_width = max(28, width // 3)
left_width = width - right_width left_width = width - right_width
if left_width < 60: if left_width < 50:
left_width = min(60, width - 20) left_width = min(50, width - 18)
right_width = width - left_width right_width = width - left_width
data_height = max(1, data_height)
left_width = max(1, left_width)
right_width = max(1, right_width)
data_win = stdscr.derwin(data_height, left_width, 0, 0) data_win = stdscr.derwin(data_height, left_width, 0, 0)
help_win = stdscr.derwin(data_height, right_width, 0, left_width) help_win = stdscr.derwin(data_height, right_width, 0, left_width)
status_win = stdscr.derwin(6, width, data_height, 0) status_win = stdscr.derwin(status_height, width, data_height, 0)
self._draw_data_panel(data_win, state) self._draw_data_panel(data_win, state)
self._draw_help_panel(help_win) self._draw_help_panel(help_win)
@@ -268,7 +328,8 @@ class ReactorDashboard:
y, y,
"Primary Loop", "Primary Loop",
[ [
("Pump", "ON" if self.reactor.primary_pump_active else "OFF"), ("Pump1", self._pump_status(state.primary_pumps, 0)),
("Pump2", self._pump_status(state.primary_pumps, 1)),
("Flow", f"{state.primary_loop.mass_flow_rate:7.0f} kg/s"), ("Flow", f"{state.primary_loop.mass_flow_rate:7.0f} kg/s"),
("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"), ("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"),
("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K"), ("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K"),
@@ -280,13 +341,20 @@ class ReactorDashboard:
y, y,
"Secondary Loop", "Secondary Loop",
[ [
("Pump", "ON" if self.reactor.secondary_pump_active else "OFF"), ("Pump1", self._pump_status(state.secondary_pumps, 0)),
("Pump2", self._pump_status(state.secondary_pumps, 1)),
("Flow", f"{state.secondary_loop.mass_flow_rate:7.0f} kg/s"), ("Flow", f"{state.secondary_loop.mass_flow_rate:7.0f} kg/s"),
("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K"), ("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K"),
("Pressure", f"{state.secondary_loop.pressure:5.2f} MPa"), ("Pressure", f"{state.secondary_loop.pressure:5.2f} MPa"),
("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}"), ("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}"),
], ],
) )
y = self._draw_section(
win,
y,
"Generators",
self._generator_lines(state),
)
consumer_status = "n/a" consumer_status = "n/a"
consumer_demand = 0.0 consumer_demand = 0.0
if self.reactor.consumer: if self.reactor.consumer:
@@ -298,13 +366,23 @@ class ReactorDashboard:
"Turbine / Grid", "Turbine / Grid",
[ [
("Turbines", " ".join(self._turbine_status_lines())), ("Turbines", " ".join(self._turbine_status_lines())),
("Unit1 Elec", f"{state.turbines[0].electrical_output_mw:7.1f} MW" if state.turbines else "n/a"),
(
"Unit2 Elec",
f"{state.turbines[1].electrical_output_mw:7.1f} MW" if len(state.turbines) > 1 else "n/a",
),
(
"Unit3 Elec",
f"{state.turbines[2].electrical_output_mw:7.1f} MW" if len(state.turbines) > 2 else "n/a",
),
("Electrical", f"{state.total_electrical_output():7.1f} MW"), ("Electrical", f"{state.total_electrical_output():7.1f} MW"),
("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"), ("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"),
("Consumer", f"{consumer_status}"), ("Consumer", f"{consumer_status}"),
("Demand", f"{consumer_demand:7.1f} MW"), ("Demand", f"{consumer_demand:7.1f} MW"),
], ],
) )
self._draw_health_bar(win, y + 1) y = self._draw_section(win, y, "Maintenance", self._maintenance_lines())
y = self._draw_health_bars(win, y)
def _draw_help_panel(self, win: "curses._CursesWindow") -> None: def _draw_help_panel(self, win: "curses._CursesWindow") -> None:
win.erase() win.erase()
@@ -319,7 +397,7 @@ class ReactorDashboard:
"Start pumps before withdrawing rods.", "Start pumps before withdrawing rods.",
"Bring turbine and consumer online after thermal stabilization.", "Bring turbine and consumer online after thermal stabilization.",
"Toggle turbine units (1/2/3) for staggered maintenance.", "Toggle turbine units (1/2/3) for staggered maintenance.",
"Use m/n/k/y/u/i to request maintenance (stop equipment first).", "Use m/n/,/. for pump maintenance; B/V for generators.",
"Press 'r' to reset/clear state if you want a cold start.", "Press 'r' to reset/clear state if you want a cold start.",
"Watch component health to avoid automatic trips.", "Watch component health to avoid automatic trips.",
] ]
@@ -408,25 +486,61 @@ class ReactorDashboard:
lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}")) lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}"))
return lines return lines
def _draw_health_bar(self, win: "curses._CursesWindow", start_y: int) -> None: def _health_lines(self) -> list[tuple[str, str]]:
comps = self.reactor.health_monitor.components
lines: list[tuple[str, str]] = []
for name, comp in comps.items():
pct = f"{comp.integrity*100:5.1f}%"
state = "FAILED" if comp.failed else pct
lines.append((name, state))
return lines
def _maintenance_lines(self) -> list[tuple[str, str]]:
if not self.reactor.maintenance_active:
return [("Active", "None")]
return [(comp, "IN PROGRESS") for comp in sorted(self.reactor.maintenance_active)]
def _generator_lines(self, state: PlantState) -> list[tuple[str, str]]:
if not state.generators:
return [("Status", "n/a")]
lines: list[tuple[str, str]] = []
for idx, gen in enumerate(state.generators):
status = "RUN" if gen.running else "START" if gen.starting else "OFF"
spool = f" spool {gen.spool_remaining:4.1f}s" if gen.starting else ""
lines.append((f"Gen{idx + 1}", f"{status} {gen.power_output_mw:6.1f} MW{spool}"))
lines.append((f" Battery", f"{gen.battery_charge*100:5.1f}%"))
return lines
def _draw_health_bars(self, win: "curses._CursesWindow", start_y: int) -> int:
height, width = win.getmaxyx() height, width = win.getmaxyx()
inner_width = width - 4
if start_y >= height - 2: if start_y >= height - 2:
return return height - 2
win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1)) win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1))
bar_width = max(10, min(width - 28, 40)) bar_width = max(8, min(inner_width - 18, 40))
for idx, (name, comp) in enumerate(self.reactor.health_monitor.components.items(), start=1): row = start_y + 1
filled = int(bar_width * comp.integrity) for name, comp in self.reactor.health_monitor.components.items():
bar = "" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
row = start_y + idx
if row >= height - 1: if row >= height - 1:
break break
label = f"{name:<12}:" label = f"{name:<12}"
win.addstr(row, 4, label[:14], curses.A_BOLD) target = 0.0 if comp.failed else comp.integrity
bar_start = 4 + max(len(label), 14) + 1 filled = int(bar_width * max(0.0, min(1.0, target)))
bar = "#" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
win.addstr(row, 4, f"{label}:")
bar_start = 4 + len(label) + 1
win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color)) win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color))
percent_col = min(width - 8, bar_start + bar_width + 2) percent_text = "FAILED" if comp.failed else f"{comp.integrity*100:5.1f}%"
win.addstr(row, percent_col, f"{comp.integrity*100:5.1f}%", curses.color_pair(color)) percent_x = min(width - len(percent_text) - 2, bar_start + bar_width + 2)
win.addstr(row, percent_x, percent_text, curses.color_pair(color))
row += 1
return row + 1
def _pump_status(self, pumps: list, index: int) -> str:
if index >= len(pumps):
return "n/a"
state = pumps[index]
return f"{'ON ' if state.active else 'OFF'} {state.flow_rate:6.0f} kg/s"
def _current_demand(self) -> float: def _current_demand(self) -> float:
if self.reactor.consumer: if self.reactor.consumer:

View File

@@ -53,8 +53,12 @@ class HealthMonitor:
def __init__(self) -> None: def __init__(self) -> None:
self.components: Dict[str, ComponentHealth] = { self.components: Dict[str, ComponentHealth] = {
"core": ComponentHealth("core"), "core": ComponentHealth("core"),
"primary_pump": ComponentHealth("primary_pump"), "primary_pump_1": ComponentHealth("primary_pump_1"),
"secondary_pump": ComponentHealth("secondary_pump"), "primary_pump_2": ComponentHealth("primary_pump_2"),
"secondary_pump_1": ComponentHealth("secondary_pump_1"),
"secondary_pump_2": ComponentHealth("secondary_pump_2"),
"generator_1": ComponentHealth("generator_1"),
"generator_2": ComponentHealth("generator_2"),
} }
for idx in range(3): for idx in range(3):
name = f"turbine_{idx + 1}" name = f"turbine_{idx + 1}"
@@ -67,32 +71,48 @@ class HealthMonitor:
def evaluate( def evaluate(
self, self,
state: PlantState, state: PlantState,
primary_active: bool, primary_units: Iterable[bool],
secondary_active: bool, secondary_units: Iterable[bool],
turbine_active: Iterable[bool], turbine_active: Iterable[bool],
generator_states: Iterable,
dt: float, dt: float,
) -> List[str]: ) -> List[str]:
events: list[str] = [] events: list[str] = []
turbine_flags = list(turbine_active) turbine_flags = list(turbine_active)
core = self.component("core") core = self.component("core")
core_temp = state.core.fuel_temperature core_temp = state.core.fuel_temperature
temp_stress = max(0.0, (core_temp - 900.0) / (constants.MAX_CORE_TEMPERATURE - 900.0)) temp_stress = max(0.0, (core_temp - 900.0) / max(1e-6, (constants.MAX_CORE_TEMPERATURE - 900.0)))
base_degrade = 0.0001 * dt base_degrade = 0.0001 * dt
core.degrade(base_degrade + temp_stress * 0.01 * dt) core.degrade(base_degrade + temp_stress * 0.01 * dt)
if primary_active: prim_units = list(primary_units)
primary_flow = state.primary_loop.mass_flow_rate sec_units = list(secondary_units)
flow_ratio = 0.0 if primary_flow <= 0 else min(1.0, primary_flow / 18_000.0) prim_states = state.primary_pumps or []
self.component("primary_pump").degrade((0.0002 + (1 - flow_ratio) * 0.005) * dt) sec_states = state.secondary_pumps or []
else: for idx, active in enumerate(prim_units):
self.component("primary_pump").degrade(0.0005 * dt) comp = self.component(f"primary_pump_{idx + 1}")
if idx < len(prim_states) and active:
flow = prim_states[idx].flow_rate
flow_ratio = 0.0 if flow <= 0 else min(1.0, flow / 9_000.0)
comp.degrade((0.0002 + (1 - flow_ratio) * 0.005) * dt)
else:
comp.degrade(0.0)
for idx, active in enumerate(sec_units):
comp = self.component(f"secondary_pump_{idx + 1}")
if idx < len(sec_states) and active:
flow = sec_states[idx].flow_rate
flow_ratio = 0.0 if flow <= 0 else min(1.0, flow / 8_000.0)
comp.degrade((0.0002 + (1 - flow_ratio) * 0.004) * dt)
else:
comp.degrade(0.0)
if secondary_active: for idx, gen_state in enumerate(generator_states):
secondary_flow = state.secondary_loop.mass_flow_rate comp = self.component(f"generator_{idx + 1}")
flow_ratio = 0.0 if secondary_flow <= 0 else min(1.0, secondary_flow / 16_000.0) running = getattr(gen_state, "running", False) or getattr(gen_state, "starting", False)
self.component("secondary_pump").degrade((0.0002 + (1 - flow_ratio) * 0.004) * dt) if running:
else: comp.degrade(0.00015 * dt)
self.component("secondary_pump").degrade(0.0005 * dt) else:
comp.degrade(0.0)
turbines = state.turbines if hasattr(state, "turbines") else [] turbines = state.turbines if hasattr(state, "turbines") else []
for idx, active in enumerate(turbine_flags): for idx, active in enumerate(turbine_flags):
@@ -126,6 +146,11 @@ class HealthMonitor:
mapped = "turbine_1" if name == "turbine" else name mapped = "turbine_1" if name == "turbine" else name
if mapped in self.components: if mapped in self.components:
self.components[mapped] = ComponentHealth.from_snapshot(comp_data) self.components[mapped] = ComponentHealth.from_snapshot(comp_data)
elif mapped == "primary_pump":
self.components["primary_pump_1"] = ComponentHealth.from_snapshot(comp_data)
self.components["primary_pump_2"] = ComponentHealth.from_snapshot(comp_data)
elif mapped == "secondary_pump":
self.components["secondary_pump_1"] = ComponentHealth.from_snapshot(comp_data)
def maintain(self, component: str, amount: float = 0.05) -> bool: def maintain(self, component: str, amount: float = 0.05) -> bool:
comp = self.components.get(component) comp = self.components.get(component)

View File

@@ -0,0 +1,68 @@
"""Auxiliary diesel generator model with spool dynamics."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from . import constants
LOGGER = logging.getLogger(__name__)
@dataclass
class GeneratorState:
running: bool
starting: bool
spool_remaining: float
power_output_mw: float
battery_charge: float
@dataclass
class DieselGenerator:
rated_output_mw: float = 50.0
spool_time: float = constants.GENERATOR_SPOOL_TIME
def start(self, state: GeneratorState) -> None:
if state.running or state.starting:
return
if state.battery_charge <= 0.05:
LOGGER.warning("Generator start failed: insufficient battery")
return
state.starting = True
state.spool_remaining = self.spool_time
LOGGER.info("Generator starting (spool %.0fs)", self.spool_time)
def stop(self, state: GeneratorState) -> None:
if not (state.running or state.starting):
return
state.running = False
state.starting = False
state.spool_remaining = 0.0
state.power_output_mw = 0.0
LOGGER.info("Generator stopped")
def step(self, state: GeneratorState, load_demand_mw: float, dt: float) -> float:
"""Advance generator dynamics and return delivered power."""
if state.starting:
state.spool_remaining = max(0.0, state.spool_remaining - dt)
state.power_output_mw = self.rated_output_mw * (1.0 - state.spool_remaining / max(self.spool_time, 1e-6))
if state.spool_remaining <= 0.0:
state.starting = False
state.running = True
LOGGER.info("Generator online at %.1f MW", self.rated_output_mw)
elif state.running:
available = self.rated_output_mw
state.power_output_mw = min(available, load_demand_mw)
else:
state.power_output_mw = 0.0
if state.running:
state.battery_charge = min(1.0, state.battery_charge + 0.02 * dt)
elif state.starting:
state.battery_charge = max(0.0, state.battery_charge - 0.01 * dt)
else:
state.battery_charge = max(0.0, state.battery_charge - 0.001 * dt)
return state.power_output_mw

View File

@@ -15,7 +15,7 @@ LOGGER = logging.getLogger(__name__)
def temperature_feedback(temp: float) -> float: def temperature_feedback(temp: float) -> float:
"""Negative coefficient: higher temperature lowers reactivity.""" """Negative coefficient: higher temperature lowers reactivity."""
reference = 900.0 reference = 900.0
coefficient = -1.5e-5 coefficient = -1.7e-5
return coefficient * (temp - reference) return coefficient * (temp - reference)
@@ -28,25 +28,35 @@ class NeutronDynamics:
beta_effective: float = 0.0065 beta_effective: float = 0.0065
delayed_neutron_fraction: float = 0.0008 delayed_neutron_fraction: float = 0.0008
external_source_coupling: float = 1e-6 external_source_coupling: float = 1e-6
shutdown_bias: float = -0.014
def reactivity(self, state: CoreState, control_fraction: float) -> float: def reactivity(self, state: CoreState, control_fraction: float) -> float:
rho = ( rho = (
0.02 * (1.0 - control_fraction) self.shutdown_bias +
constants.CONTROL_ROD_WORTH * (1.0 - control_fraction)
+ temperature_feedback(state.fuel_temperature) + temperature_feedback(state.fuel_temperature)
- fuel_reactivity_penalty(state.burnup) - fuel_reactivity_penalty(state.burnup)
- xenon_poisoning(state.neutron_flux) - xenon_poisoning(state.neutron_flux)
) )
return rho return rho
def flux_derivative(self, state: CoreState, rho: float, external_source_rate: float = 0.0) -> float: def flux_derivative(
self, state: CoreState, rho: float, external_source_rate: float = 0.0, baseline_source: float = 1e5
) -> float:
generation_time = constants.NEUTRON_LIFETIME generation_time = constants.NEUTRON_LIFETIME
beta = self.beta_effective beta = self.beta_effective
source_term = self.external_source_coupling * external_source_rate source_term = self.external_source_coupling * external_source_rate
return ((rho - beta) / generation_time) * state.neutron_flux + 1e5 + source_term return ((rho - beta) / generation_time) * state.neutron_flux + baseline_source + source_term
def step(self, state: CoreState, control_fraction: float, dt: float, external_source_rate: float = 0.0) -> None: def step(self, state: CoreState, control_fraction: float, dt: float, external_source_rate: float = 0.0) -> None:
rho = self.reactivity(state, control_fraction) rho = self.reactivity(state, control_fraction)
d_flux = self.flux_derivative(state, rho, external_source_rate) rho = min(rho, 0.02)
shutdown = control_fraction >= 0.95
if shutdown:
rho = min(rho, -0.04)
baseline = 0.0 if shutdown else 1e5
source = 0.0 if shutdown else external_source_rate
d_flux = self.flux_derivative(state, rho, source, baseline_source=baseline)
state.neutron_flux = max(0.0, state.neutron_flux + d_flux * dt) state.neutron_flux = max(0.0, state.neutron_flux + d_flux * dt)
state.reactivity_margin = rho state.reactivity_margin = rho
LOGGER.debug( LOGGER.debug(

View File

@@ -13,8 +13,9 @@ from .consumer import ElectricalConsumer
from .control import ControlSystem from .control import ControlSystem
from .failures import HealthMonitor from .failures import HealthMonitor
from .fuel import FuelAssembly, decay_heat_fraction from .fuel import FuelAssembly, decay_heat_fraction
from .generator import DieselGenerator, GeneratorState
from .neutronics import NeutronDynamics from .neutronics import NeutronDynamics
from .state import CoolantLoopState, CoreState, PlantState, TurbineState from .state import CoolantLoopState, CoreState, PlantState, PumpState, TurbineState
from .thermal import ThermalSolver, heat_transfer from .thermal import ThermalSolver, heat_transfer
from .turbine import SteamGenerator, Turbine from .turbine import SteamGenerator, Turbine
@@ -31,15 +32,20 @@ class Reactor:
thermal: ThermalSolver thermal: ThermalSolver
steam_generator: SteamGenerator steam_generator: SteamGenerator
turbines: list[Turbine] turbines: list[Turbine]
generators: list[DieselGenerator]
atomic_model: AtomicPhysics atomic_model: AtomicPhysics
consumer: ElectricalConsumer | None = None consumer: ElectricalConsumer | None = None
health_monitor: HealthMonitor = field(default_factory=HealthMonitor) health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
primary_pump_active: bool = True primary_pump_active: bool = True
secondary_pump_active: bool = True secondary_pump_active: bool = True
primary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
secondary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
turbine_active: bool = True turbine_active: bool = True
turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True]) turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True])
shutdown: bool = False shutdown: bool = False
meltdown: bool = False
poison_alerts: set[str] = field(default_factory=set) poison_alerts: set[str] = field(default_factory=set)
maintenance_active: set[str] = field(default_factory=set)
def __post_init__(self) -> None: def __post_init__(self) -> None:
if not self.turbines: if not self.turbines:
@@ -47,6 +53,12 @@ class Reactor:
if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines): if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines):
self.turbine_unit_active = [True] * len(self.turbines) self.turbine_unit_active = [True] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active) self.turbine_active = any(self.turbine_unit_active)
if not self.generators:
self.generators = [DieselGenerator() for _ in range(2)]
if not self.primary_pump_units or len(self.primary_pump_units) != 2:
self.primary_pump_units = [True, True]
if not self.secondary_pump_units or len(self.secondary_pump_units) != 2:
self.secondary_pump_units = [True, True]
@classmethod @classmethod
def default(cls) -> "Reactor": def default(cls) -> "Reactor":
@@ -60,6 +72,7 @@ class Reactor:
thermal=ThermalSolver(), thermal=ThermalSolver(),
steam_generator=SteamGenerator(), steam_generator=SteamGenerator(),
turbines=[Turbine() for _ in range(3)], turbines=[Turbine() for _ in range(3)],
generators=[DieselGenerator() for _ in range(2)],
atomic_model=atomic_model, atomic_model=atomic_model,
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False), consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
health_monitor=HealthMonitor(), health_monitor=HealthMonitor(),
@@ -76,6 +89,18 @@ class Reactor:
fission_product_inventory={}, fission_product_inventory={},
emitted_particles={}, emitted_particles={},
) )
# Default to a cold, safe configuration: rods fully inserted, manual control, pumps/turbines off.
self.control.manual_control = True
self.control.rod_fraction = 0.95
self.shutdown = True
self.meltdown = False
self.primary_pump_active = False
self.secondary_pump_active = False
self.turbine_unit_active = [False] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active)
if self.consumer:
self.consumer.set_online(False)
primary = CoolantLoopState( primary = CoolantLoopState(
temperature_in=ambient, temperature_in=ambient,
temperature_out=ambient, temperature_out=ambient,
@@ -90,6 +115,12 @@ class Reactor:
mass_flow_rate=0.0, mass_flow_rate=0.0,
steam_quality=0.0, steam_quality=0.0,
) )
primary_pumps = [PumpState(active=self.primary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)]
secondary_pumps = [PumpState(active=self.secondary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)]
generator_states = [
GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0)
for _ in self.generators
]
turbine_states = [ turbine_states = [
TurbineState( TurbineState(
steam_enthalpy=2_000.0, steam_enthalpy=2_000.0,
@@ -101,7 +132,15 @@ class Reactor:
) )
for _ in self.turbines for _ in self.turbines
] ]
return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbines=turbine_states) return PlantState(
core=core,
primary_loop=primary,
secondary_loop=secondary,
turbines=turbine_states,
primary_pumps=primary_pumps,
secondary_pumps=secondary_pumps,
generators=generator_states,
)
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None: def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
if self.shutdown: if self.shutdown:
@@ -109,6 +148,9 @@ class Reactor:
else: else:
rod_fraction = self.control.update_rods(state.core, dt) rod_fraction = self.control.update_rods(state.core, dt)
if state.core.fuel_temperature >= constants.CORE_MELTDOWN_TEMPERATURE and not self.meltdown:
self._trigger_meltdown(state)
overrides = {} overrides = {}
if command: if command:
overrides = self._apply_command(command, state) overrides = self._apply_command(command, state)
@@ -138,28 +180,124 @@ class Reactor:
self._check_poison_alerts(state) self._check_poison_alerts(state)
pump_demand = overrides.get("coolant_demand", self.control.coolant_demand(state.primary_loop)) pump_demand = overrides.get("coolant_demand", self.control.coolant_demand(state.primary_loop))
self.primary_pump_active = self.primary_pump_active and any(self.primary_pump_units)
self.secondary_pump_active = self.secondary_pump_active and any(self.secondary_pump_units)
primary_units_active = [
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
for idx in range(2)
]
secondary_units_active = [
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
for idx in range(2)
]
aux_demand = constants.BASE_AUX_LOAD_MW + constants.PUMP_POWER_MW * (
sum(primary_units_active) + sum(secondary_units_active)
)
turbine_electrical = state.total_electrical_output()
generator_power = self._step_generators(state, aux_demand, turbine_electrical, dt)
aux_available = turbine_electrical + generator_power
power_ratio = 1.0 if aux_demand <= 0 else min(1.0, aux_available / aux_demand)
if aux_demand > 0 and aux_available < 0.5 * aux_demand:
LOGGER.warning("Aux power deficit: available %.1f/%.1f MW", aux_available, aux_demand)
if self.primary_pump_active: if self.primary_pump_active:
self.primary_pump.step(state.primary_loop, pump_demand) total_flow = 0.0
target_pressure = (12.0 * pump_demand + 2.0) * power_ratio
loop_pressure = 0.5
target_flow = self.primary_pump.flow_rate(pump_demand) * power_ratio
for idx, pump_state in enumerate(state.primary_pumps):
unit_enabled = (
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
)
desired_flow = target_flow if unit_enabled else 0.0
desired_pressure = target_pressure if unit_enabled else 0.5
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, desired_flow, dt, self.primary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, desired_pressure, dt, self.primary_pump.spool_time
)
pump_state.active = (unit_enabled and power_ratio > 0.05) or pump_state.flow_rate > 1.0
total_flow += pump_state.flow_rate
loop_pressure = max(loop_pressure, pump_state.pressure)
state.primary_loop.mass_flow_rate = total_flow
state.primary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
)
else: else:
state.primary_loop.mass_flow_rate = 0.0 state.primary_loop.mass_flow_rate = self._ramp_value(
state.primary_loop.pressure = 0.5 state.primary_loop.mass_flow_rate, 0.0, dt, self.primary_pump.spool_time
)
state.primary_loop.pressure = self._ramp_value(
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
)
for pump_state in state.primary_pumps:
pump_state.active = False
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, 0.0, dt, self.primary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, state.primary_loop.pressure, dt, self.primary_pump.spool_time
)
if self.secondary_pump_active: if self.secondary_pump_active:
self.secondary_pump.step(state.secondary_loop, 0.75) total_flow = 0.0
target_pressure = 12.0 * 0.75 + 2.0
loop_pressure = 0.5
target_flow = self.secondary_pump.flow_rate(0.75)
for idx, pump_state in enumerate(state.secondary_pumps):
unit_enabled = (
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
)
desired_flow = target_flow if unit_enabled else 0.0
desired_pressure = target_pressure if unit_enabled else 0.5
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, desired_flow, dt, self.secondary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, desired_pressure, dt, self.secondary_pump.spool_time
)
pump_state.active = unit_enabled or pump_state.flow_rate > 1.0
total_flow += pump_state.flow_rate
loop_pressure = max(loop_pressure, pump_state.pressure)
state.secondary_loop.mass_flow_rate = total_flow
state.secondary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
)
else: else:
state.secondary_loop.mass_flow_rate = 0.0 state.secondary_loop.mass_flow_rate = self._ramp_value(
state.secondary_loop.pressure = 0.5 state.secondary_loop.mass_flow_rate, 0.0, dt, self.secondary_pump.spool_time
)
state.secondary_loop.pressure = self._ramp_value(
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
)
for pump_state in state.secondary_pumps:
pump_state.active = False
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, 0.0, dt, self.secondary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, state.secondary_loop.pressure, dt, self.secondary_pump.spool_time
)
self.thermal.step_core(state.core, state.primary_loop, total_power, dt) self.thermal.step_core(state.core, state.primary_loop, total_power, dt)
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power) if not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0:
transferred = 0.0
else:
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power)
self.thermal.step_secondary(state.secondary_loop, transferred) self.thermal.step_secondary(state.secondary_loop, transferred)
self._step_turbine_bank(state, transferred) self._step_turbine_bank(state, transferred, dt)
self._maintenance_tick(state, dt)
if (not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0) and total_power > 50.0:
self._handle_heat_sink_loss(state)
failures = self.health_monitor.evaluate( failures = self.health_monitor.evaluate(
state, state,
self.primary_pump_active, primary_units_active,
self.secondary_pump_active, secondary_units_active,
self.turbine_unit_active, self.turbine_unit_active,
state.generators,
dt, dt,
) )
for failure in failures: for failure in failures:
@@ -183,7 +321,7 @@ class Reactor:
sum(t.load_demand_mw for t in state.turbines), sum(t.load_demand_mw for t in state.turbines),
) )
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float) -> None: def _step_turbine_bank(self, state: PlantState, steam_power_mw: float, dt: float) -> None:
if not state.turbines: if not state.turbines:
return return
active_indices = [ active_indices = [
@@ -195,9 +333,9 @@ class Reactor:
break break
turbine_state = state.turbines[idx] turbine_state = state.turbines[idx]
if idx in active_indices: if idx in active_indices:
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit) turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit, dt=dt)
else: else:
self._reset_turbine_state(turbine_state) self._spin_down_turbine(turbine_state, dt, turbine.spool_time)
self._dispatch_consumer_load(state, active_indices) self._dispatch_consumer_load(state, active_indices)
def _reset_turbine_state(self, turbine_state: TurbineState) -> None: def _reset_turbine_state(self, turbine_state: TurbineState) -> None:
@@ -206,6 +344,23 @@ class Reactor:
turbine_state.load_demand_mw = 0.0 turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = 0.0 turbine_state.load_supplied_mw = 0.0
@staticmethod
def _ramp_value(current: float, target: float, dt: float, time_constant: float) -> float:
if time_constant <= 0.0:
return target
alpha = min(1.0, max(0.0, dt / time_constant))
return current + (target - current) * alpha
def _spin_down_turbine(self, turbine_state: TurbineState, dt: float, time_constant: float) -> None:
turbine_state.shaft_power_mw = self._ramp_value(turbine_state.shaft_power_mw, 0.0, dt, time_constant)
turbine_state.electrical_output_mw = self._ramp_value(
turbine_state.electrical_output_mw, 0.0, dt, time_constant
)
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = self._ramp_value(
turbine_state.load_supplied_mw, 0.0, dt, time_constant
)
def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None: def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None:
total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices) total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices)
if self.consumer: if self.consumer:
@@ -231,10 +386,15 @@ class Reactor:
LOGGER.critical("Core failure detected. Initiating SCRAM.") LOGGER.critical("Core failure detected. Initiating SCRAM.")
self.shutdown = True self.shutdown = True
self.control.scram() self.control.scram()
elif component == "primary_pump": elif component.startswith("primary_pump"):
self._set_primary_pump(False) idx = self._component_index(component)
elif component == "secondary_pump": self._toggle_primary_pump_unit(idx, False)
self._set_secondary_pump(False) elif component.startswith("secondary_pump"):
idx = self._component_index(component)
self._toggle_secondary_pump_unit(idx, False)
elif component.startswith("generator"):
idx = self._component_index(component)
LOGGER.warning("Generator %d failed", idx + 1)
elif component.startswith("turbine"): elif component.startswith("turbine"):
idx = self._component_index(component) idx = self._component_index(component)
self._set_turbine_state(False, index=idx) self._set_turbine_state(False, index=idx)
@@ -255,10 +415,15 @@ class Reactor:
self.shutdown = self.shutdown or command.rod_position >= 0.95 self.shutdown = self.shutdown or command.rod_position >= 0.95
elif command.rod_step is not None: elif command.rod_step is not None:
overrides["rod_fraction"] = self.control.increment_rods(command.rod_step) overrides["rod_fraction"] = self.control.increment_rods(command.rod_step)
if command.primary_pump_on is not None: if command.primary_pumps:
self._set_primary_pump(command.primary_pump_on) for idx, flag in command.primary_pumps.items():
if command.secondary_pump_on is not None: self._toggle_primary_pump_unit(idx - 1, flag)
self._set_secondary_pump(command.secondary_pump_on) if command.secondary_pumps:
for idx, flag in command.secondary_pumps.items():
self._toggle_secondary_pump_unit(idx - 1, flag)
if command.generator_units:
for idx, flag in command.generator_units.items():
self._toggle_generator(idx - 1, flag, state)
if command.turbine_on is not None: if command.turbine_on is not None:
self._set_turbine_state(command.turbine_on) self._set_turbine_state(command.turbine_on)
if command.turbine_units: if command.turbine_units:
@@ -272,18 +437,101 @@ class Reactor:
if command.coolant_demand is not None: if command.coolant_demand is not None:
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand)) overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
for component in command.maintenance_components: for component in command.maintenance_components:
self._perform_maintenance(component) self._toggle_maintenance(component)
return overrides return overrides
def _set_primary_pump(self, active: bool) -> None: def _set_primary_pump(self, active: bool) -> None:
if self.primary_pump_active != active: if self.primary_pump_active != active:
self.primary_pump_active = active self.primary_pump_active = active
LOGGER.info("Primary pump %s", "enabled" if active else "stopped") LOGGER.info("Primary pump %s", "enabled" if active else "stopped")
if not active:
self.primary_pump_units = [False] * len(self.primary_pump_units)
elif active and not any(self.primary_pump_units):
self.primary_pump_units = [True] * len(self.primary_pump_units)
def _set_secondary_pump(self, active: bool) -> None: def _set_secondary_pump(self, active: bool) -> None:
if self.secondary_pump_active != active: if self.secondary_pump_active != active:
self.secondary_pump_active = active self.secondary_pump_active = active
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped") LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
if not active:
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
elif active and not any(self.secondary_pump_units):
self.secondary_pump_units = [True] * len(self.secondary_pump_units)
def _toggle_primary_pump_unit(self, index: int, active: bool) -> None:
if index < 0 or index >= len(self.primary_pump_units):
LOGGER.warning("Ignoring primary pump index %s", index)
return
if self.primary_pump_units[index] != active:
self.primary_pump_units[index] = active
LOGGER.info("Primary pump %d %s", index + 1, "enabled" if active else "stopped")
if active:
self._set_primary_pump(True)
elif not any(self.primary_pump_units):
self._set_primary_pump(False)
def _toggle_secondary_pump_unit(self, index: int, active: bool) -> None:
if index < 0 or index >= len(self.secondary_pump_units):
LOGGER.warning("Ignoring secondary pump index %s", index)
return
if self.secondary_pump_units[index] != active:
self.secondary_pump_units[index] = active
LOGGER.info("Secondary pump %d %s", index + 1, "enabled" if active else "stopped")
if active:
self._set_secondary_pump(True)
elif not any(self.secondary_pump_units):
self._set_secondary_pump(False)
def _toggle_generator(self, index: int, active: bool, state: PlantState) -> None:
if index < 0 or index >= len(self.generators) or index >= len(state.generators):
LOGGER.warning("Ignoring generator index %s", index)
return
gen_state = state.generators[index]
if active:
self.generators[index].start(gen_state)
else:
self.generators[index].stop(gen_state)
def _trigger_meltdown(self, state: PlantState) -> None:
LOGGER.critical("Core meltdown in progress (%.1f K)", state.core.fuel_temperature)
self.meltdown = True
self.shutdown = True
self.control.scram()
try:
self.health_monitor.component("core").fail()
except KeyError:
pass
self._set_turbine_state(False)
def _step_generators(self, state: PlantState, aux_demand: float, turbine_electric: float, dt: float) -> float:
# Ensure we have generator state objects aligned with hardware.
if not state.generators or len(state.generators) < len(self.generators):
missing = len(self.generators) - len(state.generators)
for _ in range(missing):
state.generators.append(
GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0)
)
deficit = max(0.0, aux_demand - turbine_electric)
if deficit > 0.0:
for idx, gen_state in enumerate(state.generators):
if not (gen_state.running or gen_state.starting):
self.generators[idx].start(gen_state)
deficit -= self.generators[idx].rated_output_mw
if deficit <= 0:
break
elif turbine_electric > aux_demand:
for idx, gen_state in enumerate(state.generators):
if gen_state.running and not gen_state.starting:
self.generators[idx].stop(gen_state)
total_power = 0.0
remaining = max(0.0, aux_demand - turbine_electric)
for idx, gen_state in enumerate(state.generators):
load = remaining if remaining > 0 else 0.0
delivered = self.generators[idx].step(gen_state, load, dt)
total_power += delivered
remaining = max(0.0, remaining - delivered)
return total_power
def _set_turbine_state(self, active: bool, index: int | None = None) -> None: def _set_turbine_state(self, active: bool, index: int | None = None) -> None:
if index is None: if index is None:
@@ -301,30 +549,77 @@ class Reactor:
def _component_index(self, name: str) -> int: def _component_index(self, name: str) -> int:
if name == "turbine": if name == "turbine":
return 0 return 0
parts = name.split("_")
try: try:
return int(name.split("_")[1]) - 1 for token in reversed(parts):
except (IndexError, ValueError): return int(token) - 1
except (ValueError, TypeError):
return -1 return -1
def _perform_maintenance(self, component: str) -> None: def _perform_maintenance(self, component: str) -> None:
if not self._can_maintain(component):
return
self.health_monitor.maintain(component)
def _maintenance_tick(self, state: PlantState, dt: float) -> None:
if not self.maintenance_active:
return
completed: list[str] = []
for component in list(self.maintenance_active):
if not self._can_maintain(component):
continue
restored = self.health_monitor.maintain(component, amount=0.02 * dt)
comp_state = self.health_monitor.component(component)
if comp_state.integrity >= 0.999:
completed.append(component)
for comp in completed:
self.maintenance_active.discard(comp)
LOGGER.info("Maintenance completed for %s", comp)
def _toggle_maintenance(self, component: str) -> None:
if component in self.maintenance_active:
self.maintenance_active.remove(component)
LOGGER.info("Maintenance stopped for %s", component)
return
if not self._can_maintain(component):
return
self.maintenance_active.add(component)
LOGGER.info("Maintenance started for %s", component)
def _can_maintain(self, component: str) -> bool:
if component == "core" and not self.shutdown: if component == "core" and not self.shutdown:
LOGGER.warning("Cannot maintain core while reactor is running") LOGGER.warning("Cannot maintain core while reactor is running")
return return False
if component == "primary_pump" and self.primary_pump_active: if component.startswith("primary_pump_"):
LOGGER.warning("Stop primary pump before maintenance") idx = self._component_index(component)
return if idx < 0 or idx >= len(self.primary_pump_units):
if component == "secondary_pump" and self.secondary_pump_active: LOGGER.warning("Unknown primary pump maintenance target %s", component)
LOGGER.warning("Stop secondary pump before maintenance") return False
return if self.primary_pump_units[idx]:
LOGGER.warning("Stop primary pump %d before maintenance", idx + 1)
return False
if component.startswith("secondary_pump_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.secondary_pump_units):
LOGGER.warning("Unknown secondary pump maintenance target %s", component)
return False
if self.secondary_pump_units[idx]:
LOGGER.warning("Stop secondary pump %d before maintenance", idx + 1)
return False
if component.startswith("generator_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.generators):
LOGGER.warning("Unknown generator maintenance target %s", component)
return False
if component.startswith("turbine"): if component.startswith("turbine"):
idx = self._component_index(component) idx = self._component_index(component)
if idx < 0 or idx >= len(self.turbine_unit_active): if idx < 0 or idx >= len(self.turbine_unit_active):
LOGGER.warning("Unknown turbine maintenance target %s", component) LOGGER.warning("Unknown turbine maintenance target %s", component)
return return False
if self.turbine_unit_active[idx]: if self.turbine_unit_active[idx]:
LOGGER.warning("Stop turbine %d before maintenance", idx + 1) LOGGER.warning("Stop turbine %d before maintenance", idx + 1)
return return False
self.health_monitor.maintain(component) return True
def attach_consumer(self, consumer: ElectricalConsumer) -> None: def attach_consumer(self, consumer: ElectricalConsumer) -> None:
self.consumer = consumer self.consumer = consumer
@@ -339,9 +634,23 @@ class Reactor:
metadata = { metadata = {
"primary_pump_active": self.primary_pump_active, "primary_pump_active": self.primary_pump_active,
"secondary_pump_active": self.secondary_pump_active, "secondary_pump_active": self.secondary_pump_active,
"primary_pump_units": self.primary_pump_units,
"secondary_pump_units": self.secondary_pump_units,
"turbine_active": self.turbine_active, "turbine_active": self.turbine_active,
"turbine_units": self.turbine_unit_active, "turbine_units": self.turbine_unit_active,
"shutdown": self.shutdown, "shutdown": self.shutdown,
"meltdown": self.meltdown,
"maintenance_active": list(self.maintenance_active),
"generators": [
{
"running": g.running,
"starting": g.starting,
"spool_remaining": g.spool_remaining,
"power_output_mw": g.power_output_mw,
"battery_charge": g.battery_charge,
}
for g in state.generators
],
"consumer": { "consumer": {
"online": self.consumer.online if self.consumer else False, "online": self.consumer.online if self.consumer else False,
"demand_mw": self.consumer.demand_mw if self.consumer else 0.0, "demand_mw": self.consumer.demand_mw if self.consumer else 0.0,
@@ -354,11 +663,17 @@ class Reactor:
plant, metadata, health = self.control.load_state(filepath) plant, metadata, health = self.control.load_state(filepath)
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active) self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active) self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
self.primary_pump_units = list(metadata.get("primary_pump_units", self.primary_pump_units))
self.secondary_pump_units = list(metadata.get("secondary_pump_units", self.secondary_pump_units))
unit_states = metadata.get("turbine_units") unit_states = metadata.get("turbine_units")
if unit_states: if unit_states:
self.turbine_unit_active = list(unit_states) self.turbine_unit_active = list(unit_states)
self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active)) self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active))
self.shutdown = metadata.get("shutdown", self.shutdown) self.shutdown = metadata.get("shutdown", self.shutdown)
self.meltdown = metadata.get("meltdown", self.meltdown)
maint = metadata.get("maintenance_active")
if maint is not None:
self.maintenance_active = set(maint)
consumer_cfg = metadata.get("consumer") consumer_cfg = metadata.get("consumer")
if consumer_cfg: if consumer_cfg:
if not self.consumer: if not self.consumer:
@@ -373,6 +688,21 @@ class Reactor:
if health: if health:
self.health_monitor.load_snapshot(health) self.health_monitor.load_snapshot(health)
LOGGER.info("Reactor state restored from %s", filepath) LOGGER.info("Reactor state restored from %s", filepath)
# Back-fill pump state lists for compatibility.
if not plant.primary_pumps or len(plant.primary_pumps) < 2:
plant.primary_pumps = [
PumpState(active=self.primary_pump_active, flow_rate=plant.primary_loop.mass_flow_rate / 2, pressure=plant.primary_loop.pressure)
for _ in range(2)
]
if not plant.secondary_pumps or len(plant.secondary_pumps) < 2:
plant.secondary_pumps = [
PumpState(
active=self.secondary_pump_active,
flow_rate=plant.secondary_loop.mass_flow_rate / 2,
pressure=plant.secondary_loop.pressure,
)
for _ in range(2)
]
if len(plant.turbines) < len(self.turbines): if len(plant.turbines) < len(self.turbines):
ambient = constants.ENVIRONMENT_TEMPERATURE ambient = constants.ENVIRONMENT_TEMPERATURE
while len(plant.turbines) < len(self.turbines): while len(plant.turbines) < len(self.turbines):
@@ -386,8 +716,38 @@ class Reactor:
load_supplied_mw=0.0, load_supplied_mw=0.0,
) )
) )
gen_meta = metadata.get("generators", [])
if not plant.generators or len(plant.generators) < len(self.generators):
while len(plant.generators) < len(self.generators):
plant.generators.append(
GeneratorState(
running=False,
starting=False,
spool_remaining=0.0,
power_output_mw=0.0,
battery_charge=1.0,
)
)
for idx, gen_state in enumerate(plant.generators):
if idx < len(gen_meta):
cfg = gen_meta[idx]
gen_state.running = cfg.get("running", gen_state.running)
gen_state.starting = cfg.get("starting", gen_state.starting)
gen_state.spool_remaining = cfg.get("spool_remaining", gen_state.spool_remaining)
gen_state.power_output_mw = cfg.get("power_output_mw", gen_state.power_output_mw)
gen_state.battery_charge = cfg.get("battery_charge", gen_state.battery_charge)
return plant return plant
def _handle_heat_sink_loss(self, state: PlantState) -> None:
if not self.shutdown:
LOGGER.critical("Loss of secondary heat sink detected. Initiating SCRAM.")
self.shutdown = True
self.control.scram()
self._set_turbine_state(False)
# Clear turbine output and demands to reflect lost steam.
for turbine_state in state.turbines:
self._reset_turbine_state(turbine_state)
def _check_poison_alerts(self, state: PlantState) -> None: def _check_poison_alerts(self, state: PlantState) -> None:
inventory = state.core.fission_product_inventory or {} inventory = state.core.fission_product_inventory or {}
for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items(): for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items():

View File

@@ -4,6 +4,8 @@ from __future__ import annotations
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, field, asdict
from .generator import GeneratorState
def clamp(value: float, min_value: float, max_value: float) -> float: def clamp(value: float, min_value: float, max_value: float) -> float:
return max(min_value, min(max_value, value)) return max(min_value, min(max_value, value))
@@ -54,12 +56,22 @@ class TurbineState:
load_supplied_mw: float = 0.0 load_supplied_mw: float = 0.0
@dataclass
class PumpState:
active: bool
flow_rate: float
pressure: float
@dataclass @dataclass
class PlantState: class PlantState:
core: CoreState core: CoreState
primary_loop: CoolantLoopState primary_loop: CoolantLoopState
secondary_loop: CoolantLoopState secondary_loop: CoolantLoopState
turbines: list[TurbineState] turbines: list[TurbineState]
primary_pumps: list[PumpState] = field(default_factory=list)
secondary_pumps: list[PumpState] = field(default_factory=list)
generators: list[GeneratorState] = field(default_factory=list)
time_elapsed: float = field(default=0.0) time_elapsed: float = field(default=0.0)
def snapshot(self) -> dict[str, float]: def snapshot(self) -> dict[str, float]:
@@ -73,6 +85,9 @@ class PlantState:
"turbine_electric": self.total_electrical_output(), "turbine_electric": self.total_electrical_output(),
"products": self.core.fission_product_inventory, "products": self.core.fission_product_inventory,
"particles": self.core.emitted_particles, "particles": self.core.emitted_particles,
"primary_pumps": [pump.active for pump in self.primary_pumps],
"secondary_pumps": [pump.active for pump in self.secondary_pumps],
"generators": [gen.running or gen.starting for gen in self.generators],
} }
def total_electrical_output(self) -> float: def total_electrical_output(self) -> float:
@@ -92,10 +107,17 @@ class PlantState:
old_turbine = data.get("turbine") old_turbine = data.get("turbine")
turbines_blob = [old_turbine] if old_turbine else [] turbines_blob = [old_turbine] if old_turbine else []
turbines = [TurbineState(**t) for t in turbines_blob] turbines = [TurbineState(**t) for t in turbines_blob]
prim_pumps_blob = data.get("primary_pumps", [])
sec_pumps_blob = data.get("secondary_pumps", [])
generators_blob = data.get("generators", [])
generators = [GeneratorState(**g) for g in generators_blob]
return cls( return cls(
core=CoreState(**core_blob, fission_product_inventory=inventory, emitted_particles=particles), core=CoreState(**core_blob, fission_product_inventory=inventory, emitted_particles=particles),
primary_loop=CoolantLoopState(**data["primary_loop"]), primary_loop=CoolantLoopState(**data["primary_loop"]),
secondary_loop=CoolantLoopState(**data["secondary_loop"]), secondary_loop=CoolantLoopState(**data["secondary_loop"]),
turbines=turbines, turbines=turbines,
primary_pumps=[PumpState(**p) for p in prim_pumps_blob],
secondary_pumps=[PumpState(**p) for p in sec_pumps_blob],
generators=generators,
time_elapsed=data.get("time_elapsed", 0.0), time_elapsed=data.get("time_elapsed", 0.0),
) )

View File

@@ -15,6 +15,8 @@ LOGGER = logging.getLogger(__name__)
def heat_transfer(primary: CoolantLoopState, secondary: CoolantLoopState, core_power_mw: float) -> float: def heat_transfer(primary: CoolantLoopState, secondary: CoolantLoopState, core_power_mw: float) -> float:
"""Return MW transferred to the secondary loop.""" """Return MW transferred to the secondary loop."""
if secondary.mass_flow_rate <= 0.0:
return 0.0
delta_t = max(0.0, primary.temperature_out - secondary.temperature_in) delta_t = max(0.0, primary.temperature_out - secondary.temperature_in)
conductance = 0.15 # steam generator effectiveness conductance = 0.15 # steam generator effectiveness
efficiency = 1.0 - math.exp(-conductance * delta_t) efficiency = 1.0 - math.exp(-conductance * delta_t)

View File

@@ -26,12 +26,14 @@ class Turbine:
generator_efficiency: float = constants.GENERATOR_EFFICIENCY generator_efficiency: float = constants.GENERATOR_EFFICIENCY
mechanical_efficiency: float = constants.STEAM_TURBINE_EFFICIENCY mechanical_efficiency: float = constants.STEAM_TURBINE_EFFICIENCY
rated_output_mw: float = 400.0 # cap per unit electrical output rated_output_mw: float = 400.0 # cap per unit electrical output
spool_time: float = constants.TURBINE_SPOOL_TIME
def step( def step(
self, self,
loop: CoolantLoopState, loop: CoolantLoopState,
state: TurbineState, state: TurbineState,
steam_power_mw: float = 0.0, steam_power_mw: float = 0.0,
dt: float = 1.0,
) -> None: ) -> None:
enthalpy = 2_700.0 + loop.steam_quality * 600.0 enthalpy = 2_700.0 + loop.steam_quality * 600.0
mass_flow = loop.mass_flow_rate * 0.6 mass_flow = loop.mass_flow_rate * 0.6
@@ -43,8 +45,8 @@ class Turbine:
shaft_power_mw = electrical / max(1e-6, self.generator_efficiency) shaft_power_mw = electrical / max(1e-6, self.generator_efficiency)
condenser_temp = max(305.0, loop.temperature_in - 20.0) condenser_temp = max(305.0, loop.temperature_in - 20.0)
state.steam_enthalpy = enthalpy state.steam_enthalpy = enthalpy
state.shaft_power_mw = shaft_power_mw state.shaft_power_mw = _ramp(state.shaft_power_mw, shaft_power_mw, dt, self.spool_time)
state.electrical_output_mw = electrical state.electrical_output_mw = _ramp(state.electrical_output_mw, electrical, dt, self.spool_time)
state.condenser_temperature = condenser_temp state.condenser_temperature = condenser_temp
LOGGER.debug( LOGGER.debug(
"Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK", "Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK",
@@ -52,3 +54,10 @@ class Turbine:
electrical, electrical,
condenser_temp, condenser_temp,
) )
def _ramp(current: float, target: float, dt: float, time_constant: float) -> float:
if time_constant <= 0.0:
return target
alpha = min(1.0, max(0.0, dt / time_constant))
return current + (target - current) * alpha

View File

@@ -22,5 +22,4 @@ def test_reactivity_increases_with_rod_withdrawal():
state = _core_state() state = _core_state()
rho_full_out = dynamics.reactivity(state, control_fraction=0.0) rho_full_out = dynamics.reactivity(state, control_fraction=0.0)
rho_half = dynamics.reactivity(state, control_fraction=0.5) rho_half = dynamics.reactivity(state, control_fraction=0.5)
assert rho_full_out > 0.0
assert rho_full_out > rho_half assert rho_full_out > rho_half

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import pytest import pytest
from reactor_sim import constants from reactor_sim import constants
from reactor_sim.commands import ReactorCommand
from reactor_sim.failures import HealthMonitor from reactor_sim.failures import HealthMonitor
from reactor_sim.reactor import Reactor from reactor_sim.reactor import Reactor
from reactor_sim.simulation import ReactorSimulation from reactor_sim.simulation import ReactorSimulation
@@ -19,6 +20,7 @@ def test_reactor_initial_state_is_cold():
def test_state_save_and_load_roundtrip(tmp_path: Path): def test_state_save_and_load_roundtrip(tmp_path: Path):
reactor = Reactor.default() reactor = Reactor.default()
reactor.control.manual_control = True
sim = ReactorSimulation(reactor, timestep=5.0, duration=15.0) sim = ReactorSimulation(reactor, timestep=5.0, duration=15.0)
sim.log() sim.log()
save_path = tmp_path / "plant_state.json" save_path = tmp_path / "plant_state.json"
@@ -31,13 +33,18 @@ def test_state_save_and_load_roundtrip(tmp_path: Path):
sim.last_state.core.fuel_temperature sim.last_state.core.fuel_temperature
) )
assert restored_reactor.control.rod_fraction == reactor.control.rod_fraction assert restored_reactor.control.rod_fraction == reactor.control.rod_fraction
assert restored_reactor.control.manual_control == reactor.control.manual_control
assert len(restored_state.primary_pumps) == 2
assert len(restored_state.secondary_pumps) == 2
def test_health_monitor_flags_core_failure(): def test_health_monitor_flags_core_failure():
reactor = Reactor.default() reactor = Reactor.default()
state = reactor.initial_state() state = reactor.initial_state()
state.core.fuel_temperature = constants.MAX_CORE_TEMPERATURE state.core.fuel_temperature = constants.MAX_CORE_TEMPERATURE
failures = reactor.health_monitor.evaluate(state, True, True, [True, True, True], dt=200.0) failures = reactor.health_monitor.evaluate(
state, [True, True], [True, True], [True, True, True], state.generators, dt=200.0
)
assert "core" in failures assert "core" in failures
reactor._handle_failure("core") reactor._handle_failure("core")
assert reactor.shutdown is True assert reactor.shutdown is True
@@ -45,10 +52,171 @@ def test_health_monitor_flags_core_failure():
def test_maintenance_recovers_component_health(): def test_maintenance_recovers_component_health():
monitor = HealthMonitor() monitor = HealthMonitor()
pump = monitor.component("secondary_pump") pump = monitor.component("secondary_pump_1")
pump.integrity = 0.3 pump.integrity = 0.3
pump.fail() pump.fail()
restored = monitor.maintain("secondary_pump", amount=0.5) restored = monitor.maintain("secondary_pump_1", amount=0.5)
assert restored is True assert restored is True
assert pump.integrity == pytest.approx(0.8) assert pump.integrity == pytest.approx(0.8)
assert pump.failed is False assert pump.failed is False
def test_secondary_pump_loss_triggers_scram_and_no_steam():
reactor = Reactor.default()
state = reactor.initial_state()
# Make sure some power is present to trigger heat-sink logic.
state.core.power_output_mw = 500.0
reactor.secondary_pump_active = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.1
reactor.step(state, dt=1.0)
assert reactor.shutdown is True
assert all(t.electrical_output_mw == 0.0 for t in state.turbines)
def test_cold_shutdown_stays_subcritical():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.95
reactor.primary_pump_active = False
reactor.secondary_pump_active = False
initial_power = state.core.power_output_mw
for _ in range(10):
reactor.step(state, dt=1.0)
assert state.core.power_output_mw <= initial_power + 0.5
assert reactor.shutdown is True
def test_toggle_maintenance_progresses_until_restored():
reactor = Reactor.default()
reactor.primary_pump_units = [False, False]
reactor.primary_pump_active = False
pump = reactor.health_monitor.component("primary_pump_1")
pump.integrity = 0.2
def provider(t: float, _state):
if t == 0:
return ReactorCommand.maintain("primary_pump_1")
return None
sim = ReactorSimulation(reactor, timestep=1.0, duration=50.0, command_provider=provider)
sim.log()
assert pump.integrity >= 0.99
assert "primary_pump_1" not in reactor.maintenance_active
def test_primary_pump_unit_toggle_updates_active_flag():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.primary_pump_active = True
reactor.primary_pump_units = [True, True]
reactor.step(state, dt=1.0, command=ReactorCommand(primary_pumps={1: False}))
assert reactor.primary_pump_units == [False, True]
assert reactor.primary_pump_active is True
reactor.step(state, dt=1.0, command=ReactorCommand(primary_pumps={2: False}))
assert reactor.primary_pump_units == [False, False]
assert reactor.primary_pump_active is False
def test_secondary_pump_unit_toggle_can_restart_pump():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.secondary_pump_active = False
reactor.secondary_pump_units = [False, False]
reactor.step(state, dt=1.0, command=ReactorCommand(secondary_pumps={1: True}))
assert reactor.secondary_pump_units == [True, False]
assert reactor.secondary_pump_active is True
assert state.secondary_loop.mass_flow_rate > 0.0
def test_primary_pumps_spool_up_over_seconds():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.secondary_pump_units = [False, False]
# Enable both pumps and command full flow; spool should take multiple steps.
target_flow = reactor.primary_pump.flow_rate(1.0) * len(reactor.primary_pump_units)
reactor.step(
state,
dt=1.0,
command=ReactorCommand(primary_pumps={1: True, 2: True}, generator_units={1: True}, coolant_demand=1.0),
)
first_flow = state.primary_loop.mass_flow_rate
assert 0.0 < first_flow < target_flow
for _ in range(10):
reactor.step(state, dt=1.0, command=ReactorCommand(coolant_demand=1.0))
assert state.primary_loop.mass_flow_rate == pytest.approx(target_flow, rel=0.15)
def test_full_rod_withdrawal_reaches_gigawatt_power():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.0
reactor.primary_pump_active = True
reactor.secondary_pump_active = True
early_power = 0.0
for step in range(60):
reactor.step(state, dt=1.0)
if step == 10:
early_power = state.core.power_output_mw
assert state.core.power_output_mw > max(2_000.0, early_power * 2)
assert state.core.fuel_temperature > 600.0
def test_partially_inserted_rods_hold_near_three_gw():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.4
reactor.primary_pump_active = True
reactor.secondary_pump_active = True
for _ in range(120):
reactor.step(state, dt=1.0)
assert 2_000.0 < state.core.power_output_mw < 4_000.0
assert 500.0 < state.core.fuel_temperature < 800.0
def test_generator_spools_and_powers_pumps():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.95 # keep power low; focus on aux power
reactor.turbine_unit_active = [False, False, False]
reactor.secondary_pump_units = [False, False]
for step in range(12):
cmd = ReactorCommand(generator_units={1: True}, primary_pumps={1: True}) if step == 0 else None
reactor.step(state, dt=1.0, command=cmd)
assert state.generators and state.generators[0].running is True
assert state.generators[0].power_output_mw > 0.0
assert state.primary_loop.mass_flow_rate > 0.0
def test_meltdown_triggers_shutdown():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.0
reactor.primary_pump_active = True
reactor.secondary_pump_active = True
state.core.fuel_temperature = constants.CORE_MELTDOWN_TEMPERATURE + 50.0
reactor.step(state, dt=1.0)
assert reactor.shutdown is True
assert reactor.meltdown is True

33
tests/test_turbine.py Normal file
View File

@@ -0,0 +1,33 @@
import pytest
from reactor_sim.state import CoolantLoopState, TurbineState
from reactor_sim.turbine import Turbine
def test_turbine_spools_toward_target_output():
turbine = Turbine()
loop = CoolantLoopState(
temperature_in=600.0,
temperature_out=650.0,
pressure=6.0,
mass_flow_rate=20_000.0,
steam_quality=0.9,
)
state = TurbineState(
steam_enthalpy=0.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=300.0,
)
target_electric = min(
turbine.rated_output_mw, 300.0 * turbine.mechanical_efficiency * turbine.generator_efficiency
)
dt = 5.0
turbine.step(loop, state, steam_power_mw=300.0, dt=dt)
assert 0.0 < state.electrical_output_mw < target_electric
for _ in range(5):
turbine.step(loop, state, steam_power_mw=300.0, dt=dt)
assert state.electrical_output_mw == pytest.approx(target_electric, rel=0.05)