from __future__ import annotations
import time
import datetime
import threading
from pathlib import Path
from typing import List
import numpy as np
import pyvisa
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt import NavigationToolbar2QT as NavigationToolbar
from PyQt5.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox,
QLineEdit, QMessageBox, QSplitter, QCheckBox, QGroupBox
)
from PyQt5.QtCore import QThread, pyqtSignal, pyqtSlot, Qt
from dlab.core.device_registry import REGISTRY
from dlab.hardware.wrappers.powermeter_controller import PowermeterController
from dlab.utils.paths_utils import data_dir
from dlab.utils.log_panel import LogPanel
REGISTRY_KEY_PREFIX = "powermeter:thorlabs"
class _LivePowerThread(QThread):
"""Background thread for continuous power reading."""
power_signal = pyqtSignal(float, float)
error_signal = pyqtSignal(str)
def __init__(self, ctrl: PowermeterController, period_s: float):
super().__init__()
self._ctrl = ctrl
self._period = float(max(0.02, period_s))
self._running = True
self._lock = threading.Lock()
def update_period(self, period_s: float) -> None:
with self._lock:
self._period = float(max(0.02, period_s))
def stop(self) -> None:
self._running = False
def run(self) -> None:
while self._running:
try:
with self._lock:
period = self._period
val = float(self._ctrl.read_power())
ts = time.time()
self.power_signal.emit(ts, val)
time.sleep(period)
except Exception as e:
self.error_signal.emit(str(e))
break
[docs]
class PowermeterLiveWindow(QWidget):
"""Live view window for Thorlabs powermeter."""
closed = pyqtSignal()
def __init__(self, log_panel: LogPanel | None = None):
super().__init__()
self.setWindowTitle("PowermeterLiveWindow")
self.setAttribute(Qt.WA_DeleteOnClose)
self._log = log_panel
self._ctrl: PowermeterController | None = None
self._capture_thread: _LivePowerThread | None = None
self._registry_key: str | None = None
# Data buffers
self._t: List[float] = []
self._y: List[float] = []
self._init_ui()
try:
REGISTRY.register("ui:powermeter_live", self)
except Exception:
pass
def _init_ui(self):
main = QHBoxLayout(self)
main.setContentsMargins(6, 6, 6, 6)
main.setSpacing(6)
splitter = QSplitter()
# Left panel - controls
left = QWidget()
left_l = QVBoxLayout(left)
left_l.setContentsMargins(6, 6, 6, 6)
left_l.setSpacing(6)
left.setMaximumWidth(230)
# Powermeter selection
self._pm_combo = QComboBox()
btn_search = QPushButton("Search Powermeters")
btn_search.clicked.connect(self._search_powermeters)
left_l.addWidget(QLabel("Select Powermeter:"))
left_l.addWidget(self._pm_combo)
left_l.addWidget(btn_search)
# Activate/Deactivate
self._activate_btn = QPushButton("Activate")
self._deactivate_btn = QPushButton("Deactivate")
self._deactivate_btn.setEnabled(False)
self._activate_btn.clicked.connect(self._activate_hardware)
self._deactivate_btn.clicked.connect(self._deactivate_hardware)
left_l.addWidget(self._activate_btn)
left_l.addWidget(self._deactivate_btn)
# Settings
self._wl_combo = QComboBox()
self._wl_combo.addItems(["1030", "515", "343"])
self._avg_edit = QLineEdit("1")
self._period_edit = QLineEdit("0.1")
self._win_edit = QLineEdit("10")
for w in (self._wl_combo, self._avg_edit, self._period_edit, self._win_edit, self._pm_combo):
w.setMaximumWidth(110)
left_l.addWidget(QLabel("Wavelength (nm):"))
left_l.addWidget(self._wl_combo)
left_l.addWidget(QLabel("Averaging count:"))
left_l.addWidget(self._avg_edit)
left_l.addWidget(QLabel("Sampling period (s):"))
left_l.addWidget(self._period_edit)
left_l.addWidget(QLabel("Time window (s):"))
left_l.addWidget(self._win_edit)
# Y range group
range_grp = QGroupBox("Y range")
rg_l = QVBoxLayout(range_grp)
rg_l.setContentsMargins(6, 6, 6, 6)
rg_l.setSpacing(4)
self._ylim_cb = QCheckBox("Choose range")
self._ylim_cb.toggled.connect(self._toggle_ylim_inputs)
row_min = QHBoxLayout()
row_max = QHBoxLayout()
self._ymin_edit = QLineEdit("")
self._ymin_edit.setMaximumWidth(90)
self._ymax_edit = QLineEdit("")
self._ymax_edit.setMaximumWidth(90)
self._ymin_edit.setEnabled(False)
self._ymax_edit.setEnabled(False)
row_min.addWidget(QLabel("Power min:"))
row_min.addWidget(self._ymin_edit, 1)
row_max.addWidget(QLabel("Power max:"))
row_max.addWidget(self._ymax_edit, 1)
rg_l.addWidget(self._ylim_cb)
rg_l.addLayout(row_min)
rg_l.addLayout(row_max)
left_l.addWidget(range_grp)
# Comment
left_l.addWidget(QLabel("Comment:"))
self._comment_edit = QLineEdit("")
self._comment_edit.setMaximumWidth(200)
left_l.addWidget(self._comment_edit)
# Apply button
btn_apply = QPushButton("Apply Settings")
btn_apply.clicked.connect(self._apply_settings)
left_l.addWidget(btn_apply)
# Start/Stop buttons
self._start_btn = QPushButton("Start Live")
self._start_btn.clicked.connect(self._start_live)
self._stop_btn = QPushButton("Stop Live")
self._stop_btn.setEnabled(False)
self._stop_btn.clicked.connect(self._stop_live)
left_l.addWidget(self._start_btn)
left_l.addWidget(self._stop_btn)
left_l.addStretch()
splitter.addWidget(left)
# Right panel - plot
right = QWidget()
right_l = QVBoxLayout(right)
right_l.setContentsMargins(6, 6, 6, 6)
right_l.setSpacing(6)
# Header with power display
header = QWidget()
header_l = QHBoxLayout(header)
header_l.setContentsMargins(0, 0, 0, 0)
header_l.setSpacing(12)
self._lbl_big_now = QLabel("—")
self._lbl_big_max = QLabel("—")
self._lbl_big_mean = QLabel("—")
self._lbl_big_now.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self._lbl_big_max.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self._lbl_big_mean.setAlignment(Qt.AlignLeft | Qt.AlignVCenter)
self._lbl_big_now.setStyleSheet("font-size: 36pt; font-weight: 800;")
self._lbl_big_max.setStyleSheet("font-size: 24pt; font-weight: 700;")
self._lbl_big_mean.setStyleSheet("font-size: 24pt; font-weight: 700;")
lab_now = QLabel("Power:")
lab_now.setStyleSheet("font-size: 12pt;")
lab_max = QLabel("Max:")
lab_max.setStyleSheet("font-size: 12pt;")
lab_mean = QLabel("Mean:")
lab_mean.setStyleSheet("font-size: 12pt;")
self._save_window_btn = QPushButton("Save Window")
self._save_window_btn.clicked.connect(self._save_current_window)
header_l.addWidget(lab_now)
header_l.addWidget(self._lbl_big_now, 1)
header_l.addSpacing(8)
header_l.addWidget(lab_max)
header_l.addWidget(self._lbl_big_max)
header_l.addSpacing(8)
header_l.addWidget(lab_mean)
header_l.addWidget(self._lbl_big_mean)
header_l.addStretch(1)
header_l.addWidget(self._save_window_btn)
right_l.addWidget(header)
# Plot
self._figure, self._ax = plt.subplots()
self._ax.set_xlabel("Time (s)")
self._ax.set_ylabel("Power (W)")
self._ax.grid(True)
self._canvas = FigureCanvas(self._figure)
self._toolbar = NavigationToolbar(self._canvas, self)
right_l.addWidget(self._toolbar)
right_l.addWidget(self._canvas)
splitter.addWidget(right)
splitter.setStretchFactor(0, 0)
splitter.setStretchFactor(1, 1)
splitter.setSizes([220, 780])
main.addWidget(splitter)
self.resize(1100, 680)
# -------------------------------------------------------------------------
# Logging
# -------------------------------------------------------------------------
def _log_message(self, msg: str):
if self._log:
self._log.log(msg, source="Powermeter")
# -------------------------------------------------------------------------
# UI helpers
# -------------------------------------------------------------------------
def _toggle_ylim_inputs(self, checked: bool):
self._ymin_edit.setEnabled(checked)
self._ymax_edit.setEnabled(checked)
def _plot_color(self) -> str:
w = self._current_wavelength_nm()
if w == 343:
return "#9400D3"
if w == 515:
return "#00A000"
return "#CC0000"
def _fmt_power(self, v: float) -> str:
a = abs(v)
if a < 1e-6:
return f"{v * 1e9:.3g} nW"
if a < 1e-3:
return f"{v * 1e6:.3g} µW"
if a < 1:
return f"{v * 1e3:.3g} mW"
return f"{v:.3g} W"
def _current_wavelength_nm(self) -> int:
try:
return int(self._wl_combo.currentText())
except Exception:
return 1030
# -------------------------------------------------------------------------
# Hardware control
# -------------------------------------------------------------------------
def _search_powermeters(self):
self._pm_combo.clear()
try:
rm = pyvisa.ResourceManager()
res = list(rm.list_resources())
except Exception:
res = []
if not res:
QMessageBox.critical(self, "Error", "No VISA resources found.")
self._log_message("No VISA resources found.")
return
self._pm_combo.addItems(res)
self._log_message(f"Found {len(res)} resource(s).")
def _activate_hardware(self):
idx = self._pm_combo.currentIndex()
if idx < 0:
QMessageBox.critical(self, "Error", "No powermeter selected.")
return
res = self._pm_combo.currentText().strip()
if not res:
QMessageBox.critical(self, "Error", "Invalid resource.")
return
try:
self._ctrl = PowermeterController(res)
self._ctrl.activate()
self._apply_settings()
key = f"{REGISTRY_KEY_PREFIX}:pm_{idx + 1}"
try:
for k, v in REGISTRY.items(prefix=f"{REGISTRY_KEY_PREFIX}:"):
if k == key or v is self._ctrl:
REGISTRY.unregister(k)
except Exception:
pass
REGISTRY.register(key, self._ctrl)
self._registry_key = key
self._log_message(f"Activated and registered '{key}'.")
self._activate_btn.setEnabled(False)
self._deactivate_btn.setEnabled(True)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to activate: {e}")
self._log_message(f"Activate failed: {e}")
def _deactivate_hardware(self):
if self._capture_thread is not None:
QMessageBox.information(
self, "Live running", "Stop Live before deactivating the device."
)
return
try:
if self._ctrl:
self._ctrl.deactivate()
self._log_message("Powermeter deactivated.")
finally:
try:
if self._registry_key:
REGISTRY.unregister(self._registry_key)
self._log_message(f"Unregistered '{self._registry_key}'.")
except Exception:
pass
self._registry_key = None
self._ctrl = None
self._activate_btn.setEnabled(True)
self._deactivate_btn.setEnabled(False)
self._start_btn.setEnabled(True)
self._stop_btn.setEnabled(False)
def _apply_settings(self):
if not self._ctrl:
QMessageBox.warning(self, "Warning", "Powermeter not activated.")
return
try:
wl = float(self._current_wavelength_nm())
av = int(float(self._avg_edit.text()))
self._ctrl.set_wavelength(wl)
self._ctrl.set_avg(av)
try:
self._ctrl.set_auto_range(True)
except Exception:
pass
try:
self._ctrl.set_bandwidth("high")
except Exception:
pass
self._log_message("Settings applied.")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to apply settings: {e}")
self._log_message(f"Apply failed: {e}")
# -------------------------------------------------------------------------
# Live capture
# -------------------------------------------------------------------------
def _start_live(self):
if not self._ctrl:
QMessageBox.critical(self, "Error", "Powermeter not activated.")
return
try:
period = float(self._period_edit.text())
except Exception:
period = 0.1
self._t.clear()
self._y.clear()
self._capture_thread = _LivePowerThread(self._ctrl, period)
self._capture_thread.power_signal.connect(self._update_power)
self._capture_thread.error_signal.connect(lambda e: self._log_message(f"Error: {e}"))
self._capture_thread.start()
self._log_message("Live started.")
self._start_btn.setEnabled(False)
self._stop_btn.setEnabled(True)
self._deactivate_btn.setEnabled(False)
def _stop_live(self):
if self._capture_thread:
self._capture_thread.stop()
self._capture_thread.wait()
self._capture_thread = None
self._log_message("Live stopped.")
self._start_btn.setEnabled(True)
self._stop_btn.setEnabled(False)
self._deactivate_btn.setEnabled(True)
# -------------------------------------------------------------------------
# Data handling
# -------------------------------------------------------------------------
def _window_arrays(self):
if not self._t:
return np.array([]), np.array([])
try:
win = float(self._win_edit.text())
except Exception:
win = 10.0
if win <= 0:
win = 10.0
t_now = self._t[-1]
t0_keep = t_now - win
i0 = 0
for i in range(len(self._t)):
if self._t[i] >= t0_keep:
i0 = i
break
x = np.asarray(self._t[i0:]) - float(self._t[i0] if len(self._t[i0:]) else t_now)
y = np.asarray(self._y[i0:])
return x, y
def _update_power(self, ts: float, val: float):
self._t.append(ts)
self._y.append(val)
x, y = self._window_arrays()
self._ax.cla()
color = self._plot_color()
self._ax.plot(x, y, color=color, linewidth=2.0)
self._ax.set_xlabel("Time (s)")
self._ax.set_ylabel("Power (W)")
self._ax.grid(True)
self._ax.set_title(f"Powermeter Live — {self._current_wavelength_nm()} nm")
if self._ylim_cb.isChecked():
try:
ymin = float(self._ymin_edit.text())
ymax = float(self._ymax_edit.text())
if np.isfinite(ymin) and np.isfinite(ymax) and ymax > ymin:
self._ax.set_ylim(ymin, ymax)
except Exception:
pass
self._canvas.draw_idle()
# Update labels
p_now = self._fmt_power(val)
p_max = self._fmt_power(np.max(y) if y.size else val)
p_mean = self._fmt_power(float(np.mean(y)) if y.size else val)
self._lbl_big_now.setText(p_now)
self._lbl_big_max.setText(p_max)
self._lbl_big_mean.setText(p_mean)
self._lbl_big_now.setStyleSheet(f"font-size: 36pt; font-weight: 800; color: {color};")
self._lbl_big_max.setStyleSheet(f"font-size: 24pt; font-weight: 700; color: {color};")
self._lbl_big_mean.setStyleSheet(f"font-size: 24pt; font-weight: 700; color: {color};")
@pyqtSlot(float, float)
def set_power_from_scan(self, ts: float, val: float) -> None:
"""External slot for receiving power values from scans."""
self._update_power(ts, val)
@pyqtSlot(object)
def refresh_from_device(self, dev) -> None:
"""External slot for refreshing from device."""
try:
val = float(dev.fetch_power())
self._update_power(time.time(), val)
except Exception as e:
self._log_message(f"Live refresh error: {e}")
# -------------------------------------------------------------------------
# Save
# -------------------------------------------------------------------------
def _get_save_directory(self) -> tuple[Path, datetime.datetime]:
now = datetime.datetime.now()
dir_path = data_dir() / now.strftime("%Y-%m-%d") / "powermeter"
dir_path.mkdir(parents=True, exist_ok=True)
return dir_path, now
def _save_current_window(self):
x, y = self._window_arrays()
if y.size == 0:
QMessageBox.warning(self, "Save Window", "No data to save.")
return
dir_path, now = self._get_save_directory()
filename = f"powermeter_log_{now.strftime('%Y-%m-%d_%H_%M_%S')}.txt"
filepath = dir_path / filename
try:
with open(filepath, "w", encoding="utf-8") as f:
comment = self._comment_edit.text().strip()
if comment:
f.write(f"# Comment: {comment}\n")
f.write(f"# Wavelength: {self._current_wavelength_nm()} nm\n")
f.write("# t_s\tpower_W\n")
for xi, yi in zip(x, y):
f.write(f"{xi:.6f}\t{yi:.9g}\n")
self._log_message(f"Saved window to {filepath}")
except Exception as e:
QMessageBox.critical(self, "Save Window", f"Failed to save: {e}")
self._log_message(f"Save failed: {e}")
# -------------------------------------------------------------------------
# Cleanup
# -------------------------------------------------------------------------
[docs]
def closeEvent(self, event):
try:
self._stop_live()
finally:
try:
if self._ctrl:
self._ctrl.deactivate()
finally:
try:
if self._registry_key:
REGISTRY.unregister(self._registry_key)
except Exception:
pass
try:
REGISTRY.unregister("ui:powermeter_live")
except Exception:
pass
self._registry_key = None
self._ctrl = None
self.closed.emit()
super().closeEvent(event)
if __name__ == "__main__":
from PyQt5.QtWidgets import QApplication
import sys
app = QApplication(sys.argv)
w = PowermeterLiveWindow()
w.show()
sys.exit(app.exec_())