Source code for lvmcam.actor.actor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2023-08-09
# @Filename: actor.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import pathlib
from copy import deepcopy

from typing import Any

from astropy.coordinates import EarthLocation
from astropy.utils.iers import conf

from basecam import ImageNamer
from basecam.actor import BaseCameraActor
from basecam.exposure import Exposure
from basecam.models import Extension, FITSModel, HeaderModel, basic_header_model
from clu import AMQPActor, Command
from clu.client import AMQPReply
from sdsstools import read_yaml_file

from lvmcam import __version__
from lvmcam.camera import BlackflyCamera, BlackflyCameraSystem
from lvmcam.models import CameraCards, GenicamCards, ScraperParamCards, WCSCards
from lvmcam.models.proc import ProcCards


conf.auto_download = False
conf.iers_degraded_accuracy = "ignore"


CWD = pathlib.Path(__file__).parents[1]
BASE_CAMERA_PARAMS = read_yaml_file(CWD / "etc/lvm.camera_param.agcam.yml")


[docs] def mergedicts(a, b): for key in b: if isinstance(a.get(key), dict) or isinstance(b.get(key), dict): mergedicts(a.get(key, {}), b.get(key, {})) else: a[key] = b[key] return a
[docs] def get_camera_class(config: dict): """Returns a camera class with the correct image namer and fits model.""" dirname = config.get("dirname", ".") basename = config.get("basename", "{camera.name}-{num:04d}.fits") header_model = basic_header_model header_model.append(CameraCards) header_model.append(GenicamCards()) header_model.append(ScraperParamCards()) header_model.append(WCSCards()) raw = Extension( data="raw", header_model=header_model, compressed="RICE_1", name="RAW", ) proc = Extension(data="none", header_model=HeaderModel([ProcCards]), name="PROC") class LVMCamera(BlackflyCamera): fits_model = FITSModel([raw, proc]) image_namer = ImageNamer(basename=basename, dirname=dirname) # LCO location = EarthLocation.from_geodetic( lon=-70.70166667, lat=-29.00333333, height=2282.0, ) async def _expose_internal(self, exposure, **kwargs): exposure.obstime.location = self.location return await super()._expose_internal(exposure, **kwargs) async def expose(self, *args, **kwargs) -> Exposure: return await super().expose(*args, **kwargs) return LVMCamera
[docs] class LVMCamActor(BaseCameraActor, AMQPActor): """The LVMCam actor.""" def __init__(self, *args, **kwargs): from lvmcam.actor.commands import camera_parser # Update camera config with default parameters. config = kwargs.get("config", {}) for cam in config["cameras"]: camera_config = config["cameras"][cam] config["cameras"][cam] = mergedicts(camera_config, BASE_CAMERA_PARAMS) camera_class = get_camera_class(config) self.camera_system = BlackflyCameraSystem(camera_class, config["cameras"]) super().__init__( self.camera_system, *args, version=__version__, command_parser=camera_parser, **kwargs, ) if self.model: self.model.schema["additionalProperties"] = True self.model.validator = self.model.VALIDATOR(self.model.schema) # Scraped data from actors. self.scraper_data: dict[str, Any] = {} self.add_reply_callback(self.parse_actor_reply)
[docs] async def start(self, **kwargs): """Starts the actor and adds cameras to the camera system.""" assert isinstance(self.camera_system._config, dict) for camera in self.camera_system._config: try: cam = await self.camera_system.add_camera(name=camera, actor=self) self.log.debug(f"Camera {camera} connected") cam.fits_model.context.update({"__actor__": self}) except Exception as ex: self.log.error(f"Camera {camera} failed connecting: {ex}") await super().start(**kwargs) for actor in self.config["scraper"]: await self.send_command( actor, "status", internal=True, await_command=False, ) return self
[docs] async def parse_actor_reply(self, reply: AMQPReply): """Parses replies from the actor system and updates scraped data.""" scraper_defs: dict[str, dict[str, str]] = self.config["scraper"] if reply.sender not in scraper_defs or len(reply.body) == 0: return # Flatten body of the reply by dot-joining deeper dictionaries. # Do this until there are no more dictionaries. body = deepcopy(reply.body) while True: if not any([isinstance(value, dict) for value in body.values()]): break new_keys = {} remove_keys: list[str] = [] for key, value in body.items(): if isinstance(value, dict): remove_keys.append(key) for skey, svalue in body[key].items(): new_keys[key + "." + skey] = svalue for key in remove_keys: body.pop(key) body.update(new_keys) for key in scraper_defs[reply.sender]: if key in body: scraper_key = scraper_defs[reply.sender][key] self.scraper_data[scraper_key] = body[key]
LVMCamCommand = Command[LVMCamActor]