989 lines
48 KiB
Python
989 lines
48 KiB
Python
"""High-level reactor orchestration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
import logging
|
|
|
|
from . import constants
|
|
from .atomic import AtomicPhysics
|
|
from .commands import ReactorCommand
|
|
from .coolant import Pump
|
|
from .consumer import ElectricalConsumer
|
|
from .control import ControlSystem
|
|
from .failures import HealthMonitor
|
|
from .fuel import FuelAssembly, decay_heat_fraction
|
|
from .generator import DieselGenerator, GeneratorState
|
|
from .neutronics import NeutronDynamics
|
|
from .state import CoolantLoopState, CoreState, PlantState, PumpState, TurbineState
|
|
from .thermal import ThermalSolver, heat_transfer, saturation_pressure, temperature_rise
|
|
from .turbine import SteamGenerator, Turbine
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Reactor:
|
|
fuel: FuelAssembly
|
|
neutronics: NeutronDynamics
|
|
control: ControlSystem
|
|
primary_pump: Pump
|
|
secondary_pump: Pump
|
|
thermal: ThermalSolver
|
|
steam_generator: SteamGenerator
|
|
turbines: list[Turbine]
|
|
generators: list[DieselGenerator]
|
|
atomic_model: AtomicPhysics
|
|
consumer: ElectricalConsumer | None = None
|
|
health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
|
|
pressurizer_level: float = 0.6
|
|
primary_pump_active: bool = True
|
|
secondary_pump_active: bool = True
|
|
primary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
|
|
secondary_pump_units: list[bool] = field(default_factory=lambda: [True, True])
|
|
turbine_active: bool = True
|
|
turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True])
|
|
shutdown: bool = False
|
|
meltdown: bool = False
|
|
generator_auto: bool = False
|
|
primary_relief_open: bool = False
|
|
secondary_relief_open: bool = False
|
|
poison_alerts: set[str] = field(default_factory=set)
|
|
maintenance_active: set[str] = field(default_factory=set)
|
|
|
|
def __post_init__(self) -> None:
|
|
if not self.turbines:
|
|
self.turbines = [Turbine()]
|
|
if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines):
|
|
self.turbine_unit_active = [True] * len(self.turbines)
|
|
self.turbine_active = any(self.turbine_unit_active)
|
|
if not self.generators:
|
|
self.generators = [DieselGenerator() for _ in range(2)]
|
|
if not self.primary_pump_units or len(self.primary_pump_units) != 2:
|
|
self.primary_pump_units = [True, True]
|
|
if not self.secondary_pump_units or len(self.secondary_pump_units) != 2:
|
|
self.secondary_pump_units = [True, True]
|
|
|
|
@classmethod
|
|
def default(cls) -> "Reactor":
|
|
atomic_model = AtomicPhysics()
|
|
return cls(
|
|
fuel=FuelAssembly(enrichment=0.045, mass_kg=80_000.0, atomic_physics=atomic_model),
|
|
neutronics=NeutronDynamics(),
|
|
control=ControlSystem(),
|
|
primary_pump=Pump(nominal_flow=18_000.0, shutoff_head_mpa=constants.PRIMARY_PUMP_SHUTOFF_HEAD_MPA),
|
|
secondary_pump=Pump(
|
|
nominal_flow=16_000.0, efficiency=0.85, shutoff_head_mpa=constants.SECONDARY_PUMP_SHUTOFF_HEAD_MPA
|
|
),
|
|
thermal=ThermalSolver(),
|
|
steam_generator=SteamGenerator(),
|
|
turbines=[Turbine() for _ in range(3)],
|
|
generators=[DieselGenerator() for _ in range(2)],
|
|
atomic_model=atomic_model,
|
|
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
|
|
health_monitor=HealthMonitor(),
|
|
)
|
|
|
|
def initial_state(self) -> PlantState:
|
|
ambient = constants.ENVIRONMENT_TEMPERATURE
|
|
primary_nominal_mass = constants.PRIMARY_LOOP_VOLUME_M3 * constants.COOLANT_DENSITY
|
|
secondary_nominal_mass = constants.SECONDARY_LOOP_VOLUME_M3 * constants.COOLANT_DENSITY
|
|
self.pressurizer_level = 0.6
|
|
core = CoreState(
|
|
fuel_temperature=ambient,
|
|
neutron_flux=1e5,
|
|
reactivity_margin=-0.02,
|
|
power_output_mw=0.1,
|
|
burnup=0.0,
|
|
delayed_precursors=[0.0 for _ in constants.CONTROL_ROD_BANK_WEIGHTS],
|
|
fission_product_inventory={},
|
|
emitted_particles={},
|
|
)
|
|
# Default to a cold, safe configuration: rods fully inserted, manual control, pumps/turbines off.
|
|
self.control.manual_control = True
|
|
self.control.rod_fraction = 0.95
|
|
self.control.rod_banks = [0.95 for _ in self.control.rod_banks]
|
|
self.control.rod_target = 0.95
|
|
self.shutdown = True
|
|
self.meltdown = False
|
|
self.generator_auto = False
|
|
self.primary_relief_open = False
|
|
self.secondary_relief_open = False
|
|
self.primary_pump_active = False
|
|
self.secondary_pump_active = False
|
|
self.primary_pump_units = [False] * len(self.primary_pump_units)
|
|
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
|
|
self.turbine_unit_active = [False] * len(self.turbines)
|
|
self.turbine_active = any(self.turbine_unit_active)
|
|
if self.consumer:
|
|
self.consumer.set_online(False)
|
|
|
|
primary = CoolantLoopState(
|
|
temperature_in=ambient,
|
|
temperature_out=ambient,
|
|
pressure=0.5,
|
|
mass_flow_rate=0.0,
|
|
steam_quality=0.0,
|
|
inventory_kg=primary_nominal_mass * constants.PRIMARY_INVENTORY_TARGET,
|
|
level=constants.PRIMARY_INVENTORY_TARGET,
|
|
)
|
|
secondary = CoolantLoopState(
|
|
temperature_in=ambient,
|
|
temperature_out=ambient,
|
|
pressure=0.5,
|
|
mass_flow_rate=0.0,
|
|
steam_quality=0.0,
|
|
inventory_kg=secondary_nominal_mass * constants.SECONDARY_INVENTORY_TARGET,
|
|
level=constants.SECONDARY_INVENTORY_TARGET,
|
|
)
|
|
primary_pumps = [
|
|
PumpState(active=self.primary_pump_active and self.primary_pump_units[idx], flow_rate=0.0, pressure=0.5)
|
|
for idx in range(2)
|
|
]
|
|
secondary_pumps = [
|
|
PumpState(active=self.secondary_pump_active and self.secondary_pump_units[idx], flow_rate=0.0, pressure=0.5)
|
|
for idx in range(2)
|
|
]
|
|
generator_states = [
|
|
GeneratorState(
|
|
running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0, status="OFF"
|
|
)
|
|
for _ in self.generators
|
|
]
|
|
turbine_states = [
|
|
TurbineState(
|
|
steam_enthalpy=2_000.0,
|
|
shaft_power_mw=0.0,
|
|
electrical_output_mw=0.0,
|
|
condenser_temperature=ambient,
|
|
load_demand_mw=0.0,
|
|
load_supplied_mw=0.0,
|
|
)
|
|
for _ in self.turbines
|
|
]
|
|
return PlantState(
|
|
core=core,
|
|
primary_loop=primary,
|
|
secondary_loop=secondary,
|
|
turbines=turbine_states,
|
|
primary_pumps=primary_pumps,
|
|
secondary_pumps=secondary_pumps,
|
|
generators=generator_states,
|
|
)
|
|
|
|
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
|
|
if self.shutdown:
|
|
rod_fraction = self.control.update_rods(state.core, dt)
|
|
else:
|
|
rod_fraction = self.control.update_rods(state.core, dt)
|
|
|
|
if state.core.fuel_temperature >= constants.CORE_MELTDOWN_TEMPERATURE and not self.meltdown:
|
|
self._trigger_meltdown(state)
|
|
|
|
overrides = {}
|
|
if command:
|
|
overrides = self._apply_command(command, state)
|
|
if not self.shutdown and not self.control.manual_control:
|
|
rod_fraction = self.control.update_rods(state.core, dt)
|
|
|
|
decay_power, decay_neutron_source, decay_products, decay_particles = self.fuel.decay_reaction_effects(
|
|
state.core
|
|
)
|
|
self.neutronics.update_poisons(state.core, dt)
|
|
self.neutronics.step(state.core, rod_fraction, dt, external_source_rate=decay_neutron_source, rod_banks=self.control.rod_banks)
|
|
|
|
prompt_power, fission_rate, fission_event = self.fuel.prompt_energy_rate(
|
|
state.core.neutron_flux, rod_fraction
|
|
)
|
|
decay_heat = decay_heat_fraction(state.core.burnup) * state.core.power_output_mw
|
|
total_power = prompt_power + decay_heat + decay_power
|
|
total_power = min(total_power, constants.TEST_MAX_POWER_MW * 0.98)
|
|
state.core.power_output_mw = total_power
|
|
state.core.update_burnup(dt)
|
|
# Track fission products and emitted particles for diagnostics.
|
|
products: dict[str, float] = {}
|
|
for atom in fission_event.products:
|
|
products[atom.symbol] = products.get(atom.symbol, 0.0) + fission_rate * dt
|
|
for k, v in decay_products.items():
|
|
products[k] = products.get(k, 0.0) + v * dt
|
|
particles: dict[str, float] = {k: v * dt for k, v in decay_particles.items()}
|
|
state.core.add_products(products)
|
|
state.core.add_emitted_particles(particles)
|
|
self._check_poison_alerts(state)
|
|
|
|
self._update_loop_inventory(
|
|
state.primary_loop, constants.PRIMARY_LOOP_VOLUME_M3, constants.PRIMARY_INVENTORY_TARGET, dt
|
|
)
|
|
self._update_loop_inventory(
|
|
state.secondary_loop, constants.SECONDARY_LOOP_VOLUME_M3, constants.SECONDARY_INVENTORY_TARGET, dt
|
|
)
|
|
|
|
pump_demand = overrides.get(
|
|
"coolant_demand",
|
|
self.control.coolant_demand(
|
|
state.primary_loop,
|
|
state.core.power_output_mw,
|
|
state.total_electrical_output(),
|
|
),
|
|
)
|
|
self.primary_pump_active = self.primary_pump_active and any(self.primary_pump_units)
|
|
self.secondary_pump_active = self.secondary_pump_active and any(self.secondary_pump_units)
|
|
primary_units_active = [
|
|
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
|
|
for idx in range(2)
|
|
]
|
|
secondary_units_active = [
|
|
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
|
|
for idx in range(2)
|
|
]
|
|
any_units = any(primary_units_active) or any(secondary_units_active)
|
|
load_present = any_units or (self.consumer and self.consumer.online) or state.total_electrical_output() > 0.1
|
|
idle_core = state.core.power_output_mw < 1.0 and self.control.rod_fraction >= 0.9
|
|
baseline_off = (
|
|
(self.shutdown or idle_core)
|
|
and not load_present
|
|
and state.total_electrical_output() <= 0.01
|
|
and sum(primary_units_active) == 0
|
|
and sum(secondary_units_active) == 0
|
|
)
|
|
aux_base = 0.0 if baseline_off else constants.BASE_AUX_LOAD_MW
|
|
aux_pump_primary = constants.PUMP_POWER_MW * sum(primary_units_active)
|
|
aux_pump_secondary = constants.PUMP_POWER_MW * sum(secondary_units_active)
|
|
aux_demand = aux_base + aux_pump_primary + aux_pump_secondary
|
|
turbine_electrical = state.total_electrical_output()
|
|
generator_power = self._step_generators(state, aux_demand, turbine_electrical, dt)
|
|
aux_available = turbine_electrical + generator_power
|
|
supplied = aux_available if aux_demand <= 0 else min(aux_available, aux_demand)
|
|
power_ratio = 1.0 if aux_demand <= 0 else min(1.0, supplied / max(1e-6, aux_demand))
|
|
if aux_demand > 0 and aux_available < 0.99 * aux_demand:
|
|
LOGGER.warning("Aux power deficit: available %.1f/%.1f MW", aux_available, aux_demand)
|
|
state.aux_draws = {
|
|
"base": aux_base * power_ratio,
|
|
"primary_pumps": aux_pump_primary * power_ratio,
|
|
"secondary_pumps": aux_pump_secondary * power_ratio,
|
|
"total_demand": aux_demand,
|
|
"supplied": supplied,
|
|
"generator_output": generator_power,
|
|
"turbine_output": turbine_electrical,
|
|
}
|
|
|
|
if self.primary_pump_active:
|
|
total_flow = 0.0
|
|
base_flow, base_head = self.primary_pump.performance(pump_demand)
|
|
target_flow = base_flow * power_ratio
|
|
loop_pressure = max(0.1, saturation_pressure(state.primary_loop.temperature_out))
|
|
target_pressure = max(0.5, base_head * power_ratio)
|
|
primary_flow_scale = min(
|
|
self._inventory_flow_scale(state.primary_loop), self._npsh_factor(state.primary_loop)
|
|
)
|
|
for idx, pump_state in enumerate(state.primary_pumps):
|
|
unit_enabled = (
|
|
self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx]
|
|
)
|
|
powered = power_ratio > 0.1
|
|
desired_flow = target_flow * primary_flow_scale if unit_enabled else 0.0
|
|
desired_pressure = target_pressure if unit_enabled else 0.5
|
|
if not powered:
|
|
desired_flow = 0.0
|
|
desired_pressure = 0.5
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, desired_flow, dt, self.primary_pump.spool_time
|
|
)
|
|
pump_state.pressure = self._ramp_value(
|
|
pump_state.pressure, desired_pressure, dt, self.primary_pump.spool_time
|
|
)
|
|
pump_state.active = unit_enabled and powered and pump_state.flow_rate > 1.0
|
|
if not powered or not unit_enabled:
|
|
pump_state.status = "STOPPING" if pump_state.flow_rate > 1.0 else "OFF"
|
|
elif primary_flow_scale < 0.99:
|
|
pump_state.status = "CAV"
|
|
elif pump_state.flow_rate < max(1.0, desired_flow * 0.8):
|
|
pump_state.status = "STARTING"
|
|
else:
|
|
pump_state.status = "RUN"
|
|
total_flow += pump_state.flow_rate
|
|
loop_pressure = max(loop_pressure, pump_state.pressure)
|
|
state.primary_loop.mass_flow_rate = total_flow
|
|
state.primary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
|
|
state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time
|
|
)
|
|
else:
|
|
state.primary_loop.mass_flow_rate = self._ramp_value(
|
|
state.primary_loop.mass_flow_rate, 0.0, dt, self.primary_pump.spool_time
|
|
)
|
|
target_pressure = max(0.5, saturation_pressure(state.primary_loop.temperature_out))
|
|
state.primary_loop.pressure = self._ramp_value(
|
|
state.primary_loop.pressure, target_pressure, dt, self.primary_pump.spool_time
|
|
)
|
|
for pump_state in state.primary_pumps:
|
|
pump_state.active = False
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, 0.0, dt, self.primary_pump.spool_time
|
|
)
|
|
pump_state.pressure = state.primary_loop.pressure
|
|
pump_state.status = "STOPPING" if pump_state.flow_rate > 0.1 else "OFF"
|
|
if self.secondary_pump_active:
|
|
total_flow = 0.0
|
|
base_flow, base_head = self.secondary_pump.performance(0.75)
|
|
target_pressure = max(0.5, base_head * power_ratio)
|
|
loop_pressure = max(0.1, saturation_pressure(state.secondary_loop.temperature_out))
|
|
target_flow = base_flow * power_ratio
|
|
secondary_flow_scale = min(
|
|
self._inventory_flow_scale(state.secondary_loop), self._npsh_factor(state.secondary_loop)
|
|
)
|
|
for idx, pump_state in enumerate(state.secondary_pumps):
|
|
unit_enabled = (
|
|
self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx]
|
|
)
|
|
powered = power_ratio > 0.1
|
|
desired_flow = target_flow * secondary_flow_scale if unit_enabled else 0.0
|
|
desired_pressure = target_pressure if unit_enabled else 0.5
|
|
if not powered:
|
|
desired_flow = 0.0
|
|
desired_pressure = 0.5
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, desired_flow, dt, self.secondary_pump.spool_time
|
|
)
|
|
pump_state.pressure = self._ramp_value(
|
|
pump_state.pressure, desired_pressure, dt, self.secondary_pump.spool_time
|
|
)
|
|
pump_state.active = unit_enabled and powered and pump_state.flow_rate > 1.0
|
|
if not powered or not unit_enabled:
|
|
pump_state.status = "STOPPING" if pump_state.flow_rate > 1.0 else "OFF"
|
|
elif secondary_flow_scale < 0.99:
|
|
pump_state.status = "CAV"
|
|
elif pump_state.flow_rate < max(1.0, desired_flow * 0.8):
|
|
pump_state.status = "STARTING"
|
|
else:
|
|
pump_state.status = "RUN"
|
|
total_flow += pump_state.flow_rate
|
|
loop_pressure = max(loop_pressure, pump_state.pressure)
|
|
state.secondary_loop.mass_flow_rate = total_flow
|
|
state.secondary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value(
|
|
state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time
|
|
)
|
|
else:
|
|
state.secondary_loop.mass_flow_rate = self._ramp_value(
|
|
state.secondary_loop.mass_flow_rate, 0.0, dt, self.secondary_pump.spool_time
|
|
)
|
|
state.secondary_loop.pressure = self._ramp_value(
|
|
state.secondary_loop.pressure, max(0.1, saturation_pressure(state.secondary_loop.temperature_out)), dt, self.secondary_pump.spool_time
|
|
)
|
|
for pump_state in state.secondary_pumps:
|
|
pump_state.active = False
|
|
pump_state.flow_rate = self._ramp_value(
|
|
pump_state.flow_rate, 0.0, dt, self.secondary_pump.spool_time
|
|
)
|
|
pump_state.pressure = state.secondary_loop.pressure
|
|
pump_state.status = "STOPPING" if pump_state.flow_rate > 0.1 else "OFF"
|
|
|
|
self._apply_pressurizer(state.primary_loop, dt)
|
|
if self.primary_relief_open:
|
|
state.primary_loop.pressure = max(0.1, saturation_pressure(state.primary_loop.temperature_out))
|
|
if self.secondary_relief_open:
|
|
state.secondary_loop.pressure = max(0.1, saturation_pressure(state.secondary_loop.temperature_out))
|
|
|
|
if not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0:
|
|
transferred = 0.0
|
|
else:
|
|
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power)
|
|
self.thermal.step_core(state.core, state.primary_loop, total_power, dt)
|
|
self.thermal.step_secondary(state.secondary_loop, transferred, dt)
|
|
self._apply_secondary_boiloff(state, dt)
|
|
self._update_loop_inventory(
|
|
state.secondary_loop, constants.SECONDARY_LOOP_VOLUME_M3, constants.SECONDARY_INVENTORY_TARGET, dt
|
|
)
|
|
|
|
self._step_turbine_bank(state, transferred, dt)
|
|
self._maintenance_tick(state, dt)
|
|
|
|
if (not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0) and total_power > 50.0:
|
|
self._handle_heat_sink_loss(state)
|
|
|
|
failures = self.health_monitor.evaluate(
|
|
state,
|
|
primary_units_active,
|
|
secondary_units_active,
|
|
self.turbine_unit_active,
|
|
state.generators,
|
|
dt,
|
|
)
|
|
for failure in failures:
|
|
self._handle_failure(failure)
|
|
|
|
state.time_elapsed += dt
|
|
|
|
# Update inlet temperatures based on heat removed/added for next iteration.
|
|
env = constants.ENVIRONMENT_TEMPERATURE
|
|
primary_cooling = temperature_rise(transferred, state.primary_loop.mass_flow_rate)
|
|
if transferred <= 0.0 or state.secondary_loop.mass_flow_rate <= 1.0:
|
|
passive = 0.02 * max(0.0, state.primary_loop.temperature_out - env) * dt
|
|
primary_cooling = max(primary_cooling, passive)
|
|
state.primary_loop.temperature_in = max(env, state.primary_loop.temperature_out - primary_cooling)
|
|
|
|
if state.secondary_loop.mass_flow_rate <= 1.0:
|
|
target_temp = env
|
|
state.secondary_loop.temperature_out = self._ramp_value(
|
|
state.secondary_loop.temperature_out, target_temp, dt, self.secondary_pump.spool_time
|
|
)
|
|
state.secondary_loop.temperature_in = state.secondary_loop.temperature_out
|
|
else:
|
|
secondary_cooling = max(0.0, state.secondary_loop.temperature_out - env - 40.0)
|
|
state.secondary_loop.temperature_in = max(env, state.secondary_loop.temperature_out - max(20.0, secondary_cooling))
|
|
|
|
state.primary_to_secondary_delta_t = max(0.0, state.primary_loop.temperature_out - state.secondary_loop.temperature_in)
|
|
state.heat_exchanger_efficiency = 0.0 if total_power <= 0 else min(1.0, max(0.0, transferred / max(1e-6, total_power)))
|
|
|
|
LOGGER.info(
|
|
(
|
|
"t=%5.1fs rods=%.2f core_power=%.1fMW prompt=%.1fMW :: "
|
|
"fissions %.2e/s, outlet %.1fK, electrical %.1fMW load %.1f/%.1fMW"
|
|
),
|
|
state.time_elapsed,
|
|
rod_fraction,
|
|
total_power,
|
|
prompt_power,
|
|
fission_rate,
|
|
state.primary_loop.temperature_out,
|
|
state.total_electrical_output(),
|
|
sum(t.load_supplied_mw for t in state.turbines),
|
|
sum(t.load_demand_mw for t in state.turbines),
|
|
)
|
|
|
|
def _step_turbine_bank(self, state: PlantState, steam_power_mw: float, dt: float) -> None:
|
|
if not state.turbines:
|
|
return
|
|
active_indices = [
|
|
idx for idx, active in enumerate(self.turbine_unit_active) if active and idx < len(state.turbines)
|
|
]
|
|
power_per_unit = steam_power_mw / len(active_indices) if active_indices else 0.0
|
|
for idx, turbine in enumerate(self.turbines):
|
|
if idx >= len(state.turbines):
|
|
break
|
|
turbine_state = state.turbines[idx]
|
|
if idx in active_indices:
|
|
# Simple throttle map: reduce throttle when electrical demand is low, open as demand rises.
|
|
demand = turbine_state.load_demand_mw
|
|
throttle = 0.4 if demand <= 0 else min(1.0, 0.4 + demand / max(1e-6, turbine.rated_output_mw))
|
|
turbine.throttle = throttle
|
|
turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit, dt=dt)
|
|
if power_per_unit <= 0.0 and turbine_state.electrical_output_mw < 0.1:
|
|
turbine_state.status = "OFF"
|
|
elif turbine_state.electrical_output_mw < max(0.1 * turbine.rated_output_mw, 1.0):
|
|
turbine_state.status = "STARTING"
|
|
else:
|
|
turbine_state.status = "RUN"
|
|
else:
|
|
self._spin_down_turbine(turbine_state, dt, turbine.spool_time)
|
|
turbine_state.status = "STOPPING" if turbine_state.electrical_output_mw > 0.1 else "OFF"
|
|
self._dispatch_consumer_load(state, active_indices)
|
|
|
|
def _reset_turbine_state(self, turbine_state: TurbineState) -> None:
|
|
turbine_state.shaft_power_mw = 0.0
|
|
turbine_state.electrical_output_mw = 0.0
|
|
turbine_state.load_demand_mw = 0.0
|
|
turbine_state.load_supplied_mw = 0.0
|
|
turbine_state.status = "OFF"
|
|
|
|
@staticmethod
|
|
def _ramp_value(current: float, target: float, dt: float, time_constant: float) -> float:
|
|
if time_constant <= 0.0:
|
|
return target
|
|
alpha = min(1.0, max(0.0, dt / time_constant))
|
|
return current + (target - current) * alpha
|
|
|
|
def _spin_down_turbine(self, turbine_state: TurbineState, dt: float, time_constant: float) -> None:
|
|
turbine_state.shaft_power_mw = self._ramp_value(turbine_state.shaft_power_mw, 0.0, dt, time_constant)
|
|
turbine_state.electrical_output_mw = self._ramp_value(
|
|
turbine_state.electrical_output_mw, 0.0, dt, time_constant
|
|
)
|
|
turbine_state.load_demand_mw = 0.0
|
|
turbine_state.load_supplied_mw = self._ramp_value(
|
|
turbine_state.load_supplied_mw, 0.0, dt, time_constant
|
|
)
|
|
if turbine_state.electrical_output_mw < 0.1:
|
|
turbine_state.electrical_output_mw = 0.0
|
|
|
|
def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None:
|
|
total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices)
|
|
if self.consumer:
|
|
demand = self.consumer.request_power()
|
|
supplied = min(total_electrical, demand)
|
|
self.consumer.update_power_received(supplied)
|
|
else:
|
|
demand = 0.0
|
|
supplied = 0.0
|
|
demand_per_unit = demand / len(active_indices) if active_indices else 0.0
|
|
total_for_share = total_electrical if total_electrical > 0 else 1.0
|
|
for idx, turbine_state in enumerate(state.turbines):
|
|
if idx not in active_indices:
|
|
turbine_state.load_demand_mw = 0.0
|
|
turbine_state.load_supplied_mw = 0.0
|
|
continue
|
|
share = 0.0 if total_electrical <= 0 else supplied * (turbine_state.electrical_output_mw / total_for_share)
|
|
turbine_state.load_demand_mw = demand_per_unit
|
|
turbine_state.load_supplied_mw = share if demand_per_unit <= 0 else min(share, demand_per_unit)
|
|
|
|
def _nominal_inventory(self, volume_m3: float) -> float:
|
|
return volume_m3 * constants.COOLANT_DENSITY
|
|
|
|
def _update_loop_inventory(
|
|
self, loop: CoolantLoopState, volume_m3: float, target_level: float, dt: float
|
|
) -> None:
|
|
nominal_mass = self._nominal_inventory(volume_m3)
|
|
if nominal_mass <= 0.0:
|
|
loop.level = 0.0
|
|
return
|
|
if loop.inventory_kg <= 0.0:
|
|
loop.inventory_kg = nominal_mass * target_level
|
|
current_level = loop.inventory_kg / nominal_mass
|
|
correction = (target_level - current_level) * constants.LOOP_INVENTORY_CORRECTION_RATE
|
|
loop.inventory_kg = max(0.0, loop.inventory_kg + correction * nominal_mass * dt)
|
|
loop.level = min(1.2, max(0.0, loop.inventory_kg / nominal_mass))
|
|
|
|
def _inventory_flow_scale(self, loop: CoolantLoopState) -> float:
|
|
if loop.level <= constants.LOW_LEVEL_FLOW_FLOOR:
|
|
return 0.0
|
|
if loop.level <= 0.25:
|
|
return max(0.0, (loop.level - constants.LOW_LEVEL_FLOW_FLOOR) / (0.25 - constants.LOW_LEVEL_FLOW_FLOOR))
|
|
return 1.0
|
|
|
|
def _npsh_factor(self, loop: CoolantLoopState) -> float:
|
|
vapor_pressure = saturation_pressure(loop.temperature_in)
|
|
available = max(0.0, loop.pressure - vapor_pressure)
|
|
if available <= 0.0:
|
|
return 0.0
|
|
return max(0.0, min(1.0, available / constants.NPSH_REQUIRED_MPA))
|
|
|
|
def _apply_pressurizer(self, primary: CoolantLoopState, dt: float) -> None:
|
|
if self.shutdown and primary.mass_flow_rate <= 100.0:
|
|
return
|
|
target = constants.PRIMARY_PRESSURIZER_SETPOINT_MPA
|
|
band = constants.PRIMARY_PRESSURIZER_DEADBAND_MPA
|
|
heat_rate = constants.PRIMARY_PRESSURIZER_HEAT_RATE_MPA_PER_S
|
|
spray_rate = constants.PRIMARY_PRESSURIZER_SPRAY_RATE_MPA_PER_S
|
|
if primary.pressure < target - band and self.pressurizer_level > 0.05:
|
|
primary.pressure = min(target, primary.pressure + heat_rate * dt)
|
|
self.pressurizer_level = max(0.0, self.pressurizer_level - constants.PRIMARY_PRESSURIZER_LEVEL_DRAW_PER_S * dt)
|
|
elif primary.pressure > target + band:
|
|
primary.pressure = max(target - band, primary.pressure - spray_rate * dt)
|
|
self.pressurizer_level = min(1.0, self.pressurizer_level + constants.PRIMARY_PRESSURIZER_LEVEL_FILL_PER_S * dt)
|
|
primary.pressure = min(constants.MAX_PRESSURE, max(saturation_pressure(primary.temperature_out), primary.pressure))
|
|
|
|
def _apply_secondary_boiloff(self, state: PlantState, dt: float) -> None:
|
|
loop = state.secondary_loop
|
|
if loop.mass_flow_rate <= 0.0 or loop.steam_quality <= 0.0:
|
|
return
|
|
steam_mass = loop.mass_flow_rate * loop.steam_quality * constants.SECONDARY_STEAM_LOSS_FRACTION * dt
|
|
loop.inventory_kg = max(0.0, loop.inventory_kg - steam_mass)
|
|
nominal = self._nominal_inventory(constants.SECONDARY_LOOP_VOLUME_M3)
|
|
loop.level = min(1.2, max(0.0, loop.inventory_kg / nominal)) if nominal > 0 else 0.0
|
|
|
|
def _handle_failure(self, component: str) -> None:
|
|
if component == "core":
|
|
LOGGER.critical("Core failure detected. Initiating SCRAM.")
|
|
self.shutdown = True
|
|
self.control.scram()
|
|
elif component.startswith("primary_pump"):
|
|
idx = self._component_index(component)
|
|
self._toggle_primary_pump_unit(idx, False)
|
|
elif component.startswith("secondary_pump"):
|
|
idx = self._component_index(component)
|
|
self._toggle_secondary_pump_unit(idx, False)
|
|
elif component.startswith("generator"):
|
|
idx = self._component_index(component)
|
|
LOGGER.warning("Generator %d failed", idx + 1)
|
|
elif component.startswith("turbine"):
|
|
idx = self._component_index(component)
|
|
self._set_turbine_state(False, index=idx)
|
|
|
|
def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]:
|
|
overrides: dict[str, float] = {}
|
|
if command.scram:
|
|
self.shutdown = True
|
|
overrides["rod_fraction"] = self.control.scram()
|
|
self._set_turbine_state(False)
|
|
if command.generator_auto is not None:
|
|
self.generator_auto = command.generator_auto
|
|
if command.primary_relief is not None:
|
|
self.primary_relief_open = command.primary_relief
|
|
if command.secondary_relief is not None:
|
|
self.secondary_relief_open = command.secondary_relief
|
|
if command.power_setpoint is not None:
|
|
self.control.set_power_setpoint(command.power_setpoint)
|
|
if command.rod_manual is not None:
|
|
self.control.set_manual_mode(command.rod_manual)
|
|
if command.rod_manual is False and not self.meltdown:
|
|
self.shutdown = False
|
|
if command.rod_position is not None:
|
|
self.control.set_manual_mode(True)
|
|
self.control.set_rods(command.rod_position)
|
|
self.shutdown = self.shutdown or command.rod_position >= 0.95
|
|
elif command.rod_step is not None:
|
|
self.control.increment_rods(command.rod_step)
|
|
if command.primary_pumps:
|
|
for idx, flag in command.primary_pumps.items():
|
|
self._toggle_primary_pump_unit(idx - 1, flag)
|
|
if command.secondary_pumps:
|
|
for idx, flag in command.secondary_pumps.items():
|
|
self._toggle_secondary_pump_unit(idx - 1, flag)
|
|
if command.generator_units:
|
|
for idx, flag in command.generator_units.items():
|
|
self._toggle_generator(idx - 1, flag, state)
|
|
if command.turbine_on is not None:
|
|
self._set_turbine_state(command.turbine_on)
|
|
if command.turbine_units:
|
|
for key, state_flag in command.turbine_units.items():
|
|
idx = key - 1
|
|
self._set_turbine_state(state_flag, index=idx)
|
|
if command.consumer_online is not None and self.consumer:
|
|
self.consumer.set_online(command.consumer_online)
|
|
if command.consumer_demand is not None and self.consumer:
|
|
self.consumer.set_demand(command.consumer_demand)
|
|
if command.coolant_demand is not None:
|
|
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
|
|
for component in command.maintenance_components:
|
|
self._toggle_maintenance(component)
|
|
if not self.meltdown and not command.scram:
|
|
rod_target = overrides.get("rod_fraction", self.control.rod_fraction)
|
|
if rod_target < 0.95:
|
|
self.shutdown = False
|
|
return overrides
|
|
|
|
def _set_primary_pump(self, active: bool) -> None:
|
|
if self.primary_pump_active != active:
|
|
self.primary_pump_active = active
|
|
LOGGER.info("Primary pump %s", "enabled" if active else "stopped")
|
|
if not active:
|
|
self.primary_pump_units = [False] * len(self.primary_pump_units)
|
|
|
|
def _set_secondary_pump(self, active: bool) -> None:
|
|
if self.secondary_pump_active != active:
|
|
self.secondary_pump_active = active
|
|
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
|
|
if not active:
|
|
self.secondary_pump_units = [False] * len(self.secondary_pump_units)
|
|
|
|
def _toggle_primary_pump_unit(self, index: int, active: bool) -> None:
|
|
if index < 0 or index >= len(self.primary_pump_units):
|
|
LOGGER.warning("Ignoring primary pump index %s", index)
|
|
return
|
|
if self.primary_pump_units[index] != active:
|
|
self.primary_pump_units[index] = active
|
|
LOGGER.info("Primary pump %d %s", index + 1, "enabled" if active else "stopped")
|
|
if active:
|
|
self._set_primary_pump(True)
|
|
elif not any(self.primary_pump_units):
|
|
self._set_primary_pump(False)
|
|
|
|
def _toggle_secondary_pump_unit(self, index: int, active: bool) -> None:
|
|
if index < 0 or index >= len(self.secondary_pump_units):
|
|
LOGGER.warning("Ignoring secondary pump index %s", index)
|
|
return
|
|
if self.secondary_pump_units[index] != active:
|
|
self.secondary_pump_units[index] = active
|
|
LOGGER.info("Secondary pump %d %s", index + 1, "enabled" if active else "stopped")
|
|
if active:
|
|
self._set_secondary_pump(True)
|
|
elif not any(self.secondary_pump_units):
|
|
self._set_secondary_pump(False)
|
|
|
|
def _toggle_generator(self, index: int, active: bool, state: PlantState) -> None:
|
|
if index < 0 or index >= len(self.generators) or index >= len(state.generators):
|
|
LOGGER.warning("Ignoring generator index %s", index)
|
|
return
|
|
gen_state = state.generators[index]
|
|
if active:
|
|
self.generators[index].start(gen_state)
|
|
else:
|
|
self.generators[index].stop(gen_state)
|
|
|
|
def _trigger_meltdown(self, state: PlantState) -> None:
|
|
LOGGER.critical("Core meltdown in progress (%.1f K)", state.core.fuel_temperature)
|
|
self.meltdown = True
|
|
self.shutdown = True
|
|
self.control.scram()
|
|
try:
|
|
self.health_monitor.component("core").fail()
|
|
except KeyError:
|
|
pass
|
|
self._set_turbine_state(False)
|
|
|
|
def _step_generators(self, state: PlantState, aux_demand: float, turbine_electric: float, dt: float) -> float:
|
|
# Ensure we have generator state objects aligned with hardware.
|
|
if not state.generators or len(state.generators) < len(self.generators):
|
|
missing = len(self.generators) - len(state.generators)
|
|
for _ in range(missing):
|
|
state.generators.append(
|
|
GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0)
|
|
)
|
|
deficit = max(0.0, aux_demand - turbine_electric)
|
|
if self.generator_auto and aux_demand <= 0.0:
|
|
for idx, gen_state in enumerate(state.generators):
|
|
if gen_state.running or gen_state.starting:
|
|
self.generators[idx].stop(gen_state)
|
|
return 0.0
|
|
if self.generator_auto:
|
|
if deficit > 0.0:
|
|
for idx, gen_state in enumerate(state.generators):
|
|
if not (gen_state.running or gen_state.starting):
|
|
self.generators[idx].start(gen_state)
|
|
deficit -= self.generators[idx].rated_output_mw
|
|
if deficit <= 0:
|
|
break
|
|
elif turbine_electric > aux_demand:
|
|
for idx, gen_state in enumerate(state.generators):
|
|
if gen_state.running and not gen_state.starting:
|
|
self.generators[idx].stop(gen_state)
|
|
|
|
total_power = 0.0
|
|
remaining = max(0.0, aux_demand - turbine_electric)
|
|
active_indices = [idx for idx, g in enumerate(state.generators) if g.running or g.starting]
|
|
share = remaining / len(active_indices) if active_indices and remaining > 0 else 0.0
|
|
for idx, gen_state in enumerate(state.generators):
|
|
load = share if idx in active_indices else 0.0
|
|
delivered = self.generators[idx].step(gen_state, load, dt)
|
|
total_power += delivered
|
|
remaining = max(0.0, remaining - delivered)
|
|
return total_power
|
|
|
|
def _set_turbine_state(self, active: bool, index: int | None = None) -> None:
|
|
if index is None:
|
|
for idx in range(len(self.turbine_unit_active)):
|
|
self._set_turbine_state(active, index=idx)
|
|
return
|
|
if index < 0 or index >= len(self.turbine_unit_active):
|
|
LOGGER.warning("Ignoring turbine index %s", index)
|
|
return
|
|
if self.turbine_unit_active[index] != active:
|
|
self.turbine_unit_active[index] = active
|
|
LOGGER.info("Turbine %d %s", index + 1, "started" if active else "stopped")
|
|
self.turbine_active = any(self.turbine_unit_active)
|
|
|
|
def _component_index(self, name: str) -> int:
|
|
if name == "turbine":
|
|
return 0
|
|
parts = name.split("_")
|
|
try:
|
|
for token in reversed(parts):
|
|
return int(token) - 1
|
|
except (ValueError, TypeError):
|
|
return -1
|
|
|
|
def _perform_maintenance(self, component: str) -> None:
|
|
if not self._can_maintain(component):
|
|
return
|
|
self.health_monitor.maintain(component)
|
|
|
|
def _maintenance_tick(self, state: PlantState, dt: float) -> None:
|
|
if not self.maintenance_active:
|
|
return
|
|
completed: list[str] = []
|
|
for component in list(self.maintenance_active):
|
|
if not self._can_maintain(component):
|
|
continue
|
|
restored = self.health_monitor.maintain(component, amount=0.02 * dt)
|
|
comp_state = self.health_monitor.component(component)
|
|
if comp_state.integrity >= 0.999:
|
|
completed.append(component)
|
|
for comp in completed:
|
|
self.maintenance_active.discard(comp)
|
|
LOGGER.info("Maintenance completed for %s", comp)
|
|
|
|
def _toggle_maintenance(self, component: str) -> None:
|
|
if component in self.maintenance_active:
|
|
self.maintenance_active.remove(component)
|
|
LOGGER.info("Maintenance stopped for %s", component)
|
|
return
|
|
if not self._can_maintain(component):
|
|
return
|
|
self.maintenance_active.add(component)
|
|
LOGGER.info("Maintenance started for %s", component)
|
|
|
|
def _can_maintain(self, component: str) -> bool:
|
|
if component == "core" and not self.shutdown:
|
|
LOGGER.warning("Cannot maintain core while reactor is running")
|
|
return False
|
|
if component.startswith("primary_pump_"):
|
|
idx = self._component_index(component)
|
|
if idx < 0 or idx >= len(self.primary_pump_units):
|
|
LOGGER.warning("Unknown primary pump maintenance target %s", component)
|
|
return False
|
|
if self.primary_pump_units[idx]:
|
|
LOGGER.warning("Stop primary pump %d before maintenance", idx + 1)
|
|
return False
|
|
if component.startswith("secondary_pump_"):
|
|
idx = self._component_index(component)
|
|
if idx < 0 or idx >= len(self.secondary_pump_units):
|
|
LOGGER.warning("Unknown secondary pump maintenance target %s", component)
|
|
return False
|
|
if self.secondary_pump_units[idx]:
|
|
LOGGER.warning("Stop secondary pump %d before maintenance", idx + 1)
|
|
return False
|
|
if component.startswith("generator_"):
|
|
idx = self._component_index(component)
|
|
if idx < 0 or idx >= len(self.generators):
|
|
LOGGER.warning("Unknown generator maintenance target %s", component)
|
|
return False
|
|
if component.startswith("turbine"):
|
|
idx = self._component_index(component)
|
|
if idx < 0 or idx >= len(self.turbine_unit_active):
|
|
LOGGER.warning("Unknown turbine maintenance target %s", component)
|
|
return False
|
|
if self.turbine_unit_active[idx]:
|
|
LOGGER.warning("Stop turbine %d before maintenance", idx + 1)
|
|
return False
|
|
return True
|
|
|
|
def attach_consumer(self, consumer: ElectricalConsumer) -> None:
|
|
self.consumer = consumer
|
|
LOGGER.info("Attached consumer %s (%.1f MW)", consumer.name, consumer.demand_mw)
|
|
|
|
def detach_consumer(self) -> None:
|
|
if self.consumer:
|
|
LOGGER.info("Detached consumer %s", self.consumer.name)
|
|
self.consumer = None
|
|
|
|
def save_state(self, filepath: str, state: PlantState) -> None:
|
|
metadata = {
|
|
"primary_pump_active": self.primary_pump_active,
|
|
"secondary_pump_active": self.secondary_pump_active,
|
|
"primary_pump_units": self.primary_pump_units,
|
|
"secondary_pump_units": self.secondary_pump_units,
|
|
"turbine_active": self.turbine_active,
|
|
"turbine_units": self.turbine_unit_active,
|
|
"shutdown": self.shutdown,
|
|
"meltdown": self.meltdown,
|
|
"generator_auto": self.generator_auto,
|
|
"primary_relief_open": self.primary_relief_open,
|
|
"secondary_relief_open": self.secondary_relief_open,
|
|
"pressurizer_level": self.pressurizer_level,
|
|
"maintenance_active": list(self.maintenance_active),
|
|
"generators": [
|
|
{
|
|
"running": g.running,
|
|
"starting": g.starting,
|
|
"spool_remaining": g.spool_remaining,
|
|
"power_output_mw": g.power_output_mw,
|
|
"battery_charge": g.battery_charge,
|
|
"status": g.status,
|
|
}
|
|
for g in state.generators
|
|
],
|
|
"consumer": {
|
|
"online": self.consumer.online if self.consumer else False,
|
|
"demand_mw": self.consumer.demand_mw if self.consumer else 0.0,
|
|
"name": self.consumer.name if self.consumer else None,
|
|
},
|
|
}
|
|
self.control.save_state(filepath, state, metadata, self.health_monitor.snapshot())
|
|
|
|
def load_state(self, filepath: str) -> PlantState:
|
|
plant, metadata, health = self.control.load_state(filepath)
|
|
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
|
|
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
|
|
self.primary_pump_units = list(metadata.get("primary_pump_units", self.primary_pump_units))
|
|
self.secondary_pump_units = list(metadata.get("secondary_pump_units", self.secondary_pump_units))
|
|
unit_states = metadata.get("turbine_units")
|
|
if unit_states:
|
|
self.turbine_unit_active = list(unit_states)
|
|
self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active))
|
|
self.shutdown = metadata.get("shutdown", self.shutdown)
|
|
self.meltdown = metadata.get("meltdown", self.meltdown)
|
|
self.generator_auto = metadata.get("generator_auto", self.generator_auto)
|
|
self.primary_relief_open = metadata.get("primary_relief_open", self.primary_relief_open)
|
|
self.secondary_relief_open = metadata.get("secondary_relief_open", self.secondary_relief_open)
|
|
self.pressurizer_level = metadata.get("pressurizer_level", self.pressurizer_level)
|
|
maint = metadata.get("maintenance_active")
|
|
if maint is not None:
|
|
self.maintenance_active = set(maint)
|
|
consumer_cfg = metadata.get("consumer")
|
|
if consumer_cfg:
|
|
if not self.consumer:
|
|
self.consumer = ElectricalConsumer(
|
|
name=consumer_cfg.get("name") or "External",
|
|
demand_mw=consumer_cfg.get("demand_mw", 0.0),
|
|
online=consumer_cfg.get("online", False),
|
|
)
|
|
else:
|
|
self.consumer.set_demand(consumer_cfg.get("demand_mw", self.consumer.demand_mw))
|
|
self.consumer.set_online(consumer_cfg.get("online", self.consumer.online))
|
|
if health:
|
|
self.health_monitor.load_snapshot(health)
|
|
LOGGER.info("Reactor state restored from %s", filepath)
|
|
# Back-fill pump state lists for compatibility.
|
|
if not plant.primary_pumps or len(plant.primary_pumps) < 2:
|
|
plant.primary_pumps = [
|
|
PumpState(
|
|
active=self.primary_pump_active,
|
|
flow_rate=plant.primary_loop.mass_flow_rate / 2,
|
|
pressure=plant.primary_loop.pressure,
|
|
status="OFF",
|
|
)
|
|
for _ in range(2)
|
|
]
|
|
if not plant.secondary_pumps or len(plant.secondary_pumps) < 2:
|
|
plant.secondary_pumps = [
|
|
PumpState(
|
|
active=self.secondary_pump_active,
|
|
flow_rate=plant.secondary_loop.mass_flow_rate / 2,
|
|
pressure=plant.secondary_loop.pressure,
|
|
status="OFF",
|
|
)
|
|
for _ in range(2)
|
|
]
|
|
if len(plant.turbines) < len(self.turbines):
|
|
ambient = constants.ENVIRONMENT_TEMPERATURE
|
|
while len(plant.turbines) < len(self.turbines):
|
|
plant.turbines.append(
|
|
TurbineState(
|
|
steam_enthalpy=2_000.0,
|
|
shaft_power_mw=0.0,
|
|
electrical_output_mw=0.0,
|
|
condenser_temperature=ambient,
|
|
load_demand_mw=0.0,
|
|
load_supplied_mw=0.0,
|
|
)
|
|
)
|
|
gen_meta = metadata.get("generators", [])
|
|
if not plant.generators or len(plant.generators) < len(self.generators):
|
|
while len(plant.generators) < len(self.generators):
|
|
plant.generators.append(
|
|
GeneratorState(
|
|
running=False,
|
|
starting=False,
|
|
spool_remaining=0.0,
|
|
power_output_mw=0.0,
|
|
battery_charge=1.0,
|
|
status="OFF",
|
|
)
|
|
)
|
|
for idx, gen_state in enumerate(plant.generators):
|
|
if idx < len(gen_meta):
|
|
cfg = gen_meta[idx]
|
|
gen_state.running = cfg.get("running", gen_state.running)
|
|
gen_state.starting = cfg.get("starting", gen_state.starting)
|
|
gen_state.spool_remaining = cfg.get("spool_remaining", gen_state.spool_remaining)
|
|
gen_state.power_output_mw = cfg.get("power_output_mw", gen_state.power_output_mw)
|
|
gen_state.battery_charge = cfg.get("battery_charge", gen_state.battery_charge)
|
|
gen_state.status = cfg.get("status", gen_state.status)
|
|
return plant
|
|
|
|
def _handle_heat_sink_loss(self, state: PlantState) -> None:
|
|
if not self.shutdown:
|
|
LOGGER.critical("Loss of secondary heat sink detected. Initiating SCRAM.")
|
|
self.shutdown = True
|
|
self.control.scram()
|
|
self._set_turbine_state(False)
|
|
# Clear turbine output and demands to reflect lost steam.
|
|
for turbine_state in state.turbines:
|
|
self._reset_turbine_state(turbine_state)
|
|
|
|
def _check_poison_alerts(self, state: PlantState) -> None:
|
|
inventory = state.core.fission_product_inventory or {}
|
|
for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items():
|
|
amount = inventory.get(symbol, 0.0)
|
|
if amount >= threshold and symbol not in self.poison_alerts:
|
|
self.poison_alerts.add(symbol)
|
|
LOGGER.warning("Poison level high: %s inventory %.2e exceeds %.2e", symbol, amount, threshold)
|