from __future__ import annotations
from atomiq.components.primitives import Triggerable, Switchable, Component
from artiq.experiment import kernel, rpc
from artiq.language.types import TStr, TFloat
from artiq.language.units import us
[docs]
class Camera(Component):
"""A Camera
This is an abstract class to represent a camera.
A camera needs to have a configuration (exposure time, readout mode, binning, subarray, etc). Since cameras usually
need quite some config options to be set, this class works with named configuration sets stored in a dict. An
example looks like the following:
.. code-block:: python
config_dict = {
"camera_usecase1" : {
"option_xy": 2.3,
"option_zy": "external",
...
},
"camera_usecase2": {
"option_xy": 2e-3,
"option_zy": "internal",
},
...
}
Args:
config_dict: Dictionary holding the configuration sets for the camera
default_config: key from the config dict to use if none is specified in the :func:`configure` function
"""
kernel_invariants = {"config_dict", "default_config"}
def __init__(self, config_dict: dict, default_config: TStr, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
self.config_dict = config_dict
self.default_config = default_config
def _set_config(self, config: TStr):
raise NotImplementedError("Implement _set_config() method in inherited class")
[docs]
@rpc(flags={"async"})
def arm(self):
"""
Arm the camera such that a subsequent call of the start() method can start
the exposure immediately. Arming typically cannot meet realtime requirements.
However, once armed, the camera can start exposure on a trigger in realtime
"""
raise NotImplementedError("Implement arm() method in inherited class")
[docs]
@rpc(flags={"async"})
def start(self):
"""
Start exposure of the camera
"""
raise NotImplementedError("Implement start() method in inherited class")
[docs]
@rpc(flags={"async"})
def stop(self):
"""
Stop exposure of the camera
"""
raise NotImplementedError("Implement stop() method in inherited class")
[docs]
class TriggeredRPCCamera(Camera, Triggerable):
"""Camera that is configured via RPC but triggered via a TTL
Args:
rpc_camera: The ARTIQ RPC object that represents the camera. It needs to provide the RPC functions
``configure(config: dict)`` to configure the camera, ``start()`` to arm the camera
(i.e. wait for trigger), and ``stop()`` to stop waiting for trigger or stop exposure.
fire_ttl: The logic signal that triggers the exposure.
fire_pulsetime: The time the logic signal remains high to trigger the camera (default 100us)
"""
kernel_invariants = {"camera", "fire_ttl", "fire_pulsetime"}
def __init__(self, rpc_camera: Component, fire_ttl: Switchable, fire_pulsetime: TFloat = 100*us, *args, **kwargs):
Camera.__init__(self, *args, **kwargs)
Triggerable.__init__(self, ["trigger"])
self.camera = rpc_camera
self.fire_ttl = fire_ttl
self.fire_pulsetime = fire_pulsetime
@rpc(flags={"async"})
def _set_config(self, config: TStr):
if config in self.config_dict:
metadata = {"seqTimestamp": float(self.experiment.identifier)*1e-6,
"run_id": self.experiment.run_id,
"run_timestamp": self.experiment.run_timestamp,
"step_counter": self.experiment.step_counter,
}
self.camera.configure(identifier=int(self.experiment.identifier), configuration=self.config_dict[config], metadata=metadata)
[docs]
@rpc(flags={"async"})
def arm(self):
self.camera.start(int(self.experiment.identifier))
[docs]
@rpc(flags={"async"})
def stop(self):
self.camera.abort()
[docs]
@kernel
def fire(self, channel: TStr = ""):
self.fire_ttl.pulse(self.fire_pulsetime)
[docs]
@kernel
def start(self):
self.fire()