Source code for atomiq.atomiq

import inspect
import traceback
import time
import numpy as np

from artiq.experiment import kernel, rpc, delay, now_mu
from artiq.language.environment import EnvExperiment, EnumerationValue, BooleanValue, NumberValue, StringValue
from artiq.language.types import TInt32, TList
from artiq.language.scan import Scannable, NoScan, ScanObject, MultiScanManager
from artiq.language.units import ms
from artiq.language.core import TerminationRequested
from artiq.coredevice.exceptions import RTIOUnderflow

from atomiq.components import ComponentFactory
from atomiq.arguments import NativeArgumentProvider

from atomiq.helper import component_dict, component_data, block_dict

import logging

default_argument_provider = NativeArgumentProvider()

logging.basicConfig()
logger = logging.getLogger(__name__)


[docs] class AtomiqExperiment(EnvExperiment): CHUNKSIZE = 10 components = ["log"] arg_provider = default_argument_provider def __init__(self, managers_or_parent, name=None, arg_provider=None, component_map=None, *args, **kwargs): # combine components from parent class with our own and make entries unique self._components = [] for cls in self._getmro(): if hasattr(cls, "components"): self._components += cls.components self._components = list(set(self._components)) self._blocks = [] for cls in self._getmro(): if hasattr(cls, "blocks"): self._blocks += cls.blocks self.name = name if name is not None else type(self).__name__ self.component_map = component_map if arg_provider is not None: self.arg_provider = arg_provider self.clock_at_start = np.int64(-1) self.identifier = np.int64(-1) self.step_counter = 0 self.run_id = 0 # if we are an atomiq block, leave the experiment attribute untouched if not hasattr(self, "experiment"): self.experiment = self super().__init__(managers_or_parent) def _getmro(self): return inspect.getmro(type(self)) def _import_from_block(self, blkdata: dict): """ This uses monkey patching of the class to make members available to the current experiment Args: blkdata: Block data with the keys "class", "map", "alias" """ # get proper name of the step function after importing alias = blkdata["alias"] cls_name = blkdata["class"].__name__ if alias is None else alias # propagate our component mappings if necessary if self.component_map is not None and len(self.component_map) > 0: prop_map = {key: (self.component_map[val] if val in self.component_map else val) for key, val in blkdata["map"].items()} else: prop_map = blkdata["map"] # instanciate the block object and attach it to self obj = blkdata["class"](self, arg_provider=self.arg_provider, component_map=prop_map, name=cls_name ) setattr(type(self), cls_name, obj) # check if we need any components from the attached block object if hasattr(obj, "_components"): cmp = [component_data(dict(comp, **{"id": prop_map[comp["id"]]})) if comp["id"] in prop_map else component_data(comp) for comp in map(component_dict, obj._components)] logger.info(f"Transferring components {cmp} to parent") self._components += cmp self._components = list(set(self._components)) def _build_hook_handler(self): blks = [blkdata["alias"] if "alias" in blkdata and blkdata["alias"] is not None else blkdata["class"].__name__ for blkdata in map(block_dict, self._blocks)] print(blks) for hook, no_args in [("prerun", 0), ("postrun", 0), ("prechunk", 1), ("postchunk", 1), ("prestep", 1), ("poststep", 1), ("prerun_host", 0), ("postrun_host", 0), ("prechunk_host", 1), ("postchunk_host", 1)]: args = [f"arg{i}" for i in range(no_args)] code = f"def _{hook}({', '.join(['self']+args)}):"+"\n" code += " "+'\n '.join([f"self.{blk}._{hook}({', '.join(args)})" for blk in blks])+"\n" code += f" self.{hook}({', '.join(args)})"+"\n" loc = locals() exec(code, globals(), loc) f = loc[f'_{hook}'] if not hook.endswith("_host"): f = kernel(f) # Save source code for the compiler to pick up later. f.artiq_embedded = f.artiq_embedded._replace(function=code) setattr(self.__class__, f"_{hook}", f) setattr(self, f"_{hook}", f.__get__(self)) def _build_blocks(self): for blkdata in map(block_dict, self._blocks): self._import_from_block(blkdata) self._build_hook_handler() def _build_components(self): for comp in map(component_dict, self._components): if self.component_map is not None and comp["id"] in self.component_map: target_id = self.component_map[comp["id"]] else: target_id = comp["id"] comp_obj = ComponentFactory.produce(target_id, self) if isinstance(comp_obj, comp["type"]): setattr(self, comp["id"], comp_obj) else: raise TypeError(f"Component {comp['id']} does not have specified type {comp['type']}," f"instead it has {type(comp_obj)}") def _build_arguments(self): for argname, argdict in self.arg_provider.get_arguments(self.__class__).items(): group = argdict['group'] if "group" in argdict else self.name if "options" in argdict: self.setattr_argument(argname, EnumerationValue(argdict["options"]), group=group) else: if "default" in argdict: if isinstance(argdict["default"], bool): self.setattr_argument(argname, BooleanValue(argdict["default"]), group=group) elif isinstance(argdict["default"], str): self.setattr_argument(argname, StringValue(argdict["default"]), group=group) elif "scannable" in argdict and argdict["scannable"] is False: self.setattr_argument(argname, NumberValue(argdict["default"], **{k: argdict[k] for k in ('unit', 'scale', 'ndecimals') if k in argdict}), group=group ) else: self.setattr_argument(argname, Scannable(default=NoScan(argdict["default"], 1), **{k: argdict[k] for k in ('unit', 'scale', 'ndecimals') if k in argdict}), group=group ) else: logger.error(f"no default value set for argument {argname}")
[docs] def prepare(self): for component in self._components: getattr(self, component_dict(component)["id"])._recursive_prepare() # maybe this works for autodetection self.scannables = [ (varname, getattr(self, varname)) for varname, _ in inspect.getmembers(self, lambda x: isinstance(x, ScanObject)) ] # check if we need any attributes from the attached block object for blkdata in map(block_dict, self._blocks): alias = blkdata["alias"] cls_name = blkdata["class"].__name__ if alias is None else alias obj = getattr(type(self), cls_name) obj.prepare() if hasattr(obj, "scannables"): self.scannables += obj.scannables self.msm = MultiScanManager(*self.scannables)
def _build_core(self): self.setattr_device("scheduler") self.setattr_device("core") self.setattr_device("core_dma") def _build(self): pass
[docs] def build(self): self._build_core() # It's important to do this before the components are built since # the blocks modify the component list self._build_blocks() self._build_components() # maybe components themselfes need something being done in build phase? for component in self._components: getattr(self, component_dict(component)["id"])._recursive_build() self._build_arguments() self._build()
@kernel def _prerun_core(self, reset=False): self.clock_at_start = self.core.get_rtio_counter_mu() self.log.info("setting up coredevice") if reset: delay(500 * ms) self.core.reset() self.core.break_realtime() self.prerun() @rpc(flags={"async"}) def _set_identifiers_in_master(self, identifier, run_id, step_counter): """ makes an rpc call to set the identifier in the master """ self.identifier = identifier self.run_id = run_id self.step_counter = step_counter self.set_dataset("step_counter", step_counter, broadcast=True)
[docs] def chunker(self, mult_scan, size: TInt32 = 100) -> TList: """ generator to call a kernel with chunks of scan points """ noscan_args = {name: getattr(self, name) for name, value in self._HasEnvironment__argument_mgr.unprocessed_arguments.items() if hasattr(self, name) and not isinstance(value, dict)} # first count the number of points for time estimation n_point = 0 for _p in mult_scan: n_point += 1 # broadcast this information self.set_dataset("max_step_counter", n_point - 1, broadcast=True) chunk = [] i = 1 counter = 0 for _p in mult_scan: step_counter = counter _p.__dict__.update(noscan_args) setattr(_p, "step_counter", step_counter) setattr(_p, "run_id", self.scheduler.rid) setattr(_p, "identifier", np.int64(-1)) chunk.append(_p) counter += 1 if len(chunk) >= size: logger.info(f"starting kernel with chunk {i}") yield chunk chunk = [] i += 1 logger.info(f"starting kernel with chunk {i}") if len(chunk) > 0: yield chunk else: return
def _loop_chunks(self, chunk_list: TList): """ Loop over a list of chunks in the host, i.e. iterate over starting kernels holding CHUNKSIZE experiments :param chunks: (list) list of chunks. Each entry is again an iterable with CHUNKSIZE Scanpoint objects inside """ current_chunk = 0 for points in chunk_list: try: self._prechunk_host(points) self._runchunk(points) self._postchunk_host(points) except RTIOUnderflow as ex: self.log.warning( f"RTIOUnderflow during chunk {current_chunk}") self.log.warning(ex) self.log.warning(traceback.format_exc()) self._prerun_core(reset=True) finally: current_chunk += 1 if self.scheduler.check_pause(): self.core.comm.close() self.scheduler.pause() self.log.info( f"Come with me if you want to live - RID {self.scheduler.rid}" ) def _run_init(self): # generate and log run id exp_identifier = f"{self.__class__.__name__}_{self.scheduler.rid}" self.log.info(f"starting run {exp_identifier}") # get timestamp of the run self.run_timestamp = int(time.time()*1e6) self._prerun_core() # run _prerun methods of the components for comp in map(component_dict, self._components): getattr(self, comp["id"])._recursive_prerun() self._prerun_host() self._prerun() logger.info(f"running with block list {self._blocks}") logger.info(f"running with component list {self._components}")
[docs] def run(self): self._run_init() try: self._loop_chunks(self.chunker(self.msm, self.CHUNKSIZE)) except TerminationRequested: logger.info(f"I'll be back - RID {self.scheduler.rid}") self._postrun() self._postrun_host()
@kernel def _runchunk(self, points): # make everything settle self.core.wait_until_mu(now_mu()) self.core.reset() self._prechunk(points) # execute the step function for every point in the chunk. for point in points: # set the identifier in the master and the running core device self.identifier = self.run_timestamp + \ np.int64((self.core.get_rtio_counter_mu() - self.clock_at_start)*self.core.ref_period*1e6) point.identifier = self.identifier self.step_counter = point.step_counter self.run_id = point.run_id self._set_identifiers_in_master(self.identifier, self.run_id, self.step_counter) self.core.break_realtime() # delay to have some slack when the step starts delay(0.5 * ms) # do what the user wants to be done... self._prestep(point) self.step(point) self._poststep(point) self.core.wait_until_mu(now_mu()) self._postchunk(points) self.core.wait_until_mu(now_mu())
[docs] @kernel def prerun(self): pass
[docs] def prerun_host(self): pass
[docs] @kernel def postrun(self): pass
[docs] def postrun_host(self): pass
[docs] @kernel def prestep(self, point): pass
[docs] @kernel def poststep(self, point): pass
[docs] @kernel def prechunk(self, points): pass
[docs] @kernel def postchunk(self, points): pass
[docs] def prechunk_host(self, points): pass
[docs] def postchunk_host(self, points): pass
[docs] @kernel def step(self, point): raise NotImplementedError
[docs] class AtomiqBlock(AtomiqExperiment): def __new__(cls, *args, **kwargs): # if we get a name in the constructor make a class with that name and instanciate it if "name" in kwargs and kwargs["name"] is not None: return super().__new__(type(kwargs["name"], (cls,), {})) else: return super().__new__(cls) def __init__(self, *args, **kwargs): self.experiment = args[0].experiment super().__init__(*args, **kwargs) def _getmro(self): # since we dynamically add the dummy class with the alias name, we need to exclude that # from the mro here again. return inspect.getmro(type(self))[1:]