Compare commits

...

22 Commits

Author SHA1 Message Date
Codex Agent
b03e80da9f Add generator power model and meltdown handling 2025-11-22 20:04:04 +01:00
Codex Agent
e7d8a34818 Retune reactivity for withdrawn and partially inserted rods 2025-11-22 19:41:34 +01:00
Codex Agent
287a8e186a Restore full power when rods are withdrawn 2025-11-22 19:27:18 +01:00
Codex Agent
a8da5371db Restore component health progress bars 2025-11-22 19:17:22 +01:00
Codex Agent
e81a9fdbe3 Add spool dynamics for pumps and turbines 2025-11-22 19:13:57 +01:00
Codex Agent
e18f100e15 Handle individual pump unit toggles 2025-11-22 19:07:45 +01:00
Codex Agent
c2d44bbdd3 Guard dashboard window sizing to avoid curses NULL 2025-11-22 19:02:19 +01:00
Codex Agent
7ef61c537f Add per-pump toggles and persist pump unit states 2025-11-22 18:57:04 +01:00
Codex Agent
e664af63ef Move component health into main dashboard panel 2025-11-22 18:53:31 +01:00
Codex Agent
d1751647cd Fix dashboard window sizing to avoid curses error 2025-11-22 18:50:04 +01:00
Codex Agent
038d3d0fea Relax dashboard size requirements 2025-11-22 18:48:51 +01:00
Codex Agent
511544c2eb Persist manual control and pump states 2025-11-22 18:47:19 +01:00
Codex Agent
f626c99f44 Show per-pump status and per-turbine output 2025-11-22 18:43:53 +01:00
Codex Agent
5bd9992ed0 Stop degrading pumps when offline 2025-11-22 18:39:20 +01:00
Codex Agent
290b7a8565 Add toggleable maintenance state and status panel 2025-11-22 18:37:11 +01:00
Codex Agent
ecc193f61b Invert rod hotkeys and update instructions 2025-11-22 18:30:18 +01:00
Codex Agent
c9cd669ca4 Bias shutdown reactivity and keep cold start subcritical 2025-11-22 18:28:40 +01:00
Codex Agent
7a0b5c4b96 Start in cold manual state with rods inserted 2025-11-22 18:25:23 +01:00
Codex Agent
fef1d32fbe Restore three turbines and update controls 2025-11-22 18:22:24 +01:00
Codex Agent
2434034505 Handle heat-sink loss and update turbine controls 2025-11-22 18:20:21 +01:00
Codex Agent
8d9b57f86d Ensure dashboard quit saves state and ignore artifacts 2025-11-22 18:12:17 +01:00
Codex Agent
ba65f84cd4 Fix dashboard reset logging 2025-11-22 18:06:27 +01:00
18 changed files with 926 additions and 204 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.venv/
**/__pycache__/
*.egg-info/
artifacts/

View File

@@ -12,7 +12,7 @@ All source code lives under `src/reactor_sim`. Submodules map to plant systems:
- A git remote for `origin` is configured; push changes to `origin/main` once work is complete so dashboards stay in sync.
## Operations & Control Hooks
Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, 1/2/3 toggle individual turbines, r reset/clear saved state, +/- rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation.
Manual commands live in `reactor_sim.commands.ReactorCommand`. Pass a `command_provider` callable to `ReactorSimulation` to adjust rods, pumps, turbine, coolant demand, or the attached `ElectricalConsumer`. Use `ReactorCommand.scram_all()` for full shutdown, `ReactorCommand(consumer_online=True, consumer_demand=600)` to hook the grid, or toggle pumps (`primary_pump_on=False`) to simulate faults. Control helpers in `control.py` expose `set_rods`, `increment_rods`, and `scram`, and you can switch `set_manual_mode(True)` to pause the automatic rod controller. For hands-on runs, launch the curses dashboard (`FISSION_DASHBOARD=1 FISSION_REALTIME=1 python run_simulation.py`) and use the on-screen shortcuts (q quit/save, space SCRAM, p/o pumps, t turbine, 1/2/3 toggle individual turbines, y/u/i turbine maintenance, m/n pump maintenance, k core maintenance, c consumer, r reset/clear saved state, + insert rods / - withdraw rods in 0.05 steps, [/ ] consumer demand, s/d setpoint, `a` toggles auto/manual rods). Recommended startup: enable manual rods (`a`), withdraw to ~0.3 before ramping the turbine/consumer, then re-enable auto control when you want closed-loop operation.
The plant now boots cold (ambient core temperature, idle pumps); scripts must sequence startup: enable pumps, gradually withdraw rods, connect the consumer after turbine spin-up, and use `ControlSystem.set_power_setpoint` to chase desired output. Set `FISSION_REALTIME=1` to run continuously with real-time pacing; optionally set `FISSION_SIM_DURATION=infinite` for indefinite runs and send SIGINT/Ctrl+C to stop. Use `FISSION_SIM_DURATION=600` (default) for bounded offline batches.
## Coding Style & Naming Conventions

View File

@@ -1,104 +0,0 @@
{
"control": {
"setpoint_mw": 3000.0,
"rod_fraction": 0.3872181667930225
},
"plant": {
"core": {
"fuel_temperature": 633.2342060602825,
"neutron_flux": 31335131.43776027,
"reactivity_margin": 0.006241667151538859,
"power_output_mw": 2976.900110634399,
"burnup": 0.004945521266135161
},
"primary_loop": {
"temperature_in": 295.0,
"temperature_out": 338.7522062115579,
"pressure": 14.0,
"mass_flow_rate": 16200.0,
"steam_quality": 0.0
},
"secondary_loop": {
"temperature_in": 295.0,
"temperature_out": 360.05377003828596,
"pressure": 6.650537700382859,
"mass_flow_rate": 10880.0,
"steam_quality": 0.6505377003828593
},
"turbines": [
{
"steam_enthalpy": 3090.3226202297155,
"shaft_power_mw": 336.9056685758782,
"electrical_output_mw": 323.4294418328431,
"condenser_temperature": 305.0,
"load_demand_mw": 0.0,
"load_supplied_mw": 0.0
},
{
"steam_enthalpy": 3090.3226202297155,
"shaft_power_mw": 336.9056685758782,
"electrical_output_mw": 323.4294418328431,
"condenser_temperature": 305.0,
"load_demand_mw": 0.0,
"load_supplied_mw": 0.0
},
{
"steam_enthalpy": 3090.3226202297155,
"shaft_power_mw": 336.9056685758782,
"electrical_output_mw": 323.4294418328431,
"condenser_temperature": 305.0,
"load_demand_mw": 0.0,
"load_supplied_mw": 0.0
}
],
"time_elapsed": 600.0
},
"metadata": {
"primary_pump_active": true,
"secondary_pump_active": true,
"turbine_active": true,
"turbine_units": [
true,
true,
true
],
"shutdown": false,
"consumer": {
"online": false,
"demand_mw": 800.0,
"name": "Grid"
}
},
"health": {
"core": {
"name": "core",
"integrity": 0.9400000000000066,
"failed": false
},
"primary_pump": {
"name": "primary_pump",
"integrity": 0.5800000000000063,
"failed": false
},
"secondary_pump": {
"name": "secondary_pump",
"integrity": 0.1120000000000021,
"failed": false
},
"turbine_1": {
"name": "turbine_1",
"integrity": 0.610000000000003,
"failed": false
},
"turbine_2": {
"name": "turbine_2",
"integrity": 0.610000000000003,
"failed": false
},
"turbine_3": {
"name": "turbine_3",
"integrity": 0.610000000000003,
"failed": false
}
}
}

View File

@@ -21,6 +21,9 @@ class ReactorCommand:
consumer_demand: float | None = None
rod_manual: bool | None = None
turbine_units: dict[int, bool] | None = None
primary_pumps: dict[int, bool] | None = None
secondary_pumps: dict[int, bool] | None = None
generator_units: dict[int, bool] | None = None
maintenance_components: tuple[str, ...] = tuple()
@classmethod

View File

