191 lines
7.8 KiB
Python
191 lines
7.8 KiB
Python
"""Control system for rods and plant automation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from . import constants
|
|
from .state import CoolantLoopState, CoreState, PlantState
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def clamp(value: float, lo: float, hi: float) -> float:
|
|
return max(lo, min(hi, value))
|
|
|
|
|
|
@dataclass
|
|
class ControlSystem:
|
|
setpoint_mw: float = 3_000.0
|
|
rod_fraction: float = 0.5
|
|
manual_control: bool = False
|
|
rod_banks: list[float] = field(default_factory=lambda: [0.5, 0.5, 0.5])
|
|
rod_target: float = 0.5
|
|
|
|
def update_rods(self, state: CoreState, dt: float) -> float:
|
|
if not self.rod_banks or len(self.rod_banks) != len(constants.CONTROL_ROD_BANK_WEIGHTS):
|
|
self.rod_banks = [self.rod_fraction] * len(constants.CONTROL_ROD_BANK_WEIGHTS)
|
|
# Keep manual tweaks in sync with the target.
|
|
self.rod_target = clamp(self.rod_target, 0.0, 0.95)
|
|
if self.manual_control:
|
|
if abs(self.rod_fraction - self.effective_insertion()) > 1e-6:
|
|
self.rod_target = clamp(self.rod_fraction, 0.0, 0.95)
|
|
self._advance_banks(self.rod_target, dt)
|
|
return self.rod_fraction
|
|
error = (state.power_output_mw - self.setpoint_mw) / self.setpoint_mw
|
|
# When power is low (negative error) withdraw rods; when high, insert them.
|
|
adjustment = error * 0.35
|
|
adjustment = clamp(adjustment, -constants.CONTROL_ROD_SPEED * dt, constants.CONTROL_ROD_SPEED * dt)
|
|
self.rod_target = clamp(self.rod_target + adjustment, 0.0, 0.95)
|
|
self._advance_banks(self.rod_target, dt)
|
|
LOGGER.debug("Control rod target=%.3f (error=%.3f)", self.rod_target, error)
|
|
return self.rod_fraction
|
|
|
|
def set_rods(self, fraction: float) -> float:
|
|
self.rod_target = self._quantize_manual(fraction)
|
|
self._advance_banks(self.rod_target, 0.0)
|
|
LOGGER.info("Manual rod target set to %.3f", self.rod_target)
|
|
return self.rod_target
|
|
|
|
def increment_rods(self, delta: float) -> float:
|
|
return self.set_rods(self.rod_fraction + delta)
|
|
|
|
def scram(self) -> float:
|
|
self.rod_target = 0.95
|
|
self.rod_banks = [0.95 for _ in self.rod_banks]
|
|
self._sync_fraction()
|
|
LOGGER.warning("SCRAM: rods fully inserted")
|
|
return self.rod_fraction
|
|
|
|
def set_power_setpoint(self, megawatts: float) -> None:
|
|
previous = self.setpoint_mw
|
|
self.setpoint_mw = clamp(megawatts, 100.0, 4_000.0)
|
|
LOGGER.info("Power setpoint %.0f -> %.0f MW", previous, self.setpoint_mw)
|
|
|
|
def set_manual_mode(self, manual: bool) -> None:
|
|
if self.manual_control != manual:
|
|
self.manual_control = manual
|
|
LOGGER.info("Rod control %s", "manual" if manual else "automatic")
|
|
|
|
def safety_backoff(self, subcooling_margin: float | None, dnb_margin: float | None, dt: float) -> None:
|
|
"""Insert rods proactively when thermal margins are thin."""
|
|
if self.manual_control:
|
|
return
|
|
severity = 0.0
|
|
if subcooling_margin is not None:
|
|
severity = max(severity, max(0.0, 3.0 - subcooling_margin) / 3.0)
|
|
if dnb_margin is not None:
|
|
severity = max(severity, max(0.0, 0.7 - dnb_margin) / 0.7)
|
|
if severity <= 0.0:
|
|
return
|
|
backoff = (0.003 + 0.02 * severity) * dt
|
|
self.rod_target = clamp(self.rod_target + backoff, 0.0, 0.95)
|
|
self._advance_banks(self.rod_target, dt)
|
|
LOGGER.debug("Safety backoff applied: target=%.3f severity=%.2f", self.rod_target, severity)
|
|
|
|
def coolant_demand(
|
|
self,
|
|
primary: CoolantLoopState,
|
|
core_power_mw: float | None = None,
|
|
electrical_output_mw: float | None = None,
|
|
) -> float:
|
|
desired_temp = constants.PRIMARY_OUTLET_TARGET_K
|
|
# Increase demand when outlet is hotter than desired, reduce when cooler.
|
|
temp_error = (primary.temperature_out - desired_temp) / 100.0
|
|
demand = 0.8 + temp_error
|
|
# Keep a light power-proportional floor so both pumps stay spinning without flooding the loop.
|
|
power_floor = 0.0
|
|
if core_power_mw is not None:
|
|
power_fraction = clamp(core_power_mw / constants.NORMAL_CORE_POWER_MW, 0.0, 1.5)
|
|
power_floor = 0.2 + 0.25 * power_fraction
|
|
demand = max(demand, power_floor)
|
|
# At power, keep primary pumps near full speed to preserve pressure/subcooling.
|
|
if core_power_mw is not None and core_power_mw > 500.0:
|
|
demand = max(demand, 0.8)
|
|
elif core_power_mw is not None and core_power_mw > 100.0:
|
|
demand = max(demand, 0.6)
|
|
demand = clamp(demand, 0.0, 1.0)
|
|
LOGGER.debug(
|
|
"Coolant demand %.2f (temp_error=%.2f, power_floor=%.2f) for outlet %.1fK power %.1f MW elec %.1f MW",
|
|
demand,
|
|
temp_error,
|
|
power_floor,
|
|
primary.temperature_out,
|
|
core_power_mw or 0.0,
|
|
electrical_output_mw or 0.0,
|
|
)
|
|
return demand
|
|
|
|
def effective_insertion(self) -> float:
|
|
if not self.rod_banks:
|
|
return self.rod_fraction
|
|
weights = constants.CONTROL_ROD_BANK_WEIGHTS
|
|
total = sum(weights)
|
|
effective = sum(w * b for w, b in zip(weights, self.rod_banks)) / total
|
|
return clamp(effective, 0.0, 0.95)
|
|
|
|
def _advance_banks(self, target: float, dt: float) -> None:
|
|
speed = constants.CONTROL_ROD_SPEED * dt
|
|
new_banks: list[float] = []
|
|
for idx, pos in enumerate(self.rod_banks):
|
|
direction = 1 if target > pos else -1
|
|
step = direction * speed
|
|
updated = clamp(pos + step, 0.0, 0.95)
|
|
# Avoid overshoot
|
|
if (direction > 0 and updated > target) or (direction < 0 and updated < target):
|
|
updated = target
|
|
new_banks.append(updated)
|
|
self.rod_banks = new_banks
|
|
self._sync_fraction()
|
|
|
|
def _sync_fraction(self) -> None:
|
|
self.rod_fraction = self.effective_insertion()
|
|
|
|
def _quantize_manual(self, fraction: float) -> float:
|
|
step = constants.ROD_MANUAL_STEP
|
|
quantized = round(fraction / step) * step
|
|
return clamp(quantized, 0.0, 0.95)
|
|
|
|
|
|
def save_state(
|
|
self,
|
|
filepath: str,
|
|
plant_state: PlantState,
|
|
metadata: dict | None = None,
|
|
health_snapshot: dict | None = None,
|
|
) -> None:
|
|
payload = {
|
|
"control": {
|
|
"setpoint_mw": self.setpoint_mw,
|
|
"rod_fraction": self.rod_fraction,
|
|
"manual_control": self.manual_control,
|
|
"rod_banks": self.rod_banks,
|
|
"rod_target": self.rod_target,
|
|
},
|
|
"plant": plant_state.to_dict(),
|
|
"metadata": metadata or {},
|
|
}
|
|
if health_snapshot:
|
|
payload["health"] = health_snapshot
|
|
path = Path(filepath)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(payload, indent=2))
|
|
LOGGER.info("Saved control & plant state to %s", path)
|
|
|
|
def load_state(self, filepath: str) -> tuple[PlantState, dict, dict | None]:
|
|
path = Path(filepath)
|
|
data = json.loads(path.read_text())
|
|
control = data.get("control", {})
|
|
self.setpoint_mw = control.get("setpoint_mw", self.setpoint_mw)
|
|
self.rod_fraction = control.get("rod_fraction", self.rod_fraction)
|
|
self.manual_control = control.get("manual_control", self.manual_control)
|
|
self.rod_banks = control.get("rod_banks", self.rod_banks) or self.rod_banks
|
|
self.rod_target = control.get("rod_target", self.rod_fraction)
|
|
self._sync_fraction()
|
|
plant = PlantState.from_dict(data["plant"])
|
|
LOGGER.info("Loaded plant state from %s", path)
|
|
return plant, data.get("metadata", {}), data.get("health")
|