162 lines
6.0 KiB
Python
162 lines
6.0 KiB
Python
"""Wear and failure monitoring for reactor components."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, asdict
|
|
import logging
|
|
from typing import Dict, Iterable, List
|
|
|
|
from . import constants
|
|
from .state import PlantState
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class ComponentHealth:
|
|
name: str
|
|
integrity: float = 1.0
|
|
failed: bool = False
|
|
|
|
def degrade(self, amount: float) -> None:
|
|
if self.failed:
|
|
return
|
|
self.integrity = max(0.0, self.integrity - amount)
|
|
if self.integrity <= 0.0:
|
|
self.fail()
|
|
|
|
def fail(self) -> None:
|
|
if not self.failed:
|
|
self.failed = True
|
|
LOGGER.error("Component %s has failed", self.name)
|
|
|
|
def restore(self, amount: float) -> None:
|
|
if amount <= 0.0:
|
|
return
|
|
previous = self.integrity
|
|
self.integrity = min(1.0, self.integrity + amount)
|
|
if self.integrity > 0.05:
|
|
self.failed = False
|
|
LOGGER.info("Maintenance on %s: %.3f -> %.3f", self.name, previous, self.integrity)
|
|
|
|
def snapshot(self) -> dict:
|
|
return asdict(self)
|
|
|
|
@classmethod
|
|
def from_snapshot(cls, data: dict) -> "ComponentHealth":
|
|
return cls(**data)
|
|
|
|
|
|
class HealthMonitor:
|
|
"""Tracks component wear and signals failures."""
|
|
|
|
def __init__(self) -> None:
|
|
self.components: Dict[str, ComponentHealth] = {
|
|
"core": ComponentHealth("core"),
|
|
"primary_pump_1": ComponentHealth("primary_pump_1"),
|
|
"primary_pump_2": ComponentHealth("primary_pump_2"),
|
|
"secondary_pump_1": ComponentHealth("secondary_pump_1"),
|
|
"secondary_pump_2": ComponentHealth("secondary_pump_2"),
|
|
"generator_1": ComponentHealth("generator_1"),
|
|
"generator_2": ComponentHealth("generator_2"),
|
|
}
|
|
for idx in range(3):
|
|
name = f"turbine_{idx + 1}"
|
|
self.components[name] = ComponentHealth(name)
|
|
self.failure_log: list[str] = []
|
|
|
|
def component(self, name: str) -> ComponentHealth:
|
|
return self.components[name]
|
|
|
|
def evaluate(
|
|
self,
|
|
state: PlantState,
|
|
primary_units: Iterable[bool],
|
|
secondary_units: Iterable[bool],
|
|
turbine_active: Iterable[bool],
|
|
generator_states: Iterable,
|
|
dt: float,
|
|
) -> List[str]:
|
|
events: list[str] = []
|
|
turbine_flags = list(turbine_active)
|
|
core = self.component("core")
|
|
core_temp = state.core.fuel_temperature
|
|
temp_stress = max(0.0, (core_temp - 900.0) / max(1e-6, (constants.MAX_CORE_TEMPERATURE - 900.0)))
|
|
base_degrade = 0.0001 * dt
|
|
core.degrade(base_degrade + temp_stress * 0.01 * dt)
|
|
|
|
prim_units = list(primary_units)
|
|
sec_units = list(secondary_units)
|
|
prim_states = state.primary_pumps or []
|
|
sec_states = state.secondary_pumps or []
|
|
for idx, active in enumerate(prim_units):
|
|
comp = self.component(f"primary_pump_{idx + 1}")
|
|
if idx < len(prim_states) and active:
|
|
flow = prim_states[idx].flow_rate
|
|
flow_ratio = 0.0 if flow <= 0 else min(1.0, flow / 9_000.0)
|
|
comp.degrade((0.0002 + (1 - flow_ratio) * 0.005) * dt)
|
|
else:
|
|
comp.degrade(0.0)
|
|
for idx, active in enumerate(sec_units):
|
|
comp = self.component(f"secondary_pump_{idx + 1}")
|
|
if idx < len(sec_states) and active:
|
|
flow = sec_states[idx].flow_rate
|
|
flow_ratio = 0.0 if flow <= 0 else min(1.0, flow / 8_000.0)
|
|
comp.degrade((0.0002 + (1 - flow_ratio) * 0.004) * dt)
|
|
else:
|
|
comp.degrade(0.0)
|
|
|
|
for idx, gen_state in enumerate(generator_states):
|
|
comp = self.component(f"generator_{idx + 1}")
|
|
running = getattr(gen_state, "running", False) or getattr(gen_state, "starting", False)
|
|
if running:
|
|
comp.degrade(0.00015 * dt)
|
|
else:
|
|
comp.degrade(0.0)
|
|
|
|
turbines = state.turbines if hasattr(state, "turbines") else []
|
|
for idx, active in enumerate(turbine_flags):
|
|
name = f"turbine_{idx + 1}"
|
|
component = self.component(name)
|
|
if active and idx < len(turbines):
|
|
turbine_state = turbines[idx]
|
|
demand = turbine_state.load_demand_mw
|
|
supplied = turbine_state.load_supplied_mw
|
|
if demand <= 0:
|
|
load_ratio = 0.0
|
|
else:
|
|
load_ratio = min(1.0, supplied / max(1e-6, demand))
|
|
mismatch = abs(1 - load_ratio)
|
|
stress = 0.00005 + mismatch * 0.0006
|
|
else:
|
|
stress = 0.00002
|
|
component.degrade(stress * dt)
|
|
|
|
for name, component in self.components.items():
|
|
if component.failed and name not in self.failure_log:
|
|
events.append(name)
|
|
self.failure_log.append(name)
|
|
return events
|
|
|
|
def snapshot(self) -> dict:
|
|
return {name: comp.snapshot() for name, comp in self.components.items()}
|
|
|
|
def load_snapshot(self, data: dict) -> None:
|
|
for name, comp_data in data.items():
|
|
mapped = "turbine_1" if name == "turbine" else name
|
|
if mapped in self.components:
|
|
self.components[mapped] = ComponentHealth.from_snapshot(comp_data)
|
|
elif mapped == "primary_pump":
|
|
self.components["primary_pump_1"] = ComponentHealth.from_snapshot(comp_data)
|
|
self.components["primary_pump_2"] = ComponentHealth.from_snapshot(comp_data)
|
|
elif mapped == "secondary_pump":
|
|
self.components["secondary_pump_1"] = ComponentHealth.from_snapshot(comp_data)
|
|
|
|
def maintain(self, component: str, amount: float = 0.05) -> bool:
|
|
comp = self.components.get(component)
|
|
if not comp:
|
|
LOGGER.warning("Maintenance requested for unknown component %s", component)
|
|
return False
|
|
comp.restore(amount)
|
|
return True
|