Source code for atomiq.heros
import sys
from atomiq.helper import dummy_decorator
[docs]
def get_heros():
"""
To import heros, we have to play tricks here. Since the artiq worker calls our code
in the artiq.master.worker_impl.examine function and afterwards removes the imported
modules from sys.modules. The PyO3 used for the python zenoh bindings does not like this
since then every subsequent `import heros` will reinitialize the PyO3 rust extension, leading
to the error
PyO3 modules compiled for CPython 3.8 or older may only be initialized once per interpreter process
To work around this, we save a reference to the heros module in the sys module and use this reference if
it exists. We use the sys module since it should always exist.
"""
if hasattr(sys, "heros"):
return sys.heros
else:
try:
import heros
sys.heros = heros
return heros
except ImportError:
return None
heros = get_heros()
if heros is not None:
LocalHERO = heros.heros.LocalHERO
RemoteHERO = heros.heros.RemoteHERO
event = heros.event
else:
LocalHERO = object
RemoteHERO = object
event = dummy_decorator
[docs]
class AtomiqHERO(LocalHERO):
name: str = ""
rid: int = -1
chunksize = -1
components: tuple[str] = ()
blocks: tuple[str] = ()
default_arguments: dict[dict] | None = None
def __init__(self, experiment):
self.experiment = experiment
self.name = experiment.__class__.__name__
self.rid = experiment.scheduler.rid
self.chunksize = experiment.CHUNKSIZE
self.components = tuple(experiment.components) if hasattr(experiment, "components") else ()
self.blocks = (str(block) for block in experiment.blocks) if hasattr(experiment, "blocks") else ()
self.default_arguments = experiment.arguments if hasattr(experiment, "arguments") else {}
super().__init__(f"atomiq-run-{self.experiment.scheduler.rid}", tags=list(experiment.TAGS))
@property
def step_counter(self):
return int(self.experiment.step_counter)
@property
def identifier(self):
return int(self.experiment.identifier)
@property
def steps_total(self):
"""
Return the total number of steps. Since this is only available after the prepare stage of the experiment, the
method returns -1 if the prepare was not yet run.
"""
if hasattr(self.experiment, "max_step_counter"):
return self.experiment.max_step_counter + 1
else:
return -1
[docs]
def terminate(self):
self.experiment.terminate()
@event
def emit_data(self, data):
return data