Source code for dlab.hardware.wrappers.andor_controller

from __future__ import annotations

import logging

import numpy as np
from pylablib.devices import Andor
import pylablib

from dlab.utils.config_utils import cfg_get


_log = logging.getLogger(__name__)

DEFAULT_EXPOSURE_US = 500_000
MIN_EXPOSURE_US = 1_000
MAX_EXPOSURE_US = 10_000_000


[docs] class AndorControllerError(Exception): """Raised for Andor camera operation errors."""
[docs] class AndorController: """Controller for an Andor SDK2 camera via pylablib.""" def __init__(self, device_index: int = 0) -> None: self.device_index = device_index self._cam: Andor.AndorSDK2Camera | None = None self._image_shape: tuple[int, ...] | None = None self._current_exposure: int | None = None
[docs] def is_active(self) -> bool: """Check if camera is activated.""" return self._cam is not None
[docs] def activate(self) -> None: """Initialize and configure the camera.""" if self._cam is not None: return try: driver_path = cfg_get("paths.drivers_andor", "src/dlab/hardware/drivers/andor_driver") pylablib.par["devices/dlls/andor_sdk2"] = driver_path cam = Andor.AndorSDK2Camera() exp_us = self._clamp_exposure(DEFAULT_EXPOSURE_US) cam.set_exposure(exp_us / 1e6) try: cam.setup_shutter("open") except Exception: pass cam.start_acquisition() try: cam.wait_for_frame(timeout=20) frame = cam.read_oldest_image() finally: cam.stop_acquisition() self._cam = cam self._image_shape = np.shape(frame) self._current_exposure = exp_us _log.info( "Andor[%s] activated; shape=%s; exposure=%dus", self.device_index, self._image_shape, self._current_exposure ) except Exception as e: try: cam.close() except Exception: pass self._cam = None self._image_shape = None raise AndorControllerError(f"activate failed: {e}") from e
[docs] def deactivate(self) -> None: """Close the camera connection.""" if self._cam: try: self._cam.close() _log.info("Andor[%s] deactivated", self.device_index) except Exception as e: _log.error("Andor[%s] deactivate error: %s", self.device_index, e) finally: self._cam = None self._image_shape = None self._current_exposure = None
def __enter__(self) -> AndorController: self.activate() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.deactivate() def _clamp_exposure(self, exposure_us: int) -> int: if exposure_us < MIN_EXPOSURE_US or exposure_us > MAX_EXPOSURE_US: clamped = max(MIN_EXPOSURE_US, min(MAX_EXPOSURE_US, exposure_us)) _log.warning( "Andor[%s] exposure %dus out of range [%d..%d]; clamped to %dus", self.device_index, exposure_us, MIN_EXPOSURE_US, MAX_EXPOSURE_US, clamped ) return clamped return exposure_us
[docs] def set_exposure(self, exposure_us: int) -> None: """Set exposure time in microseconds.""" if not isinstance(exposure_us, int) or exposure_us <= 0: raise ValueError("exposure_us must be a positive integer") if self._cam is None: raise AndorControllerError("Camera not active; call activate() first") exposure_us = self._clamp_exposure(exposure_us) if self._current_exposure == exposure_us: return try: self._cam.set_exposure(exposure_us / 1e6) self._current_exposure = exposure_us _log.debug("Andor[%s] exposure set to %dus", self.device_index, exposure_us) except Exception as e: raise AndorControllerError(f"set_exposure failed: {e}") from e
[docs] def get_image_shape(self) -> tuple[int, ...]: """Return the image dimensions.""" if self._image_shape is None: raise AndorControllerError("Image shape unknown; call activate() first") return self._image_shape
@property def current_exposure(self) -> int | None: """Current exposure time in microseconds.""" return self._current_exposure
[docs] def capture_single(self, exposure_us: int | None = None, timeout_s: float = 20.0) -> np.ndarray: """Capture a single frame with optional exposure override.""" if self._cam is None or self._image_shape is None: raise AndorControllerError("Camera not active; call activate() first") if exposure_us is not None: self.set_exposure(int(exposure_us)) self._cam.start_acquisition() try: self._cam.wait_for_frame(timeout=timeout_s) frame = self._cam.read_oldest_image() finally: self._cam.stop_acquisition() return frame.astype(np.float64, copy=False)