Files
Reactor-Sim/src/reactor_sim/reactor.py

1219 lines
61 KiB
Python

"""High-level reactor orchestration."""
from __future__ import annotations
from dataclasses import dataclass, field
import logging
from . import constants
from .atomic import AtomicPhysics
from .commands import ReactorCommand
from .coolant import Pump
from .consumer import ElectricalConsumer
from .control import ControlSystem
from .failures import HealthMonitor
from .fuel import FuelAssembly, decay_heat_fraction
from .generator import DieselGenerator, GeneratorState
from .neutronics import NeutronDynamics
from .state import CoolantLoopState, CoreState, PlantState, PumpState, TurbineState
from .thermal import ThermalSolver, heat_transfer, saturation_pressure, saturation_temperature, temperature_rise
from .turbine import SteamGenerator, Turbine
LOGGER = logging.getLogger(__name__)
@dataclass
class Reactor:
fuel: FuelAssembly
neutronics: NeutronDynamics
control: ControlSystem
primary_pump: Pump
secondary_pump: Pump
thermal: ThermalSolver
steam_generator: SteamGenerator
turbines: list[Turbine]
generators: list[DieselGenerator]
atomic_model: AtomicPhysics
consumer: ElectricalConsumer | None = None
health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
pressurizer_level: float = 0.6
allow_external_aux: bool = False
relaxed_npsh: bool = False
primary_pump_active: bool = True
secondary_pump_active: bool = True
primary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
secondary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
turbine_active: bool = True
turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True])
shutdown: bool = False
meltdown: bool = False
generator_auto: bool = False
primary_relief_open: bool = False
secondary_relief_open: bool = False
poison_alerts: set[str] = field(default_factory=set)
maintenance_active: set[str] = field(default_factory=set)
def __post_init__(self) -> None:
if not self.turbines:
self.turbines = [Turbine()]
if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines):
self.turbine_unit_active = [True] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active)
if not self.generators:
self.generators = [DieselGenerator() for _ in range(2)]
# Balance-of-plant controls
self.feedwater_valve = 0.5
self._last_steam_out_kg_s = 0.0
# Slow chemistry/boron trim control
self._boron_trim_active = True
if not self.primary_pump_units or len(self.primary_pump_units) != 2:
self.primary_pump_units = [True, True]
if not self.secondary_pump_units or len(self.secondary_pump_units) != 2:
self.secondary_pump_units = [True, True]
@classmethod
def default(cls) -> "Reactor":
atomic_model = AtomicPhysics()
return cls(
fuel=FuelAssembly(enrichment=0.045, mass_kg=80_000.0, atomic_physics=atomic_model),
neutronics=NeutronDynamics(),
control=ControlSystem(),
primary_pump=Pump(nominal_flow=18_000.0, shutoff_head_mpa=constants.PRIMARY_PUMP_SHUTOFF_HEAD_MPA),
secondary_pump=Pump(
nominal_flow=16_000.0, efficiency=0.85, shutoff_head_mpa=constants.SECONDARY_PUMP_SHUTOFF_HEAD_MPA
),
thermal=ThermalSolver(),
steam_generator=SteamGenerator(),
turbines=[Turbine() for _ in range(3)],
generators=[DieselGenerator() for _ in range(2)],
atomic_model=atomic_model,
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
health_monitor=HealthMonitor(),
)
def initial_state(self) -> PlantState:
ambient = constants.ENVIRONMENT_TEMPERATURE
primary_nominal_mass = constants.PRIMARY_LOOP_VOLUME_M3 * constants.COOLANT_DENSITY
secondary_nominal_mass = constants.SECONDARY_LOOP_VOLUME_M3 * constants.COOLANT_DENSITY
self.pressurizer_level = 0.6
core = CoreState(
fuel_temperature=ambient,
neutron_flux=1e5,
reactivity_margin=-0.02,
power_output_mw=0.1,
burnup=0.0,
delayed_precursors=[0.0 for _ in constants.CONTROL_ROD_BANK_WEIGHTS],
fission_product_inventory={},
emitted_particles={},
)
# Default to a cold, safe configuration: rods fully inserted, manual control, pumps/turbines off.
self.control.manual_control = True
self.control.rod_fraction = 0.95
self.control.rod_banks = [0.95 for _ in self.control.rod_banks]
self.control.rod_target = 0.95
self.shutdown = True
self.meltdown = False
self.generator_auto = False
self.primary_relief_open = False
self.secondary_relief_open = False
self.primary_pump_active = False
self.secondary_pump_active = False
self.primary_pump_units = [False] * len(self.primary_pump_units)
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
self.turbine_unit_active = [False] * len(self.turbines)
self.turbine_active = any(self.turbine_unit_active)
if self.consumer:
self.consumer.set_online(False)
primary = CoolantLoopState(
temperature_in=ambient,
temperature_out=ambient,
pressure=0.5,
mass_flow_rate=0.0,
steam_quality=0.0,
inventory_kg=primary_nominal_mass * constants.PRIMARY_INVENTORY_TARGET,
level=constants.PRIMARY_INVENTORY_TARGET,
energy_j=primary_nominal_mass * constants.PRIMARY_INVENTORY_TARGET * constants.COOLANT_HEAT_CAPACITY * ambient,
)
secondary = CoolantLoopState(
temperature_in=ambient,
temperature_out=ambient,
pressure=0.5,
mass_flow_rate=0.0,
steam_quality=0.0,
inventory_kg=secondary_nominal_mass * constants.SECONDARY_INVENTORY_TARGET,
level=constants.SECONDARY_INVENTORY_TARGET,
energy_j=secondary_nominal_mass * constants.SECONDARY_INVENTORY_TARGET * constants.COOLANT_HEAT_CAPACITY * ambient,
)
primary_pumps = [
PumpState(active=self.primary_pump_active and self.primary_pump_units[idx], flow_rate=0.0, pressure=0.5)
for idx in range(2)
]
secondary_pumps = [
PumpState(active=self.secondary_pump_active and self.secondary_pump_units[idx], flow_rate=0.0, pressure=0.5)
for idx in range(2)
]
generator_states = [
GeneratorState(
running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0, status="OFF"
)
for _ in self.generators
]
turbine_states = [
TurbineState(
steam_enthalpy=2_000.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=ambient,
load_demand_mw=0.0,
load_supplied_mw=0.0,
)
for _ in self.turbines
]
return PlantState(
core=core,
primary_loop=primary,
secondary_loop=secondary,
turbines=turbine_states,
primary_pumps=primary_pumps,
secondary_pumps=secondary_pumps,
generators=generator_states,
)
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
if self.shutdown:
rod_fraction = self.control.update_rods(state.core, dt)
else:
rod_fraction = self.control.update_rods(state.core, dt)
if state.core.fuel_temperature >= constants.CORE_MELTDOWN_TEMPERATURE and not self.meltdown:
self._trigger_meltdown(state)
overrides = {}
if command:
overrides = self._apply_command(command, state)
if not self.shutdown and not self.control.manual_control:
rod_fraction = self.control.update_rods(state.core, dt)
decay_power, decay_neutron_source, decay_products, decay_particles = self.fuel.decay_reaction_effects(
state.core
)
self.neutronics.update_poisons(state.core, dt)
# Apply soluble boron reactivity bias (slow trim).
boron_delta = state.boron_ppm - constants.CHEM_BORON_DEFAULT_PPM
self.neutronics.shutdown_bias = self.neutronics.base_shutdown_bias - boron_delta * constants.BORON_WORTH_PER_PPM
self.neutronics.step(state.core, rod_fraction, dt, external_source_rate=decay_neutron_source, rod_banks=self.control.rod_banks)
prompt_power, fission_rate, fission_event = self.fuel.prompt_energy_rate(
state.core.neutron_flux, rod_fraction
)
decay_heat = decay_heat_fraction(state.core.burnup) * state.core.power_output_mw
total_power = prompt_power + decay_heat + decay_power
total_power = min(total_power, constants.TEST_MAX_POWER_MW * 0.98, self.control.setpoint_mw * 1.1)
state.core.power_output_mw = total_power
state.core.update_burnup(dt)
# Track fission products and emitted particles for diagnostics.
products: dict[str, float] = {}
for atom in fission_event.products:
products[atom.symbol] = products.get(atom.symbol, 0.0) + fission_rate * dt
for k, v in decay_products.items():
products[k] = products.get(k, 0.0) + v * dt
particles: dict[str, float] = {k: v * dt for k, v in decay_particles.items()}
state.core.add_products(products)
state.core.add_emitted_particles(particles)
self._check_poison_alerts(state)
self._update_loop_inventory(
state.primary_loop, constants.PRIMARY_LOOP_VOLUME_M3, constants.PRIMARY_INVENTORY_TARGET, dt
)
pump_demand = overrides.get(
"coolant_demand",
self.control.coolant_demand(
state.primary_loop,
state.core.power_output_mw,
state.total_electrical_output(),
),
)
self.primary_pump_active = self.primary_pump_active and any(self.primary_pump_units)
self.secondary_pump_active = self.secondary_pump_active and any(self.secondary_pump_units)
primary_units_active = [
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
for idx in range(2)
]
secondary_units_active = [
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
for idx in range(2)
]
any_units = any(primary_units_active) or any(secondary_units_active)
load_present = any_units or (self.consumer and self.consumer.online) or state.total_electrical_output() > 0.1
idle_core = state.core.power_output_mw < 1.0 and self.control.rod_fraction >= 0.9
baseline_off = (
(self.shutdown or idle_core)
and not load_present
and state.total_electrical_output() <= 0.01
and sum(primary_units_active) == 0
and sum(secondary_units_active) == 0
)
aux_base = 0.0 if baseline_off else constants.BASE_AUX_LOAD_MW
aux_pump_primary = constants.PUMP_POWER_MW * sum(primary_units_active)
aux_pump_secondary = constants.PUMP_POWER_MW * sum(secondary_units_active)
aux_demand = aux_base + aux_pump_primary + aux_pump_secondary
turbine_electrical = state.total_electrical_output()
generator_power = self._step_generators(state, aux_demand, turbine_electrical, dt)
aux_available = turbine_electrical + generator_power
if self.allow_external_aux:
aux_available = max(aux_available, aux_demand)
supplied = aux_available if aux_demand <= 0 else min(aux_available, aux_demand)
power_ratio = 1.0 if aux_demand <= 0 else min(1.0, supplied / max(1e-6, aux_demand))
if aux_demand > 0 and aux_available < 0.99 * aux_demand:
LOGGER.warning("Aux power deficit: available %.1f/%.1f MW", aux_available, aux_demand)
state.aux_draws = {
"base": aux_base * power_ratio,
"primary_pumps": aux_pump_primary * power_ratio,
"secondary_pumps": aux_pump_secondary * power_ratio,
"total_demand": aux_demand,
"supplied": supplied,
"generator_output": generator_power,
"turbine_output": turbine_electrical,
}
if self.primary_pump_active:
total_flow = 0.0
base_flow, base_head = self.primary_pump.performance(pump_demand)
target_flow = base_flow * power_ratio
loop_pressure = max(
state.primary_loop.pressure, saturation_pressure(state.primary_loop.temperature_out), 0.1
)
target_pressure = max(0.5, base_head * power_ratio)
if self.primary_relief_open:
target_pressure = min(target_pressure, 1.0)
primary_flow_scale = min(
self._inventory_flow_scale(state.primary_loop), self._npsh_factor(state.primary_loop)
)
for idx, pump_state in enumerate(state.primary_pumps):
unit_enabled = (
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
)
powered = power_ratio > 0.1
desired_flow = target_flow * primary_flow_scale if unit_enabled else 0.0
desired_pressure = target_pressure if unit_enabled else 0.5
if not powered:
desired_flow = 0.0
desired_pressure = 0.5
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, desired_flow, dt, self.primary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, desired_pressure, dt, self.primary_pump.spool_time
)
pump_state.active = unit_enabled and powered and pump_state.flow_rate > 1.0
if not powered or not unit_enabled:
pump_state.status = "STOPPING" if pump_state.flow_rate > 1.0 else "OFF"
elif primary_flow_scale < 0.99:
pump_state.status = "CAV"
elif pump_state.flow_rate < max(1.0, desired_flow * 0.8):
pump_state.status = "STARTING"
else:
pump_state.status = "RUN"
total_flow += pump_state.flow_rate
loop_pressure = max(loop_pressure, pump_state.pressure)
state.primary_loop.mass_flow_rate = total_flow
state.primary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
)
else:
state.primary_loop.mass_flow_rate = self._ramp_value(
state.primary_loop.mass_flow_rate, 0.0, dt, self.primary_pump.spool_time
)
target_pressure = max(0.5, saturation_pressure(state.primary_loop.temperature_out))
state.primary_loop.pressure = self._ramp_value(
state.primary_loop.pressure, target_pressure, dt, self.primary_pump.spool_time
)
for pump_state in state.primary_pumps:
pump_state.active = False
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, 0.0, dt, self.primary_pump.spool_time
)
pump_state.pressure = state.primary_loop.pressure
pump_state.status = "STOPPING" if pump_state.flow_rate > 0.1 else "OFF"
if self.secondary_pump_active:
total_flow = 0.0
demand = 0.75
base_flow, base_head = self.secondary_pump.performance(demand)
target_pressure = max(0.5, base_head * power_ratio)
if self.secondary_relief_open:
target_pressure = min(target_pressure, 1.0)
loop_pressure = max(
state.secondary_loop.pressure, saturation_pressure(state.secondary_loop.temperature_out), 0.1
)
target_flow = base_flow * power_ratio
secondary_flow_scale = min(
self._inventory_flow_scale(state.secondary_loop), self._npsh_factor(state.secondary_loop)
)
for idx, pump_state in enumerate(state.secondary_pumps):
unit_enabled = (
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
)
powered = power_ratio > 0.1
desired_flow = target_flow * secondary_flow_scale if unit_enabled else 0.0
desired_pressure = target_pressure if unit_enabled else 0.5
if not powered:
desired_flow = 0.0
desired_pressure = 0.5
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, desired_flow, dt, self.secondary_pump.spool_time
)
pump_state.pressure = self._ramp_value(
pump_state.pressure, desired_pressure, dt, self.secondary_pump.spool_time
)
pump_state.active = unit_enabled and powered and pump_state.flow_rate > 1.0
if not powered or not unit_enabled:
pump_state.status = "STOPPING" if pump_state.flow_rate > 1.0 else "OFF"
elif secondary_flow_scale < 0.99:
pump_state.status = "CAV"
elif pump_state.flow_rate < max(1.0, desired_flow * 0.8):
pump_state.status = "STARTING"
else:
pump_state.status = "RUN"
total_flow += pump_state.flow_rate
loop_pressure = max(loop_pressure, pump_state.pressure)
state.secondary_loop.mass_flow_rate = total_flow
state.secondary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
)
else:
state.secondary_loop.mass_flow_rate = self._ramp_value(
state.secondary_loop.mass_flow_rate, 0.0, dt, self.secondary_pump.spool_time
)
state.secondary_loop.pressure = self._ramp_value(
state.secondary_loop.pressure, max(0.1, saturation_pressure(state.secondary_loop.temperature_out)), dt, self.secondary_pump.spool_time
)
for pump_state in state.secondary_pumps:
pump_state.active = False
pump_state.flow_rate = self._ramp_value(
pump_state.flow_rate, 0.0, dt, self.secondary_pump.spool_time
)
pump_state.pressure = state.secondary_loop.pressure
pump_state.status = "STOPPING" if pump_state.flow_rate > 0.1 else "OFF"
self._apply_pressurizer(state.primary_loop, dt)
if not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0:
transferred = 0.0
else:
transferred = heat_transfer(
state.primary_loop,
state.secondary_loop,
total_power,
fouling_factor=getattr(state, "hx_fouling", 0.0),
)
residual = max(0.0, total_power - transferred)
self.thermal.step_core(state.core, state.primary_loop, total_power, dt, residual_power_mw=residual)
self.thermal.step_secondary(state.secondary_loop, transferred, dt)
if self.primary_relief_open:
self._vent_relief(
state.primary_loop,
target_pressure=1.0,
vent_rate_max=0.02,
ramp_time=12.0,
dt=dt,
)
for pump_state in state.primary_pumps:
pump_state.pressure = state.primary_loop.pressure
if self.secondary_relief_open:
self._vent_relief(
state.secondary_loop,
target_pressure=1.0,
vent_rate_max=0.05,
ramp_time=10.0,
dt=dt,
)
for pump_state in state.secondary_pumps:
pump_state.pressure = state.secondary_loop.pressure
if not self.control.manual_control and not self.shutdown:
self.control.safety_backoff(state.core.subcooling_margin, state.core.dnb_margin, dt)
self._apply_secondary_boiloff(state, dt)
self._update_secondary_level(state, dt)
self._update_chemistry(state, dt)
self._apply_boron_trim(state, dt)
steam_draw = self._step_turbine_bank(state, transferred, dt)
if steam_draw > 0.0:
self.thermal.remove_steam_energy(state.secondary_loop, steam_draw, dt)
self._maintenance_tick(state, dt)
if (not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0) and total_power > 50.0:
self._handle_heat_sink_loss(state)
# SCRAM matrix: DNB, subcooling, steam generator level/pressure
if state.core.dnb_margin is not None and state.core.dnb_margin < 0.45:
LOGGER.critical("DNB margin low: %.2f, initiating SCRAM", state.core.dnb_margin)
self.shutdown = True
self.control.scram()
elif state.core.dnb_margin is not None and state.core.dnb_margin < 0.6:
LOGGER.warning("DNB margin low: %.2f", state.core.dnb_margin)
if state.core.subcooling_margin is not None and state.core.subcooling_margin < 2.0:
LOGGER.critical("Subcooling margin lost: %.1fK, initiating SCRAM", state.core.subcooling_margin)
self.shutdown = True
self.control.scram()
elif state.core.subcooling_margin is not None and state.core.subcooling_margin < 5.0:
LOGGER.warning("Subcooling margin low: %.1fK", state.core.subcooling_margin)
if state.secondary_loop.level < 0.05 or state.secondary_loop.level > 0.98:
LOGGER.critical("Secondary level out of bounds (%.1f%%), initiating SCRAM", state.secondary_loop.level * 100)
self.shutdown = True
self.control.scram()
if state.secondary_loop.pressure > 0.95 * constants.MAX_PRESSURE:
LOGGER.critical("Secondary pressure high (%.2f MPa), initiating SCRAM", state.secondary_loop.pressure)
self.shutdown = True
self.control.scram()
failures = self.health_monitor.evaluate(
state,
primary_units_active,
secondary_units_active,
self.turbine_unit_active,
state.generators,
dt,
)
for failure in failures:
self._handle_failure(failure)
state.time_elapsed += dt
# Update inlet temperatures based on heat removed/added for next iteration.
env = constants.ENVIRONMENT_TEMPERATURE
primary_cooling = temperature_rise(transferred, state.primary_loop.mass_flow_rate)
if transferred <= 0.0 or state.secondary_loop.mass_flow_rate <= 1.0:
passive = constants.PASSIVE_COOL_RATE_PRIMARY * max(0.0, state.primary_loop.temperature_out - env) * dt
primary_cooling = max(primary_cooling, passive)
state.primary_loop.temperature_in = max(env, state.primary_loop.temperature_out - primary_cooling)
if state.core.power_output_mw <= constants.RHR_CUTOFF_POWER_MW and not self.turbine_active:
bleed = constants.RHR_COOL_RATE * dt
state.primary_loop.temperature_out = max(env, state.primary_loop.temperature_out - bleed)
state.primary_loop.temperature_in = max(env, state.primary_loop.temperature_out - bleed)
if state.secondary_loop.mass_flow_rate <= 1.0:
# Passive cooldown toward ambient when pumps off/low steam.
rhr = 0.0
if constants.RHR_ACTIVE and state.core.power_output_mw <= constants.RHR_CUTOFF_POWER_MW:
rhr = constants.RHR_COOL_RATE * dt
target_temp = max(env, state.secondary_loop.temperature_out - rhr)
state.secondary_loop.temperature_out = self._ramp_value(
state.secondary_loop.temperature_out, target_temp, dt, self.secondary_pump.spool_time
)
state.secondary_loop.temperature_in = state.secondary_loop.temperature_out
else:
# Allow the secondary to retain more heat so it can approach saturation and form steam.
excess = max(0.0, state.secondary_loop.temperature_out - env)
cooling_drop = min(40.0, max(10.0, 0.2 * excess))
state.secondary_loop.temperature_in = max(env, state.secondary_loop.temperature_out - cooling_drop)
if state.core.power_output_mw <= constants.RHR_CUTOFF_POWER_MW and not self.turbine_active:
bleed = constants.RHR_COOL_RATE * dt
state.secondary_loop.temperature_out = max(env, state.secondary_loop.temperature_out - bleed)
state.secondary_loop.temperature_in = max(env, state.secondary_loop.temperature_out - bleed)
# Keep stored energies consistent with updated temperatures/quality.
cp = constants.COOLANT_HEAT_CAPACITY
primary_avg = 0.5 * (state.primary_loop.temperature_in + state.primary_loop.temperature_out)
state.primary_loop.energy_j = max(0.0, state.primary_loop.inventory_kg * cp * primary_avg)
sat_temp_sec = saturation_temperature(max(0.05, state.secondary_loop.pressure))
sec_liquid_energy = state.secondary_loop.inventory_kg * cp * min(state.secondary_loop.temperature_out, sat_temp_sec)
sec_latent = state.secondary_loop.inventory_kg * state.secondary_loop.steam_quality * constants.STEAM_LATENT_HEAT
superheat = max(0.0, state.secondary_loop.temperature_out - sat_temp_sec)
sec_superheat = state.secondary_loop.inventory_kg * cp * superheat if state.secondary_loop.steam_quality >= 1.0 else 0.0
state.secondary_loop.energy_j = max(0.0, sec_liquid_energy + sec_latent + sec_superheat)
state.primary_to_secondary_delta_t = max(0.0, state.primary_loop.temperature_out - state.secondary_loop.temperature_in)
state.heat_exchanger_efficiency = 0.0 if total_power <= 0 else min(1.0, max(0.0, transferred / max(1e-6, total_power)))
LOGGER.info(
(
"t=%5.1fs rods=%.2f core_power=%.1fMW prompt=%.1fMW :: "
"fissions %.2e/s, outlet %.1fK, electrical %.1fMW load %.1f/%.1fMW"
),
state.time_elapsed,
rod_fraction,
total_power,
prompt_power,
fission_rate,
state.primary_loop.temperature_out,
state.total_electrical_output(),
sum(t.load_supplied_mw for t in state.turbines),
sum(t.load_demand_mw for t in state.turbines),
)
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float, dt: float) -> float:
if not state.turbines:
return 0.0
steam_draw_mw = 0.0
active_indices = [
idx for idx, active in enumerate(self.turbine_unit_active) if active and idx < len(state.turbines)
]
power_per_unit = steam_power_mw / len(active_indices) if active_indices else 0.0
for idx, turbine in enumerate(self.turbines):
if idx >= len(state.turbines):
break
turbine_state = state.turbines[idx]
if idx in active_indices:
# Simple throttle map: reduce throttle when electrical demand is low, open as demand rises.
demand = turbine_state.load_demand_mw
desired = 0.4 if demand <= 0 else min(1.0, 0.4 + demand / max(1e-6, turbine.rated_output_mw))
# Governor: nudge throttle toward desired based on electrical error.
error = (demand - turbine_state.electrical_output_mw) / max(1.0, turbine.rated_output_mw)
turbine.throttle = max(0.3, min(1.0, turbine.throttle + (desired - turbine.throttle) * 0.5 + 0.2 * error * dt))
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit, dt=dt)
if turbine_state.electrical_output_mw > 1.05 * turbine.rated_output_mw:
LOGGER.critical("Turbine %d overspeed/overload, tripping unit", idx + 1)
self._spin_down_turbine(turbine_state, dt, turbine.spool_time)
turbine_state.status = "TRIPPED"
self.turbine_unit_active[idx] = False
continue
if power_per_unit <= 0.0 and turbine_state.electrical_output_mw < 0.1:
turbine_state.status = "OFF"
elif turbine_state.electrical_output_mw < max(0.1 * turbine.rated_output_mw, 1.0):
turbine_state.status = "STARTING"
else:
turbine_state.status = "RUN"
total_eff = max(1e-6, turbine.generator_efficiency * turbine.mechanical_efficiency)
steam_draw_mw += turbine_state.electrical_output_mw / total_eff
else:
self._spin_down_turbine(turbine_state, dt, turbine.spool_time)
turbine_state.status = "STOPPING" if turbine_state.electrical_output_mw > 0.1 else "OFF"
self._dispatch_consumer_load(state, active_indices)
return steam_draw_mw
def _reset_turbine_state(self, turbine_state: TurbineState) -> None:
turbine_state.shaft_power_mw = 0.0
turbine_state.electrical_output_mw = 0.0
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = 0.0
turbine_state.status = "OFF"
@staticmethod
def _ramp_value(current: float, target: float, dt: float, time_constant: float) -> float:
if time_constant <= 0.0:
return target
alpha = min(1.0, max(0.0, dt / time_constant))
return current + (target - current) * alpha
def _spin_down_turbine(self, turbine_state: TurbineState, dt: float, time_constant: float) -> None:
turbine_state.shaft_power_mw = self._ramp_value(turbine_state.shaft_power_mw, 0.0, dt, time_constant)
turbine_state.electrical_output_mw = self._ramp_value(
turbine_state.electrical_output_mw, 0.0, dt, time_constant
)
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = self._ramp_value(
turbine_state.load_supplied_mw, 0.0, dt, time_constant
)
if turbine_state.electrical_output_mw < 0.1:
turbine_state.electrical_output_mw = 0.0
def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None:
total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices)
if self.consumer:
demand = self.consumer.request_power()
supplied = min(total_electrical, demand)
self.consumer.update_power_received(supplied)
else:
demand = 0.0
supplied = 0.0
demand_per_unit = demand / len(active_indices) if active_indices else 0.0
total_for_share = total_electrical if total_electrical > 0 else 1.0
for idx, turbine_state in enumerate(state.turbines):
if idx not in active_indices:
turbine_state.load_demand_mw = 0.0
turbine_state.load_supplied_mw = 0.0
continue
share = 0.0 if total_electrical <= 0 else supplied * (turbine_state.electrical_output_mw / total_for_share)
turbine_state.load_demand_mw = demand_per_unit
turbine_state.load_supplied_mw = share if demand_per_unit <= 0 else min(share, demand_per_unit)
def _nominal_inventory(self, volume_m3: float) -> float:
return volume_m3 * constants.COOLANT_DENSITY
def _update_loop_inventory(
self, loop: CoolantLoopState, volume_m3: float, target_level: float, dt: float
) -> None:
nominal_mass = self._nominal_inventory(volume_m3)
if nominal_mass <= 0.0:
loop.level = 0.0
return
if loop.inventory_kg <= 0.0:
loop.inventory_kg = nominal_mass * target_level
current_level = loop.inventory_kg / nominal_mass
correction = (target_level - current_level) * constants.LOOP_INVENTORY_CORRECTION_RATE
loop.inventory_kg = max(0.0, loop.inventory_kg + correction * nominal_mass * dt)
loop.level = min(1.2, max(0.0, loop.inventory_kg / nominal_mass))
def _update_secondary_level(self, state: PlantState, dt: float) -> None:
"""Steam drum level controller with shrink/swell and feedwater valve."""
loop = state.secondary_loop
nominal_mass = self._nominal_inventory(constants.SECONDARY_LOOP_VOLUME_M3)
if nominal_mass <= 0.0:
loop.level = 0.0
return
if loop.inventory_kg <= 0.0:
loop.inventory_kg = nominal_mass * constants.SECONDARY_INVENTORY_TARGET
current_level = loop.inventory_kg / nominal_mass
steam_out = loop.mass_flow_rate * max(0.0, loop.steam_quality)
# Shrink/swell: apparent level drops when steam draw surges.
swell = -0.02 * (steam_out - self._last_steam_out_kg_s) / max(1.0, nominal_mass)
sensed_level = current_level + swell
# PI-ish valve adjustment toward target level.
error = constants.SECONDARY_INVENTORY_TARGET - sensed_level
valve_delta = 0.3 * error * dt
self.feedwater_valve = max(0.0, min(1.0, self.feedwater_valve + valve_delta))
# Feedwater adds mass and energy at inlet temperature.
steam_factor = min(1.0, max(0.1, steam_out / max(1.0, nominal_mass * 0.1)))
feed_rate = (
self.feedwater_valve
* nominal_mass
* 0.002
* steam_factor
) # up to ~0.2% of nominal mass per second, scaled by steam draw
added_mass = feed_rate * dt
loop.inventory_kg = max(0.0, loop.inventory_kg + added_mass)
cp = constants.COOLANT_HEAT_CAPACITY
loop.energy_j += added_mass * cp * loop.temperature_in
loop.level = min(1.2, max(0.0, loop.inventory_kg / nominal_mass))
self._last_steam_out_kg_s = steam_out
def _apply_boron_trim(self, state: PlantState, dt: float) -> None:
"""Slow soluble boron trim to hold power near setpoint; acts only near target."""
if not self._boron_trim_active or self.control.manual_control or self.shutdown:
return
if state.time_elapsed < 300.0:
return
if self.control.setpoint_mw <= 0.0:
return
error = (state.core.power_output_mw - self.control.setpoint_mw) / self.control.setpoint_mw
if abs(error) < 0.005:
return
delta = constants.BORON_TRIM_RATE_PPM_PER_S * error * dt
state.boron_ppm = min(constants.CHEM_MAX_PPM, max(0.0, state.boron_ppm + delta))
def _update_chemistry(self, state: PlantState, dt: float) -> None:
"""Track dissolved species and fouling impacts on HX and condenser."""
env = constants.ENVIRONMENT_TEMPERATURE
steam_out = state.secondary_loop.mass_flow_rate * max(0.0, state.secondary_loop.steam_quality)
temp = state.secondary_loop.temperature_out
temp_factor = max(0.0, (temp - env) / 300.0)
impurity_load = max(0.0, state.dissolved_oxygen_ppm + 0.5 * state.sodium_ppm)
fouling_rate = constants.HX_FOULING_RATE * temp_factor * impurity_load
heal = constants.HX_FOULING_HEAL_RATE * (1.0 if steam_out < 200.0 or temp_factor < 0.2 else 0.0)
state.hx_fouling = max(
0.0,
min(constants.HX_FOULING_MAX_PENALTY, state.hx_fouling + (fouling_rate - heal) * dt),
)
# Degas oxygen with steam production; small impurity ingress over time (worse when venting).
degas = 0.0005 * steam_out * dt / max(1.0, constants.SECONDARY_LOOP_VOLUME_M3)
state.dissolved_oxygen_ppm = max(0.0, state.dissolved_oxygen_ppm - degas)
ingress = (0.01 if self.secondary_relief_open else 0.002) * dt
state.sodium_ppm = min(constants.CHEM_MAX_PPM, state.sodium_ppm + ingress)
state.boron_ppm = max(0.0, state.boron_ppm - 0.001 * dt)
chem_penalty = constants.CONDENSER_CHEM_FOULING_RATE * impurity_load / 1_000.0
for turb_state in state.turbines:
turb_state.fouling_penalty = min(
constants.CONDENSER_FOULING_MAX_PENALTY,
max(0.0, turb_state.fouling_penalty + chem_penalty * dt),
)
backpressure = constants.CONDENSER_CHEM_BACKPRESSURE_FACTOR * impurity_load * dt
turb_state.condenser_pressure = min(
constants.CONDENSER_MAX_PRESSURE_MPA, turb_state.condenser_pressure + backpressure
)
def _inventory_flow_scale(self, loop: CoolantLoopState) -> float:
if loop.level <= constants.LOW_LEVEL_FLOW_FLOOR:
return 0.0
if loop.level <= 0.25:
return max(0.0, (loop.level - constants.LOW_LEVEL_FLOW_FLOOR) / (0.25 - constants.LOW_LEVEL_FLOW_FLOOR))
return 1.0
def _npsh_factor(self, loop: CoolantLoopState) -> float:
if self.relaxed_npsh:
return 1.0
vapor_pressure = saturation_pressure(loop.temperature_in)
available = max(0.0, loop.pressure - vapor_pressure)
if available <= 0.0:
return 0.001
return max(0.001, min(1.0, available / constants.NPSH_REQUIRED_MPA))
def _apply_pressurizer(self, primary: CoolantLoopState, dt: float) -> None:
if self.shutdown and primary.mass_flow_rate <= 100.0:
return
target = constants.PRIMARY_PRESSURIZER_SETPOINT_MPA
band = constants.PRIMARY_PRESSURIZER_DEADBAND_MPA
heat_rate = constants.PRIMARY_PRESSURIZER_HEAT_RATE_MPA_PER_S
spray_rate = constants.PRIMARY_PRESSURIZER_SPRAY_RATE_MPA_PER_S
if primary.pressure < target - band and self.pressurizer_level > 0.05:
primary.pressure = min(target, primary.pressure + heat_rate * dt)
self.pressurizer_level = max(0.0, self.pressurizer_level - constants.PRIMARY_PRESSURIZER_LEVEL_DRAW_PER_S * dt)
elif primary.pressure > target + band:
primary.pressure = max(target - band, primary.pressure - spray_rate * dt)
self.pressurizer_level = min(1.0, self.pressurizer_level + constants.PRIMARY_PRESSURIZER_LEVEL_FILL_PER_S * dt)
primary.pressure = min(constants.MAX_PRESSURE, max(saturation_pressure(primary.temperature_out), primary.pressure))
def _apply_secondary_boiloff(self, state: PlantState, dt: float) -> None:
loop = state.secondary_loop
if loop.mass_flow_rate <= 0.0 or loop.steam_quality <= 0.0:
return
steam_mass = loop.mass_flow_rate * loop.steam_quality * constants.SECONDARY_STEAM_LOSS_FRACTION * dt
if steam_mass <= 0.0:
return
prev_mass = max(1e-6, loop.inventory_kg)
loop.inventory_kg = max(0.0, loop.inventory_kg - steam_mass)
# Scale stored energy with the remaining mass to keep specific enthalpy consistent.
ratio = max(0.0, loop.inventory_kg) / prev_mass
loop.energy_j *= ratio
nominal = self._nominal_inventory(constants.SECONDARY_LOOP_VOLUME_M3)
loop.level = min(1.2, max(0.0, loop.inventory_kg / nominal)) if nominal > 0 else 0.0
def _handle_failure(self, component: str) -> None:
if component == "core":
LOGGER.critical("Core failure detected. Initiating SCRAM.")
self.shutdown = True
self.control.scram()
elif component.startswith("primary_pump"):
idx = self._component_index(component)
self._toggle_primary_pump_unit(idx, False)
elif component.startswith("secondary_pump"):
idx = self._component_index(component)
self._toggle_secondary_pump_unit(idx, False)
elif component.startswith("generator"):
idx = self._component_index(component)
LOGGER.warning("Generator %d failed", idx + 1)
elif component.startswith("turbine"):
idx = self._component_index(component)
self._set_turbine_state(False, index=idx)
def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]:
overrides: dict[str, float] = {}
if command.scram:
self.shutdown = True
overrides["rod_fraction"] = self.control.scram()
self._set_turbine_state(False)
if command.generator_auto is not None:
self.generator_auto = command.generator_auto
if command.primary_relief is not None:
self.primary_relief_open = command.primary_relief
if command.secondary_relief is not None:
self.secondary_relief_open = command.secondary_relief
if command.power_setpoint is not None:
self.control.set_power_setpoint(command.power_setpoint)
if command.rod_manual is not None:
self.control.set_manual_mode(command.rod_manual)
if command.rod_manual is False and not self.meltdown:
self.shutdown = False
if command.rod_position is not None:
self.control.set_manual_mode(True)
self.control.set_rods(command.rod_position)
self.shutdown = self.shutdown or command.rod_position >= 0.95
elif command.rod_step is not None:
self.control.increment_rods(command.rod_step)
if command.primary_pumps:
for idx, flag in command.primary_pumps.items():
self._toggle_primary_pump_unit(idx - 1, flag)
if command.secondary_pumps:
for idx, flag in command.secondary_pumps.items():
self._toggle_secondary_pump_unit(idx - 1, flag)
if command.generator_units:
for idx, flag in command.generator_units.items():
self._toggle_generator(idx - 1, flag, state)
if command.turbine_on is not None:
self._set_turbine_state(command.turbine_on)
if command.turbine_units:
for key, state_flag in command.turbine_units.items():
idx = key - 1
self._set_turbine_state(state_flag, index=idx)
if command.consumer_online is not None and self.consumer:
self.consumer.set_online(command.consumer_online)
if command.consumer_demand is not None and self.consumer:
self.consumer.set_demand(command.consumer_demand)
if command.coolant_demand is not None:
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
for component in command.maintenance_components:
self._toggle_maintenance(component)
if not self.meltdown and not command.scram:
rod_target = overrides.get("rod_fraction", self.control.rod_fraction)
if rod_target < 0.95:
self.shutdown = False
return overrides
def _set_primary_pump(self, active: bool) -> None:
if self.primary_pump_active != active:
self.primary_pump_active = active
LOGGER.info("Primary pump %s", "enabled" if active else "stopped")
if not active:
self.primary_pump_units = [False] * len(self.primary_pump_units)
def _set_secondary_pump(self, active: bool) -> None:
if self.secondary_pump_active != active:
self.secondary_pump_active = active
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
if not active:
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
def _toggle_primary_pump_unit(self, index: int, active: bool) -> None:
if index < 0 or index >= len(self.primary_pump_units):
LOGGER.warning("Ignoring primary pump index %s", index)
return
if self.primary_pump_units[index] != active:
self.primary_pump_units[index] = active
LOGGER.info("Primary pump %d %s", index + 1, "enabled" if active else "stopped")
if active:
self._set_primary_pump(True)
elif not any(self.primary_pump_units):
self._set_primary_pump(False)
def _toggle_secondary_pump_unit(self, index: int, active: bool) -> None:
if index < 0 or index >= len(self.secondary_pump_units):
LOGGER.warning("Ignoring secondary pump index %s", index)
return
if self.secondary_pump_units[index] != active:
self.secondary_pump_units[index] = active
LOGGER.info("Secondary pump %d %s", index + 1, "enabled" if active else "stopped")
if active:
self._set_secondary_pump(True)
elif not any(self.secondary_pump_units):
self._set_secondary_pump(False)
def _toggle_generator(self, index: int, active: bool, state: PlantState) -> None:
if index < 0 or index >= len(self.generators) or index >= len(state.generators):
LOGGER.warning("Ignoring generator index %s", index)
return
gen_state = state.generators[index]
if active:
self.generators[index].start(gen_state)
else:
self.generators[index].stop(gen_state)
def _trigger_meltdown(self, state: PlantState) -> None:
LOGGER.critical("Core meltdown in progress (%.1f K)", state.core.fuel_temperature)
self.meltdown = True
self.shutdown = True
self.control.scram()
try:
self.health_monitor.component("core").fail()
except KeyError:
pass
self._set_turbine_state(False)
def _step_generators(self, state: PlantState, aux_demand: float, turbine_electric: float, dt: float) -> float:
# Ensure we have generator state objects aligned with hardware.
if not state.generators or len(state.generators) < len(self.generators):
missing = len(self.generators) - len(state.generators)
for _ in range(missing):
state.generators.append(
GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0)
)
deficit = max(0.0, aux_demand - turbine_electric)
if self.generator_auto and aux_demand <= 0.0:
for idx, gen_state in enumerate(state.generators):
if gen_state.running or gen_state.starting:
self.generators[idx].stop(gen_state)
return 0.0
if self.generator_auto:
if deficit > 0.0:
for idx, gen_state in enumerate(state.generators):
if not (gen_state.running or gen_state.starting):
self.generators[idx].start(gen_state)
deficit -= self.generators[idx].rated_output_mw
if deficit <= 0:
break
elif turbine_electric > aux_demand:
for idx, gen_state in enumerate(state.generators):
if gen_state.running and not gen_state.starting:
self.generators[idx].stop(gen_state)
total_power = 0.0
remaining = max(0.0, aux_demand - turbine_electric)
active_indices = [idx for idx, g in enumerate(state.generators) if g.running or g.starting]
share = remaining / len(active_indices) if active_indices and remaining > 0 else 0.0
for idx, gen_state in enumerate(state.generators):
load = share if idx in active_indices else 0.0
delivered = self.generators[idx].step(gen_state, load, dt)
total_power += delivered
remaining = max(0.0, remaining - delivered)
return total_power
def _vent_relief(
self,
loop: CoolantLoopState,
target_pressure: float,
vent_rate_max: float,
ramp_time: float,
dt: float,
) -> None:
"""Model relief valve venting: gradual depressurization with mass/enthalpy loss."""
if loop.inventory_kg <= 0.0:
loop.pressure = max(target_pressure, loop.pressure)
return
# Vent rate scales with overpressure; capped to keep a multi-second depressurization.
overfrac = max(0.0, (loop.pressure - target_pressure) / max(1e-6, loop.pressure))
vent_rate = min(vent_rate_max, 0.01 + vent_rate_max * overfrac) # fraction of mass per second
vent_mass = min(loop.inventory_kg, loop.inventory_kg * vent_rate * dt)
if vent_mass > 0.0:
specific_enthalpy = (
loop.steam_quality * constants.STEAM_LATENT_HEAT
+ constants.COOLANT_HEAT_CAPACITY * max(loop.temperature_out, constants.ENVIRONMENT_TEMPERATURE)
)
loop.inventory_kg = max(0.0, loop.inventory_kg - vent_mass)
loop.energy_j = max(0.0, loop.energy_j - vent_mass * specific_enthalpy)
# Pressure ramps toward target with the requested time constant.
ramp = min(1.0, dt / max(1e-6, ramp_time))
loop.pressure = max(target_pressure, loop.pressure - (loop.pressure - target_pressure) * ramp)
# Cool toward saturation at the new pressure to avoid re-pressurizing from superheat.
sat_target = saturation_temperature(target_pressure)
if loop.temperature_out > sat_target:
temp_drop = (loop.temperature_out - sat_target) * ramp
loop.temperature_out -= temp_drop
loop.temperature_in = min(loop.temperature_in, loop.temperature_out)
loop.steam_quality = 0.0
cp = constants.COOLANT_HEAT_CAPACITY
loop.energy_j = max(0.0, loop.inventory_kg * cp * loop.average_temperature())
# Re-resolve temperature/quality/pressure to reflect the vented state.
try:
self.thermal._resolve_secondary_state(loop) # type: ignore[attr-defined]
except AttributeError:
pass
def _set_turbine_state(self, active: bool, index: int | None = None) -> None:
if index is None:
for idx in range(len(self.turbine_unit_active)):
self._set_turbine_state(active, index=idx)
return
if index < 0 or index >= len(self.turbine_unit_active):
LOGGER.warning("Ignoring turbine index %s", index)
return
if self.turbine_unit_active[index] != active:
self.turbine_unit_active[index] = active
LOGGER.info("Turbine %d %s", index + 1, "started" if active else "stopped")
self.turbine_active = any(self.turbine_unit_active)
def _component_index(self, name: str) -> int:
if name == "turbine":
return 0
parts = name.split("_")
try:
for token in reversed(parts):
return int(token) - 1
except (ValueError, TypeError):
return -1
def _perform_maintenance(self, component: str) -> None:
if not self._can_maintain(component):
return
self.health_monitor.maintain(component)
def _maintenance_tick(self, state: PlantState, dt: float) -> None:
if not self.maintenance_active:
return
completed: list[str] = []
for component in list(self.maintenance_active):
if not self._can_maintain(component):
continue
restored = self.health_monitor.maintain(component, amount=0.02 * dt)
comp_state = self.health_monitor.component(component)
if comp_state.integrity >= 0.999:
completed.append(component)
for comp in completed:
self.maintenance_active.discard(comp)
LOGGER.info("Maintenance completed for %s", comp)
def _toggle_maintenance(self, component: str) -> None:
if component in self.maintenance_active:
self.maintenance_active.remove(component)
LOGGER.info("Maintenance stopped for %s", component)
return
if not self._can_maintain(component):
return
self.maintenance_active.add(component)
LOGGER.info("Maintenance started for %s", component)
def _can_maintain(self, component: str) -> bool:
if component == "core" and not self.shutdown:
LOGGER.warning("Cannot maintain core while reactor is running")
return False
if component.startswith("primary_pump_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.primary_pump_units):
LOGGER.warning("Unknown primary pump maintenance target %s", component)
return False
if self.primary_pump_units[idx]:
LOGGER.warning("Stop primary pump %d before maintenance", idx + 1)
return False
if component.startswith("secondary_pump_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.secondary_pump_units):
LOGGER.warning("Unknown secondary pump maintenance target %s", component)
return False
if self.secondary_pump_units[idx]:
LOGGER.warning("Stop secondary pump %d before maintenance", idx + 1)
return False
if component.startswith("generator_"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.generators):
LOGGER.warning("Unknown generator maintenance target %s", component)
return False
if component.startswith("turbine"):
idx = self._component_index(component)
if idx < 0 or idx >= len(self.turbine_unit_active):
LOGGER.warning("Unknown turbine maintenance target %s", component)
return False
if self.turbine_unit_active[idx]:
LOGGER.warning("Stop turbine %d before maintenance", idx + 1)
return False
return True
def attach_consumer(self, consumer: ElectricalConsumer) -> None:
self.consumer = consumer
LOGGER.info("Attached consumer %s (%.1f MW)", consumer.name, consumer.demand_mw)
def detach_consumer(self) -> None:
if self.consumer:
LOGGER.info("Detached consumer %s", self.consumer.name)
self.consumer = None
def save_state(self, filepath: str, state: PlantState) -> None:
metadata = {
"primary_pump_active": self.primary_pump_active,
"secondary_pump_active": self.secondary_pump_active,
"primary_pump_units": self.primary_pump_units,
"secondary_pump_units": self.secondary_pump_units,
"turbine_active": self.turbine_active,
"turbine_units": self.turbine_unit_active,
"shutdown": self.shutdown,
"meltdown": self.meltdown,
"generator_auto": self.generator_auto,
"primary_relief_open": self.primary_relief_open,
"secondary_relief_open": self.secondary_relief_open,
"pressurizer_level": self.pressurizer_level,
"maintenance_active": list(self.maintenance_active),
"generators": [
{
"running": g.running,
"starting": g.starting,
"spool_remaining": g.spool_remaining,
"power_output_mw": g.power_output_mw,
"battery_charge": g.battery_charge,
"status": g.status,
}
for g in state.generators
],
"consumer": {
"online": self.consumer.online if self.consumer else False,
"demand_mw": self.consumer.demand_mw if self.consumer else 0.0,
"name": self.consumer.name if self.consumer else None,
},
}
self.control.save_state(filepath, state, metadata, self.health_monitor.snapshot())
def load_state(self, filepath: str) -> PlantState:
plant, metadata, health = self.control.load_state(filepath)
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
self.primary_pump_units = list(metadata.get("primary_pump_units", self.primary_pump_units))
self.secondary_pump_units = list(metadata.get("secondary_pump_units", self.secondary_pump_units))
unit_states = metadata.get("turbine_units")
if unit_states:
self.turbine_unit_active = list(unit_states)
self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active))
self.shutdown = metadata.get("shutdown", self.shutdown)
self.meltdown = metadata.get("meltdown", self.meltdown)
self.generator_auto = metadata.get("generator_auto", self.generator_auto)
self.primary_relief_open = metadata.get("primary_relief_open", self.primary_relief_open)
self.secondary_relief_open = metadata.get("secondary_relief_open", self.secondary_relief_open)
self.pressurizer_level = metadata.get("pressurizer_level", self.pressurizer_level)
maint = metadata.get("maintenance_active")
if maint is not None:
self.maintenance_active = set(maint)
consumer_cfg = metadata.get("consumer")
if consumer_cfg:
if not self.consumer:
self.consumer = ElectricalConsumer(
name=consumer_cfg.get("name") or "External",
demand_mw=consumer_cfg.get("demand_mw", 0.0),
online=consumer_cfg.get("online", False),
)
else:
self.consumer.set_demand(consumer_cfg.get("demand_mw", self.consumer.demand_mw))
self.consumer.set_online(consumer_cfg.get("online", self.consumer.online))
if health:
self.health_monitor.load_snapshot(health)
LOGGER.info("Reactor state restored from %s", filepath)
# Back-fill pump state lists for compatibility.
if not plant.primary_pumps or len(plant.primary_pumps) < 2:
plant.primary_pumps = [
PumpState(
active=self.primary_pump_active,
flow_rate=plant.primary_loop.mass_flow_rate / 2,
pressure=plant.primary_loop.pressure,
status="OFF",
)
for _ in range(2)
]
if not plant.secondary_pumps or len(plant.secondary_pumps) < 2:
plant.secondary_pumps = [
PumpState(
active=self.secondary_pump_active,
flow_rate=plant.secondary_loop.mass_flow_rate / 2,
pressure=plant.secondary_loop.pressure,
status="OFF",
)
for _ in range(2)
]
if len(plant.turbines) < len(self.turbines):
ambient = constants.ENVIRONMENT_TEMPERATURE
while len(plant.turbines) < len(self.turbines):
plant.turbines.append(
TurbineState(
steam_enthalpy=2_000.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=ambient,
load_demand_mw=0.0,
load_supplied_mw=0.0,
)
)
gen_meta = metadata.get("generators", [])
if not plant.generators or len(plant.generators) < len(self.generators):
while len(plant.generators) < len(self.generators):
plant.generators.append(
GeneratorState(
running=False,
starting=False,
spool_remaining=0.0,
power_output_mw=0.0,
battery_charge=1.0,
status="OFF",
)
)
for idx, gen_state in enumerate(plant.generators):
if idx < len(gen_meta):
cfg = gen_meta[idx]
gen_state.running = cfg.get("running", gen_state.running)
gen_state.starting = cfg.get("starting", gen_state.starting)
gen_state.spool_remaining = cfg.get("spool_remaining", gen_state.spool_remaining)
gen_state.power_output_mw = cfg.get("power_output_mw", gen_state.power_output_mw)
gen_state.battery_charge = cfg.get("battery_charge", gen_state.battery_charge)
gen_state.status = cfg.get("status", gen_state.status)
return plant
def _handle_heat_sink_loss(self, state: PlantState) -> None:
if not self.shutdown:
LOGGER.critical("Loss of secondary heat sink detected. Initiating SCRAM.")
self.shutdown = True
self.control.scram()
self._set_turbine_state(False)
# Clear turbine output and demands to reflect lost steam.
for turbine_state in state.turbines:
self._reset_turbine_state(turbine_state)
def _check_poison_alerts(self, state: PlantState) -> None:
inventory = state.core.fission_product_inventory or {}
for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items():
amount = inventory.get(symbol, 0.0)
if amount >= threshold and symbol not in self.poison_alerts:
self.poison_alerts.add(symbol)
LOGGER.warning("Poison level high: %s inventory %.2e exceeds %.2e", symbol, amount, threshold)