feat: add reactor control persistence and tests

This commit is contained in:
Andrii Prokhorov
2025-11-21 17:11:00 +02:00
commit cc7fba4e7a
43 changed files with 1435 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
Metadata-Version: 2.4
Name: fission-sim
Version: 0.1.0
Summary: Simplified nuclear reactor simulation
Author: Fission Sim Team
Requires-Python: >=3.10
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"

View File

@@ -0,0 +1,23 @@
pyproject.toml
src/fission_sim.egg-info/PKG-INFO
src/fission_sim.egg-info/SOURCES.txt
src/fission_sim.egg-info/dependency_links.txt
src/fission_sim.egg-info/requires.txt
src/fission_sim.egg-info/top_level.txt
src/reactor_sim/__init__.py
src/reactor_sim/atomic.py
src/reactor_sim/commands.py
src/reactor_sim/constants.py
src/reactor_sim/consumer.py
src/reactor_sim/control.py
src/reactor_sim/coolant.py
src/reactor_sim/failures.py
src/reactor_sim/fuel.py
src/reactor_sim/logging_utils.py
src/reactor_sim/neutronics.py
src/reactor_sim/reactor.py
src/reactor_sim/simulation.py
src/reactor_sim/state.py
src/reactor_sim/thermal.py
src/reactor_sim/turbine.py
tests/test_simulation.py

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,3 @@
[dev]
pytest>=7.0

View File

@@ -0,0 +1 @@
reactor_sim

View File

@@ -0,0 +1,28 @@
"""Top-level package for the nuclear fission reactor simulation."""
from __future__ import annotations
from .atomic import Atom, AtomicPhysics, FissionEvent
from .commands import ReactorCommand
from .consumer import ElectricalConsumer
from .failures import HealthMonitor
from .logging_utils import configure_logging
from .state import CoreState, CoolantLoopState, TurbineState, PlantState
from .reactor import Reactor
from .simulation import ReactorSimulation
__all__ = [
"Atom",
"AtomicPhysics",
"FissionEvent",
"CoreState",
"CoolantLoopState",
"TurbineState",
"PlantState",
"Reactor",
"ReactorSimulation",
"ReactorCommand",
"ElectricalConsumer",
"HealthMonitor",
"configure_logging",
]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

254
src/reactor_sim/atomic.py Normal file
View File

@@ -0,0 +1,254 @@
"""Simplified atomic physics utilities for fission events."""
from __future__ import annotations
from dataclasses import dataclass
import logging
import math
import random
from typing import Sequence
from . import constants
LOGGER = logging.getLogger(__name__)
ELEMENT_SYMBOLS: Sequence[str] = (
"", # padding for 1-based indexing
"H",
"He",
"Li",
"Be",
"B",
"C",
"N",
"O",
"F",
"Ne",
"Na",
"Mg",
"Al",
"Si",
"P",
"S",
"Cl",
"Ar",
"K",
"Ca",
"Sc",
"Ti",
"V",
"Cr",
"Mn",
"Fe",
"Co",
"Ni",
"Cu",
"Zn",
"Ga",
"Ge",
"As",
"Se",
"Br",
"Kr",
"Rb",
"Sr",
"Y",
"Zr",
"Nb",
"Mo",
"Tc",
"Ru",
"Rh",
"Pd",
"Ag",
"Cd",
"In",
"Sn",
"Sb",
"Te",
"I",
"Xe",
"Cs",
"Ba",
"La",
"Ce",
"Pr",
"Nd",
"Pm",
"Sm",
"Eu",
"Gd",
"Tb",
"Dy",
"Ho",
"Er",
"Tm",
"Yb",
"Lu",
"Hf",
"Ta",
"W",
"Re",
"Os",
"Ir",
"Pt",
"Au",
"Hg",
"Tl",
"Pb",
"Bi",
"Po",
"At",
"Rn",
"Fr",
"Ra",
"Ac",
"Th",
"Pa",
"U",
"Np",
"Pu",
"Am",
"Cm",
"Bk",
"Cf",
"Es",
"Fm",
"Md",
"No",
"Lr",
"Rf",
"Db",
"Sg",
"Bh",
"Hs",
"Mt",
"Ds",
"Rg",
"Cn",
"Nh",
"Fl",
"Mc",
"Lv",
"Ts",
"Og",
)
def element_symbol(atomic_number: int) -> str:
if 0 < atomic_number < len(ELEMENT_SYMBOLS):
return ELEMENT_SYMBOLS[atomic_number]
return f"E{atomic_number}"
@dataclass(frozen=True)
class Atom:
symbol: str
protons: int
neutrons: int
@property
def mass_number(self) -> int:
return self.protons + self.neutrons
@property
def atomic_mass_kg(self) -> float:
return self.mass_number * constants.AMU_TO_KG
@dataclass(frozen=True)
class FissionEvent:
parent: Atom
products: tuple[Atom, Atom]
emitted_neutrons: int
energy_mev: float
def make_atom(protons: int, neutrons: int) -> Atom:
return Atom(symbol=element_symbol(protons), protons=protons, neutrons=neutrons)
FISSION_LIBRARY: dict[tuple[str, int], Sequence[FissionEvent]] = {
("U", 235): (
FissionEvent(
parent=make_atom(92, 143),
products=(make_atom(36, 56), make_atom(56, 85)), # Kr-92 + Ba-141
emitted_neutrons=3,
energy_mev=202.0,
),
FissionEvent(
parent=make_atom(92, 143),
products=(make_atom(37, 55), make_atom(55, 88)), # Rb-92 + Cs-143
emitted_neutrons=2,
energy_mev=195.0,
),
),
("U", 238): (
FissionEvent(
parent=make_atom(92, 146),
products=(make_atom(38, 54), make_atom(54, 90)),
emitted_neutrons=2,
energy_mev=185.0,
),
),
("Pu", 239): (
FissionEvent(
parent=make_atom(94, 145),
products=(make_atom(36, 53), make_atom(58, 89)),
emitted_neutrons=3,
energy_mev=210.0,
),
),
("Th", 232): (
FissionEvent(
parent=make_atom(90, 142),
products=(make_atom(36, 54), make_atom(54, 88)),
emitted_neutrons=2,
energy_mev=178.0,
),
),
}
class AtomicPhysics:
"""Utility that deterministically chooses fission fragments for electron impacts."""
def __init__(self, seed: int | None = None) -> None:
self.random = random.Random(seed)
def electron_induced_fission(self, atom: Atom) -> FissionEvent:
key = (atom.symbol, atom.mass_number)
reactions = FISSION_LIBRARY.get(key)
if reactions:
event = self.random.choice(reactions)
else:
event = self._generic_split(atom)
LOGGER.debug(
"Electron impact fission: %s-%d -> %s-%d + %s-%d + %d n",
atom.symbol,
atom.mass_number,
event.products[0].symbol,
event.products[0].mass_number,
event.products[1].symbol,
event.products[1].mass_number,
event.emitted_neutrons,
)
return event
def _generic_split(self, atom: Atom) -> FissionEvent:
heavy_mass = max(1, math.floor(atom.mass_number * 0.55))
light_mass = atom.mass_number - heavy_mass
heavy_protons = max(1, min(atom.protons - 1, math.floor(atom.protons * 0.55)))
light_protons = atom.protons - heavy_protons
heavy_neutrons = heavy_mass - heavy_protons
light_neutrons = light_mass - light_protons
heavy_atom = make_atom(heavy_protons, heavy_neutrons)
light_atom = make_atom(light_protons, light_neutrons)
emitted_neutrons = max(0, atom.neutrons - heavy_neutrons - light_neutrons)
energy_mev = 0.8 * atom.mass_number
return FissionEvent(
parent=atom,
products=(heavy_atom, light_atom),
emitted_neutrons=emitted_neutrons,
energy_mev=energy_mev,
)

