Source code for dlab.hardware.wrappers.waveplate_calib
from __future__ import annotations
from pathlib import Path
from typing import Dict, Tuple
import numpy as np
from scipy.optimize import curve_fit
from PyQt5.QtWidgets import (
QWidget, QHBoxLayout, QVBoxLayout, QLabel, QPushButton,
QComboBox, QGroupBox, QFileDialog, QLineEdit
)
from matplotlib.figure import Figure
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas
import matplotlib.cm as cm
from dlab.utils.config_utils import cfg_get
from dlab.utils.paths_utils import ressources_dir
from dlab.utils.log_panel import LogPanel
def _wp_calibration_path(wp_index: int) -> Path | None:
"""Get calibration file path for waveplate from config."""
rel = cfg_get(f"waveplates.calibration_files.{wp_index}")
if not rel:
return None
return (ressources_dir() / str(rel)).resolve()
def _load_wp_calibration_file(path: Path) -> tuple[np.ndarray, np.ndarray]:
"""Load x, y data from text file with flexible separators."""
xs: list[float] = []
ys: list[float] = []
def _try_float(tok: str) -> float | None:
try:
return float(tok)
except Exception:
return None
with open(path, "r", encoding="utf-8") as f:
for raw in f:
s = raw.strip()
if not s or s.startswith("#"):
continue
for sep in (";", ",", "\t"):
s = s.replace(sep, " ")
parts = [p for p in s.split(" ") if p]
if len(parts) < 2:
continue
x, y = _try_float(parts[0]), _try_float(parts[1])
if x is None or y is None:
continue
xs.append(x)
ys.append(y)
if not xs:
return np.array([]), np.array([])
return np.asarray(xs, dtype=float), np.asarray(ys, dtype=float)
def _generate_colors(n: int) -> list:
"""Generate n distinct colors from colormap."""
cmap = cm.get_cmap("tab10") if n <= 10 else cm.get_cmap("tab20")
return [cmap(i % cmap.N) for i in range(n)]
[docs]
class WaveplateCalibWidget(QWidget):
"""Widget for waveplate calibration management."""
def __init__(
self,
log_panel: LogPanel | None = None,
calibration_changed_callback=None,
parent=None,
):
super().__init__(parent)
self.setWindowTitle("Waveplate Calibration")
self._log = log_panel
self._calibration_changed_callback = calibration_changed_callback
self._num_waveplates = int(cfg_get("waveplates.num_waveplates", 7))
self._colors = _generate_colors(self._num_waveplates)
self._calibration_params: Dict[int, Tuple[float, float]] = {}
self._max_abs_power: Dict[int, float] = {}
self._wp_entries: Dict[int, Dict[str, QLineEdit]] = {}
self._init_ui()
self._load_all_calibrations()
def _init_ui(self):
main_layout = QHBoxLayout(self)
options_group = QGroupBox("Calibration Options")
options_layout = QVBoxLayout(options_group)
for i in range(1, self._num_waveplates + 1):
row = QHBoxLayout()
row.addWidget(QLabel(f"WP{i}:"), stretch=0)
row.addWidget(QLabel("Max (norm):"), stretch=0)
max_edit = QLineEdit("1.00")
max_edit.setFixedWidth(70)
max_edit.setReadOnly(True)
row.addWidget(max_edit, stretch=0)
row.addWidget(QLabel("Phase (deg):"), stretch=0)
off_edit = QLineEdit("0")
off_edit.setFixedWidth(70)
off_edit.setReadOnly(True)
row.addWidget(off_edit, stretch=0)
self._wp_entries[i] = {"max": max_edit, "offset": off_edit}
options_layout.addLayout(row)
choose_row = QHBoxLayout()
choose_row.addWidget(QLabel("Select WP:"), stretch=0)
self._wp_dropdown = QComboBox()
self._wp_dropdown.addItems([str(i) for i in range(1, self._num_waveplates + 1)])
choose_row.addWidget(self._wp_dropdown, stretch=1)
options_layout.addLayout(choose_row)
update_btn = QPushButton("Update Calibration File")
update_btn.clicked.connect(self._update_selected_calibration_file)
options_layout.addWidget(update_btn)
main_layout.addWidget(options_group, 1)
self._fig = Figure(figsize=(5, 6), dpi=100)
rows = int(np.ceil(np.sqrt(self._num_waveplates)))
cols = int(np.ceil(self._num_waveplates / rows))
self._axes = [self._fig.add_subplot(rows, cols, i + 1) for i in range(self._num_waveplates)]
self._canvas = FigureCanvas(self._fig)
main_layout.addWidget(self._canvas, 2)
def _load_all_calibrations(self):
"""Load calibrations for all waveplates from config."""
for i in range(1, self._num_waveplates + 1):
self.load_waveplate_calibration(i)
@staticmethod
def _cos01(x_deg: np.ndarray | float, phase_deg: float) -> np.ndarray | float:
return 0.5 * (1.0 + np.cos(2.0 * np.pi / 90.0 * (np.asarray(x_deg) - phase_deg)))
def _fit_phase_only(self, x: np.ndarray, y01: np.ndarray) -> float:
def f(xx, phase):
return self._cos01(xx, phase)
popt, _ = curve_fit(f, x, y01, p0=(0.0,))
return float(popt[0])
[docs]
def load_waveplate_calibration(self, wp_index: int) -> bool:
"""Load calibration for a waveplate from config path."""
p = _wp_calibration_path(wp_index)
if not p or not p.exists():
self._log_message(f"No calibration file for WP{wp_index}.")
return False
self._open_calibration_file(wp_index, p)
return True
def _open_calibration_file(self, wp_index: int, filepath: Path):
try:
angles, powers = _load_wp_calibration_file(filepath)
if angles.size == 0:
raise ValueError("Empty calibration file")
pmax = float(np.nanmax(powers))
if not np.isfinite(pmax) or pmax <= 0:
raise ValueError("Invalid max power in calibration file")
y01 = np.clip(powers / pmax, 0.0, 1.0)
phase = self._fit_phase_only(angles, y01)
self._calibration_params[wp_index] = (1.0, phase)
self._max_abs_power[wp_index] = pmax
self._wp_entries[wp_index]["max"].setText("1.00")
self._wp_entries[wp_index]["offset"].setText(f"{phase:.2f}")
ax = self._axes[wp_index - 1]
ax.clear()
color = self._colors[wp_index - 1]
ax.plot(angles, y01, marker="o", linestyle="None", color=color, label="data (norm)")
xs = np.linspace(0, 360, 721)
ax.plot(xs, self._cos01(xs, phase), color=color, label="fit (norm)")
ax.set_xlabel("Angle (deg)")
ax.set_ylabel("Relative power (0..1)")
ax.legend(loc="best")
self._log_message(f"WP{wp_index} loaded (max={pmax:.3g} W)")
if self._calibration_changed_callback:
self._calibration_changed_callback(wp_index, self._calibration_params[wp_index])
try:
from dlab.core.device_registry import REGISTRY
REGISTRY.register(f"waveplate:calib:{wp_index}", (1.0, phase))
REGISTRY.register(f"waveplate:calib_path:{wp_index}", filepath.as_posix())
REGISTRY.register(f"waveplate:max:{wp_index}", pmax)
except Exception:
pass
self._canvas.draw_idle()
except Exception as e:
self._log_message(f"Failed to load calibration for WP{wp_index}: {e}")
def _update_selected_calibration_file(self):
wp_index = int(self._wp_dropdown.currentText())
start_dir = _wp_calibration_path(wp_index)
if start_dir:
start_dir = start_dir.parent
else:
start_dir = ressources_dir()
fname, _ = QFileDialog.getOpenFileName(
self,
f"Select calibration file for WP{wp_index}",
str(start_dir),
"Data Files (*.twt *.txt *.csv);;All Files (*)",
)
if not fname:
return
self._open_calibration_file(wp_index, Path(fname))
def _log_message(self, message: str):
if self._log:
self._log.log(message, source="WaveplateCalib")