from __future__ import annotations
import time
from atomiq.components.primitives import Component
from artiq.experiment import rpc
from artiq.language.types import TStr, TList
[docs]
class DataSink(Component):
"""
Represent an abstract data sink and define methods it must provide.
"""
def __init__(self, *args, **kwargs):
Component.__init__(self, *args, **kwargs)
[docs]
def submit_env(self, point, identifier=None, *args, **kwargs):
if identifier is None:
identifier = self.experiment.identifier
env = {s: getattr(point, s) for s in point.attr}
env.update({"seqTimestamp": float(identifier)*1e-6,
"run_id": point.run_id,
"run_timestamp": self.experiment.run_timestamp,
"step_counter": point.step_counter,
})
names = list(env.keys())
self.submit_data(names, [env[s] for s in names], identifier=identifier, *args, **kwargs)
[docs]
def submit_data(self, field_names: TList, values: TList, identifier=None, *args, **kwargs):
raise NotImplementedError("Implement submit_data()")
[docs]
class RPCPublisherSink(DataSink):
kernel_invariants = {"rpc_publisher", "default_topic", "topic_prefix"}
def __init__(self, rpc_publisher: Component, topic_prefix: TStr = "", default_topic: TStr = "newData", *args, **kwargs):
"""
Represent a data sink that calls an RPC to publish data. What happens to the data strongly depends
on the other end of the RPC, i.e. the service that handles the RPC. Examples could be writing to a
pubsub service or dumping to a database.
:param rpc_publisher: (Component) an RPC device that can handle the function publish()
"""
DataSink.__init__(self, *args, **kwargs)
self.rpc_publisher = rpc_publisher
self.default_topic = default_topic
self.topic_prefix = topic_prefix
[docs]
@rpc(flags={"async"})
def submit_env(self, point, identifier=None, *args, **kwargs):
# we redefine this only to make the rpc call asynchronous
DataSink.submit_env(self, point, identifier=None, *args, **kwargs)
[docs]
@rpc(flags={"async"})
def submit_data(self, field_names: TList, values: TList, identifier=None, topic=None):
if topic is None:
topic = self.default_topic
if identifier is None:
identifier = self.experiment.identifier
data = dict(zip(field_names, values))
data["identifier"] = identifier
self.rpc_publisher.publish(f"{self.topic_prefix}{topic}", args=[data])
[docs]
class DatabaseSink(DataSink):
pass
[docs]
class ARTIQDatasetSink(DataSink):
pass