View File

@@ -0,0 +1,26 @@
"""Operator commands for manual reactor control."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class ReactorCommand:
"""Batch of operator actions applied during a simulation step."""
rod_position: float | None = None # Absolute rod insertion 0-0.95
rod_step: float | None = None # Incremental change (-1..1 mapped internally)
scram: bool = False
primary_pump_on: bool | None = None
secondary_pump_on: bool | None = None
turbine_on: bool | None = None
coolant_demand: float | None = None
power_setpoint: float | None = None
consumer_online: bool | None = None
consumer_demand: float | None = None
@classmethod
def scram_all(cls) -> "ReactorCommand":
"""Convenience constructor for an emergency shutdown."""
return cls(scram=True, turbine_on=False, primary_pump_on=True, secondary_pump_on=True)

View File

@@ -0,0 +1,19 @@
"""Physical constants and engineering limits for the simulation."""
from __future__ import annotations
SECONDS_PER_MINUTE = 60.0
MEGAWATT = 1_000_000.0
NEUTRON_LIFETIME = 0.1 # seconds, prompt neutron lifetime surrogate
FUEL_ENERGY_DENSITY = 200.0 * MEGAWATT # J/kg released as heat
COOLANT_HEAT_CAPACITY = 4_200.0 # J/(kg*K) for water/steam
COOLANT_DENSITY = 700.0 # kg/m^3 averaged between phases
MAX_CORE_TEMPERATURE = 1_800.0 # K
MAX_PRESSURE = 15.0 # MPa typical PWR primary loop limit
CONTROL_ROD_SPEED = 0.05 # fraction insertion per second
STEAM_TURBINE_EFFICIENCY = 0.34
GENERATOR_EFFICIENCY = 0.96
ENVIRONMENT_TEMPERATURE = 295.0 # K
AMU_TO_KG = 1.660_539_066_60e-27
MEV_TO_J = 1.602_176_634e-13
ELECTRON_FISSION_CROSS_SECTION = 5e-23 # cm^2, tuned for simulation scale

View File

@@ -0,0 +1,40 @@
"""External electrical consumer/load models."""
from __future__ import annotations
from dataclasses import dataclass
import logging
LOGGER = logging.getLogger(__name__)
@dataclass
class ElectricalConsumer:
name: str
demand_mw: float
online: bool = False
power_received_mw: float = 0.0
def request_power(self) -> float:
return self.demand_mw if self.online else 0.0
def set_demand(self, demand_mw: float) -> None:
previous = self.demand_mw
self.demand_mw = max(0.0, demand_mw)
LOGGER.info("%s demand %.1f -> %.1f MW", self.name, previous, self.demand_mw)
def set_online(self, online: bool) -> None:
if self.online != online:
self.online = online
LOGGER.info("%s %s", self.name, "online" if online else "offline")
def update_power_received(self, supplied_mw: float) -> None:
self.power_received_mw = supplied_mw
if supplied_mw < self.request_power():
LOGGER.warning(
"%s under-supplied: %.1f/%.1f MW",
self.name,
supplied_mw,
self.request_power(),
)

View File

@@ -0,0 +1,90 @@
"""Control system for rods and plant automation."""
from __future__ import annotations
from dataclasses import dataclass
import json
import logging
from pathlib import Path
from . import constants
from .state import CoolantLoopState, CoreState, PlantState
LOGGER = logging.getLogger(__name__)
def clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
@dataclass
class ControlSystem:
setpoint_mw: float = 3_000.0
rod_fraction: float = 0.5
def update_rods(self, state: CoreState, dt: float) -> float:
error = (state.power_output_mw - self.setpoint_mw) / self.setpoint_mw
adjustment = -error * 0.3
adjustment = clamp(adjustment, -constants.CONTROL_ROD_SPEED * dt, constants.CONTROL_ROD_SPEED * dt)
previous = self.rod_fraction
self.rod_fraction = clamp(self.rod_fraction + adjustment, 0.0, 0.95)
LOGGER.debug("Control rods %.3f -> %.3f (error=%.3f)", previous, self.rod_fraction, error)
return self.rod_fraction
def set_rods(self, fraction: float) -> float:
previous = self.rod_fraction
self.rod_fraction = clamp(fraction, 0.0, 0.95)
LOGGER.info("Manual rod set %.3f -> %.3f", previous, self.rod_fraction)
return self.rod_fraction
def increment_rods(self, delta: float) -> float:
return self.set_rods(self.rod_fraction + delta)
def scram(self) -> float:
self.rod_fraction = 0.95
LOGGER.warning("SCRAM: rods fully inserted")
return self.rod_fraction
def set_power_setpoint(self, megawatts: float) -> None:
previous = self.setpoint_mw
self.setpoint_mw = clamp(megawatts, 100.0, 4_000.0)
LOGGER.info("Power setpoint %.0f -> %.0f MW", previous, self.setpoint_mw)
def coolant_demand(self, primary: CoolantLoopState) -> float:
desired_temp = 580.0
error = (primary.temperature_out - desired_temp) / 100.0
demand = clamp(0.8 - error, 0.0, 1.0)
LOGGER.debug("Coolant demand %.2f for outlet %.1fK", demand, primary.temperature_out)
return demand
def save_state(
self,
filepath: str,
plant_state: PlantState,
metadata: dict | None = None,
health_snapshot: dict | None = None,
) -> None:
payload = {
"control": {
"setpoint_mw": self.setpoint_mw,
"rod_fraction": self.rod_fraction,
},
"plant": plant_state.to_dict(),
"metadata": metadata or {},
}
if health_snapshot:
payload["health"] = health_snapshot
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2))
LOGGER.info("Saved control & plant state to %s", path)
def load_state(self, filepath: str) -> tuple[PlantState, dict, dict | None]:
path = Path(filepath)
data = json.loads(path.read_text())
control = data.get("control", {})
self.setpoint_mw = control.get("setpoint_mw", self.setpoint_mw)
self.rod_fraction = control.get("rod_fraction", self.rod_fraction)
plant = PlantState.from_dict(data["plant"])
LOGGER.info("Loaded plant state from %s", path)
return plant, data.get("metadata", {}), data.get("health")

View File

@@ -0,0 +1,27 @@
"""Coolant loop control models."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from .state import CoolantLoopState
LOGGER = logging.getLogger(__name__)
@dataclass
class Pump:
nominal_flow: float
efficiency: float = 0.9
def flow_rate(self, demand: float) -> float:
demand = max(0.0, min(1.0, demand))
return self.nominal_flow * (0.2 + 0.8 * demand) * self.efficiency
def step(self, loop: CoolantLoopState, demand: float) -> None:
loop.mass_flow_rate = self.flow_rate(demand)
loop.pressure = 12.0 * demand + 2.0
LOGGER.debug(
"Pump demand=%.2f -> %.0f kg/s, pressure=%.1f MPa", demand, loop.mass_flow_rate, loop.pressure
)