@@ -8,15 +8,23 @@ NEUTRON_LIFETIME = 0.1 # seconds, prompt neutron lifetime surrogate
FUEL_ENERGY_DENSITY = 200.0 * MEGAWATT # J/kg released as heat
COOLANT_HEAT_CAPACITY = 4_200.0 # J/(kg*K) for water/steam
COOLANT_DENSITY = 700.0 # kg/m^3 averaged between phases
MAX_CORE_TEMPERATURE = 1_800.0 # K
CORE_MELTDOWN_TEMPERATURE = 2_873.0 # K (approx 2600C) threshold for irreversible meltdown
MAX_CORE_TEMPERATURE = CORE_MELTDOWN_TEMPERATURE # Allow simulation to approach meltdown temperature
MAX_PRESSURE = 15.0 # MPa typical PWR primary loop limit
CONTROL_ROD_SPEED = 0.03 # fraction insertion per second
CONTROL_ROD_WORTH = 0.042 # delta rho contribution when fully withdrawn
STEAM_TURBINE_EFFICIENCY = 0.34
GENERATOR_EFFICIENCY = 0.96
ENVIRONMENT_TEMPERATURE = 295.0 # K
AMU_TO_KG = 1.660_539_066_60e-27
MEV_TO_J = 1.602_176_634e-13
ELECTRON_FISSION_CROSS_SECTION = 5e-16 # cm^2, tuned for simulation scale
PUMP_SPOOL_TIME = 5.0 # seconds to reach commanded flow
TURBINE_SPOOL_TIME = 12.0 # seconds to reach steady output
GENERATOR_SPOOL_TIME = 10.0 # seconds to reach full output
# Auxiliary power assumptions
PUMP_POWER_MW = 12.0 # MW draw per pump unit
BASE_AUX_LOAD_MW = 5.0 # control, instrumentation, misc.
# Threshold inventories (event counts) for flagging common poisons in diagnostics.
KEY_POISON_THRESHOLDS = {
"Xe": 1e20, # xenon

View File

@@ -77,6 +77,7 @@ class ControlSystem:
"control": {
"setpoint_mw": self.setpoint_mw,
"rod_fraction": self.rod_fraction,
"manual_control": self.manual_control,
},
"plant": plant_state.to_dict(),
"metadata": metadata or {},
@@ -94,6 +95,7 @@ class ControlSystem:
control = data.get("control", {})
self.setpoint_mw = control.get("setpoint_mw", self.setpoint_mw)
self.rod_fraction = control.get("rod_fraction", self.rod_fraction)
self.manual_control = control.get("manual_control", self.manual_control)
plant = PlantState.from_dict(data["plant"])
LOGGER.info("Loaded plant state from %s", path)
return plant, data.get("metadata", {}), data.get("health")

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from dataclasses import dataclass
import logging
from . import constants
from .state import CoolantLoopState
LOGGER = logging.getLogger(__name__)
@@ -14,6 +15,7 @@ LOGGER = logging.getLogger(__name__)
class Pump:
nominal_flow: float
efficiency: float = 0.9
spool_time: float = constants.PUMP_SPOOL_TIME
def flow_rate(self, demand: float) -> float:
demand = max(0.0, min(1.0, demand))

View File

@@ -15,6 +15,8 @@ from .reactor import Reactor
from .simulation import ReactorSimulation
from .state import PlantState
LOGGER = logging.getLogger(__name__)
@dataclass
class DashboardKey:
@@ -40,6 +42,7 @@ class ReactorDashboard:
self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False
self.reset_requested = False
self._last_state: Optional[PlantState] = None
self.log_buffer: deque[str] = deque(maxlen=4)
self._log_handler: Optional[logging.Handler] = None
self._previous_handlers: list[logging.Handler] = []
@@ -47,15 +50,20 @@ class ReactorDashboard:
self.keys = [
DashboardKey("q", "Quit & save"),
DashboardKey("space", "SCRAM"),
DashboardKey("p", "Toggle primary pump"),
DashboardKey("o", "Toggle secondary pump"),
DashboardKey("g", "Toggle primary pump 1"),
DashboardKey("h", "Toggle primary pump 2"),
DashboardKey("j", "Toggle secondary pump 1"),
DashboardKey("k", "Toggle secondary pump 2"),
DashboardKey("b", "Toggle generator 1"),
DashboardKey("v", "Toggle generator 2"),
DashboardKey("t", "Toggle turbine"),
DashboardKey("1/2/3", "Toggle turbine units 1-3"),
DashboardKey("y/u/i", "Maintain turbine 1/2/3"),
DashboardKey("c", "Toggle consumer"),
DashboardKey("r", "Reset & clear state"),
DashboardKey("m", "Maintain primary pump"),
DashboardKey("n", "Maintain secondary pump"),
DashboardKey("m/n", "Maintain primary pumps 1/2"),
DashboardKey(",/.", "Maintain secondary pumps 1/2"),
DashboardKey("B/V", "Maintain generator 1/2"),
DashboardKey("k", "Maintain core (requires shutdown)"),
DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
@@ -89,9 +97,13 @@ class ReactorDashboard:
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._last_state = state
self._draw(stdscr, state)
self._handle_input(stdscr)
if self.quit_requested or self.reset_requested:
# Persist the latest state if we are exiting early.
if self.sim:
self.sim.last_state = state
self.sim.stop()
break
finally:
@@ -118,18 +130,33 @@ class ReactorDashboard:
if ch == ord(" "):
self._queue_command(ReactorCommand.scram_all())
elif ch in (ord("p"), ord("P")):
self._queue_command(ReactorCommand(primary_pump_on=not self.reactor.primary_pump_active))
# Deprecated master toggles ignored.
continue
elif ch in (ord("o"), ord("O")):
self._queue_command(ReactorCommand(secondary_pump_on=not self.reactor.secondary_pump_active))
continue
elif ch in (ord("g"), ord("G")):
self._toggle_primary_pump_unit(0)
elif ch in (ord("h"), ord("H")):
self._toggle_primary_pump_unit(1)
elif ch in (ord("j"), ord("J")):
self._toggle_secondary_pump_unit(0)
elif ch in (ord("k"), ord("K")):
self._toggle_secondary_pump_unit(1)
elif ch in (ord("b"), ord("B")):
self._toggle_generator_unit(0)
elif ch in (ord("v"), ord("V")):
self._toggle_generator_unit(1)
elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif ord("1") <= ch <= ord("9"):
idx = ch - ord("1")
self._toggle_turbine_unit(idx)
elif ch in (ord("+"), ord("=")):
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05)))
elif ch == ord("-"):
# Insert rods (increase fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(0.05)))
elif ch == ord("-"):
# Withdraw rods (decrease fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05)))
elif ch == ord("["):
demand = self._current_demand() - 50.0
self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand)))
@@ -148,11 +175,19 @@ class ReactorDashboard:
elif ch in (ord("a"), ord("A")):
self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control))
elif ch in (ord("m"), ord("M")):
self._queue_command(ReactorCommand.maintain("primary_pump"))
self._queue_command(ReactorCommand.maintain("primary_pump_1"))
elif ch in (ord("n"), ord("N")):
self._queue_command(ReactorCommand.maintain("secondary_pump"))
self._queue_command(ReactorCommand.maintain("primary_pump_2"))
elif ch in (ord("k"), ord("K")):
self._queue_command(ReactorCommand.maintain("core"))
elif ch == ord(","):
self._queue_command(ReactorCommand.maintain("secondary_pump_1"))
elif ch == ord("."):
self._queue_command(ReactorCommand.maintain("secondary_pump_2"))
elif ch in (ord("B"),):
self._queue_command(ReactorCommand.maintain("generator_1"))
elif ch in (ord("V"),):
self._queue_command(ReactorCommand.maintain("generator_2"))
elif ch in (ord("y"), ord("Y")):
self._queue_command(ReactorCommand.maintain("turbine_1"))
elif ch in (ord("u"), ord("U")):
@@ -185,6 +220,25 @@ class ReactorDashboard:
current = self.reactor.turbine_unit_active[index]
self._queue_command(ReactorCommand(turbine_units={index + 1: not current}))
def _toggle_primary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.primary_pump_units):
return
current = self.reactor.primary_pump_units[index]
self._queue_command(ReactorCommand(primary_pumps={index + 1: not current}))
def _toggle_secondary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.secondary_pump_units):
return
current = self.reactor.secondary_pump_units[index]
self._queue_command(ReactorCommand(secondary_pumps={index + 1: not current}))
def _toggle_generator_unit(self, index: int) -> None:
current = False
if self._last_state and index < len(self._last_state.generators):
gen = self._last_state.generators[index]
current = gen.running or gen.starting
self._queue_command(ReactorCommand(generator_units={index + 1: not current}))
def _request_reset(self) -> None:
self.reset_requested = True
if self.sim:
@@ -217,26 +271,32 @@ class ReactorDashboard:
def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
if height < 24 or width < 90:
min_status = 4
if height < min_status + 12 or width < 70:
stdscr.addstr(
0,
0,
"Terminal window too small. Resize to at least 90x24.".ljust(width),
"Terminal too small; try >=70x16 or reduce font size.".ljust(width),
curses.color_pair(4),
)
stdscr.refresh()
return
data_height = height - 6
right_width = max(32, width // 3)
status_height = min_status
data_height = height - status_height
right_width = max(28, width // 3)
left_width = width - right_width
if left_width < 60:
left_width = min(60, width - 20)
if left_width < 50:
left_width = min(50, width - 18)
right_width = width - left_width
data_height = max(1, data_height)
left_width = max(1, left_width)
right_width = max(1, right_width)
data_win = stdscr.derwin(data_height, left_width, 0, 0)
help_win = stdscr.derwin(data_height, right_width, 0, left_width)
status_win = stdscr.derwin(6, width, data_height, 0)
status_win = stdscr.derwin(status_height, width, data_height, 0)
self._draw_data_panel(data_win, state)
self._draw_help_panel(help_win)
@@ -268,7 +328,8 @@ class ReactorDashboard:
y,
"Primary Loop",
[
("Pump", "ON" if self.reactor.primary_pump_active else "OFF"),
("Pump1", self._pump_status(state.primary_pumps, 0)),
("Pump2", self._pump_status(state.primary_pumps, 1)),
("Flow", f"{state.primary_loop.mass_flow_rate:7.0f} kg/s"),
("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"),
("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K"),
@@ -280,13 +341,20 @@ class ReactorDashboard:
y,
"Secondary Loop",
[
("Pump", "ON" if self.reactor.secondary_pump_active else "OFF"),
("Pump1", self._pump_status(state.secondary_pumps, 0)),
("Pump2", self._pump_status(state.secondary_pumps, 1)),
("Flow", f"{state.secondary_loop.mass_flow_rate:7.0f} kg/s"),
("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K"),
("Pressure", f"{state.secondary_loop.pressure:5.2f} MPa"),
("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}"),
],
)
y = self._draw_section(
win,
y,
"Generators",
self._generator_lines(state),
)
consumer_status = "n/a"
consumer_demand = 0.0
if self.reactor.consumer:
@@ -298,13 +366,23 @@ class ReactorDashboard:
"Turbine / Grid",
[
("Turbines", " ".join(self._turbine_status_lines())),
("Unit1 Elec", f"{state.turbines[0].electrical_output_mw:7.1f} MW" if state.turbines else "n/a"),
(
"Unit2 Elec",
f"{state.turbines[1].electrical_output_mw:7.1f} MW" if len(state.turbines) > 1 else "n/a",
),
(
"Unit3 Elec",
f"{state.turbines[2].electrical_output_mw:7.1f} MW" if len(state.turbines) > 2 else "n/a",
),
("Electrical", f"{state.total_electrical_output():7.1f} MW"),
("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"),
("Consumer", f"{consumer_status}"),
("Demand", f"{consumer_demand:7.1f} MW"),
],
)
self._draw_health_bar(win, y + 1)
y = self._draw_section(win, y, "Maintenance", self._maintenance_lines())
y = self._draw_health_bars(win, y)
def _draw_help_panel(self, win: "curses._CursesWindow") -> None:
win.erase()
@@ -319,7 +397,7 @@ class ReactorDashboard:
"Start pumps before withdrawing rods.",
"Bring turbine and consumer online after thermal stabilization.",
"Toggle turbine units (1/2/3) for staggered maintenance.",
"Use m/n/k/y/u/i to request maintenance (stop equipment first).",
"Use m/n/,/. for pump maintenance; B/V for generators.",
"Press 'r' to reset/clear state if you want a cold start.",
"Watch component health to avoid automatic trips.",
]
@@ -408,25 +486,61 @@ class ReactorDashboard:
lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}"))
return lines
def _draw_health_bar(self, win: "curses._CursesWindow", start_y: int) -> None:
def _health_lines(self) -> list[tuple[str, str]]:
comps = self.reactor.health_monitor.components
lines: list[tuple[str, str]] = []
for name, comp in comps.items():
pct = f"{comp.integrity*100:5.1f}%"
state = "FAILED" if comp.failed else pct
lines.append((name, state))
return lines
def _maintenance_lines(self) -> list[tuple[str, str]]:
if not self.reactor.maintenance_active:
return [("Active", "None")]
return [(comp, "IN PROGRESS") for comp in sorted(self.reactor.maintenance_active)]
def _generator_lines(self, state: PlantState) -> list[tuple[str, str]]:
if not state.generators:
return [("Status", "n/a")]
lines: list[tuple[str, str]] = []
for idx, gen in enumerate(state.generators):
status = "RUN" if gen.running else "START" if gen.starting else "OFF"
spool = f" spool {gen.spool_remaining:4.1f}s" if gen.starting else ""
lines.append((f"Gen{idx + 1}", f"{status} {gen.power_output_mw:6.1f} MW{spool}"))
lines.append((f" Battery", f"{gen.battery_charge*100:5.1f}%"))
return lines
def _draw_health_bars(self, win: "curses._CursesWindow", start_y: int) -> int:
height, width = win.getmaxyx()
inner_width = width - 4
if start_y >= height - 2:
return
return height - 2
win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1))
bar_width = max(10, min(width - 28, 40))
for idx, (name, comp) in enumerate(self.reactor.health_monitor.components.items(), start=1):
filled = int(bar_width * comp.integrity)
bar = "" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
row = start_y + idx
bar_width = max(8, min(inner_width - 18, 40))
row = start_y + 1
for name, comp in self.reactor.health_monitor.components.items():
if row >= height - 1:
break
label = f"{name:<12}:"
win.addstr(row, 4, label[:14], curses.A_BOLD)
bar_start = 4 + max(len(label), 14) + 1
label = f"{name:<12}"
target = 0.0 if comp.failed else comp.integrity
filled = int(bar_width * max(0.0, min(1.0, target)))
bar = "#" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
win.addstr(row, 4, f"{label}:")
bar_start = 4 + len(label) + 1
win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color))
percent_col = min(width - 8, bar_start + bar_width + 2)
win.addstr(row, percent_col, f"{comp.integrity*100:5.1f}%", curses.color_pair(color))
percent_text = "FAILED" if comp.failed else f"{comp.integrity*100:5.1f}%"
percent_x = min(width - len(percent_text) - 2, bar_start + bar_width + 2)
win.addstr(row, percent_x, percent_text, curses.color_pair(color))
row += 1
return row + 1
def _pump_status(self, pumps: list, index: int) -> str:
if index >= len(pumps):
return "n/a"
state = pumps[index]
return f"{'ON ' if state.active else 'OFF'} {state.flow_rate:6.0f} kg/s"
def _current_demand(self) -> float:
if self.reactor.consumer:

