Source code for atomiq.components.optoelectronics.camera

from __future__ import annotations

from atomiq.components.primitives import Triggerable, Switchable, Component

from artiq.experiment import kernel, rpc
from artiq.language.types import TStr, TFloat
from artiq.language.units import us


[docs] class Camera(Component): """A Camera This is an abstract class to represent a camera. A camera needs to have a configuration (exposure time, readout mode, binning, subarray, etc). Since cameras usually need quite some config options to be set, this class works with named configuration sets stored in a dict. An example looks like the following: .. code-block:: python config_dict = { "camera_usecase1" : { "option_xy": 2.3, "option_zy": "external", ... }, "camera_usecase2": { "option_xy": 2e-3, "option_zy": "internal", }, ... } Args: config_dict: Dictionary holding the configuration sets for the camera default_config: key from the config dict to use if none is specified in the :func:`configure` function """ kernel_invariants = {"config_dict", "default_config"} def __init__(self, config_dict: dict, default_config: TStr, *args, **kwargs): Component.__init__(self, *args, **kwargs) self.config_dict = config_dict self.default_config = default_config def _set_config(self, config: TStr): raise NotImplementedError("Implement _set_config() method in inherited class")
[docs] @rpc(flags={"async"}) def configure(self, config: TStr = ""): """ Configure the camera, i.e. set exposure time, readout mode, binning, etc. Named configuration options are available from the config_dict provided upon object creation. We load the one with the key given in config. If no key is given, load the default defined upon object creation. """ if config == "": config = self.default_config self._set_config(config)
[docs] @rpc(flags={"async"}) def arm(self): """ Arm the camera such that a subsequent call of the start() method can start the exposure immediately. Arming typically cannot meet realtime requirements. However, once armed, the camera can start exposure on a trigger in realtime """ raise NotImplementedError("Implement arm() method in inherited class")
[docs] @rpc(flags={"async"}) def start(self): """ Start exposure of the camera """ raise NotImplementedError("Implement start() method in inherited class")
[docs] @rpc(flags={"async"}) def stop(self): """ Stop exposure of the camera """ raise NotImplementedError("Implement stop() method in inherited class")
[docs] class TriggeredRPCCamera(Camera, Triggerable): """Camera that is configured via RPC but triggered via a TTL Args: rpc_camera: The ARTIQ RPC object that represents the camera. It needs to provide the RPC functions ``configure(config: dict)`` to configure the camera, ``start()`` to arm the camera (i.e. wait for trigger), and ``stop()`` to stop waiting for trigger or stop exposure. fire_ttl: The logic signal that triggers the exposure. fire_pulsetime: The time the logic signal remains high to trigger the camera (default 100us) """ kernel_invariants = {"camera", "fire_ttl", "fire_pulsetime"} def __init__(self, rpc_camera: Component, fire_ttl: Switchable, fire_pulsetime: TFloat = 100*us, *args, **kwargs): Camera.__init__(self, *args, **kwargs) Triggerable.__init__(self, ["trigger"]) self.camera = rpc_camera self.fire_ttl = fire_ttl self.fire_pulsetime = fire_pulsetime @rpc(flags={"async"}) def _set_config(self, config: TStr): if config in self.config_dict: metadata = {"seqTimestamp": float(self.experiment.identifier)*1e-6, "run_id": self.experiment.run_id, "run_timestamp": self.experiment.run_timestamp, "step_counter": self.experiment.step_counter, } self.camera.configure(identifier=int(self.experiment.identifier), configuration=self.config_dict[config], metadata=metadata)
[docs] @rpc(flags={"async"}) def arm(self): self.camera.start(int(self.experiment.identifier))
[docs] @rpc(flags={"async"}) def stop(self): self.camera.abort()
[docs] @kernel def fire(self, channel: TStr = ""): self.fire_ttl.pulse(self.fire_pulsetime)
[docs] @kernel def start(self): self.fire()