110
src/reactor_sim/failures.py Normal file
View File

@@ -0,0 +1,110 @@
"""Wear and failure monitoring for reactor components."""
from __future__ import annotations
from dataclasses import dataclass, asdict
import logging
from typing import Dict, Iterable, List
from . import constants
from .state import PlantState
LOGGER = logging.getLogger(__name__)
@dataclass
class ComponentHealth:
name: str
integrity: float = 1.0
failed: bool = False
def degrade(self, amount: float) -> None:
if self.failed:
return
self.integrity = max(0.0, self.integrity - amount)
if self.integrity <= 0.0:
self.fail()
def fail(self) -> None:
if not self.failed:
self.failed = True
LOGGER.error("Component %s has failed", self.name)
def snapshot(self) -> dict:
return asdict(self)
@classmethod
def from_snapshot(cls, data: dict) -> "ComponentHealth":
return cls(**data)
class HealthMonitor:
"""Tracks component wear and signals failures."""
def __init__(self) -> None:
self.components: Dict[str, ComponentHealth] = {
"core": ComponentHealth("core"),
"primary_pump": ComponentHealth("primary_pump"),
"secondary_pump": ComponentHealth("secondary_pump"),
"turbine": ComponentHealth("turbine"),
}
self.failure_log: list[str] = []
def component(self, name: str) -> ComponentHealth:
return self.components[name]
def evaluate(
self,
state: PlantState,
primary_active: bool,
secondary_active: bool,
turbine_active: bool,
dt: float,
) -> List[str]:
events: list[str] = []
core = self.component("core")
core_temp = state.core.fuel_temperature
temp_stress = max(0.0, (core_temp - 900.0) / (constants.MAX_CORE_TEMPERATURE - 900.0))
base_degrade = 0.0001 * dt
core.degrade(base_degrade + temp_stress * 0.01 * dt)
if primary_active:
primary_flow = state.primary_loop.mass_flow_rate
flow_ratio = 0.0 if primary_flow <= 0 else min(1.0, primary_flow / 18_000.0)
self.component("primary_pump").degrade((0.0002 + (1 - flow_ratio) * 0.005) * dt)
else:
self.component("primary_pump").degrade(0.0005 * dt)
if secondary_active:
secondary_flow = state.secondary_loop.mass_flow_rate
flow_ratio = 0.0 if secondary_flow <= 0 else min(1.0, secondary_flow / 16_000.0)
self.component("secondary_pump").degrade((0.0002 + (1 - flow_ratio) * 0.004) * dt)
else:
self.component("secondary_pump").degrade(0.0005 * dt)
if turbine_active:
electrical = state.turbine.electrical_output_mw
load_ratio = (
0.0
if state.turbine.load_demand_mw <= 0
else min(1.0, electrical / max(1e-6, state.turbine.load_demand_mw))
)
stress = 0.0002 + abs(1 - load_ratio) * 0.003
self.component("turbine").degrade(stress * dt)
else:
self.component("turbine").degrade(0.0001 * dt)
for name, component in self.components.items():
if component.failed and name not in self.failure_log:
events.append(name)
self.failure_log.append(name)
return events
def snapshot(self) -> dict:
return {name: comp.snapshot() for name, comp in self.components.items()}
def load_snapshot(self, data: dict) -> None:
for name, comp_data in data.items():
if name in self.components:
self.components[name] = ComponentHealth.from_snapshot(comp_data)

