importartiqimportartiq.master.worker_dbfromatomiq.helperimportget_class_by_namefromatomiq.components.primitivesimportComponent,Remotefromatomiq.herosimportherosimportloggingimportrandomimportstringifherosisnotNone:RemoteHERO=heros.RemoteHEROlogging.basicConfig()logger=logging.getLogger(__name__)components=Nonesuservo_replacements=[]def_create_device_replace(desc,device_mgr,*args,**kwargs):globalsuservo_replacementsifdescinsuservo_replacements:desc["class"]+="_Suservo"fori,existinginenumerate(device_mgr.active_devices):# Avoid double init due to change in class name from original class as in ddbifdesc==existing[0]:deldevice_mgr.active_devices[i]returnexisting[1]returnartiq.master.worker_db._create_device_original(desc,device_mgr,*args,**kwargs)artiq.master.worker_db._create_device_original=artiq.master.worker_db._create_deviceartiq.master.worker_db._create_device=_create_device_replace
[docs]defbuild_object_from_device_db(parent,object_id):globalsuservo_replacementslogger.debug(f"building device_db object {object_id} with parent {parent}")# workaroud for https://forum.m-labs.hk/d/273-using-su-servo-mode-and-freestanding-urukul-in-the-same-experiment/3# this should not be necessary anymore once we run nac3deffixup_suservo_class(node):forclassnamein["CPLD","AD9910"]:if"class"innodeandnode["class"]==classname:suservo_replacements.append(node)logger.debug(f"fixing class for Urukul/SUServo coexistence in {node}")if"compiler"indir(artiq):ddb=parent.get_device_db()ifobject_idinddb:entry=ddb[object_id]logger.debug(f"checking {object_id} for Urukul/SUServo coexistance issues")if"class"inentryandentry["class"]=="SUServo":fornodeinentry["arguments"]["cpld_devices"]:fixup_suservo_class(ddb[node])fornodeinentry["arguments"]["dds_devices"]:fixup_suservo_class(ddb[node])returnparent.get_device(object_id)
[docs]defbuild_object_hero(parent,object_id):ifherosisNone:raiseImportError("Your experiment requires HEROS which is not available on your system")parts=object_id.split("/")iflen(parts)>1:realm=parts[0]hero_name="/".join(parts[1:])else:realm="heros"hero_name=object_idhero=RemoteHERO(hero_name,realm=realm)# let's check if the hero implements some atomiq classclass_list=[]forimpl_nameinhero._hero_implements:ifimpl_name.startswith("atomiq.components."):try:class_list.append(get_class_by_name(impl_name))exceptException:logging.warning(f"Could not load the the class {impl_name} that the HERO {hero_name} claims to"" implement.")iflen(class_list)==0:class_list.append(Component)logging.warning(f"The HERO {hero_name} does not tell what it implements. Let's treat it as a generic Component"" hope it provides the interfaces we need...")delhero# dynamically create HEROComponent classRemoteHEROComponent=type("RemoteHEROComponent",tuple([RemoteHERO,Remote]+class_list),{})def_init_replacement(self,*args,**kwargs):RemoteHERO.__init__(self,hero_name,realm=realm)Component.__init__(self,parent,object_id)Remote.__init__(self,object_id)RemoteHEROComponent.__init__=_init_replacementobj=RemoteHEROComponent(hero_name,realm=realm)# A HEROComponent lives completely outside the kernel and can thus not have a kernel invariantobj._kernel_invariants=set()returnobj
[docs]defbuild_object(classname,arg_dict):logger.debug(f"building object of class {classname}")# fixup for artiq compilers prior to nac3 where the type inferral complains when two objects of the# same class differ in the types of the attributes. Here we create unique dummy classes for each object# to work around the issue. When we are running on the legacy compiler artiq.compiler submodule should exist.if"compiler"indir(artiq):ephemeral_classname=f"{classname}_{''.join(random.choice(string.ascii_lowercase)foriinrange(8))}"logger.debug(f"apply workaround for old artiq type inferral {classname} -> {ephemeral_classname}")target_class=type(ephemeral_classname,(get_class_by_name(classname),),{})else:target_class=get_class_by_name(classname)returntarget_class(**arg_dict)
[docs]@staticmethoddefproduce(name,parent):globalcomponentsifcomponentsisNone:ddb=parent.get_device_db()components=ddb["components"]if"components"inddbelse{}logger.debug(f"Try to produce {name}")iftype(name)isstrandname.startswith("$"):returnbuild_object_hero(parent,name[1:])else:ifnameincomponents:returnComponentFactory._produce_from_dict(name,parent,components[name])else:raiseKeyError(f"component '{name}' required by {parent.__class__.__name__} does not exist in"" component definition dict")
@staticmethoddef_produce_from_dict(identifier,parent,dictionary):globalcomponentsif"obj"indictionary:# component was already built. Return itreturndictionary["obj"]else:# Build component. If arguments are themselfes components, build them recursivelyif"classname"notindictionary:raiseKeyError(f"component '{identifier}' required by {parent.__class__.__name__} does not have the""mandatory argument `classname`")arg_dict={"identifier":identifier,"parent":parent}if"arguments"indictionaryandtype(dictionary["arguments"])isdict:forargname,argvalindictionary["arguments"].items():iftype(argval)isstrandargval.startswith("&"):obj=ComponentFactory.produce(argval[1:],parent)arg_dict[argname]=objeliftype(argval)isstrandargval.startswith("@"):arg_dict[argname]=build_object_from_device_db(parent,argval[1:])eliftype(argval)isstrandargval.startswith("$"):arg_dict[argname]=build_object_hero(parent,argval[1:])else:arg_dict[argname]=argvaltry:obj=build_object(dictionary["classname"],arg_dict)dictionary["obj"]=objreturnobjexceptExceptionase:# Make a more user friendly output that names the component that cannot be builtlogger.error(f"Cannot build component {identifier} due to following error: {e}")raisee