Source code for dlab.diagnostics.ui.grating_compressor_window

from __future__ import annotations

from typing import Optional

from PyQt5.QtCore import QTimer, Qt
from PyQt5.QtWidgets import (
    QWidget,
    QVBoxLayout,
    QHBoxLayout,
    QLabel,
    QPushButton,
    QLineEdit,
    QGroupBox,
    QMessageBox,
)
from PyQt5.QtGui import QDoubleValidator

from dlab.boot import get_config
from dlab.hardware.wrappers.zaber_controller import ZaberBinaryController
from dlab.core.device_registry import REGISTRY
from dlab.utils.log_panel import LogPanel

REGISTRY_KEY = "stage:zaber:grating_compressor"


[docs] class GratingCompressorWindow(QWidget): """Control window for Zaber grating compressor stage.""" def __init__( self, log_panel: LogPanel | None = None, parent: QWidget | None = None ) -> None: super().__init__(parent) self.setWindowTitle("Grating Compressor") self.setAttribute(Qt.WA_DeleteOnClose) self._log = log_panel self._stage: Optional[ZaberBinaryController] = None # Load configuration cfg = get_config() or {} zcfg = cfg.get("zaber", {}) or {} self._port = str(zcfg.get("port", "COM4")) self._baud = int(zcfg.get("baud", 9600)) rng = zcfg.get("range", {}) or {} self._range_min = float(rng.get("min", 0.0)) self._range_max = float(rng.get("max", 50.0)) self._init_ui() # Position polling timer self._poll_timer = QTimer(self) self._poll_timer.setInterval(200) # ms self._poll_timer.timeout.connect(self._update_position) def _init_ui(self) -> None: main = QVBoxLayout(self) # Connection group conn_box = QGroupBox("Connection") cl = QHBoxLayout(conn_box) self._port_edit = QLineEdit(self._port) self._port_edit.setFixedWidth(120) self._baud_edit = QLineEdit(str(self._baud)) self._baud_edit.setFixedWidth(80) self._baud_edit.setValidator(QDoubleValidator(0, 1e9, 0, self)) self._activate_btn = QPushButton("Activate") self._deactivate_btn = QPushButton("Deactivate") self._deactivate_btn.setEnabled(False) self._activate_btn.clicked.connect(self._on_activate) self._deactivate_btn.clicked.connect(self._on_deactivate) cl.addWidget(QLabel("Port:")) cl.addWidget(self._port_edit) cl.addWidget(QLabel("Baud:")) cl.addWidget(self._baud_edit) cl.addStretch(1) cl.addWidget(self._activate_btn) cl.addWidget(self._deactivate_btn) main.addWidget(conn_box) # Motion group mot_box = QGroupBox("Motion (mm)") ml = QVBoxLayout(mot_box) # Absolute move row row1 = QHBoxLayout() self._target_abs_edit = QLineEdit() self._target_abs_edit.setPlaceholderText("Absolute position (mm)") self._move_abs_btn = QPushButton("Move To") self._move_abs_btn.setEnabled(False) self._move_abs_btn.clicked.connect(self._on_move_abs) row1.addWidget(self._target_abs_edit, 1) row1.addWidget(self._move_abs_btn) ml.addLayout(row1) # Relative move row row2 = QHBoxLayout() self._step_rel_edit = QLineEdit("0.001") self._step_rel_edit.setFixedWidth(100) self._rel_minus_btn = QPushButton("− Step") self._rel_plus_btn = QPushButton("+ Step") self._rel_minus_btn.setEnabled(False) self._rel_plus_btn.setEnabled(False) self._rel_minus_btn.clicked.connect(lambda: self._on_move_rel(sign=-1)) self._rel_plus_btn.clicked.connect(lambda: self._on_move_rel(sign=+1)) row2.addWidget(QLabel("Step (mm):")) row2.addWidget(self._step_rel_edit) row2.addStretch(1) row2.addWidget(self._rel_minus_btn) row2.addWidget(self._rel_plus_btn) ml.addLayout(row2) # Control buttons row row3 = QHBoxLayout() self._home_btn = QPushButton("Home") self._home_btn.setEnabled(False) self._ident_btn = QPushButton("Identify") self._ident_btn.setEnabled(False) self._stop_btn = QPushButton("Stop") self._stop_btn.setEnabled(False) self._home_btn.clicked.connect(self._on_home) self._ident_btn.clicked.connect(self._on_ident) self._stop_btn.clicked.connect(self._on_stop) row3.addWidget(self._home_btn) row3.addWidget(self._ident_btn) row3.addWidget(self._stop_btn) ml.addLayout(row3) # Position display row row4 = QHBoxLayout() self._cur_pos_edit = QLineEdit() self._cur_pos_edit.setReadOnly(True) self._cur_pos_edit.setPlaceholderText("—") self._cur_pos_edit.setFixedWidth(140) row4.addWidget(QLabel("Current:")) row4.addWidget(self._cur_pos_edit) row4.addStretch(1) row4.addWidget(QLabel(f"Limits: {self._range_min:g}{self._range_max:g} mm")) ml.addLayout(row4) main.addWidget(mot_box) main.addStretch(1) # ------------------------------------------------------------------------- # Logging # ------------------------------------------------------------------------- def _log_message(self, msg: str) -> None: if self._log: self._log.log(msg, source="Compressor") # ------------------------------------------------------------------------- # Position polling # ------------------------------------------------------------------------- def _update_position(self) -> None: if not self._stage: self._poll_timer.stop() return try: p = self._stage.get_position() if p is not None: self._cur_pos_edit.setText(f"{p:.3f}") except Exception as e: self._log_message(f"Position read failed: {e}") self._poll_timer.stop() # ------------------------------------------------------------------------- # UI state management # ------------------------------------------------------------------------- def _set_controls_enabled(self, enabled: bool) -> None: self._deactivate_btn.setEnabled(enabled) self._move_abs_btn.setEnabled(enabled) self._rel_minus_btn.setEnabled(enabled) self._rel_plus_btn.setEnabled(enabled) self._home_btn.setEnabled(enabled) self._ident_btn.setEnabled(enabled) self._stop_btn.setEnabled(enabled) # ------------------------------------------------------------------------- # Stage control # ------------------------------------------------------------------------- def _on_activate(self) -> None: if self._stage: return port = self._port_edit.text().strip() or self._port try: baud = int(float(self._baud_edit.text().strip())) except Exception: baud = self._baud try: stg = ZaberBinaryController( port=port, baud_rate=baud, range_min=self._range_min, range_max=self._range_max, ) stg.activate(homing=False) self._stage = stg self._log_message(f"Activated on {port} @ {baud}.") # Register in device registry try: prev = REGISTRY.get(REGISTRY_KEY) if prev is not None and prev is not self._stage: REGISTRY.unregister(REGISTRY_KEY) except Exception: pass REGISTRY.register(REGISTRY_KEY, self._stage) self._set_controls_enabled(True) self._activate_btn.setEnabled(False) self._port_edit.setEnabled(False) self._baud_edit.setEnabled(False) self._poll_timer.start() except Exception as e: QMessageBox.critical(self, "Error", f"Activation failed: {e}") self._log_message(f"Activation failed: {e}") self._stage = None def _on_deactivate(self) -> None: try: if self._stage: try: self._stage.disable() finally: try: REGISTRY.unregister(REGISTRY_KEY) except Exception: pass self._log_message("Deactivated.") finally: self._stage = None self._set_controls_enabled(False) self._activate_btn.setEnabled(True) self._port_edit.setEnabled(True) self._baud_edit.setEnabled(True) self._poll_timer.stop() def _on_home(self) -> None: if not self._stage: QMessageBox.warning(self, "Error", "Stage not activated.") return try: self._stage.home(blocking=False) self._log_message("Homing…") self._poll_timer.start() except Exception as e: QMessageBox.critical(self, "Error", f"Home failed: {e}") self._log_message(f"Home failed: {e}") def _on_ident(self) -> None: if not self._stage: QMessageBox.warning(self, "Error", "Stage not activated.") return try: self._stage.identify() self._log_message("Identify.") except Exception as e: QMessageBox.critical(self, "Error", f"Identify failed: {e}") self._log_message(f"Identify failed: {e}") def _on_stop(self) -> None: if not self._stage: return try: self._stage._ensure().stop() self._log_message("Stop requested.") except Exception as e: self._log_message(f"Stop failed: {e}") def _on_move_abs(self) -> None: if not self._stage: QMessageBox.warning(self, "Error", "Stage not activated.") return t = self._target_abs_edit.text().strip() if not t: QMessageBox.warning(self, "Error", "Enter a target position (mm).") return try: val = float(t) self._stage.move_to(val, blocking=False) self._log_message(f"Move to {val:.3f} mm …") self._poll_timer.start() except Exception as e: QMessageBox.critical(self, "Error", f"Move failed: {e}") self._log_message(f"Move failed: {e}") def _on_move_rel(self, sign: int) -> None: if not self._stage: QMessageBox.warning(self, "Error", "Stage not activated.") return try: step = float(self._step_rel_edit.text().strip()) except Exception: QMessageBox.warning(self, "Error", "Invalid step.") return try: cur = self._stage.get_position() or 0.0 tgt = cur + sign * step tgt = max(self._range_min, min(self._range_max, tgt)) self._stage.move_to(tgt, blocking=False) self._log_message(f"Move to {tgt:.3f} mm …") self._poll_timer.start() except Exception as e: QMessageBox.critical(self, "Error", f"Relative move failed: {e}") self._log_message(f"Relative move failed: {e}") # ------------------------------------------------------------------------- # Cleanup # -------------------------------------------------------------------------
[docs] def closeEvent(self, event): if self._stage: self._on_deactivate() super().closeEvent(event)
if __name__ == "__main__": import sys from PyQt5.QtWidgets import QApplication app = QApplication(sys.argv) window = GratingCompressorWindow() window.show() sys.exit(app.exec_())