Source code for dlab.diagnostics.ui.phase_lock_window

from __future__ import annotations

import time
from collections import deque

import numpy as np

from PyQt5.QtCore import Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtWidgets import (
    QWidget,
    QVBoxLayout,
    QHBoxLayout,
    QGridLayout,
    QLabel,
    QPushButton,
    QLineEdit,
    QMessageBox,
    QCheckBox,
    QGroupBox,
)
from PyQt5.QtGui import QDoubleValidator

import matplotlib.pyplot as plt
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas

from dlab.core.device_registry import REGISTRY
from dlab.utils.log_panel import LogPanel


from dlab.hardware.wrappers.piezojena_controller import NV40

REGISTRY_KEY_DEFAULT = "phaselock:avaspec"


# -----------------------------------------------------------------------------
# Worker Threads
# -----------------------------------------------------------------------------


[docs] class ControlThread(QThread): """PID control thread for phase locking via piezo stage.""" update_status = pyqtSignal(float, float) def __init__(self, stage: NV40) -> None: super().__init__() self.stage = stage self.kp = 0.05 self.ki = 0.0 self.kd = 0.0 self.gain = -1.0 self.max_step = 0.05 self.target = 0.0 self.unwrap = True self.integral = 0.0 self.last_t = None self.last_err = 0.0 self.q = deque() self.vmin = 0.0 self.vmax = 140.0 try: self.current_v = float(self.stage.get_position()) except Exception: self.current_v = 80 self.enabled = False
[docs] def run(self) -> None: while True: if not self.q: time.sleep(0.001) continue phi = self.q.popleft() if not self.enabled: continue now = time.monotonic() dt = 0 if self.last_t is None else max(0.0005, now - self.last_t) self.last_t = now if self.unwrap: if not hasattr(self, "_phi_prev"): self._phi_prev = phi phi_use = np.unwrap([self._phi_prev, phi])[-1] self._phi_prev = phi_use err = self.target - phi_use else: phi_use = phi err = (self.target - phi + np.pi) % (2 * np.pi) - np.pi d_err = (err - self.last_err) / dt self.last_err = err self.integral += err * dt max_int = 1.0 / (self.ki + 1e-9) self.integral = np.clip(self.integral, -max_int, max_int) gain = getattr(self, "gain", -1.0) u = gain * (self.kp * err + self.ki * self.integral + self.kd * d_err) if abs(u) > self.max_step: u = np.sign(u) * self.max_step new_v = self.current_v + u new_v = max(self.vmin, min(self.vmax, new_v)) new_v = round(new_v / 0.01) * 0.01 try: self.stage.set_position(new_v) self.current_v = new_v except Exception: pass self.update_status.emit(phi_use, self.current_v)
[docs] class AvaspecThread(QThread): """Spectrometer acquisition thread.""" data_ready = pyqtSignal(object, object) error = pyqtSignal(str) def __init__(self, ctrl) -> None: super().__init__() self.ctrl = ctrl self.running = True
[docs] def run(self) -> None: while self.running and not self.isInterruptionRequested(): try: ts, wl, counts = self.ctrl.measure_once() self.data_ready.emit(wl, counts) except Exception as e: self.error.emit(str(e)) break
[docs] def stop(self) -> None: self.running = False self.requestInterruption()
# ----------------------------------------------------------------------------- # AvaspecPhaseLockWindow # -----------------------------------------------------------------------------
[docs] class AvaspecPhaseLockWindow(QWidget): """Phase locking control window using Avaspec spectrometer and PiezoJena stage.""" closed = pyqtSignal() def __init__( self, log_panel: LogPanel | None = None, registry_key: str = REGISTRY_KEY_DEFAULT, parent: QWidget | None = None, ) -> None: super().__init__(parent) self.setWindowTitle("Phase Locking Control") self.setAttribute(Qt.WA_DeleteOnClose) self._log = log_panel self._registry_key = registry_key self._spec_ctrl = None self._stage = None self._acq_thread = None self._ctrl_thread = None # Plot state self._max_points = 100 self._hist_phi_raw = deque(maxlen=self._max_points) self._hist_phi_unwrapped = deque(maxlen=self._max_points) self._last_draw = 0.0 self._min_draw_dt = 0.05 self._fft_scatter = None self._fft_line = None self._phase_plot = None self._phase_sp_line = None self._bg_fft = None self._bg_phase = None self._blitting_initialized = False # Stability test state self._stability_test_active = False self._stability_timer = None self._stability_step = 0 self._stability_setpoints = [] self._init_ui() REGISTRY.register(self._registry_key, self) def _init_ui(self) -> None: root = QVBoxLayout(self) # Matplotlib figure self._fig, (self._ax_fft, self._ax_phase) = plt.subplots(1, 2, figsize=(14, 4.5)) self._fig.tight_layout(pad=2.0) self._canvas = FigureCanvas(self._fig) root.addWidget(self._canvas, 3) middle_row = QHBoxLayout() left_panel = QVBoxLayout() # Device connection group left_panel.addWidget(self._create_connection_group()) # FFT analysis group left_panel.addWidget(self._create_fft_group()) # PID control group left_panel.addWidget(self._create_pid_group()) # Voltage limits group left_panel.addWidget(self._create_voltage_group()) # Stability test group left_panel.addWidget(self._create_stability_group()) # Control row left_panel.addLayout(self._create_control_row()) middle_row.addLayout(left_panel, 1) root.addLayout(middle_row, 2) # Stability timer self._stability_timer = QTimer() self._stability_timer.timeout.connect(self._on_stability_step) # ------------------------------------------------------------------------- # UI builders # ------------------------------------------------------------------------- def _create_connection_group(self) -> QGroupBox: group = QGroupBox("Devices") layout = QGridLayout(group) layout.addWidget(QLabel("Spectrometer:"), 0, 0) self._spec_key_edit = QLineEdit("spectrometer:avaspec:spec_1") self._spec_key_edit.setMaximumWidth(200) layout.addWidget(self._spec_key_edit, 0, 1) btn_spec = QPushButton("Connect") btn_spec.setMaximumWidth(80) btn_spec.clicked.connect(self._on_connect_spec) layout.addWidget(btn_spec, 0, 2) layout.addWidget(QLabel("NV40 Stage:"), 1, 0) self._stage_key_edit = QLineEdit("stage:piezojena:nv40") self._stage_key_edit.setMaximumWidth(200) layout.addWidget(self._stage_key_edit, 1, 1) btn_stage = QPushButton("Connect") btn_stage.setMaximumWidth(80) btn_stage.clicked.connect(self._on_connect_stage) layout.addWidget(btn_stage, 1, 2) return group def _create_fft_group(self) -> QGroupBox: group = QGroupBox("FFT Analysis") layout = QGridLayout(group) layout.setSpacing(5) layout.addWidget(QLabel("Center:"), 0, 0) self._center_idx_edit = QLineEdit("600") self._center_idx_edit.setMaximumWidth(60) layout.addWidget(self._center_idx_edit, 0, 1) layout.addWidget(QLabel("Window:"), 0, 2) self._window_edit = QLineEdit("150") self._window_edit.setMaximumWidth(60) layout.addWidget(self._window_edit, 0, 3) layout.addWidget(QLabel("Plot Every:"), 0, 4) self._plot_skip_edit = QLineEdit("5") self._plot_skip_edit.setMaximumWidth(40) layout.addWidget(self._plot_skip_edit, 0, 5) self._weighted_checkbox = QCheckBox("Weighted Avg") layout.addWidget(self._weighted_checkbox, 0, 6) self._remove_ramp_checkbox = QCheckBox("Remove Ramp") self._remove_ramp_checkbox.setChecked(False) layout.addWidget(self._remove_ramp_checkbox, 0, 7) layout.addWidget(QLabel("FFT Y:"), 1, 0) self._fft_ymin_edit = QLineEdit("0") self._fft_ymin_edit.setMaximumWidth(60) layout.addWidget(self._fft_ymin_edit, 1, 1) self._fft_ymax_edit = QLineEdit("200000") self._fft_ymax_edit.setMaximumWidth(60) layout.addWidget(self._fft_ymax_edit, 1, 2) layout.addWidget(QLabel("FFT X:"), 2, 0) self._fft_xmin_edit = QLineEdit("450") self._fft_xmin_edit.setMaximumWidth(60) layout.addWidget(self._fft_xmin_edit, 2, 1) self._fft_xmax_edit = QLineEdit("750") self._fft_xmax_edit.setMaximumWidth(60) layout.addWidget(self._fft_xmax_edit, 2, 2) layout.addWidget(QLabel("Phase Y:"), 1, 3) self._phase_ymin_edit = QLineEdit("-4") self._phase_ymin_edit.setMaximumWidth(50) layout.addWidget(self._phase_ymin_edit, 1, 4) self._phase_ymax_edit = QLineEdit("4") self._phase_ymax_edit.setMaximumWidth(50) layout.addWidget(self._phase_ymax_edit, 1, 5) layout.addWidget(QLabel("Max Pts:"), 2, 3) self._max_points_edit = QLineEdit("100") self._max_points_edit.setMaximumWidth(50) layout.addWidget(self._max_points_edit, 2, 4) btn_update = QPushButton("Update Limits") btn_update.setMaximumWidth(100) btn_update.clicked.connect(self._on_update_limits) layout.addWidget(btn_update, 2, 5, 1, 2) return group def _create_pid_group(self) -> QGroupBox: group = QGroupBox("PID Control") layout = QGridLayout(group) layout.setSpacing(5) layout.addWidget(QLabel("φ₀:"), 0, 0) self._setpoint_edit = QLineEdit("0.0") self._setpoint_edit.setMaximumWidth(70) self._setpoint_edit.setValidator(QDoubleValidator()) layout.addWidget(self._setpoint_edit, 0, 1) layout.addWidget(QLabel("Kp:"), 0, 2) self._kp_edit = QLineEdit("0.05") self._kp_edit.setMaximumWidth(60) layout.addWidget(self._kp_edit, 0, 3) layout.addWidget(QLabel("Ki:"), 0, 4) self._ki_edit = QLineEdit("0.0") self._ki_edit.setMaximumWidth(60) layout.addWidget(self._ki_edit, 0, 5) layout.addWidget(QLabel("Kd:"), 0, 6) self._kd_edit = QLineEdit("0.0") self._kd_edit.setMaximumWidth(60) layout.addWidget(self._kd_edit, 0, 7) layout.addWidget(QLabel("Gain:"), 0, 8) self._gain_edit = QLineEdit("-1.0") self._gain_edit.setMaximumWidth(60) layout.addWidget(self._gain_edit, 0, 9) layout.addWidget(QLabel("Max Step:"), 0, 10) self._max_step_edit = QLineEdit("0.05") self._max_step_edit.setMaximumWidth(60) layout.addWidget(self._max_step_edit, 0, 11) self._unwrap_checkbox = QCheckBox("Unwrap") self._unwrap_checkbox.setChecked(False) layout.addWidget(self._unwrap_checkbox, 1, 0) self._auto_reset_checkbox = QCheckBox("Auto Reset") self._auto_reset_checkbox.setChecked(False) layout.addWidget(self._auto_reset_checkbox, 1, 1, 1, 2) self._lock_checkbox = QCheckBox("LOCK") self._lock_checkbox.setStyleSheet("QCheckBox { font-weight: bold; color: blue; }") self._lock_checkbox.stateChanged.connect(self._on_toggle_lock) layout.addWidget(self._lock_checkbox, 1, 3) btn_update_sp = QPushButton("Update SP") btn_update_sp.setMaximumWidth(80) btn_update_sp.clicked.connect(self._on_update_setpoint) layout.addWidget(btn_update_sp, 1, 4) btn_update_pid = QPushButton("Update PID") btn_update_pid.setMaximumWidth(80) btn_update_pid.clicked.connect(self._on_update_pid) layout.addWidget(btn_update_pid, 1, 5) return group def _create_voltage_group(self) -> QGroupBox: group = QGroupBox("Voltage Limits") layout = QHBoxLayout(group) layout.addWidget(QLabel("Min:")) self._vmin_edit = QLineEdit("75.0") self._vmin_edit.setMaximumWidth(60) layout.addWidget(self._vmin_edit) layout.addWidget(QLabel("Max:")) self._vmax_edit = QLineEdit("82.0") self._vmax_edit.setMaximumWidth(60) layout.addWidget(self._vmax_edit) layout.addWidget(QLabel("Start:")) self._vstart_edit = QLineEdit("80.0") self._vstart_edit.setMaximumWidth(60) layout.addWidget(self._vstart_edit) btn_update = QPushButton("Update Limits") btn_update.setMaximumWidth(100) btn_update.clicked.connect(self._on_update_voltage_limits) layout.addWidget(btn_update) layout.addStretch() return group def _create_stability_group(self) -> QGroupBox: group = QGroupBox("Stability Test") layout = QHBoxLayout(group) layout.addWidget(QLabel("Start:")) self._stab_start_edit = QLineEdit("-3.14159") self._stab_start_edit.setMaximumWidth(70) layout.addWidget(self._stab_start_edit) layout.addWidget(QLabel("End:")) self._stab_end_edit = QLineEdit("3.14159") self._stab_end_edit.setMaximumWidth(70) layout.addWidget(self._stab_end_edit) layout.addWidget(QLabel("Steps:")) self._stab_steps_edit = QLineEdit("10") self._stab_steps_edit.setMaximumWidth(40) layout.addWidget(self._stab_steps_edit) layout.addWidget(QLabel("Wait [s]:")) self._stab_wait_edit = QLineEdit("5.0") self._stab_wait_edit.setMaximumWidth(50) layout.addWidget(self._stab_wait_edit) self._stab_test_btn = QPushButton("Start Test") self._stab_test_btn.setMaximumWidth(80) self._stab_test_btn.clicked.connect(self._on_toggle_stability_test) layout.addWidget(self._stab_test_btn) layout.addStretch() return group def _create_control_row(self) -> QHBoxLayout: layout = QHBoxLayout() btn_start = QPushButton("Start") btn_start.setMaximumWidth(80) btn_start.clicked.connect(self._on_start) layout.addWidget(btn_start) btn_stop = QPushButton("Stop") btn_stop.setMaximumWidth(80) btn_stop.clicked.connect(self._on_stop) layout.addWidget(btn_stop) layout.addStretch(1) self._phase_label = QLabel("φ = — rad") self._phase_label.setStyleSheet("QLabel { font-size: 12pt; font-weight: bold; }") layout.addWidget(self._phase_label) layout.addWidget(QLabel(" | ")) self._error_label = QLabel("Error = — rad") layout.addWidget(self._error_label) layout.addWidget(QLabel(" | ")) self._slope_label = QLabel("Slope = — rad/index") layout.addWidget(self._slope_label) return layout # ------------------------------------------------------------------------- # Logging # ------------------------------------------------------------------------- def _log_message(self, msg: str) -> None: if self._log: self._log.log(msg, source="PhaseLock") # ------------------------------------------------------------------------- # Public API # -------------------------------------------------------------------------
[docs] def get_current_phase(self) -> float: """Get the most recent unwrapped phase value.""" if len(self._hist_phi_unwrapped) > 0: return float(self._hist_phi_unwrapped[-1]) return float("nan")
[docs] def get_phase_average(self, duration_s: float) -> tuple[float, float]: """Get average and std of phase over specified duration.""" if len(self._hist_phi_unwrapped) == 0: return float("nan"), float("nan") n_samples = max(1, int(duration_s / self._min_draw_dt)) samples = list(self._hist_phi_unwrapped)[-n_samples:] if len(samples) == 0: return float("nan"), float("nan") avg = float(np.mean(samples)) std = float(np.std(samples)) if len(samples) > 1 else 0.0 return avg, std
[docs] def get_current_voltage(self) -> float: """Get current piezo voltage.""" if self._ctrl_thread and hasattr(self._ctrl_thread, "current_v"): return float(self._ctrl_thread.current_v) return float("nan")
[docs] def set_target(self, target_rad: float) -> None: """Set the phase lock target.""" if self._ctrl_thread: if self._auto_reset_checkbox.isChecked() and self._ctrl_thread.enabled: self._ctrl_thread.enabled = False self._ctrl_thread.target = float(target_rad) self._setpoint_edit.setText(f"{float(target_rad):.6f}") self._ctrl_thread.integral = 0.0 self._ctrl_thread.last_t = None self._ctrl_thread.last_err = 0.0 self._ctrl_thread.enabled = True else: self._ctrl_thread.target = float(target_rad) self._setpoint_edit.setText(f"{float(target_rad):.6f}")
[docs] def is_locked(self) -> bool: """Check if phase lock is active.""" return self._ctrl_thread is not None and self._ctrl_thread.enabled
# ------------------------------------------------------------------------- # Device connection # ------------------------------------------------------------------------- def _on_connect_spec(self) -> None: key = self._spec_key_edit.text().strip() ctrl = REGISTRY.get(key) if ctrl is None: QMessageBox.critical(self, "Spectrometer", f"Not found: {key}") return self._spec_ctrl = ctrl self._log_message("Connected spectrometer") def _on_connect_stage(self) -> None: key = self._stage_key_edit.text().strip() st = REGISTRY.get(key) if st is None: QMessageBox.critical(self, "NV40", "Not found") return self._ctrl_thread = ControlThread(st) try: self._ctrl_thread.vmin = float(self._vmin_edit.text()) self._ctrl_thread.vmax = float(self._vmax_edit.text()) v_start = float(self._vstart_edit.text()) self._ctrl_thread.current_v = v_start try: st.set_position(v_start) self._log_message(f"Stage set to {v_start:.2f} V") except Exception: pass except Exception: pass self._ctrl_thread.update_status.connect(self._on_control_update) self._ctrl_thread.start() self._log_message( f"Connected NV40 ({self._ctrl_thread.vmin:.1f}-{self._ctrl_thread.vmax:.1f}V, " f"start: {self._ctrl_thread.current_v:.1f}V)" ) # ------------------------------------------------------------------------- # Parameter updates # ------------------------------------------------------------------------- def _on_update_setpoint(self) -> None: if not self._ctrl_thread: return try: new_sp = float(self._setpoint_edit.text()) if self._auto_reset_checkbox.isChecked() and self._ctrl_thread.enabled: self._ctrl_thread.enabled = False self._log_message("Auto-reset: Lock disabled") self._ctrl_thread.target = new_sp self._log_message(f"Setpoint updated to {new_sp:.3f} rad") self._ctrl_thread.integral = 0.0 self._ctrl_thread.last_t = None self._ctrl_thread.last_err = 0.0 self._ctrl_thread.enabled = True self._log_message("Auto-reset: Lock re-enabled") else: self._ctrl_thread.target = new_sp self._log_message(f"Setpoint updated to {new_sp:.3f} rad") except Exception: pass def _on_update_pid(self) -> None: if not self._ctrl_thread: return self._ctrl_thread.kp = float(self._kp_edit.text()) self._ctrl_thread.ki = float(self._ki_edit.text()) self._ctrl_thread.kd = float(self._kd_edit.text()) self._ctrl_thread.target = float(self._setpoint_edit.text()) self._ctrl_thread.max_step = float(self._max_step_edit.text()) self._ctrl_thread.unwrap = self._unwrap_checkbox.isChecked() self._ctrl_thread.gain = float(self._gain_edit.text()) try: self._ctrl_thread.vmin = float(self._vmin_edit.text()) self._ctrl_thread.vmax = float(self._vmax_edit.text()) except Exception: pass self._log_message(f"PID updated (V: {self._ctrl_thread.vmin:.1f}-{self._ctrl_thread.vmax:.1f}V)") def _on_update_limits(self) -> None: xmin_fft = self._fft_xmin_edit.text().strip() xmax_fft = self._fft_xmax_edit.text().strip() if xmin_fft and xmax_fft: try: self._ax_fft.set_xlim(float(xmin_fft), float(xmax_fft)) except Exception: pass ymin_fft = self._fft_ymin_edit.text().strip() ymax_fft = self._fft_ymax_edit.text().strip() if ymin_fft and ymax_fft: try: self._ax_fft.set_ylim(float(ymin_fft), float(ymax_fft)) except Exception: pass ymin_phase = self._phase_ymin_edit.text().strip() ymax_phase = self._phase_ymax_edit.text().strip() if ymin_phase and ymax_phase: try: self._ax_phase.set_ylim(float(ymin_phase), float(ymax_phase)) except Exception: pass self._blitting_initialized = False self._canvas.draw() self._log_message("Plot limits updated") def _on_update_voltage_limits(self) -> None: if not self._ctrl_thread: self._log_message("No stage connected") return try: self._ctrl_thread.vmin = float(self._vmin_edit.text()) self._ctrl_thread.vmax = float(self._vmax_edit.text()) self._log_message(f"Voltage limits updated: {self._ctrl_thread.vmin:.1f}-{self._ctrl_thread.vmax:.1f}V") except Exception: self._log_message("Invalid voltage limits") # ------------------------------------------------------------------------- # Acquisition control # ------------------------------------------------------------------------- def _on_start(self) -> None: if self._spec_ctrl is None: QMessageBox.critical(self, "Run", "No spectrometer") return try: self._max_points = int(self._max_points_edit.text()) except Exception: self._max_points = 100 self._hist_phi_raw = deque(maxlen=self._max_points) self._hist_phi_unwrapped = deque(maxlen=self._max_points) self._ax_fft.clear() self._ax_phase.clear() self._fft_scatter = None self._fft_line = None self._phase_plot = None self._phase_sp_line = None self._blitting_initialized = False self._acq_thread = AvaspecThread(self._spec_ctrl) self._acq_thread.data_ready.connect(self._on_data) self._acq_thread.error.connect(self._on_acq_error) self._acq_thread.start() self._log_message("Acquisition started") def _on_stop(self) -> None: if self._acq_thread: self._acq_thread.stop() self._acq_thread.wait() self._acq_thread = None self._log_message("Acquisition stopped") def _on_toggle_lock(self) -> None: if self._ctrl_thread is None: return if self._lock_checkbox.isChecked(): self._ctrl_thread.integral = 0.0 self._ctrl_thread.last_t = None self._ctrl_thread.last_err = 0.0 self._ctrl_thread.enabled = True self._log_message("Lock ON") else: self._ctrl_thread.enabled = False self._log_message("Lock OFF") if self._phase_sp_line: self._phase_sp_line.set_visible(False) self._canvas.draw_idle() # ------------------------------------------------------------------------- # Stability test # ------------------------------------------------------------------------- def _on_toggle_stability_test(self) -> None: if self._stability_test_active: self._stability_test_active = False self._stability_timer.stop() self._stab_test_btn.setText("Start Test") self._log_message("Stability test stopped") else: if not self._ctrl_thread: QMessageBox.warning(self, "Stability Test", "NV40 not connected") return try: start_val = float(self._stab_start_edit.text()) end_val = float(self._stab_end_edit.text()) num_steps = int(self._stab_steps_edit.text()) wait_time = float(self._stab_wait_edit.text()) except ValueError: QMessageBox.critical(self, "Stability Test", "Invalid parameters") return if num_steps < 1: QMessageBox.critical(self, "Stability Test", "Steps must be >= 1") return self._stability_setpoints = np.linspace(start_val, end_val, num_steps).tolist() self._stability_step = 0 self._ctrl_thread.target = self._stability_setpoints[0] self._setpoint_edit.setText(f"{self._stability_setpoints[0]:.3f}") self._stability_test_active = True self._stability_timer.start(int(wait_time * 1000)) self._stab_test_btn.setText("Stop Test") self._log_message( f"Stability test: {num_steps} steps, {start_val:.3f} to {end_val:.3f} rad, wait={wait_time}s" ) def _on_stability_step(self) -> None: if not self._stability_test_active: return self._stability_step += 1 if self._stability_step >= len(self._stability_setpoints): self._on_toggle_stability_test() return new_sp = self._stability_setpoints[self._stability_step] self._ctrl_thread.target = new_sp self._setpoint_edit.setText(f"{new_sp:.3f}") self._log_message( f"Step {self._stability_step + 1}/{len(self._stability_setpoints)}: φ0 = {new_sp:.3f} rad" ) # ------------------------------------------------------------------------- # Data handling # ------------------------------------------------------------------------- def _on_acq_error(self, err: str) -> None: self._log_message(f"Thread error: {err}") self._on_stop() def _on_control_update(self, phi: float, v: float) -> None: self._phase_label.setText(f"φ = {phi:+.3f} rad V={v:+.3f}") def _on_data(self, wl, y) -> None: if self._acq_thread is None: return now = time.monotonic() if now - self._last_draw < self._min_draw_dt: return self._last_draw = now y = np.asarray(y, float) if y.size < 32: return F = np.fft.fft(y) mag = np.abs(F) phase = np.angle(F) n = len(F) try: center = int(self._center_idx_edit.text()) except Exception: center = 100 try: W = int(self._window_edit.text()) except Exception: W = 100 i0 = max(0, center - W) i1 = min(n, center + W) phase_unwrapped = np.unwrap(phase) use_weighted = self._weighted_checkbox.isChecked() remove_ramp = self._remove_ramp_checkbox.isChecked() window_mag = mag[i0:i1] window_phase = phase[i0:i1] if remove_ramp and len(window_phase) > 2: indices = np.arange(len(window_phase)) coeffs = np.polyfit(indices, window_phase, 1) phase_trend = np.polyval(coeffs, indices) phase_detrended = window_phase - phase_trend else: phase_detrended = window_phase phase_slope = 0.0 if len(phase_detrended) > 2: indices = np.arange(len(phase_detrended)) slope_coeffs = np.polyfit(indices, phase_detrended, 1) phase_slope = slope_coeffs[0] if use_weighted: if np.sum(window_mag) > 0: phi = np.sum(window_mag * phase_detrended) / np.sum(window_mag) else: phi = phase_detrended[len(phase_detrended) // 2] if len(phase_detrended) > 0 else 0.0 else: center_in_window = center - i0 if 0 <= center_in_window < len(phase_detrended): phi = phase_detrended[center_in_window] else: return try: plot_skip = max(1, int(self._plot_skip_edit.text())) except Exception: plot_skip = 5 x = np.arange(i0, i1, plot_skip) mag_decimated = mag[i0:i1:plot_skip] phase_unwrapped_decimated = phase_unwrapped[i0:i1:plot_skip] if self._fft_scatter is None: self._fft_scatter = self._ax_fft.scatter( x, mag_decimated, c=phase_unwrapped_decimated, cmap="hsv", s=20, vmin=-np.pi, vmax=np.pi, animated=True, ) xmin_fft = self._fft_xmin_edit.text().strip() xmax_fft = self._fft_xmax_edit.text().strip() if xmin_fft and xmax_fft: try: self._ax_fft.set_xlim(float(xmin_fft), float(xmax_fft)) except Exception: self._ax_fft.set_xlim(i0, i1) else: self._ax_fft.set_xlim(i0, i1) self._ax_fft.set_xlabel("FFT Index") self._ax_fft.set_ylabel("Magnitude") self._ax_fft.set_ylim(0, 200000) else: self._fft_scatter.set_offsets(np.c_[x, mag_decimated]) self._fft_scatter.set_array(phase_unwrapped_decimated) if self._fft_line is None: if use_weighted: self._fft_line = self._ax_fft.axvspan(i0, i1, alpha=0.2, color="orange", animated=True) else: self._fft_line = self._ax_fft.axvline(center, color="red", linestyle="--", animated=True) else: self._fft_line.remove() if use_weighted: self._fft_line = self._ax_fft.axvspan(i0, i1, alpha=0.2, color="orange", animated=True) else: self._fft_line = self._ax_fft.axvline(center, color="red", linestyle="--", animated=True) if self._unwrap_checkbox.isChecked(): phi_u = phi if not self._hist_phi_unwrapped else np.unwrap([self._hist_phi_unwrapped[-1], phi])[-1] else: phi_u = phi self._hist_phi_raw.append(phi) self._hist_phi_unwrapped.append(phi_u) x_indices = np.arange(len(self._hist_phi_unwrapped)) pp = np.array(self._hist_phi_unwrapped if self._unwrap_checkbox.isChecked() else self._hist_phi_raw) plot_color = "blue" if (self._ctrl_thread and self._ctrl_thread.enabled) else "black" if self._phase_plot is None: (self._phase_plot,) = self._ax_phase.plot(x_indices, pp, "o", ms=3, color=plot_color, animated=True) self._ax_phase.set_xlim(-1, self._max_points) self._ax_phase.set_xlabel("Sample") self._ax_phase.set_ylabel("Phase [rad]") ymin_phase = self._phase_ymin_edit.text().strip() ymax_phase = self._phase_ymax_edit.text().strip() if ymin_phase and ymax_phase: try: self._ax_phase.set_ylim(float(ymin_phase), float(ymax_phase)) except Exception: pass else: self._phase_plot.set_xdata(x_indices) self._phase_plot.set_ydata(pp) self._phase_plot.set_color(plot_color) if self._ctrl_thread and self._ctrl_thread.enabled: try: sp = float(self._setpoint_edit.text()) if self._phase_sp_line is None: self._phase_sp_line = self._ax_phase.axhline(sp, color="orange", linestyle="--", animated=True) else: self._phase_sp_line.set_ydata([sp, sp]) self._phase_sp_line.set_visible(True) except Exception: pass else: if self._phase_sp_line: self._phase_sp_line.set_visible(False) if not self._blitting_initialized: self._canvas.draw() self._bg_fft = self._canvas.copy_from_bbox(self._ax_fft.bbox) self._bg_phase = self._canvas.copy_from_bbox(self._ax_phase.bbox) self._blitting_initialized = True self._canvas.restore_region(self._bg_fft) self._ax_fft.draw_artist(self._fft_scatter) self._ax_fft.draw_artist(self._fft_line) self._canvas.blit(self._ax_fft.bbox) self._canvas.restore_region(self._bg_phase) self._ax_phase.draw_artist(self._phase_plot) if self._phase_sp_line and self._phase_sp_line.get_visible(): self._ax_phase.draw_artist(self._phase_sp_line) self._canvas.blit(self._ax_phase.bbox) if self._ctrl_thread and self._ctrl_thread.enabled: try: sp = float(self._setpoint_edit.text()) current_error = abs(phi_u - sp) self._error_label.setText(f"Error = {current_error:.4f} rad") except Exception: self._error_label.setText("Error = — rad") else: self._error_label.setText("Error = — rad") self._slope_label.setText(f"Slope = {phase_slope:.4f} rad/index") if self._ctrl_thread and self._ctrl_thread.enabled: self._ctrl_thread.q.append(phi) self._phase_label.setText(f"φ = {phi_u:+.3f} rad") # ------------------------------------------------------------------------- # Cleanup # -------------------------------------------------------------------------
[docs] def closeEvent(self, event) -> None: self._on_stop() if self._ctrl_thread: self._ctrl_thread.enabled = False try: REGISTRY.unregister(self._registry_key) except Exception: pass self.closed.emit() super().closeEvent(event)
if __name__ == "__main__": import sys from PyQt5.QtWidgets import QApplication app = QApplication(sys.argv) window = AvaspecPhaseLockWindow() window.resize(800, 800) window.show() sys.exit(app.exec_())