"""High-level reactor orchestration.""" from __future__ import annotations from dataclasses import dataclass, field import logging from . import constants from .atomic import AtomicPhysics from .commands import ReactorCommand from .coolant import Pump from .consumer import ElectricalConsumer from .control import ControlSystem from .failures import HealthMonitor from .fuel import FuelAssembly, decay_heat_fraction from .generator import DieselGenerator, GeneratorState from .neutronics import NeutronDynamics from .state import CoolantLoopState, CoreState, PlantState, PumpState, TurbineState from .thermal import ThermalSolver, heat_transfer from .turbine import SteamGenerator, Turbine LOGGER = logging.getLogger(__name__) @dataclass class Reactor: fuel: FuelAssembly neutronics: NeutronDynamics control: ControlSystem primary_pump: Pump secondary_pump: Pump thermal: ThermalSolver steam_generator: SteamGenerator turbines: list[Turbine] generators: list[DieselGenerator] atomic_model: AtomicPhysics consumer: ElectricalConsumer | None = None health_monitor: HealthMonitor = field(default_factory=HealthMonitor) primary_pump_active: bool = True secondary_pump_active: bool = True primary_pump_units: list[bool] = field(default_factory=lambda: [True, True]) secondary_pump_units: list[bool] = field(default_factory=lambda: [True, True]) turbine_active: bool = True turbine_unit_active: list[bool] = field(default_factory=lambda: [True, True, True]) shutdown: bool = False meltdown: bool = False poison_alerts: set[str] = field(default_factory=set) maintenance_active: set[str] = field(default_factory=set) def __post_init__(self) -> None: if not self.turbines: self.turbines = [Turbine()] if not self.turbine_unit_active or len(self.turbine_unit_active) != len(self.turbines): self.turbine_unit_active = [True] * len(self.turbines) self.turbine_active = any(self.turbine_unit_active) if not self.generators: self.generators = [DieselGenerator() for _ in range(2)] if not self.primary_pump_units or len(self.primary_pump_units) != 2: self.primary_pump_units = [True, True] if not self.secondary_pump_units or len(self.secondary_pump_units) != 2: self.secondary_pump_units = [True, True] @classmethod def default(cls) -> "Reactor": atomic_model = AtomicPhysics() return cls( fuel=FuelAssembly(enrichment=0.045, mass_kg=80_000.0, atomic_physics=atomic_model), neutronics=NeutronDynamics(), control=ControlSystem(), primary_pump=Pump(nominal_flow=18_000.0), secondary_pump=Pump(nominal_flow=16_000.0, efficiency=0.85), thermal=ThermalSolver(), steam_generator=SteamGenerator(), turbines=[Turbine() for _ in range(3)], generators=[DieselGenerator() for _ in range(2)], atomic_model=atomic_model, consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False), health_monitor=HealthMonitor(), ) def initial_state(self) -> PlantState: ambient = constants.ENVIRONMENT_TEMPERATURE core = CoreState( fuel_temperature=ambient, neutron_flux=1e5, reactivity_margin=-0.02, power_output_mw=0.1, burnup=0.0, fission_product_inventory={}, emitted_particles={}, ) # Default to a cold, safe configuration: rods fully inserted, manual control, pumps/turbines off. self.control.manual_control = True self.control.rod_fraction = 0.95 self.shutdown = True self.meltdown = False self.primary_pump_active = False self.secondary_pump_active = False self.turbine_unit_active = [False] * len(self.turbines) self.turbine_active = any(self.turbine_unit_active) if self.consumer: self.consumer.set_online(False) primary = CoolantLoopState( temperature_in=ambient, temperature_out=ambient, pressure=0.5, mass_flow_rate=0.0, steam_quality=0.0, ) secondary = CoolantLoopState( temperature_in=ambient, temperature_out=ambient, pressure=0.5, mass_flow_rate=0.0, steam_quality=0.0, ) primary_pumps = [PumpState(active=self.primary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)] secondary_pumps = [PumpState(active=self.secondary_pump_active, flow_rate=0.0, pressure=0.5) for _ in range(2)] generator_states = [ GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0) for _ in self.generators ] turbine_states = [ TurbineState( steam_enthalpy=2_000.0, shaft_power_mw=0.0, electrical_output_mw=0.0, condenser_temperature=ambient, load_demand_mw=0.0, load_supplied_mw=0.0, ) for _ in self.turbines ] return PlantState( core=core, primary_loop=primary, secondary_loop=secondary, turbines=turbine_states, primary_pumps=primary_pumps, secondary_pumps=secondary_pumps, generators=generator_states, ) def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None: if self.shutdown: rod_fraction = self.control.rod_fraction else: rod_fraction = self.control.update_rods(state.core, dt) if state.core.fuel_temperature >= constants.CORE_MELTDOWN_TEMPERATURE and not self.meltdown: self._trigger_meltdown(state) overrides = {} if command: overrides = self._apply_command(command, state) rod_fraction = overrides.get("rod_fraction", rod_fraction) decay_power, decay_neutron_source, decay_products, decay_particles = self.fuel.decay_reaction_effects( state.core ) self.neutronics.step(state.core, rod_fraction, dt, external_source_rate=decay_neutron_source) prompt_power, fission_rate, fission_event = self.fuel.prompt_energy_rate( state.core.neutron_flux, rod_fraction ) decay_heat = decay_heat_fraction(state.core.burnup) * state.core.power_output_mw total_power = prompt_power + decay_heat + decay_power state.core.power_output_mw = total_power state.core.update_burnup(dt) # Track fission products and emitted particles for diagnostics. products: dict[str, float] = {} for atom in fission_event.products: products[atom.symbol] = products.get(atom.symbol, 0.0) + fission_rate * dt for k, v in decay_products.items(): products[k] = products.get(k, 0.0) + v * dt particles: dict[str, float] = {k: v * dt for k, v in decay_particles.items()} state.core.add_products(products) state.core.add_emitted_particles(particles) self._check_poison_alerts(state) pump_demand = overrides.get("coolant_demand", self.control.coolant_demand(state.primary_loop)) self.primary_pump_active = self.primary_pump_active and any(self.primary_pump_units) self.secondary_pump_active = self.secondary_pump_active and any(self.secondary_pump_units) primary_units_active = [ self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx] for idx in range(2) ] secondary_units_active = [ self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx] for idx in range(2) ] aux_demand = constants.BASE_AUX_LOAD_MW + constants.PUMP_POWER_MW * ( sum(primary_units_active) + sum(secondary_units_active) ) turbine_electrical = state.total_electrical_output() generator_power = self._step_generators(state, aux_demand, turbine_electrical, dt) aux_available = turbine_electrical + generator_power power_ratio = 1.0 if aux_demand <= 0 else min(1.0, aux_available / aux_demand) if aux_demand > 0 and aux_available < 0.5 * aux_demand: LOGGER.warning("Aux power deficit: available %.1f/%.1f MW", aux_available, aux_demand) if self.primary_pump_active: total_flow = 0.0 target_pressure = (12.0 * pump_demand + 2.0) * power_ratio loop_pressure = 0.5 target_flow = self.primary_pump.flow_rate(pump_demand) * power_ratio for idx, pump_state in enumerate(state.primary_pumps): unit_enabled = ( self.primary_pump_active and idx < len(self.primary_pump_units) and self.primary_pump_units[idx] ) desired_flow = target_flow if unit_enabled else 0.0 desired_pressure = target_pressure if unit_enabled else 0.5 pump_state.flow_rate = self._ramp_value( pump_state.flow_rate, desired_flow, dt, self.primary_pump.spool_time ) pump_state.pressure = self._ramp_value( pump_state.pressure, desired_pressure, dt, self.primary_pump.spool_time ) pump_state.active = (unit_enabled and power_ratio > 0.05) or pump_state.flow_rate > 1.0 total_flow += pump_state.flow_rate loop_pressure = max(loop_pressure, pump_state.pressure) state.primary_loop.mass_flow_rate = total_flow state.primary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value( state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time ) else: state.primary_loop.mass_flow_rate = self._ramp_value( state.primary_loop.mass_flow_rate, 0.0, dt, self.primary_pump.spool_time ) state.primary_loop.pressure = self._ramp_value( state.primary_loop.pressure, 0.5, dt, self.primary_pump.spool_time ) for pump_state in state.primary_pumps: pump_state.active = False pump_state.flow_rate = self._ramp_value( pump_state.flow_rate, 0.0, dt, self.primary_pump.spool_time ) pump_state.pressure = self._ramp_value( pump_state.pressure, state.primary_loop.pressure, dt, self.primary_pump.spool_time ) if self.secondary_pump_active: total_flow = 0.0 target_pressure = 12.0 * 0.75 + 2.0 loop_pressure = 0.5 target_flow = self.secondary_pump.flow_rate(0.75) for idx, pump_state in enumerate(state.secondary_pumps): unit_enabled = ( self.secondary_pump_active and idx < len(self.secondary_pump_units) and self.secondary_pump_units[idx] ) desired_flow = target_flow if unit_enabled else 0.0 desired_pressure = target_pressure if unit_enabled else 0.5 pump_state.flow_rate = self._ramp_value( pump_state.flow_rate, desired_flow, dt, self.secondary_pump.spool_time ) pump_state.pressure = self._ramp_value( pump_state.pressure, desired_pressure, dt, self.secondary_pump.spool_time ) pump_state.active = unit_enabled or pump_state.flow_rate > 1.0 total_flow += pump_state.flow_rate loop_pressure = max(loop_pressure, pump_state.pressure) state.secondary_loop.mass_flow_rate = total_flow state.secondary_loop.pressure = loop_pressure if total_flow > 0 else self._ramp_value( state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time ) else: state.secondary_loop.mass_flow_rate = self._ramp_value( state.secondary_loop.mass_flow_rate, 0.0, dt, self.secondary_pump.spool_time ) state.secondary_loop.pressure = self._ramp_value( state.secondary_loop.pressure, 0.5, dt, self.secondary_pump.spool_time ) for pump_state in state.secondary_pumps: pump_state.active = False pump_state.flow_rate = self._ramp_value( pump_state.flow_rate, 0.0, dt, self.secondary_pump.spool_time ) pump_state.pressure = self._ramp_value( pump_state.pressure, state.secondary_loop.pressure, dt, self.secondary_pump.spool_time ) self.thermal.step_core(state.core, state.primary_loop, total_power, dt) if not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0: transferred = 0.0 else: transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power) self.thermal.step_secondary(state.secondary_loop, transferred) self._step_turbine_bank(state, transferred, dt) self._maintenance_tick(state, dt) if (not self.secondary_pump_active or state.secondary_loop.mass_flow_rate <= 1.0) and total_power > 50.0: self._handle_heat_sink_loss(state) failures = self.health_monitor.evaluate( state, primary_units_active, secondary_units_active, self.turbine_unit_active, state.generators, dt, ) for failure in failures: self._handle_failure(failure) state.time_elapsed += dt LOGGER.info( ( "t=%5.1fs rods=%.2f core_power=%.1fMW prompt=%.1fMW :: " "fissions %.2e/s, outlet %.1fK, electrical %.1fMW load %.1f/%.1fMW" ), state.time_elapsed, rod_fraction, total_power, prompt_power, fission_rate, state.primary_loop.temperature_out, state.total_electrical_output(), sum(t.load_supplied_mw for t in state.turbines), sum(t.load_demand_mw for t in state.turbines), ) def _step_turbine_bank(self, state: PlantState, steam_power_mw: float, dt: float) -> None: if not state.turbines: return active_indices = [ idx for idx, active in enumerate(self.turbine_unit_active) if active and idx < len(state.turbines) ] power_per_unit = steam_power_mw / len(active_indices) if active_indices else 0.0 for idx, turbine in enumerate(self.turbines): if idx >= len(state.turbines): break turbine_state = state.turbines[idx] if idx in active_indices: turbine.step(state.secondary_loop, turbine_state, steam_power_mw=power_per_unit, dt=dt) else: self._spin_down_turbine(turbine_state, dt, turbine.spool_time) self._dispatch_consumer_load(state, active_indices) def _reset_turbine_state(self, turbine_state: TurbineState) -> None: turbine_state.shaft_power_mw = 0.0 turbine_state.electrical_output_mw = 0.0 turbine_state.load_demand_mw = 0.0 turbine_state.load_supplied_mw = 0.0 @staticmethod def _ramp_value(current: float, target: float, dt: float, time_constant: float) -> float: if time_constant <= 0.0: return target alpha = min(1.0, max(0.0, dt / time_constant)) return current + (target - current) * alpha def _spin_down_turbine(self, turbine_state: TurbineState, dt: float, time_constant: float) -> None: turbine_state.shaft_power_mw = self._ramp_value(turbine_state.shaft_power_mw, 0.0, dt, time_constant) turbine_state.electrical_output_mw = self._ramp_value( turbine_state.electrical_output_mw, 0.0, dt, time_constant ) turbine_state.load_demand_mw = 0.0 turbine_state.load_supplied_mw = self._ramp_value( turbine_state.load_supplied_mw, 0.0, dt, time_constant ) def _dispatch_consumer_load(self, state: PlantState, active_indices: list[int]) -> None: total_electrical = sum(state.turbines[idx].electrical_output_mw for idx in active_indices) if self.consumer: demand = self.consumer.request_power() supplied = min(total_electrical, demand) self.consumer.update_power_received(supplied) else: demand = 0.0 supplied = 0.0 demand_per_unit = demand / len(active_indices) if active_indices else 0.0 total_for_share = total_electrical if total_electrical > 0 else 1.0 for idx, turbine_state in enumerate(state.turbines): if idx not in active_indices: turbine_state.load_demand_mw = 0.0 turbine_state.load_supplied_mw = 0.0 continue share = 0.0 if total_electrical <= 0 else supplied * (turbine_state.electrical_output_mw / total_for_share) turbine_state.load_demand_mw = demand_per_unit turbine_state.load_supplied_mw = share if demand_per_unit <= 0 else min(share, demand_per_unit) def _handle_failure(self, component: str) -> None: if component == "core": LOGGER.critical("Core failure detected. Initiating SCRAM.") self.shutdown = True self.control.scram() elif component.startswith("primary_pump"): idx = self._component_index(component) self._toggle_primary_pump_unit(idx, False) elif component.startswith("secondary_pump"): idx = self._component_index(component) self._toggle_secondary_pump_unit(idx, False) elif component.startswith("generator"): idx = self._component_index(component) LOGGER.warning("Generator %d failed", idx + 1) elif component.startswith("turbine"): idx = self._component_index(component) self._set_turbine_state(False, index=idx) def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]: overrides: dict[str, float] = {} if command.scram: self.shutdown = True overrides["rod_fraction"] = self.control.scram() self._set_turbine_state(False) if command.power_setpoint is not None: self.control.set_power_setpoint(command.power_setpoint) if command.rod_manual is not None: self.control.set_manual_mode(command.rod_manual) if command.rod_position is not None: self.control.set_manual_mode(True) overrides["rod_fraction"] = self.control.set_rods(command.rod_position) self.shutdown = self.shutdown or command.rod_position >= 0.95 elif command.rod_step is not None: overrides["rod_fraction"] = self.control.increment_rods(command.rod_step) if command.primary_pumps: for idx, flag in command.primary_pumps.items(): self._toggle_primary_pump_unit(idx - 1, flag) if command.secondary_pumps: for idx, flag in command.secondary_pumps.items(): self._toggle_secondary_pump_unit(idx - 1, flag) if command.generator_units: for idx, flag in command.generator_units.items(): self._toggle_generator(idx - 1, flag, state) if command.turbine_on is not None: self._set_turbine_state(command.turbine_on) if command.turbine_units: for key, state_flag in command.turbine_units.items(): idx = key - 1 self._set_turbine_state(state_flag, index=idx) if command.consumer_online is not None and self.consumer: self.consumer.set_online(command.consumer_online) if command.consumer_demand is not None and self.consumer: self.consumer.set_demand(command.consumer_demand) if command.coolant_demand is not None: overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand)) for component in command.maintenance_components: self._toggle_maintenance(component) return overrides def _set_primary_pump(self, active: bool) -> None: if self.primary_pump_active != active: self.primary_pump_active = active LOGGER.info("Primary pump %s", "enabled" if active else "stopped") if not active: self.primary_pump_units = [False] * len(self.primary_pump_units) elif active and not any(self.primary_pump_units): self.primary_pump_units = [True] * len(self.primary_pump_units) def _set_secondary_pump(self, active: bool) -> None: if self.secondary_pump_active != active: self.secondary_pump_active = active LOGGER.info("Secondary pump %s", "enabled" if active else "stopped") if not active: self.secondary_pump_units = [False] * len(self.secondary_pump_units) elif active and not any(self.secondary_pump_units): self.secondary_pump_units = [True] * len(self.secondary_pump_units) def _toggle_primary_pump_unit(self, index: int, active: bool) -> None: if index < 0 or index >= len(self.primary_pump_units): LOGGER.warning("Ignoring primary pump index %s", index) return if self.primary_pump_units[index] != active: self.primary_pump_units[index] = active LOGGER.info("Primary pump %d %s", index + 1, "enabled" if active else "stopped") if active: self._set_primary_pump(True) elif not any(self.primary_pump_units): self._set_primary_pump(False) def _toggle_secondary_pump_unit(self, index: int, active: bool) -> None: if index < 0 or index >= len(self.secondary_pump_units): LOGGER.warning("Ignoring secondary pump index %s", index) return if self.secondary_pump_units[index] != active: self.secondary_pump_units[index] = active LOGGER.info("Secondary pump %d %s", index + 1, "enabled" if active else "stopped") if active: self._set_secondary_pump(True) elif not any(self.secondary_pump_units): self._set_secondary_pump(False) def _toggle_generator(self, index: int, active: bool, state: PlantState) -> None: if index < 0 or index >= len(self.generators) or index >= len(state.generators): LOGGER.warning("Ignoring generator index %s", index) return gen_state = state.generators[index] if active: self.generators[index].start(gen_state) else: self.generators[index].stop(gen_state) def _trigger_meltdown(self, state: PlantState) -> None: LOGGER.critical("Core meltdown in progress (%.1f K)", state.core.fuel_temperature) self.meltdown = True self.shutdown = True self.control.scram() try: self.health_monitor.component("core").fail() except KeyError: pass self._set_turbine_state(False) def _step_generators(self, state: PlantState, aux_demand: float, turbine_electric: float, dt: float) -> float: # Ensure we have generator state objects aligned with hardware. if not state.generators or len(state.generators) < len(self.generators): missing = len(self.generators) - len(state.generators) for _ in range(missing): state.generators.append( GeneratorState(running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0) ) deficit = max(0.0, aux_demand - turbine_electric) if deficit > 0.0: for idx, gen_state in enumerate(state.generators): if not (gen_state.running or gen_state.starting): self.generators[idx].start(gen_state) deficit -= self.generators[idx].rated_output_mw if deficit <= 0: break elif turbine_electric > aux_demand: for idx, gen_state in enumerate(state.generators): if gen_state.running and not gen_state.starting: self.generators[idx].stop(gen_state) total_power = 0.0 remaining = max(0.0, aux_demand - turbine_electric) for idx, gen_state in enumerate(state.generators): load = remaining if remaining > 0 else 0.0 delivered = self.generators[idx].step(gen_state, load, dt) total_power += delivered remaining = max(0.0, remaining - delivered) return total_power def _set_turbine_state(self, active: bool, index: int | None = None) -> None: if index is None: for idx in range(len(self.turbine_unit_active)): self._set_turbine_state(active, index=idx) return if index < 0 or index >= len(self.turbine_unit_active): LOGGER.warning("Ignoring turbine index %s", index) return if self.turbine_unit_active[index] != active: self.turbine_unit_active[index] = active LOGGER.info("Turbine %d %s", index + 1, "started" if active else "stopped") self.turbine_active = any(self.turbine_unit_active) def _component_index(self, name: str) -> int: if name == "turbine": return 0 parts = name.split("_") try: for token in reversed(parts): return int(token) - 1 except (ValueError, TypeError): return -1 def _perform_maintenance(self, component: str) -> None: if not self._can_maintain(component): return self.health_monitor.maintain(component) def _maintenance_tick(self, state: PlantState, dt: float) -> None: if not self.maintenance_active: return completed: list[str] = [] for component in list(self.maintenance_active): if not self._can_maintain(component): continue restored = self.health_monitor.maintain(component, amount=0.02 * dt) comp_state = self.health_monitor.component(component) if comp_state.integrity >= 0.999: completed.append(component) for comp in completed: self.maintenance_active.discard(comp) LOGGER.info("Maintenance completed for %s", comp) def _toggle_maintenance(self, component: str) -> None: if component in self.maintenance_active: self.maintenance_active.remove(component) LOGGER.info("Maintenance stopped for %s", component) return if not self._can_maintain(component): return self.maintenance_active.add(component) LOGGER.info("Maintenance started for %s", component) def _can_maintain(self, component: str) -> bool: if component == "core" and not self.shutdown: LOGGER.warning("Cannot maintain core while reactor is running") return False if component.startswith("primary_pump_"): idx = self._component_index(component) if idx < 0 or idx >= len(self.primary_pump_units): LOGGER.warning("Unknown primary pump maintenance target %s", component) return False if self.primary_pump_units[idx]: LOGGER.warning("Stop primary pump %d before maintenance", idx + 1) return False if component.startswith("secondary_pump_"): idx = self._component_index(component) if idx < 0 or idx >= len(self.secondary_pump_units): LOGGER.warning("Unknown secondary pump maintenance target %s", component) return False if self.secondary_pump_units[idx]: LOGGER.warning("Stop secondary pump %d before maintenance", idx + 1) return False if component.startswith("generator_"): idx = self._component_index(component) if idx < 0 or idx >= len(self.generators): LOGGER.warning("Unknown generator maintenance target %s", component) return False if component.startswith("turbine"): idx = self._component_index(component) if idx < 0 or idx >= len(self.turbine_unit_active): LOGGER.warning("Unknown turbine maintenance target %s", component) return False if self.turbine_unit_active[idx]: LOGGER.warning("Stop turbine %d before maintenance", idx + 1) return False return True def attach_consumer(self, consumer: ElectricalConsumer) -> None: self.consumer = consumer LOGGER.info("Attached consumer %s (%.1f MW)", consumer.name, consumer.demand_mw) def detach_consumer(self) -> None: if self.consumer: LOGGER.info("Detached consumer %s", self.consumer.name) self.consumer = None def save_state(self, filepath: str, state: PlantState) -> None: metadata = { "primary_pump_active": self.primary_pump_active, "secondary_pump_active": self.secondary_pump_active, "primary_pump_units": self.primary_pump_units, "secondary_pump_units": self.secondary_pump_units, "turbine_active": self.turbine_active, "turbine_units": self.turbine_unit_active, "shutdown": self.shutdown, "meltdown": self.meltdown, "maintenance_active": list(self.maintenance_active), "generators": [ { "running": g.running, "starting": g.starting, "spool_remaining": g.spool_remaining, "power_output_mw": g.power_output_mw, "battery_charge": g.battery_charge, } for g in state.generators ], "consumer": { "online": self.consumer.online if self.consumer else False, "demand_mw": self.consumer.demand_mw if self.consumer else 0.0, "name": self.consumer.name if self.consumer else None, }, } self.control.save_state(filepath, state, metadata, self.health_monitor.snapshot()) def load_state(self, filepath: str) -> PlantState: plant, metadata, health = self.control.load_state(filepath) self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active) self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active) self.primary_pump_units = list(metadata.get("primary_pump_units", self.primary_pump_units)) self.secondary_pump_units = list(metadata.get("secondary_pump_units", self.secondary_pump_units)) unit_states = metadata.get("turbine_units") if unit_states: self.turbine_unit_active = list(unit_states) self.turbine_active = metadata.get("turbine_active", any(self.turbine_unit_active)) self.shutdown = metadata.get("shutdown", self.shutdown) self.meltdown = metadata.get("meltdown", self.meltdown) maint = metadata.get("maintenance_active") if maint is not None: self.maintenance_active = set(maint) consumer_cfg = metadata.get("consumer") if consumer_cfg: if not self.consumer: self.consumer = ElectricalConsumer( name=consumer_cfg.get("name") or "External", demand_mw=consumer_cfg.get("demand_mw", 0.0), online=consumer_cfg.get("online", False), ) else: self.consumer.set_demand(consumer_cfg.get("demand_mw", self.consumer.demand_mw)) self.consumer.set_online(consumer_cfg.get("online", self.consumer.online)) if health: self.health_monitor.load_snapshot(health) LOGGER.info("Reactor state restored from %s", filepath) # Back-fill pump state lists for compatibility. if not plant.primary_pumps or len(plant.primary_pumps) < 2: plant.primary_pumps = [ PumpState(active=self.primary_pump_active, flow_rate=plant.primary_loop.mass_flow_rate / 2, pressure=plant.primary_loop.pressure) for _ in range(2) ] if not plant.secondary_pumps or len(plant.secondary_pumps) < 2: plant.secondary_pumps = [ PumpState( active=self.secondary_pump_active, flow_rate=plant.secondary_loop.mass_flow_rate / 2, pressure=plant.secondary_loop.pressure, ) for _ in range(2) ] if len(plant.turbines) < len(self.turbines): ambient = constants.ENVIRONMENT_TEMPERATURE while len(plant.turbines) < len(self.turbines): plant.turbines.append( TurbineState( steam_enthalpy=2_000.0, shaft_power_mw=0.0, electrical_output_mw=0.0, condenser_temperature=ambient, load_demand_mw=0.0, load_supplied_mw=0.0, ) ) gen_meta = metadata.get("generators", []) if not plant.generators or len(plant.generators) < len(self.generators): while len(plant.generators) < len(self.generators): plant.generators.append( GeneratorState( running=False, starting=False, spool_remaining=0.0, power_output_mw=0.0, battery_charge=1.0, ) ) for idx, gen_state in enumerate(plant.generators): if idx < len(gen_meta): cfg = gen_meta[idx] gen_state.running = cfg.get("running", gen_state.running) gen_state.starting = cfg.get("starting", gen_state.starting) gen_state.spool_remaining = cfg.get("spool_remaining", gen_state.spool_remaining) gen_state.power_output_mw = cfg.get("power_output_mw", gen_state.power_output_mw) gen_state.battery_charge = cfg.get("battery_charge", gen_state.battery_charge) return plant def _handle_heat_sink_loss(self, state: PlantState) -> None: if not self.shutdown: LOGGER.critical("Loss of secondary heat sink detected. Initiating SCRAM.") self.shutdown = True self.control.scram() self._set_turbine_state(False) # Clear turbine output and demands to reflect lost steam. for turbine_state in state.turbines: self._reset_turbine_state(turbine_state) def _check_poison_alerts(self, state: PlantState) -> None: inventory = state.core.fission_product_inventory or {} for symbol, threshold in constants.KEY_POISON_THRESHOLDS.items(): amount = inventory.get(symbol, 0.0) if amount >= threshold and symbol not in self.poison_alerts: self.poison_alerts.add(symbol) LOGGER.warning("Poison level high: %s inventory %.2e exceeds %.2e", symbol, amount, threshold)