Source code for lvmcam.camera

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Richard J. Mathar (mathar@mpia.de)
# @Filename: camera.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

# Since the aravis wrapper for GenICam cameras (such as the Blackfly)
# is using glib2 GObjects to represent cameras and streams, the
# PyGObject module allows to call the C functions of aravis in python.
# https://pygobject.readthedocs.io/en/latest/

from __future__ import annotations

import abc
import asyncio
import math
from collections import namedtuple
from logging import DEBUG

from typing import Any

import gi
import numpy

from basecam import BaseCamera, CameraEvent, CameraSystem, ExposureError
from basecam.mixins import CoolerMixIn, ExposureTypeMixIn, ImageAreaMixIn
from sdsstools.logger import StreamFormatter

from lvmcam import __version__ as lvmcam_version


gi.require_version("Aravis", "0.8")
from gi.repository import Aravis  # type: ignore  # noqa: E402


__all__ = ["BlackflyCameraSystem", "BlackflyCamera"]


class GainMixIn(object, metaclass=abc.ABCMeta):
    """A mixin that provides manual control over the camera gain."""

    @abc.abstractmethod
    async def _set_gain_internal(self, gain):
        """Internal method to set the gain."""

        raise NotImplementedError

    @abc.abstractmethod
    async def _get_gain_internal(self):
        """Internal method to get the gain."""

        raise NotImplementedError

    async def set_gain(self, gain):
        """Seta the  gain of the camera."""

        return await self._set_gain_internal(gain)

    async def get_gain(self):
        """Gets the gain of the camera."""

        return await self._get_gain_internal()