56
src/reactor_sim/fuel.py Normal file
View File

@@ -0,0 +1,56 @@
"""Fuel behavior, burnup, and decay heat modeling."""
from __future__ import annotations
from dataclasses import dataclass, field
import logging
from . import constants
from .atomic import Atom, AtomicPhysics, FissionEvent, make_atom
from .state import CoreState
LOGGER = logging.getLogger(__name__)
def fuel_reactivity_penalty(burnup: float) -> float:
"""Simplistic model that penalizes reactivity as burnup increases."""
# Burnup is 0-1, penalty grows quadratically to mimic depletion.
return 0.4 * burnup**2
def decay_heat_fraction(burnup: float) -> float:
"""Return remaining decay heat fraction relative to nominal power."""
return min(0.07 + 0.2 * burnup, 0.15)
@dataclass
class FuelAssembly:
enrichment: float # fraction U-235
mass_kg: float
fissile_atom: Atom = field(default_factory=lambda: make_atom(92, 143))
atomic_physics: AtomicPhysics = field(default_factory=AtomicPhysics)
def available_energy_j(self, state: CoreState) -> float:
fraction_remaining = max(0.0, 1.0 - state.burnup)
return self.mass_kg * constants.FUEL_ENERGY_DENSITY * fraction_remaining
def simulate_electron_hit(self) -> FissionEvent:
return self.atomic_physics.electron_induced_fission(self.fissile_atom)
def prompt_energy_rate(self, flux: float, control_fraction: float) -> tuple[float, FissionEvent]:
"""Compute MW thermal from prompt fission by sampling atomic physics."""
event = self.simulate_electron_hit()
effective_flux = max(0.0, flux * max(0.0, 1.0 - control_fraction))
atoms = self.mass_kg / self.fissile_atom.atomic_mass_kg
event_rate = effective_flux * constants.ELECTRON_FISSION_CROSS_SECTION * atoms * self.enrichment
power_watts = event_rate * event.energy_mev * constants.MEV_TO_J
power_mw = power_watts / constants.MEGAWATT
LOGGER.debug(
"Prompt fission products %s-%d + %s-%d yielding %.2f MW",
event.products[0].symbol,
event.products[0].mass_number,
event.products[1].symbol,
event.products[1].mass_number,
power_mw,
)
return max(0.0, power_mw), event

View File

@@ -0,0 +1,38 @@
"""Logging helpers for the reactor simulation package."""
from __future__ import annotations
import logging
from typing import Optional
def configure_logging(level: int | str = "INFO", logfile: Optional[str] = None) -> logging.Logger:
"""Configure a package-scoped logger emitting to stdout and optional file."""
resolved_level = logging.getLevelName(level) if isinstance(level, str) else level
logger = logging.getLogger("reactor_sim")
logger.setLevel(resolved_level)
if not logger.handlers:
stream_handler = logging.StreamHandler()
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%H:%M:%S"
)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if logfile:
file_handler = logging.FileHandler(logfile)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
else:
for handler in logger.handlers:
handler.setLevel(resolved_level)
if logfile and not any(isinstance(handler, logging.FileHandler) for handler in logger.handlers):
file_handler = logging.FileHandler(logfile)
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%H:%M:%S"
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Keep package loggers self-contained so host apps can opt-in to propagation.
logger.propagate = False
logging.getLogger().setLevel(resolved_level)
return logger