View File

@@ -53,8 +53,12 @@ class HealthMonitor:
def __init__(self) -> None:
self.components: Dict[str, ComponentHealth] = {
"core": ComponentHealth("core"),
"primary_pump": ComponentHealth("primary_pump"),
"secondary_pump": ComponentHealth("secondary_pump"),
"primary_pump_1": ComponentHealth("primary_pump_1"),
"primary_pump_2": ComponentHealth("primary_pump_2"),
"secondary_pump_1": ComponentHealth("secondary_pump_1"),
"secondary_pump_2": ComponentHealth("secondary_pump_2"),
"generator_1": ComponentHealth("generator_1"),
"generator_2": ComponentHealth("generator_2"),
}
for idx in range(3):
name = f"turbine_{idx + 1}"
@@ -67,32 +71,48 @@ class HealthMonitor:
def evaluate(
self,
state: PlantState,
primary_active: bool,
secondary_active: bool,
primary_units: Iterable[bool],
secondary_units: Iterable[bool],
turbine_active: Iterable[bool],
generator_states: Iterable,
dt: float,
) -> List[str]:
events: list[str] = []
turbine_flags = list(turbine_active)
core = self.component("core")
core_temp = state.core.fuel_temperature
temp_stress = max(0.0, (core_temp - 900.0) / (constants.MAX_CORE_TEMPERATURE - 900.0))
temp_stress = max(0.0, (core_temp - 900.0) / max(1e-6, (constants.MAX_CORE_TEMPERATURE - 900.0)))
base_degrade = 0.0001 * dt
core.degrade(base_degrade + temp_stress * 0.01 * dt)
if primary_active:
primary_flow = state.primary_loop.mass_flow_rate
flow_ratio = 0.0 if primary_flow <= 0 else min(1.0, primary_flow / 18_000.0)
self.component("primary_pump").degrade((0.0002 + (1 - flow_ratio) * 0.005) * dt)
prim_units = list(primary_units)
sec_units = list(secondary_units)
prim_states = state.primary_pumps or []
sec_states = state.secondary_pumps or []
for idx, active in enumerate(prim_units):
comp = self.component(f"primary_pump_{idx + 1}")
if idx < len(prim_states) and active:
flow = prim_states[idx].flow_rate
flow_ratio = 0.0 if flow <= 0 else min(1.0, flow / 9_000.0)
comp.degrade((0.0002 + (1 - flow_ratio) * 0.005) * dt)
else:
self.component("primary_pump").degrade(0.0005 * dt)
comp.degrade(0.0)
for idx, active in enumerate(sec_units):
comp = self.component(f"secondary_pump_{idx + 1}")
if idx < len(sec_states) and active:
flow = sec_states[idx].flow_rate
flow_ratio = 0.0 if flow <= 0 else min(1.0, flow / 8_000.0)
comp.degrade((0.0002 + (1 - flow_ratio) * 0.004) * dt)
else:
comp.degrade(0.0)
if secondary_active:
secondary_flow = state.secondary_loop.mass_flow_rate
flow_ratio = 0.0 if secondary_flow <= 0 else min(1.0, secondary_flow / 16_000.0)
self.component("secondary_pump").degrade((0.0002 + (1 - flow_ratio) * 0.004) * dt)
for idx, gen_state in enumerate(generator_states):
comp = self.component(f"generator_{idx + 1}")
running = getattr(gen_state, "running", False) or getattr(gen_state, "starting", False)
if running:
comp.degrade(0.00015 * dt)
else:
self.component("secondary_pump").degrade(0.0005 * dt)
comp.degrade(0.0)
turbines = state.turbines if hasattr(state, "turbines") else []
for idx, active in enumerate(turbine_flags):
@@ -126,6 +146,11 @@ class HealthMonitor:
mapped = "turbine_1" if name == "turbine" else name
if mapped in self.components:
self.components[mapped] = ComponentHealth.from_snapshot(comp_data)
elif mapped == "primary_pump":
self.components["primary_pump_1"] = ComponentHealth.from_snapshot(comp_data)
self.components["primary_pump_2"] = ComponentHealth.from_snapshot(comp_data)
elif mapped == "secondary_pump":
self.components["secondary_pump_1"] = ComponentHealth.from_snapshot(comp_data)
def maintain(self, component: str, amount: float = 0.05) -> bool:
comp = self.components.get(component)

