616 lines
28 KiB
Python
616 lines
28 KiB
Python
"""High-level reactor orchestration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
import logging
|
|
|
|
from . import constants
|
|
from .atomic import AtomicPhysics
|
|
from .commands import ReactorCommand
|
|
from .coolant import Pump
|
|
from .consumer import ElectricalConsumer
|
|
from .control import ControlSystem
|
|
from .failures import HealthMonitor
|
|
from .fuel import FuelAssembly, decay_heat_fraction
|
|
from .neutronics import NeutronDynamics
|
|
from .state import CoolantLoopState, CoreState, PlantState, PumpState, TurbineState
|
|
from .thermal import ThermalSolver, heat_transfer
|
|
from .turbine import SteamGenerator, Turbine
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Reactor:
|
|
fuel: FuelAssembly
|
|
neutronics: NeutronDynamics
|
|
control: ControlSystem
|
|
primary_pump: Pump
|
|
secondary_pump: Pump
|
|
thermal: ThermalSolver
|
|
steam_generator: SteamGenerator
|
|
turbines: list[Turbine]
|
|
atomic_model: AtomicPhysics
|
|
consumer: ElectricalConsumer | None = None
|
|
health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
|
|
primary_pump_active: bool = True
|
|
secondary_pump_active: bool = True
|
|
primary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
|
|
secondary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
|
|
turbine_active: bool = True
|
|
turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True])
|
|
shutdown: bool = False
|
|
poison_alerts: set[str] = field(default_factory=set)
|
|
maintenance_active: set[str] = field(default_factory=set)
|
|
|
|
def __post_init__(self) -> None:
|
|
if not self.turbines:
|
|
self.turbines = [Turbine()]
|
|
if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines):
|
|
self.turbine_unit_active = [True] * len(self.turbines)
|
|
self.turbine_active = any(self.turbine_unit_active)
|
|
if not self.primary_pump_units or len(self.primary_pump_units) != 2:
|
|
self.primary_pump_units = [True, True]
|
|
if not self.secondary_pump_units or len(self.secondary_pump_units) != 2:
|
|
self.secondary_pump_units = [True, True]
|
|
|
|
@classmethod
|
|
def default(cls) -> "Reactor":
|
|
atomic_model = AtomicPhysics()
|
|
return cls(
|
|
fuel=FuelAssembly(enrichment=0.045, mass_kg=80_000.0, atomic_physics=atomic_model),
|
|
neutronics=NeutronDynamics(),
|
|
control=ControlSystem(),
|
|
primary_pump=Pump(nominal_flow=18_000.0),
|
|
secondary_pump=Pump(nominal_flow=16_000.0, efficiency=0.85),
|
|
thermal=ThermalSolver(),
|
|
steam_generator=SteamGenerator(),
|
|
turbines=[Turbine() for _ in range(3)],
|
|
atomic_model=atomic_model,
|
|
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
|
|
health_monitor=HealthMonitor(),
|
|
)
|
|
|
|
def initial_state(self) -> PlantState:
|
|
ambient = constants.ENVIRONMENT_TEMPERATURE
|
|
core = CoreState(
|
|
fuel_temperature=ambient,
|
|
neutron_flux=1e5,
|
|
reactivity_margin=-0.02,
|
|
power_output_mw=0.1,
|
|
burnup=0.0,
|
|
fission_product_inventory={},
|
|
emitted_particles={},
|
|
)
|
|
# Default to a cold, safe configuration: rods fully inserted, manual control, pumps/turbines off.
|
|
self.control.manual_control = True
|
|
self.control.rod_fraction = 0.95
|
|
self.shutdown = True
|
|
self.primary_pump_active = False
|
|
self.secondary_pump_active = False
|
|
self.turbine_unit_active = [False] * len(self.turbines)
|
|
self.turbine_active = any(self.turbine_unit_active)
|
|
if self.consumer:
|
|
self.consumer.set_online(False)
|
|
|
|
primary = CoolantLoopState(
|
|
temperature_in=ambient,
|
|
temperature_out=ambient,
|
|
pressure=0.5,
|
|
mass_flow_rate=0.0,
|
|
steam_quality=0.0,
|
|
)
|
|
secondary = CoolantLoopState(
|
|
temperature_in=ambient,
|
|
temperature_out=ambient,
|
|
pressure=0.5,
|
|
mass_flow_rate=0.0,
|
|
steam_quality=0.0,
|
|
)
|
|
primary_pumps = [PumpState(active=self.primary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)]
|
|
secondary_pumps = [PumpState(active=self.secondary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)]
|
|
turbine_states = [
|
|
TurbineState(
|
|
steam_enthalpy=2_000.0,
|
|
shaft_power_mw=0.0,
|
|
electrical_output_mw=0.0,
|
|
condenser_temperature=ambient,
|
|
load_demand_mw=0.0,
|
|
load_supplied_mw=0.0,
|
|
)
|
|
for _ in self.turbines
|
|
]
|
|
return PlantState(
|
|
core=core,
|
|
primary_loop=primary,
|
|
secondary_loop=secondary,
|
|
turbines=turbine_states,
|
|
primary_pumps=primary_pumps,
|
|
secondary_pumps=secondary_pumps,
|
|
)
|
|
|
|
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
|
|
if self.shutdown:
|
|
rod_fraction = self.control.rod_fraction
|
|
else:
|
|
rod_fraction = self.control.update_rods(state.core, dt)
|
|
|
|
overrides = {}
|
|
if command:
|
|
overrides = self._apply_command(command, state)
|
|
rod_fraction = overrides.get("rod_fraction", rod_fraction)
|
|
|
|
decay_power, decay_neutron_source, decay_products, decay_particles = self.fuel.decay_reaction_effects(
|
|
state.core
|
|
)
|
|
self.neutronics.step(state.core, rod_fraction, dt, external_source_rate=decay_neutron_source)
|
|
|
|
prompt_power, fission_rate, fission_event = self.fuel.prompt_energy_rate(
|
|
state.core.neutron_flux, rod_fraction
|
|
)
|
|
decay_heat = decay_heat_fraction(state.core.burnup) * state.core.power_output_mw
|
|
total_power = prompt_power + decay_heat + decay_power
|
|
state.core.power_output_mw = total_power
|
|
state.core.update_burnup(dt)
|
|
# Track fission products and emitted particles for diagnostics.
|
|
products: dict[str, float] = {}
|
|
for atom in fission_event.products:
|
|
products[atom.symbol] = products.get(atom.symbol, 0.0) + fission_rate * dt
|
|
for k, v in decay_products.items():
|
|
products[k] = products.get(k, 0.0) + v * dt
|
|
particles: dict[str, float] = {k: v * dt for k, v in decay_particles.items()}
|
|
state.core.add_products(products)
|
|
state.core.add_emitted_particles(particles)
|
|
self._check_poison_alerts(state)
|
|
|
|
pump_demand = overrides.get("coolant_demand", self.control.coolant_demand(state.primary_loop))
|
|
if self.primary_pump_active:
|
|
total_flow = 0.0
|
|
target_pressure = 12.0 * pump_demand + 2.0
|
|
loop_pressure = 0.5
|
|
target_flow = self.primary_pump.flow_rate(pump_demand)
|
|
for idx, pump_state in enumerate(state.primary_pumps):
|
|
unit_enabled = idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
|
|
desired_flow = target_flow if unit_enabled else 0.0
|
|
desired_pressure = target_pressure if unit_enabled else 0.5
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, desired_flow, dt, self.primary_pump.spool_time
|
|
)
|
|
pump_state.pressure = self._ramp_value(
|
|
pump_state.pressure, desired_pressure, dt, self.primary_pump.spool_time
|
|
)
|
|
pump_state.active = unit_enabled or pump_state.flow_rate > 1.0
|
|
total_flow += pump_state.flow_rate
|
|
loop_pressure = max(loop_pressure, pump_state.pressure)
|
|
state.primary_loop.mass_flow_rate = total_flow
|
|
state.primary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
|
|
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
|
|
)
|
|
else:
|
|
state.primary_loop.mass_flow_rate = self._ramp_value(
|
|
state.primary_loop.mass_flow_rate, 0.0, dt, self.primary_pump.spool_time
|
|
)
|
|
state.primary_loop.pressure = self._ramp_value(
|
|
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
|
|
)
|
|
for pump_state in state.primary_pumps:
|
|
pump_state.active = False
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, 0.0, dt, self.primary_pump.spool_time
|
|
)
|
|
pump_state.pressure = self._ramp_value(
|
|
pump_state.pressure, state.primary_loop.pressure, dt, self.primary_pump.spool_time
|
|
)
|
|
if self.secondary_pump_active:
|
|
total_flow = 0.0
|
|
target_pressure = 12.0 * 0.75 + 2.0
|
|
loop_pressure = 0.5
|
|
target_flow = self.secondary_pump.flow_rate(0.75)
|
|
for idx, pump_state in enumerate(state.secondary_pumps):
|
|
unit_enabled = idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
|
|
desired_flow = target_flow if unit_enabled else 0.0
|
|
desired_pressure = target_pressure if unit_enabled else 0.5
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, desired_flow, dt, self.secondary_pump.spool_time
|
|
)
|
|
pump_state.pressure = self._ramp_value(
|
|
pump_state.pressure, desired_pressure, dt, self.secondary_pump.spool_time
|
|
)
|
|
pump_state.active = unit_enabled or pump_state.flow_rate > 1.0
|
|
total_flow += pump_state.flow_rate
|
|
loop_pressure = max(loop_pressure, pump_state.pressure)
|
|
state.secondary_loop.mass_flow_rate = total_flow
|
|
state.secondary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
|
|
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
|
|
)
|
|
else:
|
|
state.secondary_loop.mass_flow_rate = self._ramp_value(
|
|
state.secondary_loop.mass_flow_rate, 0.0, dt, self.secondary_pump.spool_time
|
|
)
|
|
state.secondary_loop.pressure = self._ramp_value(
|
|
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
|
|
)
|
|
for pump_state in state.secondary_pumps:
|
|
pump_state.active = False
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, 0.0, dt, self.secondary_pump.spool_time
|
|
)
|
|
pump_state.pressure = self._ramp_value(
|
|
pump_state.pressure, state.secondary_loop.pressure, dt, self.secondary_pump.spool_time
|
|
)
|
|
|
|
self.thermal.step_core(state.core, state.primary_loop, total_power, dt)
|
|
if not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0:
|
|
transferred = 0.0
|
|
else:
|
|
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power)
|
|
self.thermal.step_secondary(state.secondary_loop, transferred)
|
|
|
|
self._step_turbine_bank(state, transferred, dt)
|
|
self._maintenance_tick(state, dt)
|
|
|
|
if (not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0) and total_power > 50.0:
|
|
self._handle_heat_sink_loss(state)
|
|
|
|
failures = self.health_monitor.evaluate(
|
|
state,
|
|
self.primary_pump_active,
|
|
self.secondary_pump_active,
|
|
self.turbine_unit_active,
|
|
dt,
|
|
)
|
|
for failure in failures:
|
|
self._handle_failure(failure)
|
|
|
|
state.time_elapsed += dt
|
|
|
|
LOGGER.info(
|
|
(
|
|
"t=%5.1fs rods=%.2f core_power=%.1fMW prompt=%.1fMW :: "
|
|
"fissions %.2e/s, outlet %.1fK, electrical %.1fMW load %.1f/%.1fMW"
|
|
),
|
|
state.time_elapsed,
|
|
rod_fraction,
|
|
total_power,
|
|
prompt_power,
|
|
fission_rate,
|
|
state.primary_loop.temperature_out,
|
|
state.total_electrical_output(),
|
|
sum(t.load_supplied_mw for t in state.turbines),
|
|
sum(t.load_demand_mw for t in state.turbines),
|
|
)
|
|
|
|
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float, dt: float) -> None:
|
|
if not state.turbines:
|
|
return
|
|
active_indices = [
|
|
idx for idx, active in enumerate(self.turbine_unit_active) if active and idx < len(state.turbines)
|
|
]
|
|
power_per_unit = steam_power_mw / len(active_indices) if active_indices else 0.0
|
|
for idx, turbine in enumerate(self.turbines):
|
|
if idx >= len(state.turbines):
|
|
break
|
|
turbine_state = state.turbines[idx]
|
|
if idx in active_indices:
|
|
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit, dt=dt)
|
|
else:
|
|
self._spin_down_turbine(turbine_state, dt, turbine.spool_time)
|
|
self._dispatch_consumer_load(state, active_indices)
|
|
|
|
def _reset_turbine_state(self, turbine_state: TurbineState) -> None:
|
|
turbine_state.shaft_power_mw = 0.0
|
|
turbine_state.electrical_output_mw = 0.0
|
|
turbine_state.load_demand_mw = 0.0
|
|
turbine_state.load_supplied_mw = 0.0
|
|
|
|
@staticmethod
|
|
def _ramp_value(current: float, target: float, dt: float, time_constant: float) -> float:
|
|
if time_constant <= 0.0:
|
|
return target
|
|
alpha = min(1.0, max(0.0, dt / time_constant))
|
|
return current + (target - current) * alpha
|
|
|
|
def _spin_down_turbine(self, turbine_state: TurbineState, dt: float, time_constant: float) -> None:
|
|
turbine_state.shaft_power_mw = self._ramp_value(turbine_state.shaft_power_mw, 0.0, dt, time_constant)
|
|
turbine_state.electrical_output_mw = self._ramp_value(
|
|
turbine_state.electrical_output_mw, 0.0, dt, time_constant
|
|
)
|
|
turbine_state.load_demand_mw = 0.0
|
|
turbine_state.load_supplied_mw = self._ramp_value(
|
|
turbine_state.load_supplied_mw, 0.0, dt, time_constant
|
|
)
|
|
|
|
def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None:
|
|
total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices)
|
|
if self.consumer:
|
|
demand = self.consumer.request_power()
|
|
supplied = min(total_electrical, demand)
|
|
self.consumer.update_power_received(supplied)
|
|
else:
|
|
demand = 0.0
|
|
supplied = 0.0
|
|
demand_per_unit = demand / len(active_indices) if active_indices else 0.0
|
|
total_for_share = total_electrical if total_electrical > 0 else 1.0
|
|
for idx, turbine_state in enumerate(state.turbines):
|
|
if idx not in active_indices:
|
|
turbine_state.load_demand_mw = 0.0
|
|
turbine_state.load_supplied_mw = 0.0
|
|
continue
|
|
share = 0.0 if total_electrical <= 0 else supplied * (turbine_state.electrical_output_mw / total_for_share)
|
|
turbine_state.load_demand_mw = demand_per_unit
|
|
turbine_state.load_supplied_mw = share if demand_per_unit <= 0 else min(share, demand_per_unit)
|
|
|
|
def _handle_failure(self, component: str) -> None:
|
|
if component == "core":
|
|
LOGGER.critical("Core failure detected. Initiating SCRAM.")
|
|
self.shutdown = True
|
|
self.control.scram()
|
|
elif component == "primary_pump":
|
|
self._set_primary_pump(False)
|
|
elif component == "secondary_pump":
|
|
self._set_secondary_pump(False)
|
|
elif component.startswith("turbine"):
|
|
idx = self._component_index(component)
|
|
self._set_turbine_state(False, index=idx)
|
|
|
|
def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]:
|
|
overrides: dict[str, float] = {}
|
|
if command.scram:
|
|
self.shutdown = True
|
|
overrides["rod_fraction"] = self.control.scram()
|
|
self._set_turbine_state(False)
|
|
if command.power_setpoint is not None:
|
|
self.control.set_power_setpoint(command.power_setpoint)
|
|
if command.rod_manual is not None:
|
|
self.control.set_manual_mode(command.rod_manual)
|
|
if command.rod_position is not None:
|
|
self.control.set_manual_mode(True)
|
|
overrides["rod_fraction"] = self.control.set_rods(command.rod_position)
|
|
self.shutdown = self.shutdown or command.rod_position >= 0.95
|
|
elif command.rod_step is not None:
|
|
overrides["rod_fraction"] = self.control.increment_rods(command.rod_step)
|
|
if command.primary_pump_on is not None:
|
|
self._set_primary_pump(command.primary_pump_on)
|
|
if command.secondary_pump_on is not None:
|
|
self._set_secondary_pump(command.secondary_pump_on)
|
|
if command.primary_pumps:
|
|
for idx, flag in command.primary_pumps.items():
|
|
self._toggle_primary_pump_unit(idx - 1, flag)
|
|
if command.secondary_pumps:
|
|
for idx, flag in command.secondary_pumps.items():
|
|
self._toggle_secondary_pump_unit(idx - 1, flag)
|
|
if command.turbine_on is not None:
|
|
self._set_turbine_state(command.turbine_on)
|
|
if command.turbine_units:
|
|
for key, state_flag in command.turbine_units.items():
|
|
idx = key - 1
|
|
self._set_turbine_state(state_flag, index=idx)
|
|
if command.consumer_online is not None and self.consumer:
|
|
self.consumer.set_online(command.consumer_online)
|
|
if command.consumer_demand is not None and self.consumer:
|
|
self.consumer.set_demand(command.consumer_demand)
|
|
if command.coolant_demand is not None:
|
|
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
|
|
for component in command.maintenance_components:
|
|
self._toggle_maintenance(component)
|
|
return overrides
|
|
|
|
def _set_primary_pump(self, active: bool) -> None:
|
|
if self.primary_pump_active != active:
|
|
self.primary_pump_active = active
|
|
LOGGER.info("Primary pump %s", "enabled" if active else "stopped")
|
|
if not active:
|
|
self.primary_pump_units = [False] * len(self.primary_pump_units)
|
|
elif active and not any(self.primary_pump_units):
|
|
self.primary_pump_units = [True] * len(self.primary_pump_units)
|
|
|
|
def _set_secondary_pump(self, active: bool) -> None:
|
|
if self.secondary_pump_active != active:
|
|
self.secondary_pump_active = active
|
|
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
|
|
if not active:
|
|
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
|
|
elif active and not any(self.secondary_pump_units):
|
|
self.secondary_pump_units = [True] * len(self.secondary_pump_units)
|
|
|
|
def _toggle_primary_pump_unit(self, index: int, active: bool) -> None:
|
|
if index < 0 or index >= len(self.primary_pump_units):
|
|
LOGGER.warning("Ignoring primary pump index %s", index)
|
|
return
|
|
if self.primary_pump_units[index] == active:
|
|
return
|
|
self.primary_pump_units[index] = active
|
|
LOGGER.info("Primary pump %d %s", index + 1, "enabled" if active else "stopped")
|
|
if active:
|
|
self._set_primary_pump(True)
|
|
elif not any(self.primary_pump_units):
|
|
self._set_primary_pump(False)
|
|
|
|
def _toggle_secondary_pump_unit(self, index: int, active: bool) -> None:
|
|
if index < 0 or index >= len(self.secondary_pump_units):
|
|
LOGGER.warning("Ignoring secondary pump index %s", index)
|
|
return
|
|
if self.secondary_pump_units[index] == active:
|
|
return
|
|
self.secondary_pump_units[index] = active
|
|
LOGGER.info("Secondary pump %d %s", index + 1, "enabled" if active else "stopped")
|
|
if active:
|
|
self._set_secondary_pump(True)
|
|
elif not any(self.secondary_pump_units):
|
|
self._set_secondary_pump(False)
|
|
|
|
def _set_turbine_state(self, active: bool, index: int | None = None) -> None:
|
|
if index is None:
|
|
for idx in range(len(self.turbine_unit_active)):
|
|
self._set_turbine_state(active, index=idx)
|
|
return
|
|
if index < 0 or index >= len(self.turbine_unit_active):
|
|
LOGGER.warning("Ignoring turbine index %s", index)
|
|
return
|
|
if self.turbine_unit_active[index] != active:
|
|
self.turbine_unit_active[index] = active
|
|
LOGGER.info("Turbine %d %s", index + 1, "started" if active else "stopped")
|
|
self.turbine_active = any(self.turbine_unit_active)
|
|
|
|
def _component_index(self, name: str) -> int:
|
|
if name == "turbine":
|
|
return 0
|
|
try:
|
|
return int(name.split("_")[1]) - 1
|
|
except (IndexError, ValueError):
|
|
return -1
|
|
|
|
def _perform_maintenance(self, component: str) -> None:
|
|
if not self._can_maintain(component):
|
|
return
|
|
self.health_monitor.maintain(component)
|
|
|
|
def _maintenance_tick(self, state: PlantState, dt: float) -> None:
|
|
if not self.maintenance_active:
|
|
return
|
|
completed: list[str] = []
|
|
for component in list(self.maintenance_active):
|
|
if not self._can_maintain(component):
|
|
continue
|
|
restored = self.health_monitor.maintain(component, amount=0.02 * dt)
|
|
comp_state = self.health_monitor.component(component)
|
|
if comp_state.integrity >= 0.999:
|
|
completed.append(component)
|
|
for comp in completed:
|
|
self.maintenance_active.discard(comp)
|
|
LOGGER.info("Maintenance completed for %s", comp)
|
|
|
|
def _toggle_maintenance(self, component: str) -> None:
|
|
if component in self.maintenance_active:
|
|
self.maintenance_active.remove(component)
|
|
LOGGER.info("Maintenance stopped for %s", component)
|
|
return
|
|
if not self._can_maintain(component):
|
|
return
|
|
self.maintenance_active.add(component)
|
|
LOGGER.info("Maintenance started for %s", component)
|
|
|
|
def _can_maintain(self, component: str) -> bool:
|
|
if component == "core" and not self.shutdown:
|
|
LOGGER.warning("Cannot maintain core while reactor is running")
|
|
return False
|
|
if component == "primary_pump" and self.primary_pump_active:
|
|
LOGGER.warning("Stop primary pump before maintenance")
|
|
return False
|
|
if component == "secondary_pump" and self.secondary_pump_active:
|
|
LOGGER.warning("Stop secondary pump before maintenance")
|
|
return False
|
|
if component.startswith("turbine"):
|
|
idx = self._component_index(component)
|
|
if idx < 0 or idx >= len(self.turbine_unit_active):
|
|
LOGGER.warning("Unknown turbine maintenance target %s", component)
|
|
return False
|
|
if self.turbine_unit_active[idx]:
|
|
LOGGER.warning("Stop turbine %d before maintenance", idx + 1)
|
|
return False
|
|
return True
|
|
|
|
def attach_consumer(self, consumer: ElectricalConsumer) -> None:
|
|
self.consumer = consumer
|
|
LOGGER.info("Attached consumer %s (%.1f MW)", consumer.name, consumer.demand_mw)
|
|
|
|
def detach_consumer(self) -> None:
|
|
if self.consumer:
|
|
LOGGER.info("Detached consumer %s", self.consumer.name)
|
|
self.consumer = None
|
|
|
|
def save_state(self, filepath: str, state: PlantState) -> None:
|
|
metadata = {
|
|
"primary_pump_active": self.primary_pump_active,
|
|
"secondary_pump_active": self.secondary_pump_active,
|
|
"primary_pump_units": self.primary_pump_units,
|
|
"secondary_pump_units": self.secondary_pump_units,
|
|
"turbine_active": self.turbine_active,
|
|
"turbine_units": self.turbine_unit_active,
|
|
"shutdown": self.shutdown,
|
|
"maintenance_active": list(self.maintenance_active),
|
|
"consumer": {
|
|
"online": self.consumer.online if self.consumer else False,
|
|
"demand_mw": self.consumer.demand_mw if self.consumer else 0.0,
|
|
"name": self.consumer.name if self.consumer else None,
|
|
},
|
|
}
|
|
self.control.save_state(filepath, state, metadata, self.health_monitor.snapshot())
|
|
|
|
def load_state(self, filepath: str) -> PlantState:
|
|
plant, metadata, health = self.control.load_state(filepath)
|
|
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
|
|
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
|
|
self.primary_pump_units = list(metadata.get("primary_pump_units", self.primary_pump_units))
|
|
self.secondary_pump_units = list(metadata.get("secondary_pump_units", self.secondary_pump_units))
|
|
unit_states = metadata.get("turbine_units")
|
|
if unit_states:
|
|
self.turbine_unit_active = list(unit_states)
|
|
self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active))
|
|
self.shutdown = metadata.get("shutdown", self.shutdown)
|
|
maint = metadata.get("maintenance_active")
|
|
if maint is not None:
|
|
self.maintenance_active = set(maint)
|
|
consumer_cfg = metadata.get("consumer")
|
|
if consumer_cfg:
|
|
if not self.consumer:
|
|
self.consumer = ElectricalConsumer(
|
|
name=consumer_cfg.get("name") or "External",
|
|
demand_mw=consumer_cfg.get("demand_mw", 0.0),
|
|
online=consumer_cfg.get("online", False),
|
|
)
|
|
else:
|
|
self.consumer.set_demand(consumer_cfg.get("demand_mw", self.consumer.demand_mw))
|
|
self.consumer.set_online(consumer_cfg.get("online", self.consumer.online))
|
|
if health:
|
|
self.health_monitor.load_snapshot(health)
|
|
LOGGER.info("Reactor state restored from %s", filepath)
|
|
# Back-fill pump state lists for compatibility.
|
|
if not plant.primary_pumps or len(plant.primary_pumps) < 2:
|
|
plant.primary_pumps = [
|
|
PumpState(active=self.primary_pump_active, flow_rate=plant.primary_loop.mass_flow_rate / 2, pressure=plant.primary_loop.pressure)
|
|
for _ in range(2)
|
|
]
|
|
if not plant.secondary_pumps or len(plant.secondary_pumps) < 2:
|
|
plant.secondary_pumps = [
|
|
PumpState(
|
|
active=self.secondary_pump_active,
|
|
flow_rate=plant.secondary_loop.mass_flow_rate / 2,
|
|
pressure=plant.secondary_loop.pressure,
|
|
)
|
|
for _ in range(2)
|
|
]
|
|
if len(plant.turbines) < len(self.turbines):
|
|
ambient = constants.ENVIRONMENT_TEMPERATURE
|
|
while len(plant.turbines) < len(self.turbines):
|
|
plant.turbines.append(
|
|
TurbineState(
|
|
steam_enthalpy=2_000.0,
|
|
shaft_power_mw=0.0,
|
|
electrical_output_mw=0.0,
|
|
condenser_temperature=ambient,
|
|
load_demand_mw=0.0,
|
|
load_supplied_mw=0.0,
|
|
)
|
|
)
|
|
return plant
|
|
|
|
def _handle_heat_sink_loss(self, state: PlantState) -> None:
|
|
if not self.shutdown:
|
|
LOGGER.critical("Loss of secondary heat sink detected. Initiating SCRAM.")
|
|
self.shutdown = True
|
|
self.control.scram()
|
|
self._set_turbine_state(False)
|
|
# Clear turbine output and demands to reflect lost steam.
|
|
for turbine_state in state.turbines:
|
|
self._reset_turbine_state(turbine_state)
|
|
|
|
def _check_poison_alerts(self, state: PlantState) -> None:
|
|
inventory = state.core.fission_product_inventory or {}
|
|
for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items():
|
|
amount = inventory.get(symbol, 0.0)
|
|
if amount >= threshold and symbol not in self.poison_alerts:
|
|
self.poison_alerts.add(symbol)
|
|
LOGGER.warning("Poison level high: %s inventory %.2e exceeds %.2e", symbol, amount, threshold)
|