"""Reactor simulation harness and CLI.""" from __future__ import annotations import copy import json import logging import os from dataclasses import dataclass, field from pathlib import Path from threading import Event import time from typing import Callable, Iterable, Optional from .commands import ReactorCommand from .logging_utils import configure_logging from .reactor import Reactor from .state import PlantState LOGGER = logging.getLogger(__name__) CommandProvider = Callable[[float, PlantState], Optional[ReactorCommand]] def _default_state_path() -> Path: custom = os.getenv("FISSION_STATE_PATH") return Path(custom) if custom else Path("artifacts/last_state.json") def _poison_log_path() -> Path: custom = os.getenv("FISSION_POISON_LOG") return Path(custom) if custom else Path("artifacts/poisons.json") def _write_poison_log(path: Path, state: PlantState | None, snapshots: list[dict[str, float]] | None = None) -> None: if state is None and snapshots: latest = snapshots[-1] inventory = latest.get("products", {}) particles = latest.get("particles", {}) time_elapsed = latest.get("time_elapsed", 0.0) elif state is not None: inventory = state.core.fission_product_inventory particles = state.core.emitted_particles time_elapsed = state.time_elapsed else: return path.parent.mkdir(parents=True, exist_ok=True) payload = {"time_elapsed": time_elapsed, "products": inventory, "particles": particles} path.write_text(json.dumps(payload, indent=2)) LOGGER.info("Wrote poison snapshot to %s", path) @dataclass class ReactorSimulation: reactor: Reactor timestep: float = 1.0 duration: float | None = 3600.0 command_provider: CommandProvider | None = None realtime: bool = False stop_event: Event = field(default_factory=Event) start_state: PlantState | None = None last_state: PlantState | None = field(default=None, init=False) def run(self) -> Iterable[PlantState]: state = copy.deepcopy(self.start_state) if self.start_state else self.reactor.initial_state() elapsed = 0.0 last_step_wall = time.time() while self.duration is None or elapsed < self.duration: if self.stop_event.is_set(): LOGGER.info("Stop signal received, terminating simulation loop") break snapshot = copy.deepcopy(state) yield snapshot command = self.command_provider(elapsed, snapshot) if self.command_provider else None self.reactor.step(state, self.timestep, command) elapsed += self.timestep if self.realtime: wall_elapsed = time.time() - last_step_wall sleep_time = self.timestep - wall_elapsed if sleep_time > 0: time.sleep(sleep_time) last_step_wall = time.time() self.last_state = state LOGGER.info("Simulation complete, %.0fs simulated", elapsed) def log(self) -> list[dict[str, float]]: return [snapshot for snapshot in (s.snapshot() for s in self.run())] def stop(self) -> None: self.stop_event.set() def main() -> None: log_level = os.getenv("FISSION_LOG_LEVEL", "INFO") log_file = os.getenv("FISSION_LOG_FILE") configure_logging(log_level, log_file) realtime = os.getenv("FISSION_REALTIME", "0") == "1" alternate_dashboard = os.getenv("ALTERNATE_DASHBOARD", "0") == "1" duration_env = os.getenv("FISSION_SIM_DURATION") if duration_env: duration = None if duration_env.lower() in {"none", "infinite"} else float(duration_env) else: duration = None if realtime else 600.0 reactor = Reactor.default() dashboard_mode = os.getenv("FISSION_DASHBOARD", "0") == "1" timestep_env = os.getenv("FISSION_TIMESTEP") if timestep_env: timestep = float(timestep_env) else: timestep = 1.0 if dashboard_mode or realtime else 5.0 sim = ReactorSimulation(reactor, timestep=timestep, duration=duration, realtime=realtime) state_path = _default_state_path() load_path = os.getenv("FISSION_LOAD_STATE") if not load_path and state_path.exists(): load_path = str(state_path) save_path = os.getenv("FISSION_SAVE_STATE") or str(state_path) if load_path: sim.start_state = reactor.load_state(load_path) if dashboard_mode: if alternate_dashboard: try: from .rich_dashboard import RichDashboard except ImportError as exc: # pragma: no cover - optional dependency LOGGER.error("Rich dashboard requested but 'rich' is not installed: %s", exc) return dashboard = RichDashboard( reactor, start_state=sim.start_state, timestep=sim.timestep, save_path=save_path, ) dashboard.run() return else: from .dashboard import ReactorDashboard dashboard = ReactorDashboard( reactor, start_state=sim.start_state, timestep=sim.timestep, save_path=save_path, ) dashboard.run() return try: if realtime: LOGGER.info("Running in real-time mode (Ctrl+C to stop)...") for _ in sim.run(): pass else: snapshots = sim.log() LOGGER.info("Captured %d snapshots", len(snapshots)) print(json.dumps(snapshots[-5:], indent=2)) _write_poison_log(_poison_log_path(), sim.last_state, snapshots) except KeyboardInterrupt: sim.stop() LOGGER.warning("Simulation interrupted by user") finally: if save_path and sim.last_state: reactor.save_state(save_path, sim.last_state) if __name__ == "__main__": main()