View File

@@ -0,0 +1,68 @@
"""Auxiliary diesel generator model with spool dynamics."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from . import constants
LOGGER = logging.getLogger(__name__)
@dataclass
class GeneratorState:
running: bool
starting: bool
spool_remaining: float
power_output_mw: float
battery_charge: float
@dataclass
class DieselGenerator:
rated_output_mw: float = 50.0
spool_time: float = constants.GENERATOR_SPOOL_TIME
def start(self, state: GeneratorState) -> None:
if state.running or state.starting:
return
if state.battery_charge <= 0.05:
LOGGER.warning("Generator start failed: insufficient battery")
return
state.starting = True
state.spool_remaining = self.spool_time
LOGGER.info("Generator starting (spool %.0fs)", self.spool_time)
def stop(self, state: GeneratorState) -> None:
if not (state.running or state.starting):
return
state.running = False
state.starting = False
state.spool_remaining = 0.0
state.power_output_mw = 0.0
LOGGER.info("Generator stopped")
def step(self, state: GeneratorState, load_demand_mw: float, dt: float) -> float:
"""Advance generator dynamics and return delivered power."""
if state.starting:
state.spool_remaining = max(0.0, state.spool_remaining - dt)
state.power_output_mw = self.rated_output_mw * (1.0 - state.spool_remaining / max(self.spool_time, 1e-6))
if state.spool_remaining <= 0.0:
state.starting = False
state.running = True
LOGGER.info("Generator online at %.1f MW", self.rated_output_mw)
elif state.running:
available = self.rated_output_mw
state.power_output_mw = min(available, load_demand_mw)
else:
state.power_output_mw = 0.0
if state.running:
state.battery_charge = min(1.0, state.battery_charge + 0.02 * dt)
elif state.starting:
state.battery_charge = max(0.0, state.battery_charge - 0.01 * dt)
else:
state.battery_charge = max(0.0, state.battery_charge - 0.001 * dt)
return state.power_output_mw

View File

@@ -15,7 +15,7 @@ LOGGER = logging.getLogger(__name__)
def temperature_feedback(temp: float) -> float:
"""Negative coefficient: higher temperature lowers reactivity."""
reference = 900.0
coefficient = -1.5e-5
coefficient = -1.7e-5
return coefficient * (temp - reference)
@@ -28,25 +28,35 @@ class NeutronDynamics:
beta_effective: float = 0.0065
delayed_neutron_fraction: float = 0.0008
external_source_coupling: float = 1e-6
shutdown_bias: float = -0.014
def reactivity(self, state: CoreState, control_fraction: float) -> float:
rho = (
0.02 * (1.0 - control_fraction)
self.shutdown_bias +
constants.CONTROL_ROD_WORTH * (1.0 - control_fraction)
+ temperature_feedback(state.fuel_temperature)
- fuel_reactivity_penalty(state.burnup)
- xenon_poisoning(state.neutron_flux)
)
return rho
def flux_derivative(self, state: CoreState, rho: float, external_source_rate: float = 0.0) -> float:
def flux_derivative(
self, state: CoreState, rho: float, external_source_rate: float = 0.0, baseline_source: float = 1e5
) -> float:
generation_time = constants.NEUTRON_LIFETIME
beta = self.beta_effective
source_term = self.external_source_coupling * external_source_rate
return ((rho - beta) / generation_time) * state.neutron_flux + 1e5 + source_term
return ((rho - beta) / generation_time) * state.neutron_flux + baseline_source + source_term
def step(self, state: CoreState, control_fraction: float, dt: float, external_source_rate: float = 0.0) -> None:
rho = self.reactivity(state, control_fraction)
d_flux = self.flux_derivative(state, rho, external_source_rate)
rho = min(rho, 0.02)
shutdown = control_fraction >= 0.95
if shutdown:
rho = min(rho, -0.04)
baseline = 0.0 if shutdown else 1e5
source = 0.0 if shutdown else external_source_rate
d_flux = self.flux_derivative(state, rho, source, baseline_source=baseline)
state.neutron_flux = max(0.0, state.neutron_flux + d_flux * dt)
state.reactivity_margin = rho
LOGGER.debug(

View File

@@ -13,8 +13,9 @@ from .consumer import ElectricalConsumer
from .control import ControlSystem
from .failures import HealthMonitor
from .fuel import FuelAssembly, decay_heat_fraction
from .generator import DieselGenerator, GeneratorState
from .neutronics import NeutronDynamics
from .state import CoolantLoopState, CoreState, PlantState, TurbineState
from .state import CoolantLoopState, CoreState, PlantState, PumpState, TurbineState
from .thermal import ThermalSolver, heat_transfer
from .turbine import SteamGenerator, Turbine
@@ -31,15 +32,20 @@ class Reactor:
thermal: ThermalSolver
steam_generator: SteamGenerator
turbines: list[Turbine]
generators: list[DieselGenerator]
atomic_model: AtomicPhysics
consumer: ElectricalConsumer | None = None
health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
primary_pump_active: bool = True
secondary_pump_active: bool = True
primary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
secondary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
turbine_active: bool = True
turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True])
shutdown: bool = False
meltdown: bool = False
poison_alerts: set[str] = field(default_factory=set)
maintenance_active: set[str] = field(default_factory=set)
def __post_init__(self) -> None:
if not self.turbines:
@@ -47,6 +53,12 @@ class Reactor:
if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines):
self.turbine_unit_active = [True] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active)
if not self.generators:
self.generators = [DieselGenerator() for _ in range(2)]
if not self.primary_pump_units or len(self.primary_pump_units) != 2:
self.primary_pump_units = [True, True]
if not self.secondary_pump_units or len(self.secondary_pump_units) != 2:
self.secondary_pump_units = [True, True]
@classmethod
def default(cls) -> "Reactor":
@@ -60,6 +72,7 @@ class Reactor:
thermal=ThermalSolver(),
steam_generator=SteamGenerator(),
turbines=[Turbine() for _ in range(3)],
generators=[DieselGenerator() for _ in range(2)],
atomic_model=atomic_model,
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
health_monitor=HealthMonitor(),
@@ -76,6 +89,18 @@ class Reactor:
fission_product_inventory={},
emitted_particles={},
)
# Default to a cold, safe configuration: rods fully inserted, manual control, pumps/turbines off.
self.control.manual_control = True
self.control.rod_fraction = 0.95
self.shutdown = True
self.meltdown = False
self.primary_pump_active = False
self.secondary_pump_active = False
self.turbine_unit_active = [False] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active)
if self.consumer:
self.consumer.set_online(False)
primary = CoolantLoopState(
temperature_in=ambient,
temperature_out=ambient,
@@ -90,6 +115,12 @@ class Reactor:
mass_flow_rate=0.0,
steam_quality=0.0,
)
primary_pumps = [PumpState(active=self.primary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)]
secondary_pumps = [PumpState(active=self.secondary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)]
generator_states = [
GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0)
for _ in self.generators
]
turbine_states = [
TurbineState(
steam_enthalpy=2_000.0,
@@ -101,7 +132,15 @@ class Reactor:
)
for _ in self.turbines
]
return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbines=turbine_states)
return PlantState(
core=core,
primary_loop=primary,
secondary_loop=secondary,
turbines=turbine_states,
primary_pumps=primary_pumps,
secondary_pumps=secondary_pumps,
generators=generator_states,
)
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
if self.shutdown:
@@ -109,6 +148,9 @@ class Reactor:
else:
rod_fraction = self.control.update_rods(state.core, dt)
if state.core.fuel_temperature >= constants.CORE_MELTDOWN_TEMPERATURE and not self.meltdown:
self._trigger_meltdown(state)
overrides = {}
if command:
overrides = self._apply_command(command, state)
@@ -138,28 +180,124 @@ class Reactor:
self._check_poison_alerts(state)
pump_demand = overrides.get("coolant_demand", self.control.coolant_demand(state.primary_loop))
self.primary_pump_active = self.primary_pump_active and any(self.primary_pump_units)
self.secondary_pump_active = self.secondary_pump_active and any(self.secondary_pump_units)
primary_units_active = [
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
for idx in range(2)
]
secondary_units_active = [
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
for idx in range(2)
]
aux_demand = constants.BASE_AUX_LOAD_MW + constants.PUMP_POWER_MW * (
sum(primary_units_active) + sum(secondary_units_active)
)
turbine_electrical = state.total_electrical_output()
generator_power = self._step_generators(state, aux_demand, turbine_electrical, dt)
aux_available = turbine_electrical + generator_power
power_ratio = 1.0 if aux_demand <= 0 else min(1.0, aux_available / aux_demand)
if aux_demand > 0 and aux_available < 0.5 * aux_demand:
LOGGER.warning("Aux power deficit: available %.1f/%.1f MW", aux_available, aux_demand)
if self.primary_pump_active:
self.primary_pump.step(state.primary_loop, pump_demand)
total_flow = 0.0
target_pressure = (12.0 * pump_demand + 2.0) * power_ratio
loop_pressure = 0.5
target_flow = self.primary_pump.flow_rate(pump_demand) * power_ratio
for idx, pump_state in enumerate(state.primary_pumps):
unit_enabled = (
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
)
desired_flow = target_flow if unit_enabled else 0.0
desired_pressure = target_pressure if unit_enabled else 0.5
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, desired_flow, dt, self.primary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, desired_pressure, dt, self.primary_pump.spool_time
)
pump_state.active = (unit_enabled and power_ratio > 0.05) or pump_state.flow_rate > 1.0
total_flow += pump_state.flow_rate
loop_pressure = max(loop_pressure, pump_state.pressure)
state.primary_loop.mass_flow_rate = total_flow
state.primary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
)
else:
state.primary_loop.mass_flow_rate = 0.0
state.primary_loop.pressure = 0.5
state.primary_loop.mass_flow_rate = self._ramp_value(
state.primary_loop.mass_flow_rate, 0.0, dt, self.primary_pump.spool_time
)
state.primary_loop.pressure = self._ramp_value(
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
)
for pump_state in state.primary_pumps:
pump_state.active = False
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, 0.0, dt, self.primary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, state.primary_loop.pressure, dt, self.primary_pump.spool_time
)
if self.secondary_pump_active:
self.secondary_pump.step(state.secondary_loop, 0.75)
total_flow = 0.0
target_pressure = 12.0 * 0.75 + 2.0
loop_pressure = 0.5
target_flow = self.secondary_pump.flow_rate(0.75)
for idx, pump_state in enumerate(state.secondary_pumps):
unit_enabled = (
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
)
desired_flow = target_flow if unit_enabled else 0.0
desired_pressure = target_pressure if unit_enabled else 0.5
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, desired_flow, dt, self.secondary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, desired_pressure, dt, self.secondary_pump.spool_time
)
pump_state.active = unit_enabled or pump_state.flow_rate > 1.0
total_flow += pump_state.flow_rate
loop_pressure = max(loop_pressure, pump_state.pressure)
state.secondary_loop.mass_flow_rate = total_flow
state.secondary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
)
else:
state.secondary_loop.mass_flow_rate = 0.0
state.secondary_loop.pressure = 0.5
state.secondary_loop.mass_flow_rate = self._ramp_value(
state.secondary_loop.mass_flow_rate, 0.0, dt, self.secondary_pump.spool_time
)
state.secondary_loop.pressure = self._ramp_value(
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
)
for pump_state in state.secondary_pumps:
pump_state.active = False
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, 0.0, dt, self.secondary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, state.secondary_loop.pressure, dt, self.secondary_pump.spool_time
)
self.thermal.step_core(state.core, state.primary_loop, total_power, dt)
if not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0:
transferred = 0.0
else:
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power)
self.thermal.step_secondary(state.secondary_loop, transferred)
self._step_turbine_bank(state, transferred)
self._step_turbine_bank(state, transferred, dt)
self._maintenance_tick(state, dt)
if (not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0) and total_power > 50.0:
self._handle_heat_sink_loss(state)
failures = self.health_monitor.evaluate(
state,
self.primary_pump_active,
self.secondary_pump_active,
primary_units_active,
secondary_units_active,
self.turbine_unit_active,
state.generators,
dt,
)
for failure in failures:
@@ -183,7 +321,7 @@ class Reactor:
sum(t.load_demand_mw for t in state.turbines),
)
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float) -> None:
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float, dt: float) -> None:
if not state.turbines:
return
active_indices = [
@@ -195,9 +333,9 @@ class Reactor:
break
turbine_state = state.turbines[idx]
if idx in active_indices:
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit)
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit, dt=dt)
else:
self._reset_turbine_state(turbine_state)
self._spin_down_turbine(turbine_state, dt, turbine.spool_time)
self._dispatch_consumer_load(state, active_indices)
def _reset_turbine_state(self, turbine_state: TurbineState) -> None:
@@ -206,6 +344,23 @@ class Reactor:
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = 0.0
@staticmethod
def _ramp_value(current: float, target: float, dt: float, time_constant: float) -> float:
if time_constant <= 0.0:
return target
alpha = min(1.0, max(0.0, dt / time_constant))
return current + (target - current) * alpha
def _spin_down_turbine(self, turbine_state: TurbineState, dt: float, time_constant: float) -> None:
turbine_state.shaft_power_mw = self._ramp_value(turbine_state.shaft_power_mw, 0.0, dt, time_constant)
turbine_state.electrical_output_mw = self._ramp_value(
turbine_state.electrical_output_mw, 0.0, dt, time_constant
)
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = self._ramp_value(
turbine_state.load_supplied_mw, 0.0, dt, time_constant
)
def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None:
total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices)
if self.consumer:
@@ -231,10 +386,15 @@ class Reactor:
LOGGER.critical("Core failure detected. Initiating SCRAM.")
self.shutdown = True
self.control.scram()
elif component == "primary_pump":
self._set_primary_pump(False)
elif component == "secondary_pump":
self._set_secondary_pump(False)
elif component.startswith("primary_pump"):
idx = self._component_index(component)
self._toggle_primary_pump_unit(idx, False)
elif component.startswith("secondary_pump"):
idx = self._component_index(component)
self._toggle_secondary_pump_unit(idx, False)
elif component.startswith("generator"):
idx = self._component_index(component)
LOGGER.warning("Generator %d failed", idx + 1)
elif component.startswith("turbine"):
idx = self._component_index(component)
self._set_turbine_state(False, index=idx)
@@ -255,10 +415,15 @@ class Reactor:
self.shutdown = self.shutdown or command.rod_position >= 0.95
elif command.rod_step is not None:
overrides["rod_fraction"] = self.control.increment_rods(command.rod_step)
if command.primary_pump_on is not None:
self._set_primary_pump(command.primary_pump_on)
if command.secondary_pump_on is not None:
self._set_secondary_pump(command.secondary_pump_on)
if command.primary_pumps:
for idx, flag in command.primary_pumps.items():
self._toggle_primary_pump_unit(idx - 1, flag)
if command.secondary_pumps:
for idx, flag in command.secondary_pumps.items():
self._toggle_secondary_pump_unit(idx - 1, flag)
if command.generator_units:
for idx, flag in command.generator_units.items():
self._toggle_generator(idx - 1, flag, state)
if command.turbine_on is not None:
self._set_turbine_state(command.turbine_on)
if command.turbine_units:
@@ -272,18 +437,101 @@ class Reactor:
if command.coolant_demand is not None:
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
for component in command.maintenance_components:
self._perform_maintenance(component)
self._toggle_maintenance(component)
return overrides
def _set_primary_pump(self, active: bool) -> None:
if self.primary_pump_active != active:
self.primary_pump_active = active
LOGGER.info("Primary pump %s", "enabled" if active else "stopped")
if not active:
self.primary_pump_units = [False] * len(self.primary_pump_units)
elif active and not any(self.primary_pump_units):
self.primary_pump_units = [True] * len(self.primary_pump_units)
def _set_secondary_pump(self, active: bool) -> None:
if self.secondary_pump_active != active:
self.secondary_pump_active = active
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
if not active:
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
elif active and not any(self.secondary_pump_units):
self.secondary_pump_units = [True] * len(self.secondary_pump_units)
def _toggle_primary_pump_unit(self, index: int, active: bool) -> None:
if index < 0 or index >= len(self.primary_pump_units):
LOGGER.warning("Ignoring primary pump index %s", index)
return
if self.primary_pump_units[index] != active:
self.primary_pump_units[index] = active
LOGGER.info("Primary pump %d %s", index + 1, "enabled" if active else "stopped")
if active:
self._set_primary_pump(True)
elif not any(self.primary_pump_units):
self._set_primary_pump(False)
def _toggle_secondary_pump_unit(self, index: int, active: bool) -> None:
if index < 0 or index >= len(self.secondary_pump_units):
LOGGER.warning("Ignoring secondary pump index %s", index)
return
if self.secondary_pump_units[index] != active:
self.secondary_pump_units[index] = active
LOGGER.info("Secondary pump %d %s", index + 1, "enabled" if active else "stopped")
if active:
self._set_secondary_pump(True)
elif not any(self.secondary_pump_units):
self._set_secondary_pump(False)
def _toggle_generator(self, index: int, active: bool, state: PlantState) -> None:
if index < 0 or index >= len(self.generators) or index >= len(state.generators):
LOGGER.warning("Ignoring generator index %s", index)
return
gen_state = state.generators[index]
if active:
self.generators[index].start(gen_state)
else:
self.generators[index].stop(gen_state)
def _trigger_meltdown(self, state: PlantState) -> None:
LOGGER.critical("Core meltdown in progress (%.1f K)", state.core.fuel_temperature)
self.meltdown = True
self.shutdown = True
self.control.scram()
try:
self.health_monitor.component("core").fail()
except KeyError:
pass
self._set_turbine_state(False)
def _step_generators(self, state: PlantState, aux_demand: float, turbine_electric: float, dt: float) -> float:
# Ensure we have generator state objects aligned with hardware.
if not state.generators or len(state.generators) < len(self.generators):
missing = len(self.generators) - len(state.generators)
for _ in range(missing):
state.generators.append(
GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0)
)
deficit = max(0.0, aux_demand - turbine_electric)
if deficit > 0.0:
for idx, gen_state in enumerate(state.generators):
if not (gen_state.running or gen_state.starting):
self.generators[idx].start(gen_state)
deficit -= self.generators[idx].rated_output_mw
if deficit <= 0:
break
elif turbine_electric > aux_demand:
for idx, gen_state in enumerate(state.generators):
if gen_state.running and not gen_state.starting:
self.generators[idx].stop(gen_state)
total_power = 0.0
remaining = max(0.0, aux_demand - turbine_electric)
for idx, gen_state in enumerate(state.generators):
load = remaining if remaining > 0 else 0.0
delivered = self.generators[idx].step(gen_state, load, dt)
total_power += delivered
remaining = max(0.0, remaining - delivered)
return total_power
def _set_turbine_state(self, active: bool, index: int | None = None) -> None:
if index is None:
@@ -301,30 +549,77 @@ class Reactor:
def _component_index(self, name: str) -> int:
if name == "turbine":
return 0
parts = name.split("_")
try:
return int(name.split("_")[1]) - 1
except (IndexError, ValueError):
for token in reversed(parts):
return int(token) - 1
except (ValueError, TypeError):
return -1
def _perform_maintenance(self, component: str) -> None:
if not self._can_maintain(component):
return
self.health_monitor.maintain(component)
def _maintenance_tick(self, state: PlantState, dt: float) -> None:
if not self.maintenance_active:
return
completed: list[str] = []
for component in list(self.maintenance_active):
if not self._can_maintain(component):
continue
restored = self.health_monitor.maintain(component, amount=0.02 * dt)
comp_state = self.health_monitor.component(component)
if comp_state.integrity >= 0.999:
completed.append(component)
for comp in completed:
self.maintenance_active.discard(comp)
LOGGER.info("Maintenance completed for %s", comp)
def _toggle_maintenance(self, component: str) -> None:
if component in self.maintenance_active:
self.maintenance_active.remove(component)
LOGGER.info("Maintenance stopped for %s", component)
return
if not self._can_maintain(component):
return
self.maintenance_active.add(component)
LOGGER.info("Maintenance started for %s", component)
def _can_maintain(self, component: str) -> bool:
if component == "core" and not self.shutdown:
LOGGER.warning("Cannot maintain core while reactor is running")
return
if component == "primary_pump" and self.primary_pump_active:
LOGGER.warning("Stop primary pump before maintenance")
return
if component == "secondary_pump" and self.secondary_pump_active:
LOGGER.warning("Stop secondary pump before maintenance")
return
return False
if component.startswith("primary_pump_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.primary_pump_units):
LOGGER.warning("Unknown primary pump maintenance target %s", component)
return False
if self.primary_pump_units[idx]:
LOGGER.warning("Stop primary pump %d before maintenance", idx + 1)
return False
if component.startswith("secondary_pump_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.secondary_pump_units):
LOGGER.warning("Unknown secondary pump maintenance target %s", component)
return False
if self.secondary_pump_units[idx]:
LOGGER.warning("Stop secondary pump %d before maintenance", idx + 1)
return False
if component.startswith("generator_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.generators):
LOGGER.warning("Unknown generator maintenance target %s", component)
return False
if component.startswith("turbine"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.turbine_unit_active):
LOGGER.warning("Unknown turbine maintenance target %s", component)
return
return False
if self.turbine_unit_active[idx]:
LOGGER.warning("Stop turbine %d before maintenance", idx + 1)
return
self.health_monitor.maintain(component)
return False
return True
def attach_consumer(self, consumer: ElectricalConsumer) -> None:
self.consumer = consumer
@@ -339,9 +634,23 @@ class Reactor:
metadata = {
"primary_pump_active": self.primary_pump_active,
"secondary_pump_active": self.secondary_pump_active,
"primary_pump_units": self.primary_pump_units,
"secondary_pump_units": self.secondary_pump_units,
"turbine_active": self.turbine_active,
"turbine_units": self.turbine_unit_active,
"shutdown": self.shutdown,
"meltdown": self.meltdown,
"maintenance_active": list(self.maintenance_active),
"generators": [
{
"running": g.running,
"starting": g.starting,
"spool_remaining": g.spool_remaining,
"power_output_mw": g.power_output_mw,
"battery_charge": g.battery_charge,
}
for g in state.generators
],
"consumer": {
"online": self.consumer.online if self.consumer else False,
"demand_mw": self.consumer.demand_mw if self.consumer else 0.0,
@@ -354,11 +663,17 @@ class Reactor:
plant, metadata, health = self.control.load_state(filepath)
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
self.primary_pump_units = list(metadata.get("primary_pump_units", self.primary_pump_units))
self.secondary_pump_units = list(metadata.get("secondary_pump_units", self.secondary_pump_units))
unit_states = metadata.get("turbine_units")
if unit_states:
self.turbine_unit_active = list(unit_states)
self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active))
self.shutdown = metadata.get("shutdown", self.shutdown)
self.meltdown = metadata.get("meltdown", self.meltdown)
maint = metadata.get("maintenance_active")
if maint is not None:
self.maintenance_active = set(maint)
consumer_cfg = metadata.get("consumer")
if consumer_cfg:
if not self.consumer:
@@ -373,6 +688,21 @@ class Reactor:
if health:
self.health_monitor.load_snapshot(health)
LOGGER.info("Reactor state restored from %s", filepath)
# Back-fill pump state lists for compatibility.
if not plant.primary_pumps or len(plant.primary_pumps) < 2:
plant.primary_pumps = [
PumpState(active=self.primary_pump_active, flow_rate=plant.primary_loop.mass_flow_rate / 2, pressure=plant.primary_loop.pressure)
for _ in range(2)
]
if not plant.secondary_pumps or len(plant.secondary_pumps) < 2:
plant.secondary_pumps = [
PumpState(
active=self.secondary_pump_active,
flow_rate=plant.secondary_loop.mass_flow_rate / 2,
pressure=plant.secondary_loop.pressure,
)
for _ in range(2)
]
if len(plant.turbines) < len(self.turbines):
ambient = constants.ENVIRONMENT_TEMPERATURE
while len(plant.turbines) < len(self.turbines):
@@ -386,8 +716,38 @@ class Reactor:
load_supplied_mw=0.0,
)
)
gen_meta = metadata.get("generators", [])
if not plant.generators or len(plant.generators) < len(self.generators):
while len(plant.generators) < len(self.generators):
plant.generators.append(
GeneratorState(
running=False,
starting=False,
spool_remaining=0.0,
power_output_mw=0.0,
battery_charge=1.0,
)
)
for idx, gen_state in enumerate(plant.generators):
if idx < len(gen_meta):
cfg = gen_meta[idx]
gen_state.running = cfg.get("running", gen_state.running)
gen_state.starting = cfg.get("starting", gen_state.starting)
gen_state.spool_remaining = cfg.get("spool_remaining", gen_state.spool_remaining)
gen_state.power_output_mw = cfg.get("power_output_mw", gen_state.power_output_mw)
gen_state.battery_charge = cfg.get("battery_charge", gen_state.battery_charge)
return plant
def _handle_heat_sink_loss(self, state: PlantState) -> None:
if not self.shutdown:
LOGGER.critical("Loss of secondary heat sink detected. Initiating SCRAM.")
self.shutdown = True
self.control.scram()
self._set_turbine_state(False)
# Clear turbine output and demands to reflect lost steam.
for turbine_state in state.turbines:
self._reset_turbine_state(turbine_state)
def _check_poison_alerts(self, state: PlantState) -> None:
inventory = state.core.fission_product_inventory or {}
for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items():

View File

@@ -4,6 +4,8 @@ from __future__ import annotations
from dataclasses import dataclass, field, asdict
from .generator import GeneratorState
def clamp(value: float, min_value: float, max_value: float) -> float:
return max(min_value, min(max_value, value))
@@ -54,12 +56,22 @@ class TurbineState:
load_supplied_mw: float = 0.0
@dataclass
class PumpState:
active: bool
flow_rate: float
pressure: float
@dataclass
class PlantState:
core: CoreState
primary_loop: CoolantLoopState
secondary_loop: CoolantLoopState
turbines: list[TurbineState]
primary_pumps: list[PumpState] = field(default_factory=list)
secondary_pumps: list[PumpState] = field(default_factory=list)
generators: list[GeneratorState] = field(default_factory=list)
time_elapsed: float = field(default=0.0)
def snapshot(self) -> dict[str, float]:
@@ -73,6 +85,9 @@ class PlantState:
"turbine_electric": self.total_electrical_output(),
"products": self.core.fission_product_inventory,
"particles": self.core.emitted_particles,
"primary_pumps": [pump.active for pump in self.primary_pumps],
"secondary_pumps": [pump.active for pump in self.secondary_pumps],
"generators": [gen.running or gen.starting for gen in self.generators],
}
def total_electrical_output(self) -> float:
@@ -92,10 +107,17 @@ class PlantState:
old_turbine = data.get("turbine")
turbines_blob = [old_turbine] if old_turbine else []
turbines = [TurbineState(**t) for t in turbines_blob]
prim_pumps_blob = data.get("primary_pumps", [])
sec_pumps_blob = data.get("secondary_pumps", [])
generators_blob = data.get("generators", [])
generators = [GeneratorState(**g) for g in generators_blob]
return cls(
core=CoreState(**core_blob, fission_product_inventory=inventory, emitted_particles=particles),
primary_loop=CoolantLoopState(**data["primary_loop"]),
secondary_loop=CoolantLoopState(**data["secondary_loop"]),
turbines=turbines,
primary_pumps=[PumpState(**p) for p in prim_pumps_blob],
secondary_pumps=[PumpState(**p) for p in sec_pumps_blob],
generators=generators,
time_elapsed=data.get("time_elapsed", 0.0),
)

View File

@@ -15,6 +15,8 @@ LOGGER = logging.getLogger(__name__)
def heat_transfer(primary: CoolantLoopState, secondary: CoolantLoopState, core_power_mw: float) -> float:
"""Return MW transferred to the secondary loop."""
if secondary.mass_flow_rate <= 0.0:
return 0.0
delta_t = max(0.0, primary.temperature_out - secondary.temperature_in)
conductance = 0.15 # steam generator effectiveness
efficiency = 1.0 - math.exp(-conductance * delta_t)

View File

@@ -26,12 +26,14 @@ class Turbine:
generator_efficiency: float = constants.GENERATOR_EFFICIENCY
mechanical_efficiency: float = constants.STEAM_TURBINE_EFFICIENCY
rated_output_mw: float = 400.0 # cap per unit electrical output
spool_time: float = constants.TURBINE_SPOOL_TIME
def step(
self,
loop: CoolantLoopState,
state: TurbineState,
steam_power_mw: float = 0.0,
dt: float = 1.0,
) -> None:
enthalpy = 2_700.0 + loop.steam_quality * 600.0
mass_flow = loop.mass_flow_rate * 0.6
@@ -43,8 +45,8 @@ class Turbine:
shaft_power_mw = electrical / max(1e-6, self.generator_efficiency)
condenser_temp = max(305.0, loop.temperature_in - 20.0)
state.steam_enthalpy = enthalpy
state.shaft_power_mw = shaft_power_mw
state.electrical_output_mw = electrical
state.shaft_power_mw = _ramp(state.shaft_power_mw, shaft_power_mw, dt, self.spool_time)
state.electrical_output_mw = _ramp(state.electrical_output_mw, electrical, dt, self.spool_time)
state.condenser_temperature = condenser_temp
LOGGER.debug(
"Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK",
@@ -52,3 +54,10 @@ class Turbine:
electrical,
condenser_temp,
)
def _ramp(current: float, target: float, dt: float, time_constant: float) -> float:
if time_constant <= 0.0:
return target
alpha = min(1.0, max(0.0, dt / time_constant))
return current + (target - current) * alpha

View File

@@ -22,5 +22,4 @@ def test_reactivity_increases_with_rod_withdrawal():
state = _core_state()
rho_full_out = dynamics.reactivity(state, control_fraction=0.0)
rho_half = dynamics.reactivity(state, control_fraction=0.5)
assert rho_full_out > 0.0
assert rho_full_out > rho_half

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import pytest
from reactor_sim import constants
from reactor_sim.commands import ReactorCommand
from reactor_sim.failures import HealthMonitor
from reactor_sim.reactor import Reactor
from reactor_sim.simulation import ReactorSimulation
@@ -19,6 +20,7 @@ def test_reactor_initial_state_is_cold():
def test_state_save_and_load_roundtrip(tmp_path: Path):
reactor = Reactor.default()
reactor.control.manual_control = True
sim = ReactorSimulation(reactor, timestep=5.0, duration=15.0)
sim.log()
save_path = tmp_path / "plant_state.json"
@@ -31,13 +33,18 @@ def test_state_save_and_load_roundtrip(tmp_path: Path):
sim.last_state.core.fuel_temperature
)
assert restored_reactor.control.rod_fraction == reactor.control.rod_fraction
assert restored_reactor.control.manual_control == reactor.control.manual_control
assert len(restored_state.primary_pumps) == 2
assert len(restored_state.secondary_pumps) == 2
def test_health_monitor_flags_core_failure():
reactor = Reactor.default()
state = reactor.initial_state()
state.core.fuel_temperature = constants.MAX_CORE_TEMPERATURE
failures = reactor.health_monitor.evaluate(state, True, True, [True, True, True], dt=200.0)
failures = reactor.health_monitor.evaluate(
state, [True, True], [True, True], [True, True, True], state.generators, dt=200.0
)
assert "core" in failures
reactor._handle_failure("core")
assert reactor.shutdown is True
@@ -45,10 +52,171 @@ def test_health_monitor_flags_core_failure():
def test_maintenance_recovers_component_health():
monitor = HealthMonitor()
pump = monitor.component("secondary_pump")
pump = monitor.component("secondary_pump_1")
pump.integrity = 0.3
pump.fail()
restored = monitor.maintain("secondary_pump", amount=0.5)
restored = monitor.maintain("secondary_pump_1", amount=0.5)
assert restored is True
assert pump.integrity == pytest.approx(0.8)
assert pump.failed is False
def test_secondary_pump_loss_triggers_scram_and_no_steam():
reactor = Reactor.default()
state = reactor.initial_state()
# Make sure some power is present to trigger heat-sink logic.
state.core.power_output_mw = 500.0
reactor.secondary_pump_active = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.1
reactor.step(state, dt=1.0)
assert reactor.shutdown is True
assert all(t.electrical_output_mw == 0.0 for t in state.turbines)
def test_cold_shutdown_stays_subcritical():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.95
reactor.primary_pump_active = False
reactor.secondary_pump_active = False
initial_power = state.core.power_output_mw
for _ in range(10):
reactor.step(state, dt=1.0)
assert state.core.power_output_mw <= initial_power + 0.5
assert reactor.shutdown is True
def test_toggle_maintenance_progresses_until_restored():
reactor = Reactor.default()
reactor.primary_pump_units = [False, False]
reactor.primary_pump_active = False
pump = reactor.health_monitor.component("primary_pump_1")
pump.integrity = 0.2
def provider(t: float, _state):
if t == 0:
return ReactorCommand.maintain("primary_pump_1")
return None
sim = ReactorSimulation(reactor, timestep=1.0, duration=50.0, command_provider=provider)
sim.log()
assert pump.integrity >= 0.99
assert "primary_pump_1" not in reactor.maintenance_active
def test_primary_pump_unit_toggle_updates_active_flag():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.primary_pump_active = True
reactor.primary_pump_units = [True, True]
reactor.step(state, dt=1.0, command=ReactorCommand(primary_pumps={1: False}))
assert reactor.primary_pump_units == [False, True]
assert reactor.primary_pump_active is True
reactor.step(state, dt=1.0, command=ReactorCommand(primary_pumps={2: False}))
assert reactor.primary_pump_units == [False, False]
assert reactor.primary_pump_active is False
def test_secondary_pump_unit_toggle_can_restart_pump():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.secondary_pump_active = False
reactor.secondary_pump_units = [False, False]
reactor.step(state, dt=1.0, command=ReactorCommand(secondary_pumps={1: True}))
assert reactor.secondary_pump_units == [True, False]
assert reactor.secondary_pump_active is True
assert state.secondary_loop.mass_flow_rate > 0.0
def test_primary_pumps_spool_up_over_seconds():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.secondary_pump_units = [False, False]
# Enable both pumps and command full flow; spool should take multiple steps.
target_flow = reactor.primary_pump.flow_rate(1.0) * len(reactor.primary_pump_units)
reactor.step(
state,
dt=1.0,
command=ReactorCommand(primary_pumps={1: True, 2: True}, generator_units={1: True}, coolant_demand=1.0),
)
first_flow = state.primary_loop.mass_flow_rate
assert 0.0 < first_flow < target_flow
for _ in range(10):
reactor.step(state, dt=1.0, command=ReactorCommand(coolant_demand=1.0))
assert state.primary_loop.mass_flow_rate == pytest.approx(target_flow, rel=0.15)
def test_full_rod_withdrawal_reaches_gigawatt_power():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.0
reactor.primary_pump_active = True
reactor.secondary_pump_active = True
early_power = 0.0
for step in range(60):
reactor.step(state, dt=1.0)
if step == 10:
early_power = state.core.power_output_mw
assert state.core.power_output_mw > max(2_000.0, early_power * 2)
assert state.core.fuel_temperature > 600.0
def test_partially_inserted_rods_hold_near_three_gw():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.4
reactor.primary_pump_active = True
reactor.secondary_pump_active = True
for _ in range(120):
reactor.step(state, dt=1.0)
assert 2_000.0 < state.core.power_output_mw < 4_000.0
assert 500.0 < state.core.fuel_temperature < 800.0
def test_generator_spools_and_powers_pumps():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.95 # keep power low; focus on aux power
reactor.turbine_unit_active = [False, False, False]
reactor.secondary_pump_units = [False, False]
for step in range(12):
cmd = ReactorCommand(generator_units={1: True}, primary_pumps={1: True}) if step == 0 else None
reactor.step(state, dt=1.0, command=cmd)
assert state.generators and state.generators[0].running is True
assert state.generators[0].power_output_mw > 0.0
assert state.primary_loop.mass_flow_rate > 0.0
def test_meltdown_triggers_shutdown():
reactor = Reactor.default()
state = reactor.initial_state()
reactor.shutdown = False
reactor.control.manual_control = True
reactor.control.rod_fraction = 0.0
reactor.primary_pump_active = True
reactor.secondary_pump_active = True
state.core.fuel_temperature = constants.CORE_MELTDOWN_TEMPERATURE + 50.0
reactor.step(state, dt=1.0)
assert reactor.shutdown is True
assert reactor.meltdown is True

33
tests/test_turbine.py Normal file
View File

@@ -0,0 +1,33 @@
import pytest
from reactor_sim.state import CoolantLoopState, TurbineState
from reactor_sim.turbine import Turbine
def test_turbine_spools_toward_target_output():
turbine = Turbine()
loop = CoolantLoopState(
temperature_in=600.0,
temperature_out=650.0,
pressure=6.0,
mass_flow_rate=20_000.0,
steam_quality=0.9,
)
state = TurbineState(
steam_enthalpy=0.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=300.0,
)
target_electric = min(
turbine.rated_output_mw, 300.0 * turbine.mechanical_efficiency * turbine.generator_efficiency
)
dt = 5.0
turbine.step(loop, state, steam_power_mw=300.0, dt=dt)
assert 0.0 < state.electrical_output_mw < target_electric
for _ in range(5):
turbine.step(loop, state, steam_power_mw=300.0, dt=dt)
assert state.electrical_output_mw == pytest.approx(target_electric, rel=0.05)