Source code for dlab.diagnostics.ui.scans.m2_measurement_tab

from __future__ import annotations

import datetime
import time
from pathlib import Path

import numpy as np
from PIL import Image, PngImagePlugin

from PyQt5.QtCore import QObject, pyqtSignal, QThread
from PyQt5.QtWidgets import (
    QWidget,
    QHBoxLayout,
    QVBoxLayout,
    QLabel,
    QPushButton,
    QLineEdit,
    QComboBox,
    QDoubleSpinBox,
    QGroupBox,
    QMessageBox,
    QCheckBox,
    QProgressBar,
)

from dlab.core.device_registry import REGISTRY
from dlab.utils.log_panel import LogPanel
from dlab.utils.paths_utils import data_dir


# -----------------------------------------------------------------------------
# Worker thread
# -----------------------------------------------------------------------------


[docs] class M2Worker(QObject): """Worker for M² measurement scan.""" progress = pyqtSignal(int, int) log = pyqtSignal(str) finished = pyqtSignal(str) def __init__( self, stage_key: str, camera_key: str, positions: list[float], settle_s: float, comment: str, scan_name: str, averages: int = 1, background: bool = False, existing_scan_log: str | None = None, parent: QObject | None = None, ) -> None: super().__init__(parent) self.stage_key = stage_key self.camera_key = camera_key self.positions = list(float(p) for p in positions) self.settle_s = float(settle_s) self.comment = comment self.scan_name = scan_name self.averages = int(max(1, averages)) self.background = bool(background) self.abort = False self.existing_scan_log = existing_scan_log def _emit(self, msg: str) -> None: self.log.emit(msg) def _save_png_with_meta( self, folder: Path, filename: str, frame_u8: np.ndarray, meta: dict ) -> Path: """Save an 8-bit grayscale PNG with metadata.""" folder.mkdir(parents=True, exist_ok=True) path = folder / filename f8 = np.asarray(frame_u8, dtype=np.uint8, copy=False) img = Image.fromarray(f8, mode="L") pnginfo = PngImagePlugin.PngInfo() for k, v in meta.items(): pnginfo.add_text(str(k), str(v)) img.save(path.as_posix(), format="PNG", pnginfo=pnginfo) return path
[docs] def run(self) -> None: stage = REGISTRY.get(self.stage_key) camwin = REGISTRY.get(self.camera_key) if stage is None: self._emit(f"Stage '{self.stage_key}' not found in registry.") self.finished.emit("") return if camwin is None: self._emit(f"Camera '{self.camera_key}' not found in registry.") self.finished.emit("") return if not hasattr(camwin, "grab_frame_for_scan"): self._emit("Camera window does not expose grab_frame_for_scan().") self.finished.emit("") return now = datetime.datetime.now() root = data_dir() scan_dir = root / f"{now:%Y-%m-%d}" / "Scans" / self.scan_name scan_dir.mkdir(parents=True, exist_ok=True) # Create or use existing scan log if self.existing_scan_log: scan_log = Path(self.existing_scan_log) if not scan_log.exists(): with open(scan_log, "w", encoding="utf-8") as lf: lf.write("ImageFile\tStageKey\tPosition\tExposure_us\n") lf.write(f"# {self.comment}\n") step = (self.positions[1] - self.positions[0]) if len(self.positions) > 1 else 0.0 lf.write(f"# Start={self.positions[0]:.6f}; End={self.positions[-1]:.6f}; Step={step:.6f}\n") else: date_str = f"{now:%Y-%m-%d}" idx = 1 while True: candidate = scan_dir / f"{self.scan_name}_log_{date_str}_{idx}.log" if not candidate.exists(): break idx += 1 scan_log = candidate with open(scan_log, "w", encoding="utf-8") as lf: lf.write("ImageFile\tStageKey\tPosition\tExposure_us\n") lf.write(f"# {self.comment}\n") step = (self.positions[1] - self.positions[0]) if len(self.positions) > 1 else 0.0 lf.write(f"# Start={self.positions[0]:.6f}; End={self.positions[-1]:.6f}; Step={step:.6f}\n") # Background capture if self.background: try: try: pos = float(stage.get_position()) except Exception: pos = 0.0 frame_u8, meta = camwin.grab_frame_for_scan( averages=1, dead_pixel_cleanup=False, background=True, force_roi=True, ) except Exception as e: self._emit(f"Background capture failed: {e}") self.finished.emit(scan_log.as_posix()) return cam_name = str(meta.get("CameraName", "DahengCam")).strip() or "DahengCam" exposure = int(meta.get("Exposure_us", 0)) cam_day = root / f"{now:%Y-%m-%d}" / cam_name cam_day.mkdir(parents=True, exist_ok=True) ts_ms = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") cam_fn = f"{cam_name}_Background_{ts_ms}.png" try: self._save_png_with_meta( cam_day, cam_fn, frame_u8, {"Exposure_us": exposure, "Gain": "", "Comment": self.comment}, ) except Exception as e: self._emit(f"Save background failed: {e}") self.finished.emit(scan_log.as_posix()) return try: with open(scan_log, "a", encoding="utf-8") as lf: lf.write(f"{cam_fn}\t{self.stage_key}\t{pos:.6f}\t{exposure}\n") except Exception as e: self._emit(f"Background log write failed: {e}") self._emit(f"Saved background {cam_fn} (exp {exposure} µs).") self.progress.emit(1, 1) self.finished.emit(scan_log.as_posix()) return # Main scan n = len(self.positions) for i, pos in enumerate(self.positions, 1): if self.abort: self._emit("Scan aborted.") break try: stage.move_to(float(pos), blocking=True) self._emit(f"Moved {self.stage_key} to {pos:.3f}.") except Exception as e: self._emit(f"Move to {pos:.3f} failed: {e}") self.progress.emit(i, n) continue time.sleep(self.settle_s) try: frame_u8, meta = camwin.grab_frame_for_scan( averages=self.averages, dead_pixel_cleanup=False, background=False, force_roi=True, ) except Exception as e: self._emit(f"Capture failed at {pos:.3f}: {e}") self.progress.emit(i, n) continue cam_name = str(meta.get("CameraName", "DahengCam")).strip() or "DahengCam" exposure = int(meta.get("Exposure_us", 0)) cam_day = root / f"{now:%Y-%m-%d}" / cam_name cam_day.mkdir(parents=True, exist_ok=True) ts_ms = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") cam_fn = f"{cam_name}_Image_{ts_ms}.png" try: self._save_png_with_meta( cam_day, cam_fn, frame_u8, {"Exposure_us": exposure, "Gain": "", "Comment": self.comment}, ) except Exception as e: self._emit(f"Save to camera folder failed at {pos:.3f}: {e}") self.progress.emit(i, n) continue try: with open(scan_log, "a", encoding="utf-8") as lf: lf.write(f"{cam_fn}\t{self.stage_key}\t{pos:.6f}\t{exposure}\n") except Exception as e: self._emit(f"Scan log write failed: {e}") self._emit(f"Saved {cam_fn} @ {pos:.3f} (exp {exposure} µs, avg {self.averages}).") self.progress.emit(i, n) self.finished.emit(scan_log.as_posix())
# ----------------------------------------------------------------------------- # M2Tab # -----------------------------------------------------------------------------
[docs] class M2Tab(QWidget): """Tab for M² beam measurement scan.""" def __init__( self, log_panel: LogPanel | None = None, parent: QWidget | None = None ) -> None: super().__init__(parent) self._log = log_panel self._thread: QThread | None = None self._worker: M2Worker | None = None self._last_params: dict | None = None self._doing_background = False self._last_scan_log_path: str | None = None self._init_ui() self._refresh_devices() def _init_ui(self) -> None: main = QVBoxLayout(self) # Devices group main.addWidget(self._create_devices_group()) # Parameters group main.addWidget(self._create_parameters_group()) # Options group main.addWidget(self._create_options_group()) # Comment row comment_row = QHBoxLayout() comment_row.addWidget(QLabel("Comment:")) self._comment_edit = QLineEdit("") comment_row.addWidget(self._comment_edit, 1) main.addLayout(comment_row) # Controls row main.addLayout(self._create_controls_row()) def _create_devices_group(self) -> QGroupBox: group = QGroupBox("Devices") layout = QHBoxLayout(group) layout.addWidget(QLabel("Stage:")) self._stage_combo = QComboBox() layout.addWidget(self._stage_combo, 1) layout.addWidget(QLabel("Camera:")) self._cam_combo = QComboBox() layout.addWidget(self._cam_combo, 1) btn_refresh = QPushButton("Refresh") btn_refresh.clicked.connect(self._refresh_devices) layout.addWidget(btn_refresh) return group def _create_parameters_group(self) -> QGroupBox: group = QGroupBox("Scan Parameters") layout = QHBoxLayout(group) layout.addWidget(QLabel("Start")) self._start_sb = QDoubleSpinBox() self._start_sb.setDecimals(3) self._start_sb.setRange(-1e6, 1e6) self._start_sb.setValue(10.0) layout.addWidget(self._start_sb) layout.addWidget(QLabel("End")) self._end_sb = QDoubleSpinBox() self._end_sb.setDecimals(3) self._end_sb.setRange(-1e6, 1e6) self._end_sb.setValue(16.0) layout.addWidget(self._end_sb) layout.addWidget(QLabel("Step")) self._step_sb = QDoubleSpinBox() self._step_sb.setDecimals(3) self._step_sb.setRange(1e-6, 1e6) self._step_sb.setValue(0.2) layout.addWidget(self._step_sb) layout.addWidget(QLabel("Settle (s)")) self._settle_sb = QDoubleSpinBox() self._settle_sb.setDecimals(2) self._settle_sb.setRange(0.0, 60.0) self._settle_sb.setValue(0.50) layout.addWidget(self._settle_sb) layout.addWidget(QLabel("Avg")) self._avg_sb = QDoubleSpinBox() self._avg_sb.setDecimals(0) self._avg_sb.setRange(1, 1000) self._avg_sb.setValue(1) layout.addWidget(self._avg_sb) return group def _create_options_group(self) -> QGroupBox: group = QGroupBox("Options") layout = QHBoxLayout(group) layout.addStretch(1) self._bg_checkbox = QCheckBox("Do background after scan") layout.addWidget(self._bg_checkbox) return group def _create_controls_row(self) -> QHBoxLayout: layout = QHBoxLayout() self._start_btn = QPushButton("Start") self._start_btn.clicked.connect(self._on_start) layout.addWidget(self._start_btn) self._abort_btn = QPushButton("Abort") self._abort_btn.setEnabled(False) self._abort_btn.clicked.connect(self._on_abort) layout.addWidget(self._abort_btn) self._progress = QProgressBar() self._progress.setMinimum(0) self._progress.setValue(0) layout.addWidget(self._progress, 1) return layout # ------------------------------------------------------------------------- # Logging # ------------------------------------------------------------------------- def _log_message(self, msg: str) -> None: if self._log: self._log.log(msg, source="M2Scan") # ------------------------------------------------------------------------- # Device management # ------------------------------------------------------------------------- def _refresh_devices(self) -> None: self._stage_combo.clear() for k in REGISTRY.keys("stage:"): if k.startswith("stage:serial:"): continue self._stage_combo.addItem(k) self._cam_combo.clear() for k in REGISTRY.keys("camera:daheng:"): if ":index:" in k: continue self._cam_combo.addItem(k) # ------------------------------------------------------------------------- # Scan control # ------------------------------------------------------------------------- def _on_start(self) -> None: try: stage_key = self._stage_combo.currentText().strip() cam_key = self._cam_combo.currentText().strip() if not stage_key or not cam_key: raise ValueError("Please select a stage and a camera.") a0 = float(self._start_sb.value()) a1 = float(self._end_sb.value()) step = float(self._step_sb.value()) if step <= 0: raise ValueError("Step must be > 0.") if a1 >= a0: nsteps = int(np.floor((a1 - a0) / step)) positions = [a0 + i * step for i in range(nsteps + 1)] if positions[-1] < a1 - 1e-12: positions.append(a1) else: nsteps = int(np.floor((a0 - a1) / step)) positions = [a0 - i * step for i in range(nsteps + 1)] if positions[-1] > a1 + 1e-12: positions.append(a1) settle = float(self._settle_sb.value()) avg = int(self._avg_sb.value()) comment = self._comment_edit.text() except Exception as e: QMessageBox.critical(self, "Invalid parameters", str(e)) return self._last_params = dict( stage_key=stage_key, cam_key=cam_key, positions=positions, settle=settle, avg=avg, scan_name="m_squared", comment=comment, ) self._doing_background = False self._last_scan_log_path = None self._launch_worker(background=False, existing_scan_log=None) self._log_message("Scan started…") def _launch_worker(self, background: bool, existing_scan_log: str | None) -> None: p = self._last_params if not p: return self._thread = QThread(self) self._worker = M2Worker( stage_key=p["stage_key"], camera_key=p["cam_key"], positions=p["positions"], settle_s=p["settle"], comment=p["comment"], scan_name=p["scan_name"], averages=p["avg"], background=background, existing_scan_log=existing_scan_log, ) self._worker.moveToThread(self._thread) self._thread.started.connect(self._worker.run) self._worker.log.connect(self._log_message) self._worker.progress.connect(self._on_progress) self._worker.finished.connect(self._on_finished) self._thread.finished.connect(self._thread.deleteLater) self._progress.setMaximum(len(p["positions"]) if not background else 1) self._progress.setValue(0) self._start_btn.setEnabled(False) self._abort_btn.setEnabled(True) self._thread.start() def _on_abort(self) -> None: if self._worker: self._worker.abort = True self._log_message("Abort requested.") self._abort_btn.setEnabled(False) def _on_progress(self, i: int, n: int) -> None: self._progress.setMaximum(n) self._progress.setValue(i) def _on_finished(self, log_path: str) -> None: if log_path: self._last_scan_log_path = log_path self._log_message(f"Scan finished. Log: {log_path}") else: self._log_message("Scan finished with errors.") # Background pass if self._last_params and self._bg_checkbox.isChecked() and not self._doing_background: self._doing_background = True reply = QMessageBox.information( self, "Background scan", "Please block the laser now, then click OK to record the background.", QMessageBox.Ok | QMessageBox.Cancel, QMessageBox.Ok, ) if reply == QMessageBox.Ok: self._log_message("Starting background pass…") if self._thread and self._thread.isRunning(): self._thread.quit() self._thread.wait() self._thread = None self._worker = None self._launch_worker(background=True, existing_scan_log=self._last_scan_log_path) return self._abort_btn.setEnabled(False) self._start_btn.setEnabled(True) if self._thread and self._thread.isRunning(): self._thread.quit() self._thread.wait() self._thread = None self._worker = None