import inspect
import traceback
import time
import numpy as np
from artiq.experiment import kernel, rpc, delay, now_mu
from artiq.language.environment import EnvExperiment, EnumerationValue, BooleanValue, NumberValue, StringValue
from artiq.language.types import TInt32, TList
from artiq.language.scan import Scannable, NoScan, ScanObject, MultiScanManager
from artiq.language.units import ms
from artiq.language.core import TerminationRequested
from artiq.coredevice.exceptions import RTIOUnderflow
from atomiq.components import ComponentFactory
from atomiq.components.primitives import Component
from atomiq.arguments import NativeArgumentProvider
from atomiq.hooks import hooks
from atomiq.helper import component_dict, component_data, block_dict, rec_getattr, random_ascii_string
import logging
default_argument_provider = NativeArgumentProvider()
logging.basicConfig()
logger = logging.getLogger(__name__)
[docs]
class AtomiqExperiment(EnvExperiment):
CHUNKSIZE = 10
components = ["log"]
arg_provider = default_argument_provider
def __init__(self, managers_or_parent, name=None, arg_provider=None, component_map=None, *args, **kwargs):
# combine components from parent class with our own and make entries unique
self._components = []
for cls in self._getmro():
if hasattr(cls, "components"):
self._components += cls.components
self._components = list(set(self._components))
self._blocks = []
for cls in self._getmro():
if hasattr(cls, "blocks"):
self._blocks += cls.blocks
self.name = name if name is not None else type(self).__name__
self.component_map = component_map
if arg_provider is not None:
self.arg_provider = arg_provider
self.clock_at_start = np.int64(-1)
self.identifier = np.int64(-1)
self.step_counter = 0
self.run_id = 0
# if we are an atomiq block, leave the experiment attribute untouched
if not hasattr(self, "experiment"):
self.experiment = self
self.__terminate_asap__ = False
super().__init__(managers_or_parent)
def _getmro(self):
return inspect.getmro(type(self))
def _import_from_block(self, blkdata: dict):
"""
This uses monkey patching of the class to make members available to
the current experiment
Args:
blkdata: Block data with the keys "class", "map", "alias"
"""
# get proper name of the step function after importing
alias = blkdata["alias"]
cls_name = blkdata["class"].__name__ if alias is None else alias
# propagate our component mappings if necessary
if self.component_map is not None and len(self.component_map) > 0:
prop_map = {key: (self.component_map[val] if val in self.component_map else val)
for key, val in blkdata["map"].items()}
else:
prop_map = blkdata["map"]
# instanciate the block object and attach it to self
obj = blkdata["class"](self, arg_provider=self.arg_provider,
component_map=prop_map,
name=cls_name
)
setattr(type(self), cls_name, obj)
# check if we need any components from the attached block object
if hasattr(obj, "_components"):
cmp = [component_data(dict(comp, **{"id": prop_map[comp["id"]]}))
if comp["id"] in prop_map else component_data(comp)
for comp in map(component_dict, obj._components)]
logger.info(f"Transferring components {cmp} to parent")
self._components += cmp
self._components = list(set(self._components))
def _add_to_hook_handler(self, obj_list):
"""
Add the hooks the objects in obj_list to the experiment hooks such that they are called when processing the
hooks. Only if the child object posses a method with the according name of the hook, it will be called.
Args:
obj_list: list of child objects relative to self (e.g. 'modulator.rfsource.switch') for which the hook
should be called
"""
for hook, no_args in hooks:
args = [f"arg{i}" for i in range(no_args)]
code = f"def _{hook}({', '.join(['self']+args)}):"+"\n"
code += f" self.log.debug('Doing {hook} for {{0}}', [self])\n"
for obj_path in obj_list:
obj = rec_getattr(self, obj_path)
if hasattr(obj, f"_{hook}"):
wrapped_hook = f"do_{hook}" if hasattr(obj, f"_do_{hook}") else hook
if isinstance(obj, Component):
code += f" if not self.{obj_path}._{hook}_done:" + "\n"
code += " self." + f"{obj_path}._{wrapped_hook}({', '.join(args)})" + "\n"
code += f" self.{obj_path}._{hook}_done = True" + "\n"
else:
code += " self." + f"{obj_path}._{wrapped_hook}({', '.join(args)})" + "\n"
if hasattr(self, f"_{hook}"):
# we need to set an ephemeral name for the existing hook, other wise we would recurse into the method
# we are about to build
rnd = random_ascii_string(8)
setattr(self, f"_{hook}_{rnd}", getattr(self, f"_{hook}"))
code += f" self._{hook}_{rnd}({', '.join(args)})"+"\n"
else:
code += f" self.{hook}({', '.join(args)})"+"\n"
loc = locals()
exec(code, globals(), loc)
f = loc[f'_{hook}']
if not hook.endswith("_host"):
f = kernel(f)
# Save source code for the compiler to pick up later.
f.artiq_embedded = f.artiq_embedded._replace(function=code)
setattr(self.__class__, f"_{hook}", f)
setattr(self, f"_{hook}", f.__get__(self))
def _build_blocks(self):
for blkdata in map(block_dict, self._blocks):
self._import_from_block(blkdata)
blks = [blkdata["alias"] if "alias" in blkdata and blkdata["alias"] is not None else blkdata["class"].__name__
for blkdata in map(block_dict, self._blocks)]
self._add_to_hook_handler(blks)
def _build_components(self):
recursive_components = []
for comp in map(component_dict, self._components):
if self.component_map is not None and comp["id"] in self.component_map:
target_id = self.component_map[comp["id"]]
else:
target_id = comp["id"]
comp_obj = ComponentFactory.produce(target_id, self)
if isinstance(comp_obj, comp["type"]):
setattr(self, comp["id"], comp_obj)
else:
raise TypeError(f"Component {comp['id']} does not have specified type {comp['type']},"
f"instead it has {type(comp_obj)}")
recursive_components += comp_obj.required_components(ancestors=[comp["id"]])
# In the following we add all hooks of the components to the experiment hooks
# remove duplicates
dedup_components = []
for obj, path in recursive_components:
if obj not in [obj for obj, _ in dedup_components]:
dedup_components.append((obj, path))
prerun_components = [(obj, path) for obj, path in dedup_components
if obj.__class__._prerun != Component._prerun]
self._add_to_hook_handler([".".join(path) for obj, path in prerun_components])
def _build_arguments(self):
for argname, argdict in self.arg_provider.get_arguments(self.__class__).items():
group = argdict['group'] if "group" in argdict else self.name
if "options" in argdict:
self.setattr_argument(argname, EnumerationValue(argdict["options"]), group=group)
else:
if "default" in argdict:
if isinstance(argdict["default"], bool):
self.setattr_argument(argname, BooleanValue(argdict["default"]), group=group)
elif isinstance(argdict["default"], str):
self.setattr_argument(argname, StringValue(argdict["default"]), group=group)
elif "scannable" in argdict and argdict["scannable"] is False:
self.setattr_argument(argname, NumberValue(argdict["default"],
**{k: argdict[k]
for k in ('unit', 'scale', 'ndecimals')
if k in argdict}),
group=group
)
else:
self.setattr_argument(argname, Scannable(default=NoScan(argdict["default"], 1),
**{k: argdict[k]
for k in ('unit', 'scale', 'ndecimals')
if k in argdict}),
group=group
)
else:
logger.error(f"no default value set for argument {argname}")
[docs]
def prepare(self):
"""
Prepares components and structure.
Called by ARTIQ in the prepare phase, see ARTIQ documentation for more information on experiment phases.
Note:
If you overwrite this method in your experiment, make sure to call `super().prepare()`
"""
for component in self._components:
getattr(self, component_dict(component)["id"])._recursive_prepare()
# maybe this works for autodetection
self.scannables = [
(varname, getattr(self, varname))
for varname, _ in inspect.getmembers(self, lambda x: isinstance(x, ScanObject))
]
# check if we need any attributes from the attached block object
for blkdata in map(block_dict, self._blocks):
alias = blkdata["alias"]
cls_name = blkdata["class"].__name__ if alias is None else alias
obj = getattr(type(self), cls_name)
obj.prepare()
if hasattr(obj, "scannables"):
self.scannables += obj.scannables
self.msm = MultiScanManager(*self.scannables)
def _build_core(self):
self.setattr_device("scheduler")
self.setattr_device("core")
self.setattr_device("core_dma")
def _build(self):
pass
[docs]
def build(self):
"""
Initializes arguements.
Called by ARTIQ in the build phase, see ARTIQ documentation for more information on experiment phases.
Note:
If you overwrite this method in your experiment, make sure to call `super().build()`
"""
self._build_core()
# It's important to do this before the components are built since
# the blocks modify the component list
self._build_blocks()
self._build_components()
# maybe components themselfes need something being done in build phase?
for component in self._components:
getattr(self, component_dict(component)["id"])._recursive_build()
self._build_arguments()
self._build()
@kernel
def _prerun_core(self, reset=False):
self.clock_at_start = self.core.get_rtio_counter_mu()
self.log.info("setting up coredevice")
if reset:
delay(500 * ms)
self.core.reset()
self.core.break_realtime()
self.prerun()
@rpc(flags={"async"})
def _set_identifiers_in_master(self, identifier, run_id, step_counter):
"""
makes an rpc call to set the identifier in the master
"""
self.identifier = identifier
self.run_id = run_id
self.step_counter = step_counter
self.set_dataset("step_counter", step_counter, broadcast=True)
[docs]
def chunker(self, mult_scan, size: TInt32 = 100) -> TList:
"""
Generator to call a kernel with chunks of scan points.
"""
noscan_args = {name: getattr(self, name)
for name, value in self._HasEnvironment__argument_mgr.unprocessed_arguments.items()
if hasattr(self, name) and not isinstance(value, dict)}
# first count the number of points for time estimation
n_point = 0
for _p in mult_scan:
n_point += 1
# broadcast this information
self.set_dataset("max_step_counter", n_point - 1, broadcast=True)
chunk = []
i = 1
counter = 0
for _p in mult_scan:
step_counter = counter
_p.__dict__.update(noscan_args)
setattr(_p, "step_counter", step_counter)
setattr(_p, "run_id", self.scheduler.rid)
setattr(_p, "identifier", np.int64(-1))
chunk.append(_p)
counter += 1
if len(chunk) >= size:
logger.info(f"starting kernel with chunk {i}")
yield chunk
chunk = []
i += 1
logger.info(f"starting kernel with chunk {i}")
if len(chunk) > 0:
yield chunk
else:
return
def _loop_chunks(self, chunk_list: TList):
"""
Loop over a list of chunks in the host, i.e. iterate over starting kernels holding CHUNKSIZE experiments
:param chunks: (list) list of chunks. Each entry is again an iterable with CHUNKSIZE Scanpoint objects inside
"""
current_chunk = 0
for points in chunk_list:
try:
self._prechunk_host(points)
self._runchunk(points)
self._postchunk_host(points)
except RTIOUnderflow as ex:
self.log.warning(
f"RTIOUnderflow during chunk {current_chunk}")
self.log.warning(ex)
self.log.warning(traceback.format_exc())
self._prerun_core(reset=True)
finally:
current_chunk += 1
if self.scheduler.check_pause():
self.core.comm.close()
self.scheduler.pause()
# emergency stop if required
if self.__terminate_asap__:
logger.warning("Terminating NOW")
break
self.log.info(
f"Come with me if you want to live - RID {self.scheduler.rid}"
)
def _run_init(self):
# generate and log run id
exp_identifier = f"{self.__class__.__name__}_{self.scheduler.rid}"
self.log.info(f"starting run {exp_identifier}")
# get timestamp of the run
self.run_timestamp = int(time.time()*1e6)
self._prerun_core()
self._prerun_host()
self._prerun()
logger.info(f"running with block list {self._blocks}")
logger.info(f"running with component list {self._components}")
[docs]
def run(self):
"""
Run entry point for ARTIQ, see ARTIQ documentation for more information on experiment phases.
Warning:
Do not implement this entry point in your experiment. Use the provided sub-phases (`prerun`, `step`, etc.)
More information can be found in the :ref:`phases_chunking` documentation.
"""
self._run_init()
try:
self._loop_chunks(self.chunker(self.msm, self.CHUNKSIZE))
except TerminationRequested:
logger.info(f"I'll be back - RID {self.scheduler.rid}")
self._postrun()
self._postrun_host()
@kernel
def _runchunk(self, points):
# make everything settle
self.core.wait_until_mu(now_mu())
self.core.reset()
self._prechunk(points)
# execute the step function for every point in the chunk.
for point in points:
# set the identifier in the master and the running core device
self.identifier = self.run_timestamp + \
np.int64((self.core.get_rtio_counter_mu() - self.clock_at_start)*self.core.ref_period*1e6)
point.identifier = self.identifier
self.step_counter = point.step_counter
self.run_id = point.run_id
self._set_identifiers_in_master(self.identifier, self.run_id, self.step_counter)
self.core.break_realtime()
# delay to have some slack when the step starts
delay(0.5 * ms)
# do what the user wants to be done...
self._prestep(point)
self.step(point)
self._poststep(point)
self.core.wait_until_mu(now_mu())
self._postchunk(points)
self.core.wait_until_mu(now_mu())
[docs]
@kernel
def prerun(self):
"""
Kernel entry point, run once at the beginning of the run phase of an experiment.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
def prerun_host(self):
"""
Host entry point, run once at the beginning of the run phase of an experiment.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
@kernel
def postrun(self):
"""
Kernel entry point, run once at the end of the run phase of an experiment.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
def postrun_host(self):
"""
Host entry point, run once at the end of the run phase of an experiment.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
@kernel
def prestep(self, point):
"""
Kernel entry point, run before every step.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
@kernel
def poststep(self, point):
"""
Kernel entry point, run after every step.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
@kernel
def prechunk(self, points):
"""
Kernel entry point, run once at the beginning of a chunk.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
@kernel
def postchunk(self, points):
"""
Kernel entry point, run once at the end of a chunk.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
def prechunk_host(self, points):
"""
Host entry point, run once at the beginning of a chunk.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
def postchunk_host(self, points):
"""
Host entry point, run once at the end of a chunk.
This method can be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
[docs]
@kernel
def step(self, point):
"""
Kernel entry point, for the main experiment sequence code.
This method **must** be overloaded by the user.
Details can be found in the :ref:`phases_chunking` documentation.
"""
raise NotImplementedError
[docs]
class AtomiqBlock(AtomiqExperiment):
def __new__(cls, *args, **kwargs):
# if we get a name in the constructor make a class with that name and instanciate it
if "name" in kwargs and kwargs["name"] is not None:
return super().__new__(type(kwargs["name"], (cls,), {}))
else:
return super().__new__(cls)
def __init__(self, *args, **kwargs):
self.experiment = args[0].experiment
super().__init__(*args, **kwargs)
def _getmro(self):
# since we dynamically add the dummy class with the alias name, we need to exclude that
# from the mro here again.
return inspect.getmro(type(self))[1:]