[docs]defget_heros():""" To import heros, we have to play tricks here. Since the artiq worker calls our code in the artiq.master.worker_impl.examine function and afterwards removes the imported modules from sys.modules. The PyO3 used for the python zenoh bindings does not like this since then every subsequent `import heros` will reinitialize the PyO3 rust extension, leading to the error PyO3 modules compiled for CPython 3.8 or older may only be initialized once per interpreter process To work around this, we save a reference to the heros module in the sys module and use this reference if it exists. We use the sys module since it should always exist. """ifhasattr(sys,"heros"):returnsys.heroselse:try:importherossys.heros=herosreturnherosexceptImportError:returnNone
[docs]classAtomiqHERO(LocalHERO):name:str=""rid:int=-1chunksize=-1def__init__(self,experiment):self.experiment=experimentself.name=experiment.__class__.__name__self.rid=experiment.scheduler.ridself.chunksize=experiment.CHUNKSIZEsuper().__init__(f"atomiq-run-{self.experiment.scheduler.rid}")@propertydefstep_counter(self):returnint(self.experiment.step_counter)@propertydefidentifier(self):returnint(self.experiment.identifier)@propertydefsteps_total(self):""" Return the total number of steps. Since this is only available after the prepare stage of the experiment, the method returns -1 if the prepare was not yet run. """ifhasattr(self.experiment,"max_step_counter"):returnself.experiment.max_step_counter+1else:return-1