Source code for atomiq.frontend.atomiq_master

import importlib

import artiq.master.scheduler
import artiq.master.experiments

from atomiq.heros import heros, LocalHERO, RemoteHERO, event


[docs] def argument_factory(arg_description: dict): """ Instantiate a ScanObject or BooleanValue, StringValue etc from type and default arguments """ import artiq.language.scan import artiq.language.environment from artiq.language.environment import _SimpleArgProcessor arg_type = arg_description["ty"] if arg_type == "Scannable": inner_arg_type = arg_description["default"][0]["ty"] inner_description = arg_description["default"][0] inner_kwargs = {k:v for k, v in inner_description.items() if k != "ty"} arg_cls = getattr(artiq.language.scan, inner_arg_type) return arg_cls(**inner_kwargs).describe() else: # _SimpleArgProcessor inner_arg_type = arg_type inner_description = arg_description inner_kwargs = {k:v for k, v in inner_description.items() if k != "ty"} arg_cls = getattr(artiq.language.environment, inner_arg_type) arg = arg_cls(**inner_kwargs) # we further need to conert these into "real" values, # i.e. we need "some_var": {ty: "BooleanValue", ...} -> "some_var": True # for simplicity, we just take the default return arg.default()
# If HEROS is available, replace the artiq Scheduler with our HEROS-enabled version if heros is not None: from heros.zenoh import session_manager _instance_name = None _ArtiqScheduler = artiq.master.scheduler.Scheduler _ArtiqExperimentDB = artiq.master.experiments.ExperimentDB class Scheduler(_ArtiqScheduler, LocalHERO): """ Patches the ARTIQ master Scheduler to (`artiq.master.scheduler.Scheduler`) to become a HERO. """ def __init__(self, *args, **kwargs): global _instance_name _ArtiqScheduler.__init__(self, *args, **kwargs) self._worker_handlers.update({ "run_started": self.run_started, "run_created": self.run_created, "run_ended": self.run_ended }) postfix = f"-{_instance_name}" if _instance_name is not None else "" LocalHERO.__init__(self, f"atomiq-scheduler{postfix}") @event def run_created(self, rid, metadata={}): metadata.update({"rid": rid}) return metadata @event def run_started(self, rid, metadata={}): metadata.update({"rid": rid}) return metadata @event def run_ended(self, rid, metadata={}): metadata.update({"rid": rid}) return metadata def get_current_run(self) -> RemoteHERO: """ Return a handle to the current run or None. """ for rid, run in self.get_status().items(): if run["status"] == "running": # construct HERO return RemoteHERO(f"atomiq-run-{rid}") return None def get_expid(self, rid: int) -> dict: """ Return the expid dict of a running experiment. Args: rid: A run_id. Returns: expid dictionary if the run id is currently in the schedule, else returns an empty dict """ rid = int(rid) status = self.get_status() return status[rid]["expid"] if rid in status else {} class ExperimentDB(_ArtiqExperimentDB, LocalHERO): """ Patches the ARTIQ ExperimentDB to (`artiq.master.experiments.ExperimentDB`) to become a HERO. """ def __init__(self, *args, **kwargs): global _instance_name _ArtiqExperimentDB.__init__(self, *args, **kwargs) postfix = f"-{_instance_name}" if _instance_name is not None else "" LocalHERO.__init__(self, f"atomiq-experimentdb{postfix}") def scan_repository_sync(self, new_cur_rev=None) -> None: """ Trigger a reposistory scan on the ARTIQ master. .. note:: This method is blocking and likely to timeout for larger repositories. """ import asyncio asyncio.run(self.scan_repository(new_cur_rev)) def get_experiment_examination(self, filename, use_repository=True, revision=None) -> dict: """ Examine an experiment. """ import asyncio experiment = asyncio.run(self.examine(filename, use_repository, revision)) return experiment def get_experiment(self, experiment_name: str) -> dict: """ Construct a expid dict for a given experiment. This dict can then be used to start an experiment via the Scheduler-HERO. """ assert experiment_name in self.list_experiments(), f"Experiment {experiment_name} is not in {list(self.list_experiments.keys())}" full_description = {**self.list_experiments()[experiment_name]} # make a copy expid = { "devarg_override": {}, "repo_rev": 'N/A', "log_level": 20, # INFO "file": full_description["file"], "class_name": full_description["class_name"], "arguments": {}, } # handle args arguments = {} for arg_name, arg_descriptions in full_description["arginfo"].items(): arg = argument_factory(arg_descriptions[0]) arguments[arg_name] = arg expid["arguments"] = arguments return expid def list_experiments(self) -> dict: """ List all known experiments. """ return self.explist.raw_view # apply patches artiq.master.scheduler.Scheduler = Scheduler artiq.master.experiments.ExperimentDB = ExperimentDB del artiq del Scheduler del ExperimentDB if __name__ == '__main__': from artiq.frontend import artiq_master _artiq_get_argparser = artiq_master.get_argparser def get_argparser(): """ We extend the artiq master argparser """ parser = _artiq_get_argparser() parser.add_argument("--heros-connect", help="zenoh host to connect to for discovery" "of heros") return parser args = get_argparser().parse_args() artiq_master.get_argparser = get_argparser # if we get a host to connect to for zenoh discovery, we insert it into our config if "heros_connect" in args and args.heros_connect is not None and heros is not None: session_manager.update_config({"connect": {"endpoints": [args.heros_connect]}}) if "name" in args: _instance_name = args.name artiq_master.main()