Files
Reactor-Sim/src/reactor_sim/dashboard.py
2025-11-22 23:09:39 +01:00

654 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Interactive curses dashboard for real-time reactor operation."""
from __future__ import annotations
import curses
import logging
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from . import constants
from .commands import ReactorCommand
from .reactor import Reactor
from .simulation import ReactorSimulation
from .state import PlantState
LOGGER = logging.getLogger(__name__)
@dataclass
class DashboardKey:
key: str
description: str
class ReactorDashboard:
"""Minimal TUI dashboard allowing user control in real-time."""
def __init__(
self,
reactor: Reactor,
start_state: Optional[PlantState],
timestep: float = 1.0,
save_path: Optional[str] = None,
) -> None:
self.reactor = reactor
self.start_state = start_state
self.timestep = timestep
self.save_path = save_path
self.pending_command: Optional[ReactorCommand] = None
self.sim: Optional[ReactorSimulation] = None
self.quit_requested = False
self.reset_requested = False
self._last_state: Optional[PlantState] = None
self.log_buffer: deque[str] = deque(maxlen=4)
self._log_handler: Optional[logging.Handler] = None
self._previous_handlers: list[logging.Handler] = []
self._logger = logging.getLogger("reactor_sim")
self.help_sections: list[tuple[str, list[DashboardKey]]] = [
(
"Reactor / Safety",
[
DashboardKey("q", "Quit & save"),
DashboardKey("space", "SCRAM"),
DashboardKey("r", "Reset & clear state"),
DashboardKey("a", "Toggle auto rod control"),
DashboardKey("+/-", "Withdraw/insert rods"),
DashboardKey("[/]", "Adjust consumer demand /+50 MW"),
DashboardKey("s/d", "Setpoint /+250 MW"),
],
),
(
"Pumps",
[
DashboardKey("g/h", "Toggle primary pump 1/2"),
DashboardKey("j/k", "Toggle secondary pump 1/2"),
DashboardKey("m/n", "Maintain primary pumps 1/2"),
DashboardKey(",/.", "Maintain secondary pumps 1/2"),
],
),
(
"Generators",
[
DashboardKey("b/v", "Toggle generator 1/2"),
DashboardKey("x", "Toggle generator auto"),
DashboardKey("B/V", "Maintain generator 1/2"),
],
),
(
"Turbines / Grid",
[
DashboardKey("t", "Toggle turbine bank"),
DashboardKey("1/2/3", "Toggle turbine units 1-3"),
DashboardKey("y/u/i", "Maintain turbine 1/2/3"),
DashboardKey("c", "Toggle consumer"),
],
),
]
def run(self) -> None:
curses.wrapper(self._main)
def _main(self, stdscr: "curses._CursesWindow") -> None: # type: ignore[name-defined]
curses.curs_set(0)
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_CYAN, -1)
curses.init_pair(2, curses.COLOR_YELLOW, -1)
curses.init_pair(3, curses.COLOR_GREEN, -1)
curses.init_pair(4, curses.COLOR_RED, -1)
stdscr.nodelay(True)
self._install_log_capture()
try:
while True:
self.sim = ReactorSimulation(
self.reactor,
timestep=self.timestep,
duration=None,
realtime=True,
command_provider=self._next_command,
)
self.sim.start_state = self.start_state
try:
for state in self.sim.run():
self._last_state = state
self._draw(stdscr, state)
self._handle_input(stdscr)
if self.quit_requested or self.reset_requested:
# Persist the latest state if we are exiting early.
if self.sim:
self.sim.last_state = state
self.sim.stop()
break
finally:
if self.save_path and self.sim and self.sim.last_state and not self.reset_requested:
self.reactor.save_state(self.save_path, self.sim.last_state)
if self.quit_requested:
break
if self.reset_requested:
self._clear_saved_state()
self._reset_to_greenfield()
continue
break
finally:
self._restore_logging()
def _handle_input(self, stdscr: "curses._CursesWindow") -> None:
while True:
ch = stdscr.getch()
if ch == -1:
break
if ch in (ord("q"), ord("Q")):
self.quit_requested = True
return
if ch == ord(" "):
self._queue_command(ReactorCommand.scram_all())
elif ch in (ord("p"), ord("P")):
# Deprecated master toggles ignored.
continue
elif ch in (ord("o"), ord("O")):
continue
elif ch in (ord("g"), ord("G")):
self._toggle_primary_pump_unit(0)
elif ch in (ord("h"), ord("H")):
self._toggle_primary_pump_unit(1)
elif ch in (ord("j"), ord("J")):
self._toggle_secondary_pump_unit(0)
elif ch in (ord("k"), ord("K")):
self._toggle_secondary_pump_unit(1)
elif ch in (ord("b"), ord("B")):
self._toggle_generator_unit(0)
elif ch in (ord("v"), ord("V")):
self._toggle_generator_unit(1)
elif ch in (ord("x"), ord("X")):
self._queue_command(ReactorCommand(generator_auto=not self.reactor.generator_auto))
elif ch in (ord("t"), ord("T")):
self._queue_command(ReactorCommand(turbine_on=not self.reactor.turbine_active))
elif ord("1") <= ch <= ord("9"):
idx = ch - ord("1")
self._toggle_turbine_unit(idx)
elif ch in (ord("+"), ord("=")):
# Insert rods (increase fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(0.05)))
elif ch == ord("-"):
# Withdraw rods (decrease fraction)
self._queue_command(ReactorCommand(rod_position=self._clamped_rod(-0.05)))
elif ch == ord("["):
demand = self._current_demand() - 50.0
self._queue_command(ReactorCommand(consumer_demand=max(0.0, demand)))
elif ch == ord("]"):
demand = self._current_demand() + 50.0
self._queue_command(ReactorCommand(consumer_demand=demand))
elif ch in (ord("c"), ord("C")):
online = not (self.reactor.consumer.online if self.reactor.consumer else False)
self._queue_command(ReactorCommand(consumer_online=online))
elif ch in (ord("r"), ord("R")):
self._request_reset()
elif ch in (ord("s"), ord("S")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw - 250.0))
elif ch in (ord("d"), ord("D")):
self._queue_command(ReactorCommand(power_setpoint=self.reactor.control.setpoint_mw + 250.0))
elif ch in (ord("a"), ord("A")):
self._queue_command(ReactorCommand(rod_manual=not self.reactor.control.manual_control))
elif ch in (ord("m"), ord("M")):
self._queue_command(ReactorCommand.maintain("primary_pump_1"))
elif ch in (ord("n"), ord("N")):
self._queue_command(ReactorCommand.maintain("primary_pump_2"))
elif ch in (ord("k"), ord("K")):
self._queue_command(ReactorCommand.maintain("core"))
elif ch == ord(","):
self._queue_command(ReactorCommand.maintain("secondary_pump_1"))
elif ch == ord("."):
self._queue_command(ReactorCommand.maintain("secondary_pump_2"))
elif ch in (ord("B"),):
self._queue_command(ReactorCommand.maintain("generator_1"))
elif ch in (ord("V"),):
self._queue_command(ReactorCommand.maintain("generator_2"))
elif ch in (ord("y"), ord("Y")):
self._queue_command(ReactorCommand.maintain("turbine_1"))
elif ch in (ord("u"), ord("U")):
self._queue_command(ReactorCommand.maintain("turbine_2"))
elif ch in (ord("i"), ord("I")):
self._queue_command(ReactorCommand.maintain("turbine_3"))
def _queue_command(self, command: ReactorCommand) -> None:
if self.pending_command is None:
self.pending_command = command
else:
for field in command.__dataclass_fields__: # type: ignore[attr-defined]
value = getattr(command, field)
if value is None or value is False:
continue
if field == "rod_step" and getattr(self.pending_command, field) is not None:
setattr(
self.pending_command,
field,
getattr(self.pending_command, field) + value,
)
else:
setattr(self.pending_command, field, value)
if self.pending_command.rod_position is not None and self.pending_command.rod_manual is None:
self.pending_command.rod_manual = True
def _toggle_turbine_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.turbine_unit_active):
return
current = self.reactor.turbine_unit_active[index]
self._queue_command(ReactorCommand(turbine_units={index + 1: not current}))
def _toggle_primary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.primary_pump_units):
return
current = self.reactor.primary_pump_units[index]
self._queue_command(ReactorCommand(primary_pumps={index + 1: not current}))
def _toggle_secondary_pump_unit(self, index: int) -> None:
if index < 0 or index >= len(self.reactor.secondary_pump_units):
return
current = self.reactor.secondary_pump_units[index]
self._queue_command(ReactorCommand(secondary_pumps={index + 1: not current}))
def _toggle_generator_unit(self, index: int) -> None:
current = False
if self._last_state and index < len(self._last_state.generators):
gen = self._last_state.generators[index]
current = gen.running or gen.starting
self._queue_command(ReactorCommand(generator_units={index + 1: not current}))
def _request_reset(self) -> None:
self.reset_requested = True
if self.sim:
self.sim.stop()
def _clear_saved_state(self) -> None:
if not self.save_path:
return
path = Path(self.save_path)
try:
path.unlink()
LOGGER.info("Cleared saved state at %s", path)
except FileNotFoundError:
LOGGER.info("No saved state to clear at %s", path)
def _reset_to_greenfield(self) -> None:
LOGGER.info("Resetting reactor to initial state")
self.reactor = Reactor.default()
self.start_state = None
self.pending_command = None
self._last_state = None
self.reset_requested = False
self.log_buffer.clear()
def _next_command(self, _: float, __: PlantState) -> Optional[ReactorCommand]:
cmd = self.pending_command
self.pending_command = None
return cmd
def _draw(self, stdscr: "curses._CursesWindow", state: PlantState) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
min_status = 4
if height < min_status + 12 or width < 70:
stdscr.addstr(
0,
0,
"Terminal too small; try >=70x16 or reduce font size.".ljust(width),
curses.color_pair(4),
)
stdscr.refresh()
return
status_height = min_status
data_height = height - status_height
right_width = max(28, width // 3)
left_width = width - right_width
if left_width < 50:
left_width = min(50, width - 18)
right_width = width - left_width
data_height = max(1, data_height)
left_width = max(1, left_width)
right_width = max(1, right_width)
data_win = stdscr.derwin(data_height, left_width, 0, 0)
help_win = stdscr.derwin(data_height, right_width, 0, left_width)
status_win = stdscr.derwin(status_height, width, data_height, 0)
self._draw_data_panel(data_win, state)
self._draw_help_panel(help_win)
self._draw_status_panel(status_win, state)
stdscr.refresh()
def _draw_data_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.box()
win.addstr(0, 2, " Plant Overview ", curses.color_pair(1) | curses.A_BOLD)
height, width = win.getmaxyx()
inner_height = height - 2
inner_width = width - 2
left_width = max(28, inner_width // 2)
right_width = inner_width - left_width
left_win = win.derwin(inner_height, left_width, 1, 1)
right_win = win.derwin(inner_height, right_width, 1, 1 + left_width)
for row in range(1, height - 1):
win.addch(row, 1 + left_width, curses.ACS_VLINE)
left_win.erase()
right_win.erase()
left_y = 0
left_y = self._draw_section(
left_win,
left_y,
"Core",
[
(
"Fuel Temp",
f"{state.core.fuel_temperature:6.1f} K (Max {constants.CORE_MELTDOWN_TEMPERATURE:4.0f})",
),
(
"Core Power",
f"{state.core.power_output_mw:6.1f} MW (Nom {constants.NORMAL_CORE_POWER_MW:4.0f}/Max {constants.TEST_MAX_POWER_MW:4.0f})",
),
("Neutron Flux", f"{state.core.neutron_flux:10.2e}"),
("Rods", f"{self.reactor.control.rod_fraction:.3f}"),
("Rod Mode", "AUTO" if not self.reactor.control.manual_control else "MANUAL"),
("Setpoint", f"{self.reactor.control.setpoint_mw:7.0f} MW"),
("Reactivity", f"{state.core.reactivity_margin:+.4f}"),
],
)
left_y = self._draw_section(left_win, left_y, "Key Poisons / Emitters", self._poison_lines(state))
left_y = self._draw_section(
left_win,
left_y,
"Primary Loop",
[
("Pump1", self._pump_status(state.primary_pumps, 0)),
("Pump2", self._pump_status(state.primary_pumps, 1)),
(
"Flow",
f"{state.primary_loop.mass_flow_rate:7.0f}/{self.reactor.primary_pump.nominal_flow * len(self.reactor.primary_pump_units):.0f} kg/s",
),
("Inlet Temp", f"{state.primary_loop.temperature_in:7.1f} K"),
("Outlet Temp", f"{state.primary_loop.temperature_out:7.1f} K"),
("Pressure", f"{state.primary_loop.pressure:5.2f} MPa"),
],
)
self._draw_section(
left_win,
left_y,
"Secondary Loop",
[
("Pump1", self._pump_status(state.secondary_pumps, 0)),
("Pump2", self._pump_status(state.secondary_pumps, 1)),
("Flow", f"{state.secondary_loop.mass_flow_rate:7.0f} kg/s"),
("Inlet Temp", f"{state.secondary_loop.temperature_in:7.1f} K"),
("Pressure", f"{state.secondary_loop.pressure:5.2f} MPa"),
("Steam Quality", f"{state.secondary_loop.steam_quality:5.2f}"),
],
)
right_y = 0
consumer_status = "n/a"
consumer_demand = 0.0
if self.reactor.consumer:
consumer_status = "ONLINE" if self.reactor.consumer.online else "OFF"
consumer_demand = self.reactor.consumer.demand_mw
right_y = self._draw_section(
right_win,
right_y,
"Turbine / Grid",
[
("Turbines", " ".join(self._turbine_status_lines())),
("Rated Elec", f"{len(self.reactor.turbines)*self.reactor.turbines[0].rated_output_mw:7.1f} MW"),
("Unit1 Elec", f"{state.turbines[0].electrical_output_mw:7.1f} MW" if state.turbines else "n/a"),
(
"Unit2 Elec",
f"{state.turbines[1].electrical_output_mw:7.1f} MW" if len(state.turbines) > 1 else "n/a",
),
(
"Unit3 Elec",
f"{state.turbines[2].electrical_output_mw:7.1f} MW" if len(state.turbines) > 2 else "n/a",
),
("Electrical", f"{state.total_electrical_output():7.1f} MW"),
("Load", f"{self._total_load_supplied(state):7.1f}/{self._total_load_demand(state):7.1f} MW"),
("Consumer", f"{consumer_status}"),
("Demand", f"{consumer_demand:7.1f} MW"),
],
)
right_y = self._draw_section(right_win, right_y, "Generators", self._generator_lines(state))
right_y = self._draw_section(right_win, right_y, "Power Stats", self._power_lines(state))
right_y = self._draw_section(right_win, right_y, "Generators", self._generator_lines(state))
right_y = self._draw_section(right_win, right_y, "Maintenance", self._maintenance_lines())
self._draw_health_bars(right_win, right_y)
def _draw_help_panel(self, win: "curses._CursesWindow") -> None:
win.erase()
win.box()
win.addstr(0, 2, " Controls ", curses.color_pair(1) | curses.A_BOLD)
y = 2
for title, entries in self.help_sections:
win.addstr(y, 2, title, curses.color_pair(1) | curses.A_BOLD)
y += 1
for entry in entries:
win.addstr(y, 4, f"{entry.key:<8} {entry.description}")
y += 1
y += 1
win.addstr(y, 2, "Tips:", curses.color_pair(2) | curses.A_BOLD)
tips = [
"Start pumps before withdrawing rods.",
"Bring turbine and consumer online after thermal stabilization.",
"Toggle turbine units (1/2/3) for staggered maintenance.",
"Use m/n/,/. for pump maintenance; B/V for generators.",
"Press 'r' to reset/clear state if you want a cold start.",
"Watch component health to avoid automatic trips.",
]
for idx, tip in enumerate(tips, start=y + 2):
win.addstr(idx, 4, f"- {tip}")
def _draw_status_panel(self, win: "curses._CursesWindow", state: PlantState) -> None:
win.erase()
win.hline(0, 0, curses.ACS_HLINE, win.getmaxyx()[1])
turbine_text = " ".join(self._turbine_status_lines())
msg = (
f"Time {state.time_elapsed:7.1f}s | Rods {self.reactor.control.rod_fraction:.3f} | "
f"Primary {'ON' if self.reactor.primary_pump_active else 'OFF'} | "
f"Secondary {'ON' if self.reactor.secondary_pump_active else 'OFF'} | "
f"Turbines {turbine_text}"
)
win.addstr(1, 1, msg, curses.color_pair(3))
if self.reactor.health_monitor.failure_log:
win.addstr(
3,
1,
f"Failures: {', '.join(self.reactor.health_monitor.failure_log)}",
curses.color_pair(4) | curses.A_BOLD,
)
log_y = 3
for record in list(self.log_buffer):
if log_y >= win.getmaxyx()[0] - 1:
break
win.addstr(log_y, 1, record[: win.getmaxyx()[1] - 2], curses.color_pair(2))
log_y += 1
win.addstr(log_y, 1, "Press 'q' to exit and persist the current snapshot.", curses.color_pair(2))
def _draw_section(
self,
win: "curses._CursesWindow",
start_y: int,
title: str,
lines: list[tuple[str, str] | str],
) -> int:
height, width = win.getmaxyx()
inner_width = width - 4
if start_y >= height - 2:
return height - 2
win.addstr(start_y, 2, title[:inner_width], curses.A_BOLD | curses.color_pair(1))
row = start_y + 1
for line in lines:
if row >= height - 1:
break
if isinstance(line, tuple):
label, value = line
text = f"{label:<18}: {value}"
else:
text = line
win.addstr(row, 4, text[:inner_width])
row += 1
return row + 1
def _turbine_status_lines(self) -> list[str]:
if not self.reactor.turbine_unit_active:
return ["n/a"]
lines: list[str] = []
for idx, active in enumerate(self.reactor.turbine_unit_active):
label = f"{idx + 1}:"
status = "ON" if active else "OFF"
if idx < len(getattr(self._last_state, "turbines", [])):
t_state = self._last_state.turbines[idx]
status = getattr(t_state, "status", status)
lines.append(f"{label}{status}")
return lines
def _total_load_supplied(self, state: PlantState) -> float:
return sum(t.load_supplied_mw for t in state.turbines)
def _total_load_demand(self, state: PlantState) -> float:
return sum(t.load_demand_mw for t in state.turbines)
def _poison_lines(self, state: PlantState) -> list[tuple[str, str]]:
inventory = state.core.fission_product_inventory or {}
particles = state.core.emitted_particles or {}
lines: list[tuple[str, str]] = []
def fmt(symbol: str, label: str) -> tuple[str, str]:
qty = inventory.get(symbol, 0.0)
threshold = constants.KEY_POISON_THRESHOLDS.get(symbol)
flag = " !" if threshold is not None and qty >= threshold else ""
return (f"{label}{flag}", f"{qty:9.2e}")
lines.append(fmt("Xe", "Xe (xenon)"))
lines.append(fmt("Sm", "Sm (samarium)"))
lines.append(fmt("I", "I (iodine)"))
lines.append(("Neutrons (src)", f"{particles.get('n', 0.0):9.2e}"))
lines.append(("Gammas", f"{particles.get('gamma', 0.0):9.2e}"))
lines.append(("Alphas", f"{particles.get('alpha', 0.0):9.2e}"))
return lines
def _health_lines(self) -> list[tuple[str, str]]:
comps = self.reactor.health_monitor.components
lines: list[tuple[str, str]] = []
for name, comp in comps.items():
pct = f"{comp.integrity*100:5.1f}%"
state = "FAILED" if comp.failed else pct
lines.append((name, state))
return lines
def _maintenance_lines(self) -> list[tuple[str, str]]:
if not self.reactor.maintenance_active:
return [("Active", "None")]
return [(comp, "IN PROGRESS") for comp in sorted(self.reactor.maintenance_active)]
def _generator_lines(self, state: PlantState) -> list[tuple[str, str]]:
if not state.generators:
return [("Status", "n/a")]
lines: list[tuple[str, str]] = []
control = "AUTO" if self.reactor.generator_auto else "MANUAL"
lines.append(("Control", control))
for idx, gen in enumerate(state.generators):
status = "RUN" if gen.running else "START" if gen.starting else "OFF"
spool = f" spool {gen.spool_remaining:4.1f}s" if gen.starting else ""
lines.append((f"Gen{idx + 1}", f"{status} {gen.power_output_mw:6.1f}/{self.reactor.generators[idx].rated_output_mw:4.0f} MW{spool}"))
lines.append((f" Battery", f"{gen.battery_charge*100:5.1f}%"))
return lines
def _power_lines(self, state: PlantState) -> list[tuple[str, str]]:
draws = getattr(state, "aux_draws", {}) or {}
primary_nom = constants.PUMP_POWER_MW * len(self.reactor.primary_pump_units)
secondary_nom = constants.PUMP_POWER_MW * len(self.reactor.secondary_pump_units)
lines = [
("Base Aux", f"{draws.get('base', 0.0):6.1f}/{constants.BASE_AUX_LOAD_MW:4.1f} MW"),
("Primary Aux", f"{draws.get('primary_pumps', 0.0):6.1f}/{primary_nom:4.1f} MW"),
("Secondary Aux", f"{draws.get('secondary_pumps', 0.0):6.1f}/{secondary_nom:4.1f} MW"),
("Aux Demand", f"{draws.get('total_demand', 0.0):6.1f} MW"),
("Aux Supplied", f"{draws.get('supplied', 0.0):6.1f} MW"),
("Gen Output", f"{draws.get('generator_output', 0.0):6.1f} MW"),
("Turbine Elec", f"{draws.get('turbine_output', 0.0):6.1f} MW"),
]
return lines
def _draw_health_bars(self, win: "curses._CursesWindow", start_y: int) -> int:
height, width = win.getmaxyx()
inner_width = width - 4
if start_y >= height - 2:
return height - 2
win.addstr(start_y, 2, "Component Health", curses.A_BOLD | curses.color_pair(1))
bar_width = max(8, min(inner_width - 18, 40))
label_width = 16
row = start_y + 1
for name, comp in self.reactor.health_monitor.components.items():
if row >= height - 1:
break
label = f"{name:<{label_width}}"
target = 0.0 if comp.failed else comp.integrity
filled = int(bar_width * max(0.0, min(1.0, target)))
bar = "#" * filled + "-" * (bar_width - filled)
color = 3 if comp.integrity > 0.5 else 2 if comp.integrity > 0.2 else 4
win.addstr(row, 4, f"{label}:")
bar_start = 4 + label_width + 1
win.addstr(row, bar_start, bar[:bar_width], curses.color_pair(color))
percent_text = "FAILED" if comp.failed else f"{comp.integrity*100:5.1f}%"
percent_x = min(width - len(percent_text) - 2, bar_start + bar_width + 2)
win.addstr(row, percent_x, percent_text, curses.color_pair(color))
row += 1
return row + 1
def _pump_status(self, pumps: list, index: int) -> str:
if index >= len(pumps):
return "n/a"
state = pumps[index]
status = getattr(state, "status", "ON" if state.active else "OFF")
return f"{status:<8} {state.flow_rate:6.0f} kg/s"
def _current_demand(self) -> float:
if self.reactor.consumer:
return self.reactor.consumer.demand_mw
return 0.0
def _clamped_rod(self, delta: float) -> float:
new_fraction = self.reactor.control.rod_fraction + delta
return max(0.0, min(0.95, new_fraction))
def _install_log_capture(self) -> None:
if self._log_handler:
return
self._previous_handlers = list(self._logger.handlers)
for handler in self._previous_handlers:
self._logger.removeHandler(handler)
handler = _DashboardLogHandler(self.log_buffer)
handler.setFormatter(logging.Formatter("%(levelname)s | %(message)s"))
self._logger.addHandler(handler)
self._logger.propagate = False
self._log_handler = handler
def _restore_logging(self) -> None:
if not self._log_handler:
return
self._logger.removeHandler(self._log_handler)
for handler in self._previous_handlers:
self._logger.addHandler(handler)
self._log_handler = None
self._previous_handlers = []
class _DashboardLogHandler(logging.Handler):
def __init__(self, buffer: deque[str]) -> None:
super().__init__()
self.buffer = buffer
def emit(self, record: logging.LogRecord) -> None:
msg = self.format(record)
self.buffer.append(msg)