from __future__ import annotations
from atomiq.components.electronics.voltagesource import DAC, DACChannel
from artiq.experiment import kernel, delay, delay_mu, portable, at_mu
from artiq.language.types import TFloat, TBool, TInt64, TInt32, TStr
from artiq.language.core import now_mu
from artiq.coredevice.ad53xx import voltage_to_mu
import math
import numpy as np
[docs]
class Zotino(DAC):
"""
This class represents the Sinara Zotino 32 Channel DAC.
Hint:
The Zotino provides the ``Zotino.parallel_arb`` context manager which allows to play arbitrary
waveforms and ramps on multiple channels in parallel.
Example code using this functionallity can be found :ref:`here <Coils example>`
Args:
zotino_device: The ARTIQ zotino device to use from the device db, e.g. `@zotino_0`
max_parallel_arbs: Number of channels that can be ramped or played an arbitrary waveform on in parallel.
Keep this as low as possible as higher numbers increase the prerun phase length.
max_arb_samples: Number of samples a arbitrary waveform/ramp can maximally have.
Keep this as low as possible as higher numbers increase the prerun phase length.
"""
kernel_invariants = {"_zotino_device"}
hold_arbs = False
_last_update = np.int64(0)
_last_dac_write = np.int64(0)
_presched_dac_counter = 0
def __init__(self, zotino_device:TStr, *args, max_parallel_arbs:TInt32=5, max_arb_samples:TInt32=128, **kwargs):
DAC.__init__(self, num_chan=32, *args, **kwargs)
self._zotino_device = zotino_device
self.parallel_arb = self._ParallelArbsContext(self)
self.current_parallel_arb = self._ParallelArb(self, max_parallel_arbs, max_arb_samples)
self.update_i = 0
self.update_times = [np.int64(0)]*200
try:
self._schedule_dac_delay = self._zotino_device.bus.xfer_duration_mu + 25
except AttributeError:
# can not be set at experiment variable build as bus is not available
pass
@kernel
def _prerun(self):
self._zotino_device.init()
[docs]
@kernel
def update(self):
"""
Applys the voltages in the DAC register to the outputs.
Hint:
If an update is already scheduled at the current time,
this command does nothing. This allows for setting multiple DAC channels in a `parallel` statement without
multiple slow updates.
"""
now = now_mu()
if self._last_update < now - self._zotino_device.bus.xfer_duration_mu:
self._zotino_device.load()
self._last_update = now
[docs]
@kernel
def write_dac(self, channel: TInt32, voltage: TFloat) -> None:
"""
Writes the voltage value `voltage` to the DAC channel `channel`. This method schedules the operation in the
past and does not advance the time cursor.
Args:
channel: Channel number
voltage: Voltage in V
Hint:
This command uses a scheduler which takes care that no write operations occure at the same time to prevent
undefined behavior which often do not result in a noticable error.
"""
now = now_mu()
upper_busy = self._last_dac_write + self._schedule_dac_delay
lower_busy = self._last_dac_write - (self._presched_dac_counter+1)*self._schedule_dac_delay
if (now < upper_busy) and (now > lower_busy):
# collision, schedule one write operation time earlier
self._presched_dac_counter += 1
else:
# no collisions, reset counter and write now
self._last_dac_write = now
self._presched_dac_counter = 0
sched_delay = (1+self._presched_dac_counter)*self._schedule_dac_delay
# origin of delay is described in artiq/coredevice/ad53xx.py in method AD53xx.set_dac
delay_mu(-1500 - sched_delay)
self._zotino_device.write_dac(channel, voltage)
# returns the time curser back
delay_mu(1500 + sched_delay - self._zotino_device.bus.xfer_duration_mu)
class _SingleChannelArb:
"""
Represents an arbitrary voltage signal on a single Zotino channel.
"""
active = False
channel = 0
len = 0
step_offset = np.int64(0)
i_step = 0
def __init__(self, zotino_instance:Zotino, max_samples:TInt32):
self.zotino = zotino_instance
self.values = [0]*max_samples
@kernel
def activate(self, channel:TInt32, step_offset:TInt64, values):
self.channel = channel
self.active = True
self.step_offset = step_offset
try:
self.len = len(values)
self.values[:self.len] = values
except IndexError:
self.zotino.experiment.log.error(self.zotino.identifier + ": arbitrary waveform with {} samples "\
"requested on channel {} but only {} samples initiallized. See Zotino 'max_arb_samples' "\
"device argument for more information. Truncating waveform...",
[self.len, channel, len(self.values)])
self.len = len(self.values)
self.values[:self.len] = values
self.i_step = 0
@kernel
def step(self):
self.zotino._zotino_device.write_dac_mu(self.channel, self.values[self.i_step])
self.i_step += 1
if self.i_step >= self.len:
self.active = False
class _ParallelArb:
"""
Represents a voltage arbitrary signal on multiple channels in parallel
"""
def __init__(self, zotino_instance:Zotino, max_parallel:TInt32, max_samples:TInt32):
self.zotino = zotino_instance
self.arbs = [self.zotino._SingleChannelArb(self.zotino, max_samples) for i in range(max_parallel)]
self.start_mu = np.int64(0)
self.timestep_mu = np.int64(0)
self.n_arbs = 0
@kernel
def start_record(self):
"""
Defines the starting point of the parallel arbitrary waveform at the current time cursor position and
resets previously scheduled arbs
"""
self.timestep_mu = np.int64(0)
self.n_arbs = 0
self.start_mu = now_mu()
@portable(flags={"fast-math"})
def register_arb(self, channel:TInt32, arb_values, timestep_mu: TInt64, start_mu: TInt64):
"""
Add a single channel arbitrary waveform to the parallel arb scheduler
Args:
channel: Zotino channel
arb_values: List of values which are played sequentially in machine units
timestep_mu: Time between arb samples. Must be equal for all arbs which are scheduled in parallel
start_mu: Time offset of the arb in reference to the point where `start_record` was called. Can be
negative but must be a multiple of `timestep_mu`.
"""
if self.timestep_mu == 0:
self.timestep_mu = timestep_mu
elif self.timestep_mu != timestep_mu:
self.zotino.experiment.log.warning(self.zotino.identifier + "Arb signals in parallel Zotino mode "\
"must have the same step size but have {0} (mu) and {1} (mu)", [self.timestep_mu, timestep_mu])
step_offset = (start_mu - self.start_mu)/timestep_mu
if abs(step_offset - round(step_offset)) > 0.01:
self.zotino.experiment.log.warning(self.zotino.identifier + "Arb signal in parallel Zotino mode "\
"must have a time offset which is a multiple of the step size {0} (mu) but has {1} (mu)",
[timestep_mu, start_mu])
try:
self.arbs[self.n_arbs].activate(channel, np.int64(step_offset), arb_values)
except IndexError:
self.zotino.experiment.log.error(self.zotino.identifier + ": {} parallel arbitrary waveform signals "\
"requested but only {} initiallized. See Zotino 'max_parallel_arbs' device argument for "\
"more information.", [self.n_arbs+1, len(self.arbs)])
self.n_arbs += 1
@kernel
def start(self):
"""
Start the playback of all arbitrary volage signals merged by the `register_arb` method
"""
i_step = 0
arb_running = True
while arb_running:
arb_running = False
at_mu(self.start_mu+(i_step-1)*self.timestep_mu-self.n_arbs*self.zotino._zotino_device.bus.xfer_duration_mu)
for arb in self.arbs[:self.n_arbs]:
if arb.active:
arb_running = True
if i_step > arb.step_offset:
arb.step()
at_mu(self.start_mu+(i_step-1)*self.timestep_mu)
self.zotino.update()
i_step += 1
class _ParallelArbsContext:
"""
Context manager which makes all arbitrary waveforms defined within played in parallel.
For an usage example, see :ref:`here <Coils example>`.
"""
def __init__(self, zotino_instance:Zotino):
self.zotino = zotino_instance
@kernel
def __enter__(self):
self.zotino.hold_arbs = True
self.zotino.current_parallel_arb.start_record()
@kernel
def __exit__(self, exc_type, exc_value, exc_traceback):
self.zotino.hold_arbs = False
self.zotino.current_parallel_arb.start()
[docs]
class ZotinoChannel(DACChannel):
@kernel
def _set_voltage(self, voltage: TFloat, update_dac: TBool = True):
self.dac_device.write_dac(self.channel, voltage)
if self.debug_output:
self.experiment.log.info(self.identifier + ": Written to channel {0}", [self.channel])
self.experiment.log.info(self.identifier + ": Voltage: {0}", [voltage])
if update_dac:
self.dac_device.update()
@kernel(flags={"fast-math"})
def _ramp_voltage(self, duration: TFloat, voltage_start: TFloat, voltage_end: TFloat, ramp_timestep: TFloat = 0.0002):
n = int(duration / ramp_timestep)
timestep_mu = self.core.seconds_to_mu(ramp_timestep)
voltage_start_mu = voltage_to_mu(
voltage_start,self.dac_device._zotino_device.offset_dacs, self.dac_device._zotino_device.vref)
voltage_step_mu = voltage_to_mu(
(voltage_end-voltage_start)/(n-1),
self.dac_device._zotino_device.offset_dacs, self.dac_device._zotino_device.vref) - 2**15
ramp_array = [voltage_start_mu+voltage_step_mu*i for i in range(n)]
if self.dac_device.hold_arbs:
self.dac_device.current_parallel_arb.register_arb(self.channel, ramp_array, timestep_mu, now_mu())
else:
for val in ramp_array:
self.dac_device._zotino_device.set_dac_mu([val],channels=[self.channel])
delay_mu(timestep_mu)
[docs]
class Fastino(DAC):
"""The Sinara Fast ADC called Fastino
Args:
fastino_device: The ARTIQ fastino device from the `device_db`
parallel_event_delay: Time by which occuring parallel events in ramps are moved [s] (default: 10ns)
"""
kernel_invariants = {"_fastino_device", "parallel_event_delay"}
def __init__(self, fastino_device:TStr, parallel_event_delay: TBool = 10e-9, *args, **kwargs):
DAC.__init__(self, num_chan=32, *args, **kwargs)
self._fastino_device = fastino_device
# variables for detecting parallel events
self.event_slots = np.array([-1]*32, dtype=np.int64)
self.event_slots_max_idx = -1
self.parallel_event_delay = parallel_event_delay
@kernel
def _prerun(self):
# self._fastino_device.init()
pass
[docs]
@kernel
def update(self):
pass
[docs]
@kernel
def register_event(self, time: TInt64) -> TFloat:
last_used = self.event_slots_max_idx
first_free = last_used + 1
shift_needed = False
for i in range(self.event_slots_max_idx, -1, -1):
event = self.event_slots[i]
# check if the time of the event already passed
if event < now_mu():
self.event_slots[i] = -1
if last_used == self.event_slots_max_idx:
last_used = i
first_free = i
# check for collision with the new event
if event == time:
shift_needed = True
self.event_slots_max_idx = last_used
if shift_needed:
return self.register_event(time + self.core.seconds_to_mu(self.parallel_event_delay)) +\
self.parallel_event_delay
else:
self.event_slots[first_free] = time
if first_free >= last_used:
self.event_slots_max_idx = first_free
return 0.0
[docs]
class FastinoChannel(DACChannel):
@kernel(flags={"fast-math"})
def _ramp_voltage(self,
duration: TFloat,
voltage_start: TFloat,
voltage_end: TFloat,
ramp_timestep: TFloat = 200e-6,
relocate_parallel: TBool = True):
"""
This method implements a stupid ramp on an abstract level. This will most likely work but be slow.
If your hardware has native support for ramping, please override this function when you inherit from
VoltageSource
"""
if relocate_parallel:
offset = self.dac_device.register_event(now_mu())
if offset > 0.0:
self.experiment.log.warning(self.identifier + ": Found paralell ramp, delaying..")
delay(offset)
n = int(duration / ramp_timestep)
ramp_timestep = duration / float(n)
for i in range(n + 1):
self._set_voltage(voltage_start + i * (voltage_end - voltage_start) / n, zero_time=True)
delay(ramp_timestep)
@kernel
def _set_voltage(self, voltage: TFloat, zero_time: TBool = False):
self.dac_device._fastino_device.set_dac(self.channel, voltage)
# self.experiment.log.info(self.identifier + ": setting voltage to {0} V", [voltage])
if not zero_time:
delay(0.1e-6)