Source code for atomiq.components

import artiq
import artiq.master.worker_db
from atomiq.helper import get_class_by_name
from atomiq.components.primitives import Component, Remote
from atomiq.heros import heros

import logging
import random
import string

if heros is not None:
    RemoteHERO = heros.RemoteHERO

logging.basicConfig()
logger = logging.getLogger(__name__)

components = None

suservo_replacements = []


def _create_device_replace(desc, device_mgr, *args, **kwargs):
    global suservo_replacements
    if desc in suservo_replacements:
        desc["class"] += "_Suservo"
        for i, existing in enumerate(device_mgr.active_devices):
            # Avoid double init due to change in class name from original class as in ddb
            if desc == existing[0]:
                del device_mgr.active_devices[i]
                return existing[1]
    return artiq.master.worker_db._create_device_original(desc, device_mgr, *args, **kwargs)


artiq.master.worker_db._create_device_original = artiq.master.worker_db._create_device
artiq.master.worker_db._create_device = _create_device_replace


[docs] def build_object_from_device_db(parent, object_id): global suservo_replacements logger.debug(f"building device_db object {object_id} with parent {parent}") # workaroud for https://forum.m-labs.hk/d/273-using-su-servo-mode-and-freestanding-urukul-in-the-same-experiment/3 # this should not be necessary anymore once we run nac3 def fixup_suservo_class(node): for classname in ["CPLD", "AD9910"]: if "class" in node and node["class"] == classname: suservo_replacements.append(node) logger.debug(f"fixing class for Urukul/SUServo coexistence in {node}") if "compiler" in dir(artiq): ddb = parent.get_device_db() if object_id in ddb: entry = ddb[object_id] logger.debug(f"checking {object_id} for Urukul/SUServo coexistance issues") if "class" in entry and entry["class"] == "SUServo": for node in entry["arguments"]["cpld_devices"]: fixup_suservo_class(ddb[node]) for node in entry["arguments"]["dds_devices"]: fixup_suservo_class(ddb[node]) return parent.get_device(object_id)
[docs] def build_object_hero(parent, object_id): if heros is None: raise ImportError("Your experiment requires HEROS which is not available on your system") parts = object_id.split("/") if len(parts) > 1: realm = parts[0] hero_name = "/".join(parts[1:]) else: realm = "heros" hero_name = object_id hero = RemoteHERO(hero_name, realm=realm) # let's check if the hero implements some atomiq class class_list = [] for impl_name in hero._hero_implements: if impl_name.startswith("atomiq.components."): try: class_list.append(get_class_by_name(impl_name)) except Exception: logging.warning(f"Could not load the the class {impl_name} that the HERO {hero_name} claims to" " implement.") if len(class_list) == 0: class_list.append(Component) logging.warning(f"The HERO {hero_name} does not tell what it implements. Let's treat it as a generic Component" " hope it provides the interfaces we need...") del hero # dynamically create HEROComponent class RemoteHEROComponent = type("RemoteHEROComponent", tuple([RemoteHERO, Remote] + class_list), {}) def _init_replacement(self, *args, **kwargs): RemoteHERO.__init__(self, hero_name, realm=realm) Component.__init__(self, parent, object_id) Remote.__init__(self, object_id) RemoteHEROComponent.__init__ = _init_replacement obj = RemoteHEROComponent(hero_name, realm=realm) # A HEROComponent lives completely outside the kernel and can thus not have a kernel invariant obj._kernel_invariants = set() return obj
[docs] def build_object(classname, arg_dict): logger.debug(f"building object of class {classname}") # fixup for artiq compilers prior to nac3 where the type inferral complains when two objects of the # same class differ in the types of the attributes. Here we create unique dummy classes for each object # to work around the issue. When we are running on the legacy compiler artiq.compiler submodule should exist. if "compiler" in dir(artiq): ephemeral_classname = f"{classname}_{''.join(random.choice(string.ascii_lowercase) for i in range(8))}" logger.debug(f"apply workaround for old artiq type inferral {classname} -> {ephemeral_classname}") target_class = type(ephemeral_classname, (get_class_by_name(classname),), {}) else: target_class = get_class_by_name(classname) return target_class(**arg_dict)
[docs] class ComponentFactory():
[docs] @staticmethod def produce(name, parent): global components if components is None: ddb = parent.get_device_db() components = ddb["components"] if "components" in ddb else {} logger.debug(f"Try to produce {name}") if type(name) is str and name.startswith("$"): return build_object_hero(parent, name[1:]) else: if name in components: return ComponentFactory._produce_from_dict(name, parent, components[name]) else: raise KeyError(f"component '{name}' required by {parent.__class__.__name__} does not exist in" " component definition dict")
@staticmethod def _produce_from_dict(identifier, parent, dictionary): global components if "obj" in dictionary: # component was already built. Return it return dictionary["obj"] else: # Build component. If arguments are themselfes components, build them recursively if "classname" not in dictionary: raise KeyError(f"component '{identifier}' required by {parent.__class__.__name__} does not have the" "mandatory argument `classname`") arg_dict = {"identifier": identifier, "parent": parent} if "arguments" in dictionary and type(dictionary["arguments"]) is dict: for argname, argval in dictionary["arguments"].items(): if type(argval) is str and argval.startswith("&"): obj = ComponentFactory.produce(argval[1:], parent) arg_dict[argname] = obj elif type(argval) is str and argval.startswith("@"): arg_dict[argname] = build_object_from_device_db(parent, argval[1:]) elif type(argval) is str and argval.startswith("$"): arg_dict[argname] = build_object_hero(parent, argval[1:]) else: arg_dict[argname] = argval try: obj = build_object(dictionary["classname"], arg_dict) dictionary["obj"] = obj return obj except Exception as e: # Make a more user friendly output that names the component that cannot be built logger.error(f"Cannot build component {identifier} due to following error: {e}") raise e