Source code for atomiq.heros

import sys
from atomiq.helper import dummy_decorator


[docs] def get_heros(): """ To import heros, we have to play tricks here. Since the artiq worker calls our code in the artiq.master.worker_impl.examine function and afterwards removes the imported modules from sys.modules. The PyO3 used for the python zenoh bindings does not like this since then every subsequent `import heros` will reinitialize the PyO3 rust extension, leading to the error PyO3 modules compiled for CPython 3.8 or older may only be initialized once per interpreter process To work around this, we save a reference to the heros module in the sys module and use this reference if it exists. We use the sys module since it should always exist. """ if hasattr(sys, "heros"): return sys.heros else: try: import heros sys.heros = heros return heros except ImportError: return None
heros = get_heros() if heros is not None: LocalHERO = heros.heros.LocalHERO event = heros.event else: LocalHERO = object event = dummy_decorator
[docs] class AtomiqHERO(LocalHERO): name: str = "" rid: int = -1 chunksize = -1 def __init__(self, experiment): self.experiment = experiment self.name = experiment.__class__.__name__ self.rid = experiment.scheduler.rid self.chunksize = experiment.CHUNKSIZE super().__init__(f"atomiq-run-{self.experiment.scheduler.rid}") @property def step_counter(self): return int(self.experiment.step_counter) @property def identifier(self): return int(self.experiment.identifier) @property def steps_total(self): """ Return the total number of steps. Since this is only available after the prepare stage of the experiment, the method returns -1 if the prepare was not yet run. """ if hasattr(self.experiment, "max_step_counter"): return self.experiment.max_step_counter + 1 else: return -1
[docs] def terminate(self): self.experiment.terminate()
@event def emit_data(self, data): return data