View File

@@ -0,0 +1,55 @@
"""Neutron balance and reactivity calculations."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from . import constants
from .fuel import fuel_reactivity_penalty
from .state import CoreState
LOGGER = logging.getLogger(__name__)
def temperature_feedback(temp: float) -> float:
"""Negative coefficient: higher temperature lowers reactivity."""
reference = 900.0
coefficient = -5e-5
return coefficient * (temp - reference)
def xenon_poisoning(flux: float) -> float:
return min(0.05, 1e-8 * flux)
@dataclass
class NeutronDynamics:
beta_effective: float = 0.0065
delayed_neutron_fraction: float = 0.0008
def reactivity(self, state: CoreState, control_fraction: float) -> float:
rho = (
0.02 * (1.0 - control_fraction)
+ temperature_feedback(state.fuel_temperature)
- fuel_reactivity_penalty(state.burnup)
- xenon_poisoning(state.neutron_flux)
)
return rho
def flux_derivative(self, state: CoreState, rho: float) -> float:
generation_time = constants.NEUTRON_LIFETIME
beta = self.beta_effective
return ((rho - beta) / generation_time) * state.neutron_flux + 1e5
def step(self, state: CoreState, control_fraction: float, dt: float) -> None:
rho = self.reactivity(state, control_fraction)
d_flux = self.flux_derivative(state, rho)
state.neutron_flux = max(0.0, state.neutron_flux + d_flux * dt)
state.reactivity_margin = rho
LOGGER.debug(
"Neutronics: rho=%.5f, flux=%.2e n/cm2/s, d_flux=%.2e",
rho,
state.neutron_flux,
d_flux,
)

263
src/reactor_sim/reactor.py Normal file
View File

@@ -0,0 +1,263 @@
"""High-level reactor orchestration."""
from __future__ import annotations
from dataclasses import dataclass, field
import logging
from . import constants
from .atomic import AtomicPhysics
from .commands import ReactorCommand
from .coolant import Pump
from .consumer import ElectricalConsumer
from .control import ControlSystem
from .failures import HealthMonitor
from .fuel import FuelAssembly, decay_heat_fraction
from .neutronics import NeutronDynamics
from .state import CoolantLoopState, CoreState, PlantState, TurbineState
from .thermal import ThermalSolver, heat_transfer
from .turbine import SteamGenerator, Turbine
LOGGER = logging.getLogger(__name__)
@dataclass
class Reactor:
fuel: FuelAssembly
neutronics: NeutronDynamics
control: ControlSystem
primary_pump: Pump
secondary_pump: Pump
thermal: ThermalSolver
steam_generator: SteamGenerator
turbine: Turbine
atomic_model: AtomicPhysics
consumer: ElectricalConsumer | None = None
health_monitor: HealthMonitor = field(default_factory=HealthMonitor)
primary_pump_active: bool = True
secondary_pump_active: bool = True
turbine_active: bool = True
shutdown: bool = False
@classmethod
def default(cls) -> "Reactor":
atomic_model = AtomicPhysics()
return cls(
fuel=FuelAssembly(enrichment=0.045, mass_kg=80_000.0, atomic_physics=atomic_model),
neutronics=NeutronDynamics(),
control=ControlSystem(),
primary_pump=Pump(nominal_flow=18_000.0),
secondary_pump=Pump(nominal_flow=16_000.0, efficiency=0.85),
thermal=ThermalSolver(),
steam_generator=SteamGenerator(),
turbine=Turbine(),
atomic_model=atomic_model,
consumer=ElectricalConsumer(name="Grid", demand_mw=800.0, online=False),
health_monitor=HealthMonitor(),
)
def initial_state(self) -> PlantState:
ambient = constants.ENVIRONMENT_TEMPERATURE
core = CoreState(
fuel_temperature=ambient,
neutron_flux=1e5,
reactivity_margin=-0.02,
power_output_mw=0.1,
burnup=0.0,
)
primary = CoolantLoopState(
temperature_in=ambient,
temperature_out=ambient,
pressure=0.5,
mass_flow_rate=0.0,
steam_quality=0.0,
)
secondary = CoolantLoopState(
temperature_in=ambient,
temperature_out=ambient,
pressure=0.5,
mass_flow_rate=0.0,
steam_quality=0.0,
)
turbine = TurbineState(
steam_enthalpy=2_000.0,
shaft_power_mw=0.0,
electrical_output_mw=0.0,
condenser_temperature=ambient,
load_demand_mw=0.0,
load_supplied_mw=0.0,
)
return PlantState(core=core, primary_loop=primary, secondary_loop=secondary, turbine=turbine)
def step(self, state: PlantState, dt: float, command: ReactorCommand | None = None) -> None:
if self.shutdown:
rod_fraction = self.control.rod_fraction
else:
rod_fraction = self.control.update_rods(state.core, dt)
overrides = {}
if command:
overrides = self._apply_command(command, state)
rod_fraction = overrides.get("rod_fraction", rod_fraction)
self.neutronics.step(state.core, rod_fraction, dt)
prompt_power, fission_event = self.fuel.prompt_energy_rate(state.core.neutron_flux, rod_fraction)
decay_power = decay_heat_fraction(state.core.burnup) * state.core.power_output_mw
total_power = prompt_power + decay_power
state.core.power_output_mw = total_power
state.core.update_burnup(dt)
pump_demand = overrides.get("coolant_demand", self.control.coolant_demand(state.primary_loop))
if self.primary_pump_active:
self.primary_pump.step(state.primary_loop, pump_demand)
else:
state.primary_loop.mass_flow_rate = 0.0
state.primary_loop.pressure = 0.5
if self.secondary_pump_active:
self.secondary_pump.step(state.secondary_loop, 0.75)
else:
state.secondary_loop.mass_flow_rate = 0.0
state.secondary_loop.pressure = 0.5
self.thermal.step_core(state.core, state.primary_loop, total_power, dt)
transferred = heat_transfer(state.primary_loop, state.secondary_loop, total_power)
self.thermal.step_secondary(state.secondary_loop, transferred)
if self.turbine_active:
self.turbine.step(state.secondary_loop, state.turbine, self.consumer)
else:
state.turbine.shaft_power_mw = 0.0
state.turbine.electrical_output_mw = 0.0
if self.consumer:
self.consumer.update_power_received(0.0)
failures = self.health_monitor.evaluate(
state,
self.primary_pump_active,
self.secondary_pump_active,
self.turbine_active,
dt,
)
for failure in failures:
self._handle_failure(failure)
state.time_elapsed += dt
LOGGER.info(
(
"t=%5.1fs rods=%.2f core_power=%.1fMW prompt=%.1fMW :: "
"%s-%d + %s-%d, outlet %.1fK, electrical %.1fMW load %.1f/%.1fMW"
),
state.time_elapsed,
rod_fraction,
total_power,
prompt_power,
fission_event.products[0].symbol,
fission_event.products[0].mass_number,
fission_event.products[1].symbol,
fission_event.products[1].mass_number,
state.primary_loop.temperature_out,
state.turbine.electrical_output_mw,
state.turbine.load_supplied_mw,
state.turbine.load_demand_mw,
)
def _handle_failure(self, component: str) -> None:
if component == "core":
LOGGER.critical("Core failure detected. Initiating SCRAM.")
self.shutdown = True
self.control.scram()
elif component == "primary_pump":
self._set_primary_pump(False)
elif component == "secondary_pump":
self._set_secondary_pump(False)
elif component == "turbine":
self._set_turbine_state(False)
def _apply_command(self, command: ReactorCommand, state: PlantState) -> dict[str, float]:
overrides: dict[str, float] = {}
if command.scram:
self.shutdown = True
overrides["rod_fraction"] = self.control.scram()
self._set_turbine_state(False)
if command.power_setpoint is not None:
self.control.set_power_setpoint(command.power_setpoint)
if command.rod_position is not None:
overrides["rod_fraction"] = self.control.set_rods(command.rod_position)
self.shutdown = self.shutdown or command.rod_position >= 0.95
elif command.rod_step is not None:
overrides["rod_fraction"] = self.control.increment_rods(command.rod_step)
if command.primary_pump_on is not None:
self._set_primary_pump(command.primary_pump_on)
if command.secondary_pump_on is not None:
self._set_secondary_pump(command.secondary_pump_on)
if command.turbine_on is not None:
self._set_turbine_state(command.turbine_on)
if command.consumer_online is not None and self.consumer:
self.consumer.set_online(command.consumer_online)
if command.consumer_demand is not None and self.consumer:
self.consumer.set_demand(command.consumer_demand)
if command.coolant_demand is not None:
overrides["coolant_demand"] = max(0.0, min(1.0, command.coolant_demand))
return overrides
def _set_primary_pump(self, active: bool) -> None:
if self.primary_pump_active != active:
self.primary_pump_active = active
LOGGER.info("Primary pump %s", "enabled" if active else "stopped")
def _set_secondary_pump(self, active: bool) -> None:
if self.secondary_pump_active != active:
self.secondary_pump_active = active
LOGGER.info("Secondary pump %s", "enabled" if active else "stopped")
def _set_turbine_state(self, active: bool) -> None:
if self.turbine_active != active:
self.turbine_active = active
LOGGER.info("Turbine %s", "started" if active else "stopped")
def attach_consumer(self, consumer: ElectricalConsumer) -> None:
self.consumer = consumer
LOGGER.info("Attached consumer %s (%.1f MW)", consumer.name, consumer.demand_mw)
def detach_consumer(self) -> None:
if self.consumer:
LOGGER.info("Detached consumer %s", self.consumer.name)
self.consumer = None
def save_state(self, filepath: str, state: PlantState) -> None:
metadata = {
"primary_pump_active": self.primary_pump_active,
"secondary_pump_active": self.secondary_pump_active,
"turbine_active": self.turbine_active,
"shutdown": self.shutdown,
"consumer": {
"online": self.consumer.online if self.consumer else False,
"demand_mw": self.consumer.demand_mw if self.consumer else 0.0,
"name": self.consumer.name if self.consumer else None,
},
}
self.control.save_state(filepath, state, metadata, self.health_monitor.snapshot())
def load_state(self, filepath: str) -> PlantState:
plant, metadata, health = self.control.load_state(filepath)
self.primary_pump_active = metadata.get("primary_pump_active", self.primary_pump_active)
self.secondary_pump_active = metadata.get("secondary_pump_active", self.secondary_pump_active)
self.turbine_active = metadata.get("turbine_active", self.turbine_active)
self.shutdown = metadata.get("shutdown", self.shutdown)
consumer_cfg = metadata.get("consumer")
if consumer_cfg:
if not self.consumer:
self.consumer = ElectricalConsumer(
name=consumer_cfg.get("name") or "External",
demand_mw=consumer_cfg.get("demand_mw", 0.0),
online=consumer_cfg.get("online", False),
)
else:
self.consumer.set_demand(consumer_cfg.get("demand_mw", self.consumer.demand_mw))
self.consumer.set_online(consumer_cfg.get("online", self.consumer.online))
if health:
self.health_monitor.load_snapshot(health)
LOGGER.info("Reactor state restored from %s", filepath)
return plant

View File

@@ -0,0 +1,98 @@
"""Reactor simulation harness and CLI."""
from __future__ import annotations
import copy
import json
import logging
import os
from dataclasses import dataclass, field
from threading import Event
import time
from typing import Callable, Iterable, Optional
from .commands import ReactorCommand
from .logging_utils import configure_logging
from .reactor import Reactor
from .state import PlantState
LOGGER = logging.getLogger(__name__)
CommandProvider = Callable[[float, PlantState], Optional[ReactorCommand]]
@dataclass
class ReactorSimulation:
reactor: Reactor
timestep: float = 1.0
duration: float | None = 3600.0
command_provider: CommandProvider | None = None
realtime: bool = False
stop_event: Event = field(default_factory=Event)
start_state: PlantState | None = None
last_state: PlantState | None = field(default=None, init=False)
def run(self) -> Iterable[PlantState]:
state = copy.deepcopy(self.start_state) if self.start_state else self.reactor.initial_state()
elapsed = 0.0
last_step_wall = time.time()
while self.duration is None or elapsed < self.duration:
if self.stop_event.is_set():
LOGGER.info("Stop signal received, terminating simulation loop")
break
snapshot = copy.deepcopy(state)
yield snapshot
command = self.command_provider(elapsed, snapshot) if self.command_provider else None
self.reactor.step(state, self.timestep, command)
elapsed += self.timestep
if self.realtime:
wall_elapsed = time.time() - last_step_wall
sleep_time = self.timestep - wall_elapsed
if sleep_time > 0:
time.sleep(sleep_time)
last_step_wall = time.time()
self.last_state = state
LOGGER.info("Simulation complete, %.0fs simulated", elapsed)
def log(self) -> list[dict[str, float]]:
return [snapshot for snapshot in (s.snapshot() for s in self.run())]
def stop(self) -> None:
self.stop_event.set()
def main() -> None:
log_level = os.getenv("FISSION_LOG_LEVEL", "INFO")
log_file = os.getenv("FISSION_LOG_FILE")
configure_logging(log_level, log_file)
realtime = os.getenv("FISSION_REALTIME", "0") == "1"
duration_env = os.getenv("FISSION_SIM_DURATION")
if duration_env:
duration = None if duration_env.lower() in {"none", "infinite"} else float(duration_env)
else:
duration = None if realtime else 600.0
reactor = Reactor.default()
sim = ReactorSimulation(reactor, timestep=5.0, duration=duration, realtime=realtime)
load_path = os.getenv("FISSION_LOAD_STATE")
save_path = os.getenv("FISSION_SAVE_STATE")
if load_path:
sim.start_state = reactor.load_state(load_path)
try:
if realtime:
LOGGER.info("Running in real-time mode (Ctrl+C to stop)...")
for _ in sim.run():
pass
else:
snapshots = sim.log()
LOGGER.info("Captured %d snapshots", len(snapshots))
print(json.dumps(snapshots[-5:], indent=2))
except KeyboardInterrupt:
sim.stop()
LOGGER.warning("Simulation interrupted by user")
finally:
if save_path and sim.last_state:
reactor.save_state(save_path, sim.last_state)
if __name__ == "__main__":
main()

77
src/reactor_sim/state.py Normal file
View File

@@ -0,0 +1,77 @@
"""Dataclasses that capture the thermal-hydraulic state of the plant."""
from __future__ import annotations
from dataclasses import dataclass, field, asdict
def clamp(value: float, min_value: float, max_value: float) -> float:
return max(min_value, min(max_value, value))
@dataclass
class CoreState:
fuel_temperature: float # Kelvin
neutron_flux: float # neutrons/cm^2-s equivalent
reactivity_margin: float # delta rho
power_output_mw: float # MW thermal
burnup: float # fraction of fuel consumed
def update_burnup(self, dt: float) -> None:
produced_energy_mwh = self.power_output_mw * (dt / 3600.0)
self.burnup = clamp(self.burnup + produced_energy_mwh * 1e-5, 0.0, 0.99)
@dataclass
class CoolantLoopState:
temperature_in: float # K
temperature_out: float # K
pressure: float # MPa
mass_flow_rate: float # kg/s
steam_quality: float # fraction of vapor
def average_temperature(self) -> float:
return 0.5 * (self.temperature_in + self.temperature_out)
@dataclass
class TurbineState:
steam_enthalpy: float # kJ/kg
shaft_power_mw: float
electrical_output_mw: float
condenser_temperature: float
load_demand_mw: float = 0.0
load_supplied_mw: float = 0.0
@dataclass
class PlantState:
core: CoreState
primary_loop: CoolantLoopState
secondary_loop: CoolantLoopState
turbine: TurbineState
time_elapsed: float = field(default=0.0)
def snapshot(self) -> dict[str, float]:
return {
"time_elapsed": self.time_elapsed,
"core_temp": self.core.fuel_temperature,
"core_power": self.core.power_output_mw,
"neutron_flux": self.core.neutron_flux,
"primary_outlet_temp": self.primary_loop.temperature_out,
"secondary_pressure": self.secondary_loop.pressure,
"turbine_electric": self.turbine.electrical_output_mw,
}
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "PlantState":
return cls(
core=CoreState(**data["core"]),
primary_loop=CoolantLoopState(**data["primary_loop"]),
secondary_loop=CoolantLoopState(**data["secondary_loop"]),
turbine=TurbineState(**data["turbine"]),
time_elapsed=data.get("time_elapsed", 0.0),
)

View File

@@ -0,0 +1,55 @@
"""Thermal hydraulics approximations."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from . import constants
from .state import CoolantLoopState, CoreState
LOGGER = logging.getLogger(__name__)
def heat_transfer(primary: CoolantLoopState, secondary: CoolantLoopState, core_power_mw: float) -> float:
"""Return MW transferred to the secondary loop."""
delta_t = max(0.0, primary.temperature_out - secondary.temperature_in)
conductance = 0.05 # steam generator effectiveness
transferred = min(core_power_mw, conductance * delta_t)
LOGGER.debug("Heat transfer %.2f MW with ΔT=%.1fK", transferred, delta_t)
return transferred
def temperature_rise(power_mw: float, mass_flow: float) -> float:
if mass_flow <= 0:
return 0.0
return (power_mw * constants.MEGAWATT) / (mass_flow * constants.COOLANT_HEAT_CAPACITY)
@dataclass
class ThermalSolver:
primary_volume_m3: float = 300.0
def step_core(self, core: CoreState, primary: CoolantLoopState, power_mw: float, dt: float) -> None:
temp_rise = temperature_rise(power_mw, primary.mass_flow_rate)
primary.temperature_out = primary.temperature_in + temp_rise
core.fuel_temperature += 0.1 * (power_mw - temp_rise) * dt
core.fuel_temperature = min(core.fuel_temperature, constants.MAX_CORE_TEMPERATURE)
LOGGER.debug(
"Primary loop: flow=%.0f kg/s temp_out=%.1fK core_temp=%.1fK",
primary.mass_flow_rate,
primary.temperature_out,
core.fuel_temperature,
)
def step_secondary(self, secondary: CoolantLoopState, transferred_mw: float) -> None:
delta_t = temperature_rise(transferred_mw, secondary.mass_flow_rate)
secondary.temperature_out = secondary.temperature_in + delta_t
secondary.steam_quality = min(1.0, max(0.0, delta_t / 100.0))
secondary.pressure = min(constants.MAX_PRESSURE, 6.0 + delta_t * 0.01)
LOGGER.debug(
"Secondary loop: transferred=%.1fMW temp_out=%.1fK quality=%.2f",
transferred_mw,
secondary.temperature_out,
secondary.steam_quality,
)

