from __future__ import annotations
import datetime
import time
from pathlib import Path
from typing import List
import numpy as np
from PIL import Image, PngImagePlugin
from PyQt5.QtCore import QObject, pyqtSignal, QThread, Qt
from PyQt5.QtWidgets import (
QWidget,
QVBoxLayout,
QHBoxLayout,
QLabel,
QPushButton,
QLineEdit,
QGroupBox,
QMessageBox,
QDoubleSpinBox,
QComboBox,
QProgressBar,
QCheckBox,
QSizePolicy,
QApplication,
)
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from dlab.core.device_registry import REGISTRY
from dlab.utils.log_panel import LogPanel
from dlab.utils.paths_utils import data_dir
# -----------------------------------------------------------------------------
# Helper functions
# -----------------------------------------------------------------------------
def _save_png_with_meta(folder: Path, filename: str, frame_u16: np.ndarray, meta: dict) -> Path:
"""Save a 16-bit PNG image with metadata."""
folder.mkdir(parents=True, exist_ok=True)
path = folder / filename
img = Image.fromarray(frame_u16, mode="I;16")
pnginfo = PngImagePlugin.PngInfo()
for k, v in meta.items():
pnginfo.add_text(str(k), str(v))
img.save(path.as_posix(), format="PNG", pnginfo=pnginfo)
return path
# -----------------------------------------------------------------------------
# Worker thread
# -----------------------------------------------------------------------------
[docs]
class GCWorker(QObject):
"""Worker for grating compressor scan."""
progress = pyqtSignal(int, int)
log = pyqtSignal(str)
finished = pyqtSignal(str)
live_point = pyqtSignal(float, float)
best_ready = pyqtSignal(float, float)
def __init__(
self,
stage_key: str,
andor_key: str,
positions: list[float],
exposure_us: int,
averages: int,
settle_s: float,
comment: str,
mcp_voltage: str,
do_background: bool = False,
existing_scan_log: str | None = None,
parent: QObject | None = None,
) -> None:
super().__init__(parent)
self.stage_key = stage_key
self.andor_key = andor_key
self.positions = list(float(p) for p in positions)
self.exposure_us = int(exposure_us)
self.averages = int(max(1, averages))
self.settle_s = float(settle_s)
self.scan_name = "grating_compressor"
self.comment = comment
self.mcp_voltage = str(mcp_voltage)
self.do_background = bool(do_background)
self.existing_scan_log = existing_scan_log
self.abort = False
self._best_sum = float("-inf")
self._best_pos = None
def _emit(self, msg: str) -> None:
self.log.emit(msg)
def _open_or_create_scan_log(self, root: Path, now: datetime.datetime) -> Path:
if self.existing_scan_log:
scan_log = Path(self.existing_scan_log)
if not scan_log.exists():
scan_log.parent.mkdir(parents=True, exist_ok=True)
with open(scan_log, "w", encoding="utf-8") as f:
f.write("ImageFile\tStageKey\tPosition_mm\tExposure_us\tAverages\tMCP_Voltage\n")
f.write(f"# {self.comment}\n")
return scan_log
scan_dir = root / f"{now:%Y-%m-%d}" / "Scans" / self.scan_name
scan_dir.mkdir(parents=True, exist_ok=True)
date_str = now.strftime("%Y-%m-%d")
idx = 1
while True:
candidate = scan_dir / f"{self.scan_name}_log_{date_str}_{idx}.log"
if not candidate.exists():
break
idx += 1
scan_log = candidate
with open(scan_log, "w", encoding="utf-8") as f:
f.write("ImageFile\tStageKey\tPosition_mm\tExposure_us\tAverages\tMCP_Voltage\n")
f.write(f"# {self.comment}\n")
return scan_log
[docs]
def run(self) -> None:
stage = REGISTRY.get(self.stage_key)
camwin = REGISTRY.get(self.andor_key)
if stage is None:
self._emit(f"Stage '{self.stage_key}' not found.")
self.finished.emit("")
return
if camwin is None:
self._emit(f"Camera '{self.andor_key}' not found.")
self.finished.emit("")
return
if not hasattr(camwin, "grab_frame_for_scan"):
self._emit("Selected camera does not expose grab_frame_for_scan().")
self.finished.emit("")
return
try:
if hasattr(camwin, "set_exposure_us"):
camwin.set_exposure_us(self.exposure_us)
elif hasattr(camwin, "setExposureUS"):
camwin.setExposureUS(self.exposure_us)
elif hasattr(camwin, "set_exposure"):
camwin.set_exposure(self.exposure_us)
except Exception as e:
self._emit(f"Warning: failed to preset exposure on '{self.andor_key}': {e}")
now = datetime.datetime.now()
root = data_dir()
scan_log = self._open_or_create_scan_log(root, now)
total = len(self.positions)
done = 0
for pos in self.positions:
if self.abort:
self._emit("Scan aborted.")
self.finished.emit(scan_log.as_posix())
return
# Move stage
try:
stage.move_to(float(pos), blocking=True)
self._emit(f"Moved {self.stage_key} to {pos:.3f} mm.")
except Exception as e:
self._emit(f"Move to {pos:.3f} failed: {e}")
done += 1
self.progress.emit(done, total)
continue
time.sleep(self.settle_s)
# Capture frame
try:
frame_u16, meta = camwin.grab_frame_for_scan(
averages=self.averages,
background=False,
dead_pixel_cleanup=True,
exposure_us=self.exposure_us,
)
except TypeError:
frame_u16, meta = camwin.grab_frame_for_scan(
averages=self.averages,
background=False,
dead_pixel_cleanup=True,
)
except Exception as e:
self._emit(f"Capture failed at {pos:.3f}: {e}")
done += 1
self.progress.emit(done, total)
continue
cam_name = str((meta or {}).get("CameraName", "AndorCam")).strip() or "AndorCam"
exp_meta = int((meta or {}).get("Exposure_us", self.exposure_us))
ts_ms = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
tag = "Image"
cam_day = root / f"{now:%Y-%m-%d}" / cam_name
cam_fn = f"{cam_name}_{tag}_{ts_ms}.png"
# Live metric: sum of pixels
try:
sum_counts = float(np.sum(frame_u16, dtype=np.uint64))
except Exception:
sum_counts = float(np.sum(frame_u16))
# Update best
if sum_counts > self._best_sum:
self._best_sum = sum_counts
self._best_pos = float(pos)
try:
_save_png_with_meta(
cam_day,
cam_fn,
frame_u16,
{"Exposure_us": exp_meta, "Gain": "", "Comment": self.comment},
)
except Exception as e:
self._emit(f"Save failed at {pos:.3f}: {e}")
done += 1
self.progress.emit(done, total)
continue
# Append scan log row
try:
with open(scan_log, "a", encoding="utf-8") as f:
f.write(
f"{cam_fn}\t{self.stage_key}\t{pos:.6f}\t{exp_meta}\t{self.averages}\t{self.mcp_voltage}\n"
)
except Exception as e:
self._emit(f"Scan log write failed: {e}")
# Emit live point
try:
self.live_point.emit(float(pos), float(sum_counts))
except Exception:
pass
self._emit(f"Saved {cam_fn} @ {pos:.3f} mm (exp {exp_meta} µs, avg {self.averages}).")
done += 1
self.progress.emit(done, total)
# Background capture
if self.do_background and not self.abort:
try:
frame_u16, meta = camwin.grab_frame_for_scan(
averages=self.averages,
background=True,
dead_pixel_cleanup=True,
exposure_us=self.exposure_us,
)
except TypeError:
frame_u16, meta = camwin.grab_frame_for_scan(
averages=self.averages,
background=True,
dead_pixel_cleanup=True,
)
except Exception as e:
self._emit(f"Background capture failed: {e}")
else:
cam_name = str((meta or {}).get("CameraName", "AndorCam")).strip() or "AndorCam"
exp_meta = int((meta or {}).get("Exposure_us", self.exposure_us))
ts_ms = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
tag = "Background"
cam_day = root / f"{now:%Y-%m-%d}" / cam_name
cam_fn = f"{cam_name}_{tag}_{ts_ms}.png"
try:
_save_png_with_meta(
cam_day,
cam_fn,
frame_u16,
{"Exposure_us": exp_meta, "Gain": "", "Comment": self.comment},
)
with open(scan_log, "a", encoding="utf-8") as f:
f.write(
f"{cam_fn}\t{self.stage_key}\tBG\t{exp_meta}\t{self.averages}\t{self.mcp_voltage}\n"
)
self._emit(f"Saved background: {cam_fn}")
except Exception as e:
self._emit(f"Background save/log failed: {e}")
try:
if self._best_pos is not None and np.isfinite(self._best_sum):
self.best_ready.emit(self._best_pos, float(self._best_sum))
except Exception:
pass
self.finished.emit(scan_log.as_posix())
# -----------------------------------------------------------------------------
# Live view window
# -----------------------------------------------------------------------------
[docs]
class GCScanLiveView(QWidget):
"""Live view window showing sum vs position during scan."""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(None)
self.setWindowTitle("GC Scan — Live View (Sum vs Position)")
self.setWindowFlags(self.windowFlags() | Qt.Window)
self.setWindowModality(Qt.NonModal)
self.setMinimumSize(700, 420)
self.resize(880, 520)
self._xs: List[float] = []
self._ys: List[float] = []
self._init_ui()
self._center_on_screen()
def _init_ui(self) -> None:
layout = QVBoxLayout(self)
self._figure = Figure(figsize=(8, 5))
self._figure.subplots_adjust(left=0.12, right=0.98, top=0.92, bottom=0.12)
self._canvas = FigureCanvas(self._figure)
self._canvas.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
layout.addWidget(self._canvas)
self._ax = self._figure.add_subplot(111)
self._ax.set_xlabel("Position (mm)")
self._ax.set_ylabel("Sum of pixels (a.u.)")
self._ax.grid(True, which="both", linestyle="--", alpha=0.3)
(self._line,) = self._ax.plot([], [], linestyle="-")
self._canvas.draw_idle()
def _center_on_screen(self) -> None:
try:
screen_geo = QApplication.primaryScreen().availableGeometry()
geo = self.frameGeometry()
geo.moveCenter(screen_geo.center())
self.move(geo.topLeft())
except Exception:
pass
[docs]
def reset(self) -> None:
"""Clear all data points."""
self._xs.clear()
self._ys.clear()
self._line.set_data([], [])
self._ax.relim()
self._ax.autoscale_view()
self._canvas.draw_idle()
[docs]
def add_point(self, x: float, y: float) -> None:
"""Add a data point to the plot."""
self._xs.append(x)
self._ys.append(y)
self._line.set_data(self._xs, self._ys)
self._ax.relim()
self._ax.autoscale_view()
self._canvas.draw_idle()
# -----------------------------------------------------------------------------
# GCScanTab
# -----------------------------------------------------------------------------
[docs]
class GCScanTab(QWidget):
"""Tab for grating compressor scan."""
def __init__(
self, log_panel: LogPanel | None = None, parent: QWidget | None = None
) -> None:
super().__init__(parent)
self._log = log_panel
self._thread: QThread | None = None
self._worker: GCWorker | None = None
self._last_scan_log_path: str | None = None
self._live_view: GCScanLiveView | None = None
self._init_ui()
self._refresh_devices()
def _init_ui(self) -> None:
main = QVBoxLayout(self)
# Devices group
main.addWidget(self._create_devices_group())
# Parameters group
main.addWidget(self._create_parameters_group())
# Options group
main.addWidget(self._create_options_group())
# Controls row
main.addLayout(self._create_controls_row())
def _create_devices_group(self) -> QGroupBox:
group = QGroupBox("Devices")
layout = QHBoxLayout(group)
layout.addWidget(QLabel("Stage:"))
self._stage_label = QLabel("stage:zaber:grating_compressor")
layout.addWidget(self._stage_label, 1)
layout.addWidget(QLabel("Andor camera:"))
self._cam_combo = QComboBox()
layout.addWidget(self._cam_combo, 1)
btn_refresh = QPushButton("Refresh")
btn_refresh.clicked.connect(self._refresh_devices)
layout.addWidget(btn_refresh)
return group
def _create_parameters_group(self) -> QGroupBox:
group = QGroupBox("Scan Parameters")
layout = QHBoxLayout(group)
layout.addWidget(QLabel("Start (mm)"))
self._start_sb = QDoubleSpinBox()
self._start_sb.setDecimals(3)
self._start_sb.setRange(-1e6, 1e6)
self._start_sb.setValue(24.0)
layout.addWidget(self._start_sb)
layout.addWidget(QLabel("End (mm)"))
self._end_sb = QDoubleSpinBox()
self._end_sb.setDecimals(3)
self._end_sb.setRange(-1e6, 1e6)
self._end_sb.setValue(25.0)
layout.addWidget(self._end_sb)
layout.addWidget(QLabel("Step (mm)"))
self._step_sb = QDoubleSpinBox()
self._step_sb.setDecimals(3)
self._step_sb.setRange(1e-6, 1e6)
self._step_sb.setValue(0.005)
layout.addWidget(self._step_sb)
layout.addWidget(QLabel("Settle (s)"))
self._settle_sb = QDoubleSpinBox()
self._settle_sb.setDecimals(2)
self._settle_sb.setRange(0.0, 60.0)
self._settle_sb.setValue(0.30)
layout.addWidget(self._settle_sb)
layout.addWidget(QLabel("Exposure (µs)"))
self._exp_sb = QDoubleSpinBox()
self._exp_sb.setDecimals(0)
self._exp_sb.setRange(1, 5_000_000)
self._exp_sb.setValue(3000000)
layout.addWidget(self._exp_sb)
layout.addWidget(QLabel("Averages"))
self._avg_sb = QDoubleSpinBox()
self._avg_sb.setDecimals(0)
self._avg_sb.setRange(1, 1000)
self._avg_sb.setValue(1)
layout.addWidget(self._avg_sb)
layout.addWidget(QLabel("MCP Voltage"))
self._mcp_edit = QLineEdit("")
layout.addWidget(self._mcp_edit)
layout.addWidget(QLabel("Comment"))
self._comment_edit = QLineEdit("")
layout.addWidget(self._comment_edit, 1)
return group
def _create_options_group(self) -> QGroupBox:
group = QGroupBox("Options")
layout = QHBoxLayout(group)
self._bg_checkbox = QCheckBox("Record background after scan")
self._bg_checkbox.setChecked(False)
layout.addWidget(self._bg_checkbox)
layout.addStretch(1)
return group
def _create_controls_row(self) -> QHBoxLayout:
layout = QHBoxLayout()
self._start_btn = QPushButton("Start")
self._start_btn.clicked.connect(self._on_start)
layout.addWidget(self._start_btn)
self._abort_btn = QPushButton("Abort")
self._abort_btn.setEnabled(False)
self._abort_btn.clicked.connect(self._on_abort)
layout.addWidget(self._abort_btn)
self._live_btn = QPushButton("Live View")
self._live_btn.setCheckable(True)
self._live_btn.toggled.connect(self._on_toggle_live_view)
layout.addWidget(self._live_btn)
self._progress = QProgressBar()
self._progress.setMinimum(0)
self._progress.setValue(0)
layout.addWidget(self._progress, 1)
return layout
# -------------------------------------------------------------------------
# Logging
# -------------------------------------------------------------------------
def _log_message(self, msg: str) -> None:
if self._log:
self._log.log(msg, source="GCScan")
# -------------------------------------------------------------------------
# Device management
# -------------------------------------------------------------------------
def _refresh_devices(self) -> None:
self._cam_combo.clear()
for k in REGISTRY.keys("camera:andor:"):
if ":index:" in k:
continue
self._cam_combo.addItem(k)
# -------------------------------------------------------------------------
# Scan control
# -------------------------------------------------------------------------
def _positions(self, a0: float, a1: float, step: float) -> List[float]:
"""Generate list of positions for the scan."""
if step <= 0:
raise ValueError("Step must be > 0.")
if a1 >= a0:
n = int(np.floor((a1 - a0) / step))
pos = [a0 + i * step for i in range(n + 1)]
if pos[-1] < a1 - 1e-12:
pos.append(a1)
else:
n = int(np.floor((a0 - a1) / step))
pos = [a0 - i * step for i in range(n + 1)]
if pos[-1] > a1 + 1e-12:
pos.append(a1)
return pos
def _on_start(self) -> None:
try:
stage_key = "stage:zaber:grating_compressor"
if REGISTRY.get(stage_key) is None:
raise ValueError(
"Grating compressor stage not found in registry. "
"Open the Grating Compressor window and Activate it first."
)
cam_key = self._cam_combo.currentText().strip()
if not cam_key:
raise ValueError("Select an Andor camera.")
start = float(self._start_sb.value())
end = float(self._end_sb.value())
step = float(self._step_sb.value())
pos = self._positions(start, end, step)
settle = float(self._settle_sb.value())
expo = int(self._exp_sb.value())
avg = int(self._avg_sb.value())
comment = self._comment_edit.text()
mcp_voltage = self._mcp_edit.text()
except Exception as e:
QMessageBox.critical(self, "Invalid parameters", str(e))
return
# Create thread/worker
self._thread = QThread(self)
self._worker = GCWorker(
stage_key=stage_key,
andor_key=cam_key,
positions=pos,
exposure_us=expo,
averages=avg,
settle_s=settle,
comment=comment,
mcp_voltage=mcp_voltage,
do_background=False,
existing_scan_log=None,
)
# Connect live stream if viewer is open
if self._live_view:
self._live_view.reset()
self._worker.live_point.connect(self._live_view.add_point)
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run)
self._worker.log.connect(self._log_message)
self._worker.progress.connect(self._on_progress)
self._worker.finished.connect(self._on_finished)
self._thread.finished.connect(self._thread.deleteLater)
self._progress.setMaximum(len(pos))
self._progress.setValue(0)
self._start_btn.setEnabled(False)
self._abort_btn.setEnabled(True)
self._thread.start()
self._log_message("Grating compressor scan started…")
def _on_abort(self) -> None:
if self._worker:
self._worker.abort = True
self._log_message("Abort requested.")
self._abort_btn.setEnabled(False)
def _on_progress(self, i: int, n: int) -> None:
self._progress.setMaximum(n)
self._progress.setValue(i)
def _on_finished(self, log_path: str) -> None:
if log_path:
self._last_scan_log_path = log_path
self._log_message(f"Scan finished. Log: {log_path}")
else:
self._log_message("Scan finished with errors or aborted.")
self._last_scan_log_path = None
# Optional background capture after scan
if self._bg_checkbox.isChecked() and self._last_scan_log_path:
reply = QMessageBox.information(
self,
"Background",
"Block the beam, then click OK to record one background image.",
QMessageBox.Ok | QMessageBox.Cancel,
QMessageBox.Ok,
)
if reply == QMessageBox.Ok and (not self._worker or not self._worker.abort):
self._capture_background()
self._abort_btn.setEnabled(False)
self._start_btn.setEnabled(True)
if self._thread and self._thread.isRunning():
self._thread.quit()
self._thread.wait()
self._thread = None
self._worker = None
def _capture_background(self) -> None:
"""Capture background image after scan."""
try:
cam_key = self._cam_combo.currentText().strip()
stage_key = "stage:zaber:grating_compressor"
self._log_message("Recording background…")
worker = GCWorker(
stage_key=stage_key,
andor_key=cam_key,
positions=[],
exposure_us=int(self._exp_sb.value()),
averages=int(self._avg_sb.value()),
settle_s=0.0,
comment=self._comment_edit.text(),
mcp_voltage=self._mcp_edit.text(),
do_background=True,
existing_scan_log=self._last_scan_log_path,
)
thread = QThread(self)
worker.moveToThread(thread)
thread.started.connect(worker.run)
worker.log.connect(self._log_message)
def _bg_done(_path: str) -> None:
self._log_message("Background captured.")
thread.quit()
thread.wait()
worker.finished.connect(_bg_done)
thread.start()
except Exception as e:
self._log_message(f"Background failed: {e}")
# -------------------------------------------------------------------------
# Live view
# -------------------------------------------------------------------------
def _on_toggle_live_view(self, checked: bool) -> None:
if checked:
if self._live_view is None:
self._live_view = GCScanLiveView(None)
self._live_view.reset()
self._live_view._center_on_screen()
self._live_view.show()
self._live_view.raise_()
self._live_view.activateWindow()
else:
if self._live_view:
self._live_view.hide()