Source code for atomiq.atomiq

import inspect
import traceback
import time
import numpy as np

from artiq.experiment import kernel, rpc, delay, now_mu
from artiq.language.environment import EnvExperiment, EnumerationValue, BooleanValue, NumberValue, StringValue
from artiq.language.types import TInt32, TList
from artiq.language.scan import Scannable, NoScan, ScanObject, MultiScanManager
from artiq.language.units import ms
from artiq.language.core import TerminationRequested
from artiq.coredevice.exceptions import RTIOUnderflow

from atomiq.components import ComponentFactory
from atomiq.components.primitives import Component
from atomiq.arguments import NativeArgumentProvider
from atomiq.hooks import hooks

from atomiq.helper import component_dict, component_data, block_dict, rec_getattr, random_ascii_string

import logging

default_argument_provider = NativeArgumentProvider()

logging.basicConfig()
logger = logging.getLogger(__name__)


[docs] class AtomiqExperiment(EnvExperiment): CHUNKSIZE = 10 components = ["log"] arg_provider = default_argument_provider def __init__(self, managers_or_parent, name=None, arg_provider=None, component_map=None, *args, **kwargs): # combine components from parent class with our own and make entries unique self._components = [] for cls in self._getmro(): if hasattr(cls, "components"): self._components += cls.components self._components = list(set(self._components)) self._blocks = [] for cls in self._getmro(): if hasattr(cls, "blocks"): self._blocks += cls.blocks self.name = name if name is not None else type(self).__name__ self.component_map = component_map if arg_provider is not None: self.arg_provider = arg_provider self.clock_at_start = np.int64(-1) self.identifier = np.int64(-1) self.step_counter = 0 self.run_id = 0 # if we are an atomiq block, leave the experiment attribute untouched if not hasattr(self, "experiment"): self.experiment = self self.__terminate_asap__ = False super().__init__(managers_or_parent) def _getmro(self): return inspect.getmro(type(self)) def _import_from_block(self, blkdata: dict): """ This uses monkey patching of the class to make members available to the current experiment Args: blkdata: Block data with the keys "class", "map", "alias" """ # get proper name of the step function after importing alias = blkdata["alias"] cls_name = blkdata["class"].__name__ if alias is None else alias # propagate our component mappings if necessary if self.component_map is not None and len(self.component_map) > 0: prop_map = {key: (self.component_map[val] if val in self.component_map else val) for key, val in blkdata["map"].items()} else: prop_map = blkdata["map"] # instanciate the block object and attach it to self obj = blkdata["class"](self, arg_provider=self.arg_provider, component_map=prop_map, name=cls_name ) setattr(type(self), cls_name, obj) # check if we need any components from the attached block object if hasattr(obj, "_components"): cmp = [component_data(dict(comp, **{"id": prop_map[comp["id"]]})) if comp["id"] in prop_map else component_data(comp) for comp in map(component_dict, obj._components)] logger.info(f"Transferring components {cmp} to parent") self._components += cmp self._components = list(set(self._components)) def _add_to_hook_handler(self, obj_list): """ Add the hooks the objects in obj_list to the experiment hooks such that they are called when processing the hooks. Only if the child object posses a method with the according name of the hook, it will be called. Args: obj_list: list of child objects relative to self (e.g. 'modulator.rfsource.switch') for which the hook should be called """ for hook, no_args in hooks: args = [f"arg{i}" for i in range(no_args)] code = f"def _{hook}({', '.join(['self']+args)}):"+"\n" code += f" self.log.debug('Doing {hook} for {{0}}', [self])\n" for obj_path in obj_list: obj = rec_getattr(self, obj_path) if hasattr(obj, f"_{hook}"): wrapped_hook = f"do_{hook}" if hasattr(obj, f"_do_{hook}") else hook if isinstance(obj, Component): code += f" if not self.{obj_path}._{hook}_done:" + "\n" code += " self." + f"{obj_path}._{wrapped_hook}({', '.join(args)})" + "\n" code += f" self.{obj_path}._{hook}_done = True" + "\n" else: code += " self." + f"{obj_path}._{wrapped_hook}({', '.join(args)})" + "\n" if hasattr(self, f"_{hook}"): # we need to set an ephemeral name for the existing hook, other wise we would recurse into the method # we are about to build rnd = random_ascii_string(8) setattr(self, f"_{hook}_{rnd}", getattr(self, f"_{hook}")) code += f" self._{hook}_{rnd}({', '.join(args)})"+"\n" else: code += f" self.{hook}({', '.join(args)})"+"\n" loc = locals() exec(code, globals(), loc) f = loc[f'_{hook}'] if not hook.endswith("_host"): f = kernel(f) # Save source code for the compiler to pick up later. f.artiq_embedded = f.artiq_embedded._replace(function=code) setattr(self.__class__, f"_{hook}", f) setattr(self, f"_{hook}", f.__get__(self)) def _build_blocks(self): for blkdata in map(block_dict, self._blocks): self._import_from_block(blkdata) blks = [blkdata["alias"] if "alias" in blkdata and blkdata["alias"] is not None else blkdata["class"].__name__ for blkdata in map(block_dict, self._blocks)] self._add_to_hook_handler(blks) def _build_components(self): recursive_components = [] for comp in map(component_dict, self._components): if self.component_map is not None and comp["id"] in self.component_map: target_id = self.component_map[comp["id"]] else: target_id = comp["id"] comp_obj = ComponentFactory.produce(target_id, self) if isinstance(comp_obj, comp["type"]): setattr(self, comp["id"], comp_obj) else: raise TypeError(f"Component {comp['id']} does not have specified type {comp['type']}," f"instead it has {type(comp_obj)}") recursive_components += comp_obj.required_components(ancestors=[comp["id"]]) # In the following we add all hooks of the components to the experiment hooks # remove duplicates dedup_components = [] for obj, path in recursive_components: if obj not in [obj for obj, _ in dedup_components]: dedup_components.append((obj, path)) prerun_components = [(obj, path) for obj, path in dedup_components if obj.__class__._prerun != Component._prerun] self._add_to_hook_handler([".".join(path) for obj, path in prerun_components]) def _build_arguments(self): for argname, argdict in self.arg_provider.get_arguments(self.__class__).items(): group = argdict['group'] if "group" in argdict else self.name if "options" in argdict: self.setattr_argument(argname, EnumerationValue(argdict["options"]), group=group) else: if "default" in argdict: if isinstance(argdict["default"], bool): self.setattr_argument(argname, BooleanValue(argdict["default"]), group=group) elif isinstance(argdict["default"], str): self.setattr_argument(argname, StringValue(argdict["default"]), group=group) elif "scannable" in argdict and argdict["scannable"] is False: self.setattr_argument(argname, NumberValue(argdict["default"], **{k: argdict[k] for k in ('unit', 'scale', 'ndecimals') if k in argdict}), group=group ) else: self.setattr_argument(argname, Scannable(default=NoScan(argdict["default"], 1), **{k: argdict[k] for k in ('unit', 'scale', 'ndecimals') if k in argdict}), group=group ) else: logger.error(f"no default value set for argument {argname}")
[docs] def prepare(self): """ Prepares components and structure. Called by ARTIQ in the prepare phase, see ARTIQ documentation for more information on experiment phases. Note: If you overwrite this method in your experiment, make sure to call `super().prepare()` """ for component in self._components: getattr(self, component_dict(component)["id"])._recursive_prepare() # maybe this works for autodetection self.scannables = [ (varname, getattr(self, varname)) for varname, _ in inspect.getmembers(self, lambda x: isinstance(x, ScanObject)) ] # check if we need any attributes from the attached block object for blkdata in map(block_dict, self._blocks): alias = blkdata["alias"] cls_name = blkdata["class"].__name__ if alias is None else alias obj = getattr(type(self), cls_name) obj.prepare() if hasattr(obj, "scannables"): self.scannables += obj.scannables self.msm = MultiScanManager(*self.scannables)
def _build_core(self): self.setattr_device("scheduler") self.setattr_device("core") self.setattr_device("core_dma") def _build(self): pass
[docs] def build(self): """ Initializes arguements. Called by ARTIQ in the build phase, see ARTIQ documentation for more information on experiment phases. Note: If you overwrite this method in your experiment, make sure to call `super().build()` """ self._build_core() # It's important to do this before the components are built since # the blocks modify the component list self._build_blocks() self._build_components() # maybe components themselfes need something being done in build phase? for component in self._components: getattr(self, component_dict(component)["id"])._recursive_build() self._build_arguments() self._build()
@kernel def _prerun_core(self, reset=False): self.clock_at_start = self.core.get_rtio_counter_mu() self.log.info("setting up coredevice") if reset: delay(500 * ms) self.core.reset() self.core.break_realtime() self.prerun() @rpc(flags={"async"}) def _set_identifiers_in_master(self, identifier, run_id, step_counter): """ makes an rpc call to set the identifier in the master """ self.identifier = identifier self.run_id = run_id self.step_counter = step_counter self.set_dataset("step_counter", step_counter, broadcast=True)
[docs] def chunker(self, mult_scan, size: TInt32 = 100) -> TList: """ Generator to call a kernel with chunks of scan points. """ noscan_args = {name: getattr(self, name) for name, value in self._HasEnvironment__argument_mgr.unprocessed_arguments.items() if hasattr(self, name) and not isinstance(value, dict)} # first count the number of points for time estimation n_point = 0 for _p in mult_scan: n_point += 1 # broadcast this information self.set_dataset("max_step_counter", n_point - 1, broadcast=True) chunk = [] i = 1 counter = 0 for _p in mult_scan: step_counter = counter _p.__dict__.update(noscan_args) setattr(_p, "step_counter", step_counter) setattr(_p, "run_id", self.scheduler.rid) setattr(_p, "identifier", np.int64(-1)) chunk.append(_p) counter += 1 if len(chunk) >= size: logger.info(f"starting kernel with chunk {i}") yield chunk chunk = [] i += 1 logger.info(f"starting kernel with chunk {i}") if len(chunk) > 0: yield chunk else: return
def _loop_chunks(self, chunk_list: TList): """ Loop over a list of chunks in the host, i.e. iterate over starting kernels holding CHUNKSIZE experiments :param chunks: (list) list of chunks. Each entry is again an iterable with CHUNKSIZE Scanpoint objects inside """ current_chunk = 0 for points in chunk_list: try: self._prechunk_host(points) self._runchunk(points) self._postchunk_host(points) except RTIOUnderflow as ex: self.log.warning( f"RTIOUnderflow during chunk {current_chunk}") self.log.warning(ex) self.log.warning(traceback.format_exc()) self._prerun_core(reset=True) finally: current_chunk += 1 if self.scheduler.check_pause(): self.core.comm.close() self.scheduler.pause() # emergency stop if required if self.__terminate_asap__: logger.warning("Terminating NOW") break self.log.info( f"Come with me if you want to live - RID {self.scheduler.rid}" ) def _run_init(self): # generate and log run id exp_identifier = f"{self.__class__.__name__}_{self.scheduler.rid}" self.log.info(f"starting run {exp_identifier}") # get timestamp of the run self.run_timestamp = int(time.time()*1e6) self._prerun_core() self._prerun_host() self._prerun() logger.info(f"running with block list {self._blocks}") logger.info(f"running with component list {self._components}")
[docs] def run(self): """ Run entry point for ARTIQ, see ARTIQ documentation for more information on experiment phases. Warning: Do not implement this entry point in your experiment. Use the provided sub-phases (`prerun`, `step`, etc.) More information can be found in the :ref:`phases_chunking` documentation. """ self._run_init() try: self._loop_chunks(self.chunker(self.msm, self.CHUNKSIZE)) except TerminationRequested: logger.info(f"I'll be back - RID {self.scheduler.rid}") self._postrun() self._postrun_host()
@kernel def _runchunk(self, points): # make everything settle self.core.wait_until_mu(now_mu()) self.core.reset() self._prechunk(points) # execute the step function for every point in the chunk. for point in points: # set the identifier in the master and the running core device self.identifier = self.run_timestamp + \ np.int64((self.core.get_rtio_counter_mu() - self.clock_at_start)*self.core.ref_period*1e6) point.identifier = self.identifier self.step_counter = point.step_counter self.run_id = point.run_id self._set_identifiers_in_master(self.identifier, self.run_id, self.step_counter) self.core.break_realtime() # delay to have some slack when the step starts delay(0.5 * ms) # do what the user wants to be done... self._prestep(point) self.step(point) self._poststep(point) self.core.wait_until_mu(now_mu()) self._postchunk(points) self.core.wait_until_mu(now_mu())
[docs] @kernel def prerun(self): """ Kernel entry point, run once at the beginning of the run phase of an experiment. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] def prerun_host(self): """ Host entry point, run once at the beginning of the run phase of an experiment. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] @kernel def postrun(self): """ Kernel entry point, run once at the end of the run phase of an experiment. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] def postrun_host(self): """ Host entry point, run once at the end of the run phase of an experiment. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] @kernel def prestep(self, point): """ Kernel entry point, run before every step. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] @kernel def poststep(self, point): """ Kernel entry point, run after every step. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] @kernel def prechunk(self, points): """ Kernel entry point, run once at the beginning of a chunk. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] @kernel def postchunk(self, points): """ Kernel entry point, run once at the end of a chunk. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] def prechunk_host(self, points): """ Host entry point, run once at the beginning of a chunk. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] def postchunk_host(self, points): """ Host entry point, run once at the end of a chunk. This method can be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """
[docs] @kernel def step(self, point): """ Kernel entry point, for the main experiment sequence code. This method **must** be overloaded by the user. Details can be found in the :ref:`phases_chunking` documentation. """ raise NotImplementedError
[docs] class AtomiqBlock(AtomiqExperiment): def __new__(cls, *args, **kwargs): # if we get a name in the constructor make a class with that name and instanciate it if "name" in kwargs and kwargs["name"] is not None: return super().__new__(type(kwargs["name"], (cls,), {})) else: return super().__new__(cls) def __init__(self, *args, **kwargs): self.experiment = args[0].experiment super().__init__(*args, **kwargs) def _getmro(self): # since we dynamically add the dummy class with the alias name, we need to exclude that # from the mro here again. return inspect.getmro(type(self))[1:]