View File

@@ -0,0 +1,69 @@
"""Steam generator and turbine performance models."""
from __future__ import annotations
from dataclasses import dataclass
import logging
from . import constants
from typing import Optional
from .state import CoolantLoopState, TurbineState
from .consumer import ElectricalConsumer
LOGGER = logging.getLogger(__name__)
@dataclass
class SteamGenerator:
drum_volume_m3: float = 200.0
def steam_enthalpy(self, loop: CoolantLoopState) -> float:
base = 2_700.0 # kJ/kg saturated steam
quality_adjustment = 500.0 * loop.steam_quality
return base + quality_adjustment
@dataclass
class Turbine:
generator_efficiency: float = constants.GENERATOR_EFFICIENCY
mechanical_efficiency: float = constants.STEAM_TURBINE_EFFICIENCY
def step(
self,
loop: CoolantLoopState,
state: TurbineState,
consumer: Optional[ElectricalConsumer] = None,
) -> None:
enthalpy = 2_700.0 + loop.steam_quality * 600.0
mass_flow = loop.mass_flow_rate * 0.6
shaft_power_mw = (enthalpy * mass_flow / 1_000.0) * self.mechanical_efficiency / 1_000.0
electrical = shaft_power_mw * self.generator_efficiency
if consumer:
load_demand = consumer.request_power()
supplied = min(electrical, load_demand)
consumer.update_power_received(supplied)
LOGGER.debug(
"Consumer %s demand %.1f -> supplied %.1f MW",
consumer.name,
load_demand,
supplied,
)
else:
load_demand = 0.0
supplied = 0.0
condenser_temp = max(305.0, loop.temperature_in - 20.0)
state.steam_enthalpy = enthalpy
state.shaft_power_mw = shaft_power_mw
state.electrical_output_mw = electrical
state.condenser_temperature = condenser_temp
state.load_demand_mw = load_demand
state.load_supplied_mw = supplied
LOGGER.debug(
"Turbine output: shaft=%.1fMW electrical=%.1fMW condenser=%.1fK load %.1f/%.1f",
shaft_power_mw,
electrical,
condenser_temp,
supplied,
load_demand,
)