[docs] class BlackflyCameraSystem(CameraSystem): """A collection of GenICam cameras, possibly online. Parameters ---------- camera_class : `.BaseCamera` subclass The subclass of `.BaseCamera` to use with this camera system. camera_config : dict or path A dictionary with the configuration parameters for the multiple cameras that can be present in the system, or the path to a YAML file. Refer to the documentation for details on the accepted format. include : list List of camera UIDs that can be connected. exclude : list List of camera UIDs that will be ignored. logger : ~logging.Logger The logger instance to use. If `None`, a new logger will be created. log_header : str A string to be prefixed to each message logged. log_file : str The path to which to log. verbose : bool Whether to log to stdout. """ ## A list of ip addresses in the usual "xxx.yyy.zzz.ttt" or "name.subnet.net" ## format that have been added manually/explicitly and may not be found by the ## usual broadcase auto-detection (i.e., possibly on some other global network). # ips_nonlocal = [] def __init__( self, camera_class=None, camera_config=None, include=None, exclude=None, logger=None, log_header=None, log_file=None, verbose=False, ): super().__init__( camera_class=camera_class, camera_config=camera_config, include=include, exclude=exclude, logger=logger, log_header=log_header, log_file=log_file, verbose=verbose, )
[docs] def list_available_cameras(self): return self.cameras
@property def __version__(self): """Returns the version.""" return lvmcam_version
Point = namedtuple("Point", ["x0", "y0"]) Size = namedtuple("Size", ["wd", "ht"]) Rect = namedtuple("Rect", ["x0", "y0", "wd", "ht"])
[docs] class BlackflyCamera( BaseCamera, ExposureTypeMixIn, ImageAreaMixIn, CoolerMixIn, GainMixIn, ): """A FLIR (formerly Point Grey Research) Blackfly camera. Given the pixel scale on the benches of LVMi and the assumption of 9 um pixel sizes of the LVMi cameras, we assume that the cameras have roughly 1 arsec per pixel, so they are used without binning. In addition we let the camera flip the standard image orientation of the data values assuming that values are stored into a FITS interface (where the first values in the sequential data are the bottom row). So this is not done in this python code but by the camera. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.logger.sh.setLevel(DEBUG) self.logger.sh.formatter = StreamFormatter( fmt="%(asctime)s %(name)s %(levelname)s " "%(filename)s:%(lineno)d: \033[1m%(message)s\033[21m" ) self.actor = self.camera_params.get("actor", None) self.gain = -1 self.binning = [-1, -1] self.cam: Any = None self.cam_type = "unknown" self.temperature = -1 self.camera_state: str = "idle" self.detector_size = Size(-1, -1) self.region_bounds = Size(-1, -1) self.image_area = Rect(-1, -1, -1, -1) self.pixsize = self.camera_params.get("pixsize", 0.0) self.flen = self.camera_params.get("flen", 0.0) self.telescope = self.camera_params.get("telescope", "NA") # pixel scale per arcseconds is focal length *pi/180 /3600 # = flen * mm *pi/180 /3600 # = flen * um *pi/180 /3.6, so in microns per arcsec... self.pixscale = math.radians(self.flen) / 3.6 self.arcsec_per_pix = self.pixsize / self.pixscale self.log(f"arcsec_per_pix {self.arcsec_per_pix}") # degrees per pixel is arcseconds per pixel/3600 = (mu/pix)/(mu/arcsec)/3600 self.degperpix = self.pixsize / self.pixscale / 3600.0 async def _connect_internal(self, **kwargs): """Connect to a camera and upload basic binning and ROI parameters.""" self.logger.debug(f"connect {self.name} {self.uid}") ip = self.camera_params.get("ip") self.logger.debug(f"{ip}") self.cam = Aravis.Camera.new(ip) self.cam_type = self.cam.get_model_name() self.logger.debug(f"{self.cam_type}") self.logger.debug(f"Pixsize: {self.pixsize}") self.arcsec_per_pix = self.pixsize / self.pixscale self.degperpix = self.arcsec_per_pix / 3600.0 self.logger.debug(f"{self.cam.get_binning()}") try: await self.set_binning(1, 1) except Exception as ex: self.logger.warning(f"{ex}") await asyncio.sleep(5.0) self.logger.warning(f"{ex}") await self.set_binning(1, 1) self.detector_size = Size( self.cam.get_width_bounds().max, self.cam.get_height_bounds().max, ) self.logger.debug(f"{self.detector_size}") # search for an optional gain key in the arguments # todo: one could interpret gain=0 here as to call set_gain_auto(ARV_AUTO_ON) await self.set_gain(self.camera_params.get("gain", 0)) # search for an optional x and y binning factor, # fullframe image area will be set automatically with the binning. await self.set_binning(*self.camera_params.get("binning", [1, 1])) self.logger.debug(f"{self.image_area}") # see arvenums.h for the list of pixel formats. This is MONO_16 here, always self.cam.set_pixel_format(0x01100007) # scan the general list of genicam featured values # of the four native types for typp, arvLst in self.camera_params.get("genicam_params", {}).items(): if arvLst is not None: if typp == "bool": for genkey, genval in arvLst.items(): try: if self.cam.get_boolean(genkey) != genval: self.cam.set_boolean(genkey, int(genval)) self.logger.debug(f"genicam param : {genkey}={genval}") except Exception as ex: self.logger.error(f"failed setting: {genkey}={genval} {ex}") elif typp == "int": for genkey, genval in arvLst.items(): try: self.cam.set_integer(genkey, genval) self.logger.debug(f"genicam param : {genkey}={genval}") except Exception as ex: self.logger.error(f"failed setting: {genkey}={genval} {ex}") elif typp == "float": for genkey, genval in arvLst.items(): try: self.cam.set_float(genkey, genval) self.logger.debug(f"genicam param : {genkey}={genval}") except Exception as ex: self.logger.error(f"failed setting: {genkey}={genval} {ex}") elif typp == "string": for genkey, genval in arvLst.items(): try: self.logger.debug(f"genicam param : {genkey}={genval}") self.cam.set_string(genkey, genval) except Exception as ex: self.logger.error(f"failed setting: {genkey}={genval} {ex}") async def _disconnect_internal(self): """Close connection to camera.""" if self.cam is not None: self.cam.stop_acquisition() del self.cam self.cam = None async def _expose_grabFrame(self, exposure, nretries=3): """Read a single unbinned full frame. The class splits the parent class' exposure into this function and the part which generates the FITS file, because applications in guiders are usually only interested in the frame's data, and would not take the detour of generating a FITS file and reading it back from disk. Parameters ---------- exposure On entry, exposure.exptim is the intended exposure time in [sec] On exit, exposure.data is the numpy array of the 16bit data arranged in FITS order (i.e., the data of the bottom row appear first...) nretries Number of retries left. Sometimes the camera receives an empty buffer immediately after the acquisition begins; in this case we retry taking the image. Returns ------- frame The dictionary with the window location and size (x=,y=,width=,height=) """ # To avoid being left over by other programs with no change # to set the exposure time, we switch the auto=0=off first self.cam.set_exposure_time_auto(0) # Aravis assumes exptime in micro second integers exptime_ms = int(0.5 + exposure.exptime * 1e6) self.cam.set_exposure_time(exptime_ms) # timeout (factor 2: assuming there may be two frames in auto mode taken # internally) # And 5 seconds margin for any sort of transmission overhead over PoE tout_ms = int(1.0e6 * (2.0 * exposure.exptime + 5)) self.notify(CameraEvent.EXPOSURE_INTEGRATING) # Wait a tiny bit for setting to take effect. This probably does not help # with the cases when the camera returns and empty buffer but ... await asyncio.sleep(0.1) # the buffer allocated/created within the acquisition() buf = await self.loop.run_in_executor(None, self.cam.acquisition, tout_ms) if buf is None: raise ExposureError( f"Exposing for {exposure.exptime} s failed. " f"Timed out {tout_ms / 1.0e6}." ) roi = buf.get_image_region() if roi.width != self.image_area.wd or roi.height != self.image_area.ht: if nretries > 0: self.logger.warning( f"Camera {self.name} returned an empty buffer. Retrying." ) await asyncio.sleep(1) return await self._expose_grabFrame(exposure, nretries=nretries - 1) else: raise ExposureError(f"Camera {self.name} returned an empty buffer.") return buf.get_data(), roi
[docs] async def expose(self, *args, **kwargs): exposure = await super().expose(*args, **kwargs) self.camera_state = "idle" return exposure
async def _expose_internal(self, exposure, **kwargs): """Read a single full frame and store in a FITS file. Parameters ---------- exposure On entry exposure.exptim is the intended exposure time in [sec] On exit, exposure.data contains the 16bit data of a single frame. """ # fill exposure.data with the frame's 16bit data # reg becomes a x=, y=, width= height= dictionary # these are in standard X11 coordinates where upper left =(0,0) try: self.camera_state = "exposing" data, roi = await self._expose_grabFrame(exposure) self.logger.debug(f"{roi} {self.image_area}") exposure.data = numpy.ndarray( buffer=data, dtype=numpy.uint16, shape=(self.image_area.ht, self.image_area.wd), ) self.temperature = await self.get_temperature() except Exception as err: # Sometimes the camera gets into read-only mode. In those cases # reconnecting seems to work. If that fails again, raise the error # but wrap it as an ExposureError so that the other camera does # not stop exposing. self.logger.warning(f"Camera replied with error: {err}") self.logger.warning("Will reconnect the camera and try again.") try: await self.reconnect() data, roi = await self._expose_grabFrame(exposure) except Exception as err: raise ExposureError(f"Camera failed to expose with error: {err}") finally: self.camera_state = "idle"
[docs] async def reconnect(self): """Reconnects the camera.""" await asyncio.wait_for(self.disconnect(), timeout=3) await asyncio.wait_for(self.connect(force=True), timeout=3)
def _status_internal(self): return { "camera_state": self.camera_state, "temperature": self.cam.get_float("DeviceTemperature"), "cooler": math.nan, } async def _get_binning_internal(self): return list(self.cam.get_binning()) async def _set_binning_internal(self, hbin, vbin): # search for an optional x and y binning factor try: self.logger.debug(f"set binning: {hbin} {vbin}") self.cam.set_binning(hbin, vbin) self.binning = [hbin, vbin] except Exception as ex: # horizontal and vertical binning set to 1 self.logger.error(f"failed to set binning: {[hbin, vbin]} {ex}") await self._set_image_area_internal() async def _get_image_area_internal(self): self.region_bounds = Size( self.cam.get_width_bounds().max, self.cam.get_height_bounds().max, ) return self.region_bounds async def _set_image_area_internal(self, area=None): if area: self.logger.warning("image area only with fullframe") return await self._get_image_area_internal() self.cam.set_region(0, 0, *self.region_bounds) self.image_area = Rect(0, 0, *self.region_bounds) # x0, y0, width, height async def _get_temperature_internal(self): self.temperature = self.cam.get_float("DeviceTemperature") return self.temperature async def _set_temperature_internal(self, temperature): self.logger.warning("temperature setting not possible") async def _set_gain_internal(self, gain): """Internal method to set the gain.""" try: self.logger.debug(f"set gain: {self.camera_params.get('gain', None)}") if gain == 0.0: self.cam.set_gain_auto(1) else: self.cam.set_gain_auto(0) mn, mx = self.cam.get_gain_bounds() self.cam.set_gain(max(min(mx, gain), mn)) self.gain = gain except Exception as ex: self.logger.error(f"failed to set gain: {gain} {ex}") async def _get_gain_internal(self): """Internal method to get the gain.""" return self.camera_params.get("gain", None)