Source code for atomiq.components.sinara.dac

from __future__ import annotations

from atomiq.components.electronics.voltagesource import DAC, DACChannel

from artiq.experiment import kernel, delay, delay_mu, portable, at_mu
from artiq.language.types import TFloat, TBool, TInt64, TInt32, TStr
from artiq.language.core import now_mu
from artiq.coredevice.ad53xx import voltage_to_mu
import math

import numpy as np


[docs] class Zotino(DAC): """ This class represents the Sinara Zotino 32 Channel DAC. Hint: The Zotino provides the ``Zotino.parallel_arb`` context manager which allows to play arbitrary waveforms and ramps on multiple channels in parallel. Example code using this functionallity can be found :ref:`here <Coils example>` Args: zotino_device: The ARTIQ zotino device to use from the device db, e.g. `@zotino_0` max_parallel_arbs: Number of channels that can be ramped or played an arbitrary waveform on in parallel. Keep this as low as possible as higher numbers increase the prerun phase length. max_arb_samples: Number of samples a arbitrary waveform/ramp can maximally have. Keep this as low as possible as higher numbers increase the prerun phase length. """ kernel_invariants = {"_zotino_device"} hold_arbs = False _last_update = np.int64(0) _last_dac_write = np.int64(0) _presched_dac_counter = 0 def __init__(self, zotino_device:TStr, *args, max_parallel_arbs:TInt32=5, max_arb_samples:TInt32=128, **kwargs): DAC.__init__(self, num_chan=32, *args, **kwargs) self._zotino_device = zotino_device self.parallel_arb = self._ParallelArbsContext(self) self.current_parallel_arb = self._ParallelArb(self, max_parallel_arbs, max_arb_samples) self.update_i = 0 self.update_times = [np.int64(0)]*200 try: self._schedule_dac_delay = self._zotino_device.bus.xfer_duration_mu + 25 except AttributeError: # can not be set at experiment variable build as bus is not available pass @kernel def _prerun(self): self._zotino_device.init()
[docs] @kernel def update(self): """ Applys the voltages in the DAC register to the outputs. Hint: If an update is already scheduled at the current time, this command does nothing. This allows for setting multiple DAC channels in a `parallel` statement without multiple slow updates. """ now = now_mu() if self._last_update < now - self._zotino_device.bus.xfer_duration_mu: self._zotino_device.load() self._last_update = now
[docs] @kernel def write_dac(self, channel: TInt32, voltage: TFloat) -> None: """ Writes the voltage value `voltage` to the DAC channel `channel`. This method schedules the operation in the past and does not advance the time cursor. Args: channel: Channel number voltage: Voltage in V Hint: This command uses a scheduler which takes care that no write operations occure at the same time to prevent undefined behavior which often do not result in a noticable error. """ now = now_mu() upper_busy = self._last_dac_write + self._schedule_dac_delay lower_busy = self._last_dac_write - (self._presched_dac_counter+1)*self._schedule_dac_delay if (now < upper_busy) and (now > lower_busy): # collision, schedule one write operation time earlier self._presched_dac_counter += 1 else: # no collisions, reset counter and write now self._last_dac_write = now self._presched_dac_counter = 0 sched_delay = (1+self._presched_dac_counter)*self._schedule_dac_delay # origin of delay is described in artiq/coredevice/ad53xx.py in method AD53xx.set_dac delay_mu(-1500 - sched_delay) self._zotino_device.write_dac(channel, voltage) # returns the time curser back delay_mu(1500 + sched_delay - self._zotino_device.bus.xfer_duration_mu)
class _SingleChannelArb: """ Represents an arbitrary voltage signal on a single Zotino channel. """ active = False channel = 0 len = 0 step_offset = np.int64(0) i_step = 0 def __init__(self, zotino_instance:Zotino, max_samples:TInt32): self.zotino = zotino_instance self.values = [0]*max_samples @kernel def activate(self, channel:TInt32, step_offset:TInt64, values): self.channel = channel self.active = True self.step_offset = step_offset try: self.len = len(values) self.values[:self.len] = values except IndexError: self.zotino.experiment.log.error(self.zotino.identifier + ": arbitrary waveform with {} samples "\ "requested on channel {} but only {} samples initiallized. See Zotino 'max_arb_samples' "\ "device argument for more information. Truncating waveform...", [self.len, channel, len(self.values)]) self.len = len(self.values) self.values[:self.len] = values self.i_step = 0 @kernel def step(self): self.zotino._zotino_device.write_dac_mu(self.channel, self.values[self.i_step]) self.i_step += 1 if self.i_step >= self.len: self.active = False class _ParallelArb: """ Represents a voltage arbitrary signal on multiple channels in parallel """ def __init__(self, zotino_instance:Zotino, max_parallel:TInt32, max_samples:TInt32): self.zotino = zotino_instance self.arbs = [self.zotino._SingleChannelArb(self.zotino, max_samples) for i in range(max_parallel)] self.start_mu = np.int64(0) self.timestep_mu = np.int64(0) self.n_arbs = 0 @kernel def start_record(self): """ Defines the starting point of the parallel arbitrary waveform at the current time cursor position and resets previously scheduled arbs """ self.timestep_mu = np.int64(0) self.n_arbs = 0 self.start_mu = now_mu() @portable(flags={"fast-math"}) def register_arb(self, channel:TInt32, arb_values, timestep_mu: TInt64, start_mu: TInt64): """ Add a single channel arbitrary waveform to the parallel arb scheduler Args: channel: Zotino channel arb_values: List of values which are played sequentially in machine units timestep_mu: Time between arb samples. Must be equal for all arbs which are scheduled in parallel start_mu: Time offset of the arb in reference to the point where `start_record` was called. Can be negative but must be a multiple of `timestep_mu`. """ if self.timestep_mu == 0: self.timestep_mu = timestep_mu elif self.timestep_mu != timestep_mu: self.zotino.experiment.log.warning(self.zotino.identifier + "Arb signals in parallel Zotino mode "\ "must have the same step size but have {0} (mu) and {1} (mu)", [self.timestep_mu, timestep_mu]) step_offset = (start_mu - self.start_mu)/timestep_mu if abs(step_offset - round(step_offset)) > 0.01: self.zotino.experiment.log.warning(self.zotino.identifier + "Arb signal in parallel Zotino mode "\ "must have a time offset which is a multiple of the step size {0} (mu) but has {1} (mu)", [timestep_mu, start_mu]) try: self.arbs[self.n_arbs].activate(channel, np.int64(step_offset), arb_values) except IndexError: self.zotino.experiment.log.error(self.zotino.identifier + ": {} parallel arbitrary waveform signals "\ "requested but only {} initiallized. See Zotino 'max_parallel_arbs' device argument for "\ "more information.", [self.n_arbs+1, len(self.arbs)]) self.n_arbs += 1 @kernel def start(self): """ Start the playback of all arbitrary volage signals merged by the `register_arb` method """ i_step = 0 arb_running = True while arb_running: arb_running = False at_mu(self.start_mu+(i_step-1)*self.timestep_mu-self.n_arbs*self.zotino._zotino_device.bus.xfer_duration_mu) for arb in self.arbs[:self.n_arbs]: if arb.active: arb_running = True if i_step > arb.step_offset: arb.step() at_mu(self.start_mu+(i_step-1)*self.timestep_mu) self.zotino.update() i_step += 1 class _ParallelArbsContext: """ Context manager which makes all arbitrary waveforms defined within played in parallel. For an usage example, see :ref:`here <Coils example>`. """ def __init__(self, zotino_instance:Zotino): self.zotino = zotino_instance @kernel def __enter__(self): self.zotino.hold_arbs = True self.zotino.current_parallel_arb.start_record() @kernel def __exit__(self, exc_type, exc_value, exc_traceback): self.zotino.hold_arbs = False self.zotino.current_parallel_arb.start()
[docs] class ZotinoChannel(DACChannel): @kernel def _set_voltage(self, voltage: TFloat, update_dac: TBool = True): self.dac_device.write_dac(self.channel, voltage) if self.debug_output: self.experiment.log.info(self.identifier + ": Written to channel {0}", [self.channel]) self.experiment.log.info(self.identifier + ": Voltage: {0}", [voltage]) if update_dac: self.dac_device.update() @kernel(flags={"fast-math"}) def _ramp_voltage(self, duration: TFloat, voltage_start: TFloat, voltage_end: TFloat, ramp_timestep: TFloat = 0.0002): n = int(duration / ramp_timestep) timestep_mu = self.core.seconds_to_mu(ramp_timestep) voltage_start_mu = voltage_to_mu( voltage_start,self.dac_device._zotino_device.offset_dacs, self.dac_device._zotino_device.vref) voltage_step_mu = voltage_to_mu( (voltage_end-voltage_start)/(n-1), self.dac_device._zotino_device.offset_dacs, self.dac_device._zotino_device.vref) - 2**15 ramp_array = [voltage_start_mu+voltage_step_mu*i for i in range(n)] if self.dac_device.hold_arbs: self.dac_device.current_parallel_arb.register_arb(self.channel, ramp_array, timestep_mu, now_mu()) else: for val in ramp_array: self.dac_device._zotino_device.set_dac_mu([val],channels=[self.channel]) delay_mu(timestep_mu)
[docs] class Fastino(DAC): """The Sinara Fast ADC called Fastino Args: fastino_device: The ARTIQ fastino device from the `device_db` parallel_event_delay: Time by which occuring parallel events in ramps are moved [s] (default: 10ns) """ kernel_invariants = {"_fastino_device", "parallel_event_delay"} def __init__(self, fastino_device:TStr, parallel_event_delay: TBool = 10e-9, *args, **kwargs): DAC.__init__(self, num_chan=32, *args, **kwargs) self._fastino_device = fastino_device # variables for detecting parallel events self.event_slots = np.array([-1]*32, dtype=np.int64) self.event_slots_max_idx = -1 self.parallel_event_delay = parallel_event_delay @kernel def _prerun(self): # self._fastino_device.init() pass
[docs] @kernel def update(self): pass
[docs] @kernel def register_event(self, time: TInt64) -> TFloat: last_used = self.event_slots_max_idx first_free = last_used + 1 shift_needed = False for i in range(self.event_slots_max_idx, -1, -1): event = self.event_slots[i] # check if the time of the event already passed if event < now_mu(): self.event_slots[i] = -1 if last_used == self.event_slots_max_idx: last_used = i first_free = i # check for collision with the new event if event == time: shift_needed = True self.event_slots_max_idx = last_used if shift_needed: return self.register_event(time + self.core.seconds_to_mu(self.parallel_event_delay)) +\ self.parallel_event_delay else: self.event_slots[first_free] = time if first_free >= last_used: self.event_slots_max_idx = first_free return 0.0
[docs] class FastinoChannel(DACChannel): @kernel(flags={"fast-math"}) def _ramp_voltage(self, duration: TFloat, voltage_start: TFloat, voltage_end: TFloat, ramp_timestep: TFloat = 200e-6, relocate_parallel: TBool = True): """ This method implements a stupid ramp on an abstract level. This will most likely work but be slow. If your hardware has native support for ramping, please override this function when you inherit from VoltageSource """ if relocate_parallel: offset = self.dac_device.register_event(now_mu()) if offset > 0.0: self.experiment.log.warning(self.identifier + ": Found paralell ramp, delaying..") delay(offset) n = int(duration / ramp_timestep) ramp_timestep = duration / float(n) for i in range(n + 1): self._set_voltage(voltage_start + i * (voltage_end - voltage_start) / n, zero_time=True) delay(ramp_timestep) @kernel def _set_voltage(self, voltage: TFloat, zero_time: TBool = False): self.dac_device._fastino_device.set_dac(self.channel, voltage) # self.experiment.log.info(self.identifier + ": setting voltage to {0} V", [voltage]) if not zero_time: delay(0.1e-6)