import inspect
import traceback
import time
import numpy as np
from artiq.experiment import kernel, rpc, delay, now_mu
from artiq.language.environment import EnvExperiment, EnumerationValue, BooleanValue, NumberValue, StringValue
from artiq.language.types import TInt32, TList
from artiq.language.scan import Scannable, NoScan, ScanObject, MultiScanManager
from artiq.language.units import ms
from artiq.language.core import TerminationRequested
from artiq.coredevice.exceptions import RTIOUnderflow
from atomiq.components import ComponentFactory
from atomiq.arguments import NativeArgumentProvider
from atomiq.helper import component_dict, component_data, block_dict
import logging
default_argument_provider = NativeArgumentProvider()
logging.basicConfig()
logger = logging.getLogger(__name__)
[docs]
class AtomiqExperiment(EnvExperiment):
CHUNKSIZE = 10
components = ["log"]
arg_provider = default_argument_provider
def __init__(self, managers_or_parent, name=None, arg_provider=None, component_map=None, *args, **kwargs):
# combine components from parent class with our own and make entries unique
self._components = []
for cls in self._getmro():
if hasattr(cls, "components"):
self._components += cls.components
self._components = list(set(self._components))
self._blocks = []
for cls in self._getmro():
if hasattr(cls, "blocks"):
self._blocks += cls.blocks
self.name = name if name is not None else type(self).__name__
self.component_map = component_map
if arg_provider is not None:
self.arg_provider = arg_provider
self.clock_at_start = np.int64(-1)
self.identifier = np.int64(-1)
self.step_counter = 0
self.run_id = 0
# if we are an atomiq block, leave the experiment attribute untouched
if not hasattr(self, "experiment"):
self.experiment = self
super().__init__(managers_or_parent)
def _getmro(self):
return inspect.getmro(type(self))
def _import_from_block(self, blkdata: dict):
"""
This uses monkey patching of the class to make members available to
the current experiment
Args:
blkdata: Block data with the keys "class", "map", "alias"
"""
# get proper name of the step function after importing
alias = blkdata["alias"]
cls_name = blkdata["class"].__name__ if alias is None else alias
# propagate our component mappings if necessary
if self.component_map is not None and len(self.component_map) > 0:
prop_map = {key: (self.component_map[val] if val in self.component_map else val)
for key, val in blkdata["map"].items()}
else:
prop_map = blkdata["map"]
# instanciate the block object and attach it to self
obj = blkdata["class"](self, arg_provider=self.arg_provider,
component_map=prop_map,
name=cls_name
)
setattr(type(self), cls_name, obj)
# check if we need any components from the attached block object
if hasattr(obj, "_components"):
cmp = [component_data(dict(comp, **{"id": prop_map[comp["id"]]}))
if comp["id"] in prop_map else component_data(comp)
for comp in map(component_dict, obj._components)]
logger.info(f"Transferring components {cmp} to parent")
self._components += cmp
self._components = list(set(self._components))
def _build_hook_handler(self):
blks = [blkdata["alias"] if "alias" in blkdata and blkdata["alias"] is not None else blkdata["class"].__name__
for blkdata in map(block_dict, self._blocks)]
print(blks)
for hook, no_args in [("prerun", 0), ("postrun", 0), ("prechunk", 1), ("postchunk", 1), ("prestep", 1),
("poststep", 1), ("prerun_host", 0), ("postrun_host", 0), ("prechunk_host", 1),
("postchunk_host", 1)]:
args = [f"arg{i}" for i in range(no_args)]
code = f"def _{hook}({', '.join(['self']+args)}):"+"\n"
code += " "+'\n '.join([f"self.{blk}._{hook}({', '.join(args)})" for blk in blks])+"\n"
code += f" self.{hook}({', '.join(args)})"+"\n"
loc = locals()
exec(code, globals(), loc)
f = loc[f'_{hook}']
if not hook.endswith("_host"):
f = kernel(f)
# Save source code for the compiler to pick up later.
f.artiq_embedded = f.artiq_embedded._replace(function=code)
setattr(self.__class__, f"_{hook}", f)
setattr(self, f"_{hook}", f.__get__(self))
def _build_blocks(self):
for blkdata in map(block_dict, self._blocks):
self._import_from_block(blkdata)
self._build_hook_handler()
def _build_components(self):
for comp in map(component_dict, self._components):
if self.component_map is not None and comp["id"] in self.component_map:
target_id = self.component_map[comp["id"]]
else:
target_id = comp["id"]
comp_obj = ComponentFactory.produce(target_id, self)
if isinstance(comp_obj, comp["type"]):
setattr(self, comp["id"], comp_obj)
else:
raise TypeError(f"Component {comp['id']} does not have specified type {comp['type']},"
f"instead it has {type(comp_obj)}")
def _build_arguments(self):
for argname, argdict in self.arg_provider.get_arguments(self.__class__).items():
group = argdict['group'] if "group" in argdict else self.name
if "options" in argdict:
self.setattr_argument(argname, EnumerationValue(argdict["options"]), group=group)
else:
if "default" in argdict:
if isinstance(argdict["default"], bool):
self.setattr_argument(argname, BooleanValue(argdict["default"]), group=group)
elif isinstance(argdict["default"], str):
self.setattr_argument(argname, StringValue(argdict["default"]), group=group)
elif "scannable" in argdict and argdict["scannable"] is False:
self.setattr_argument(argname, NumberValue(argdict["default"],
**{k: argdict[k]
for k in ('unit', 'scale', 'ndecimals')
if k in argdict}),
group=group
)
else:
self.setattr_argument(argname, Scannable(default=NoScan(argdict["default"], 1),
**{k: argdict[k]
for k in ('unit', 'scale', 'ndecimals')
if k in argdict}),
group=group
)
else:
logger.error(f"no default value set for argument {argname}")
[docs]
def prepare(self):
for component in self._components:
getattr(self, component_dict(component)["id"])._recursive_prepare()
# maybe this works for autodetection
self.scannables = [
(varname, getattr(self, varname))
for varname, _ in inspect.getmembers(self, lambda x: isinstance(x, ScanObject))
]
# check if we need any attributes from the attached block object
for blkdata in map(block_dict, self._blocks):
alias = blkdata["alias"]
cls_name = blkdata["class"].__name__ if alias is None else alias
obj = getattr(type(self), cls_name)
obj.prepare()
if hasattr(obj, "scannables"):
self.scannables += obj.scannables
self.msm = MultiScanManager(*self.scannables)
def _build_core(self):
self.setattr_device("scheduler")
self.setattr_device("core")
self.setattr_device("core_dma")
def _build(self):
pass
[docs]
def build(self):
self._build_core()
# It's important to do this before the components are built since
# the blocks modify the component list
self._build_blocks()
self._build_components()
# maybe components themselfes need something being done in build phase?
for component in self._components:
getattr(self, component_dict(component)["id"])._recursive_build()
self._build_arguments()
self._build()
@kernel
def _prerun_core(self, reset=False):
self.clock_at_start = self.core.get_rtio_counter_mu()
self.log.info("setting up coredevice")
if reset:
delay(500 * ms)
self.core.reset()
self.core.break_realtime()
self.prerun()
@rpc(flags={"async"})
def _set_identifiers_in_master(self, identifier, run_id, step_counter):
"""
makes an rpc call to set the identifier in the master
"""
self.identifier = identifier
self.run_id = run_id
self.step_counter = step_counter
self.set_dataset("step_counter", step_counter, broadcast=True)
[docs]
def chunker(self, mult_scan, size: TInt32 = 100) -> TList:
"""
generator to call a kernel with chunks of scan points
"""
noscan_args = {name: getattr(self, name)
for name, value in self._HasEnvironment__argument_mgr.unprocessed_arguments.items()
if hasattr(self, name) and not isinstance(value, dict)}
# first count the number of points for time estimation
n_point = 0
for _p in mult_scan:
n_point += 1
# broadcast this information
self.set_dataset("max_step_counter", n_point - 1, broadcast=True)
chunk = []
i = 1
counter = 0
for _p in mult_scan:
step_counter = counter
_p.__dict__.update(noscan_args)
setattr(_p, "step_counter", step_counter)
setattr(_p, "run_id", self.scheduler.rid)
setattr(_p, "identifier", np.int64(-1))
chunk.append(_p)
counter += 1
if len(chunk) >= size:
logger.info(f"starting kernel with chunk {i}")
yield chunk
chunk = []
i += 1
logger.info(f"starting kernel with chunk {i}")
if len(chunk) > 0:
yield chunk
else:
return
def _loop_chunks(self, chunk_list: TList):
"""
Loop over a list of chunks in the host, i.e. iterate over starting kernels holding CHUNKSIZE experiments
:param chunks: (list) list of chunks. Each entry is again an iterable with CHUNKSIZE Scanpoint objects inside
"""
current_chunk = 0
for points in chunk_list:
try:
self._prechunk_host(points)
self._runchunk(points)
self._postchunk_host(points)
except RTIOUnderflow as ex:
self.log.warning(
f"RTIOUnderflow during chunk {current_chunk}")
self.log.warning(ex)
self.log.warning(traceback.format_exc())
self._prerun_core(reset=True)
finally:
current_chunk += 1
if self.scheduler.check_pause():
self.core.comm.close()
self.scheduler.pause()
self.log.info(
f"Come with me if you want to live - RID {self.scheduler.rid}"
)
def _run_init(self):
# generate and log run id
exp_identifier = f"{self.__class__.__name__}_{self.scheduler.rid}"
self.log.info(f"starting run {exp_identifier}")
# get timestamp of the run
self.run_timestamp = int(time.time()*1e6)
self._prerun_core()
# run _prerun methods of the components
for comp in map(component_dict, self._components):
getattr(self, comp["id"])._recursive_prerun()
self._prerun_host()
self._prerun()
logger.info(f"running with block list {self._blocks}")
logger.info(f"running with component list {self._components}")
[docs]
def run(self):
self._run_init()
try:
self._loop_chunks(self.chunker(self.msm, self.CHUNKSIZE))
except TerminationRequested:
logger.info(f"I'll be back - RID {self.scheduler.rid}")
self._postrun()
self._postrun_host()
@kernel
def _runchunk(self, points):
# make everything settle
self.core.wait_until_mu(now_mu())
self.core.reset()
self._prechunk(points)
# execute the step function for every point in the chunk.
for point in points:
# set the identifier in the master and the running core device
self.identifier = self.run_timestamp + \
np.int64((self.core.get_rtio_counter_mu() - self.clock_at_start)*self.core.ref_period*1e6)
point.identifier = self.identifier
self.step_counter = point.step_counter
self.run_id = point.run_id
self._set_identifiers_in_master(self.identifier, self.run_id, self.step_counter)
self.core.break_realtime()
# delay to have some slack when the step starts
delay(0.5 * ms)
# do what the user wants to be done...
self._prestep(point)
self.step(point)
self._poststep(point)
self.core.wait_until_mu(now_mu())
self._postchunk(points)
self.core.wait_until_mu(now_mu())
[docs]
@kernel
def prerun(self):
pass
[docs]
def prerun_host(self):
pass
[docs]
@kernel
def postrun(self):
pass
[docs]
def postrun_host(self):
pass
[docs]
@kernel
def prestep(self, point):
pass
[docs]
@kernel
def poststep(self, point):
pass
[docs]
@kernel
def prechunk(self, points):
pass
[docs]
@kernel
def postchunk(self, points):
pass
[docs]
def prechunk_host(self, points):
pass
[docs]
def postchunk_host(self, points):
pass
[docs]
@kernel
def step(self, point):
raise NotImplementedError
[docs]
class AtomiqBlock(AtomiqExperiment):
def __new__(cls, *args, **kwargs):
# if we get a name in the constructor make a class with that name and instanciate it
if "name" in kwargs and kwargs["name"] is not None:
return super().__new__(type(kwargs["name"], (cls,), {}))
else:
return super().__new__(cls)
def __init__(self, *args, **kwargs):
self.experiment = args[0].experiment
super().__init__(*args, **kwargs)
def _getmro(self):
# since we dynamically add the dummy class with the alias name, we need to exclude that
# from the mro here again.
return inspect.getmro(type(self))[1:]