import importlib
import artiq.master.scheduler
import artiq.master.experiments
from atomiq.heros import heros, LocalHERO, RemoteHERO, event
[docs]
def argument_factory(arg_description: dict):
"""
Instantiate a ScanObject or BooleanValue, StringValue etc from type and default arguments
"""
import artiq.language.scan
import artiq.language.environment
from artiq.language.environment import _SimpleArgProcessor
arg_type = arg_description["ty"]
if arg_type == "Scannable":
inner_arg_type = arg_description["default"][0]["ty"]
inner_description = arg_description["default"][0]
inner_kwargs = {k:v for k, v in inner_description.items() if k != "ty"}
arg_cls = getattr(artiq.language.scan, inner_arg_type)
return arg_cls(**inner_kwargs).describe()
else:
# _SimpleArgProcessor
inner_arg_type = arg_type
inner_description = arg_description
inner_kwargs = {k:v for k, v in inner_description.items() if k != "ty"}
arg_cls = getattr(artiq.language.environment, inner_arg_type)
arg = arg_cls(**inner_kwargs)
# we further need to conert these into "real" values,
# i.e. we need "some_var": {ty: "BooleanValue", ...} -> "some_var": True
# for simplicity, we just take the default
return arg.default()
# If HEROS is available, replace the artiq Scheduler with our HEROS-enabled version
if heros is not None:
from heros.zenoh import session_manager
_instance_name = None
_ArtiqScheduler = artiq.master.scheduler.Scheduler
_ArtiqExperimentDB = artiq.master.experiments.ExperimentDB
class Scheduler(_ArtiqScheduler, LocalHERO):
"""
Patches the ARTIQ master Scheduler to (`artiq.master.scheduler.Scheduler`) to become a HERO.
"""
def __init__(self, *args, **kwargs):
global _instance_name
_ArtiqScheduler.__init__(self, *args, **kwargs)
self._worker_handlers.update({
"run_started": self.run_started,
"run_created": self.run_created,
"run_ended": self.run_ended
})
postfix = f"-{_instance_name}" if _instance_name is not None else ""
LocalHERO.__init__(self, f"atomiq-scheduler{postfix}")
@event
def run_created(self, rid, metadata={}):
metadata.update({"rid": rid})
return metadata
@event
def run_started(self, rid, metadata={}):
metadata.update({"rid": rid})
return metadata
@event
def run_ended(self, rid, metadata={}):
metadata.update({"rid": rid})
return metadata
def get_current_run(self) -> RemoteHERO:
"""
Return a handle to the current run or None.
"""
for rid, run in self.get_status().items():
if run["status"] == "running":
# construct HERO
return RemoteHERO(f"atomiq-run-{rid}")
return None
def get_expid(self, rid: int) -> dict:
"""
Return the expid dict of a running experiment.
Args:
rid: A run_id.
Returns:
expid dictionary if the run id is currently in the schedule,
else returns an empty dict
"""
rid = int(rid)
status = self.get_status()
return status[rid]["expid"] if rid in status else {}
class ExperimentDB(_ArtiqExperimentDB, LocalHERO):
"""
Patches the ARTIQ ExperimentDB to (`artiq.master.experiments.ExperimentDB`) to become a HERO.
"""
def __init__(self, *args, **kwargs):
global _instance_name
_ArtiqExperimentDB.__init__(self, *args, **kwargs)
postfix = f"-{_instance_name}" if _instance_name is not None else ""
LocalHERO.__init__(self, f"atomiq-experimentdb{postfix}")
def scan_repository_sync(self, new_cur_rev=None) -> None:
"""
Trigger a reposistory scan on the ARTIQ master.
.. note::
This method is blocking and likely to timeout for larger repositories.
"""
import asyncio
asyncio.run(self.scan_repository(new_cur_rev))
def get_experiment_examination(self, filename, use_repository=True, revision=None) -> dict:
"""
Examine an experiment.
"""
import asyncio
experiment = asyncio.run(self.examine(filename, use_repository, revision))
return experiment
def get_experiment(self, experiment_name: str) -> dict:
"""
Construct a expid dict for a given experiment.
This dict can then be used to start an experiment via the Scheduler-HERO.
"""
assert experiment_name in self.list_experiments(), f"Experiment {experiment_name} is not in {list(self.list_experiments.keys())}"
full_description = {**self.list_experiments()[experiment_name]} # make a copy
expid = {
"devarg_override": {},
"repo_rev": 'N/A',
"log_level": 20, # INFO
"file": full_description["file"],
"class_name": full_description["class_name"],
"arguments": {},
}
# handle args
arguments = {}
for arg_name, arg_descriptions in full_description["arginfo"].items():
arg = argument_factory(arg_descriptions[0])
arguments[arg_name] = arg
expid["arguments"] = arguments
return expid
def list_experiments(self) -> dict:
"""
List all known experiments.
"""
return self.explist.raw_view
# apply patches
artiq.master.scheduler.Scheduler = Scheduler
artiq.master.experiments.ExperimentDB = ExperimentDB
del artiq
del Scheduler
del ExperimentDB
if __name__ == '__main__':
from artiq.frontend import artiq_master
_artiq_get_argparser = artiq_master.get_argparser
def get_argparser():
"""
We extend the artiq master argparser
"""
parser = _artiq_get_argparser()
parser.add_argument("--heros-connect",
help="zenoh host to connect to for discovery"
"of heros")
return parser
args = get_argparser().parse_args()
artiq_master.get_argparser = get_argparser
# if we get a host to connect to for zenoh discovery, we insert it into our config
if "heros_connect" in args and args.heros_connect is not None and heros is not None:
session_manager.update_config({"connect": {"endpoints": [args.heros_connect]}})
if "name" in args:
_instance_name = args.name
artiq_master.main()