Source code for atomiq.components.basics.log

from __future__ import annotations

from artiq.experiment import rpc

from atomiq.components.primitives import Component

import logging

logging.basicConfig()
logger = logging.getLogger(__name__)


[docs] def msg_str(msg, vars): if len(vars) == 0: return msg else: return msg.format(*vars)
[docs] class KernelLogger(Component): """ This components a python logger from within artiq """ def __init__(self, *args, **kwargs): self.logger = logging.getLogger(__name__) super().__init__(*args, **kwargs)
[docs] @rpc(flags={"async"}) def debug(self, msg, vars=[]): """ Logs a message with level DEBUG. :param obj msg: what to log """ self.logger.debug(msg_str(msg, vars))
[docs] @rpc(flags={"async"}) def info(self, msg, vars=[]): """ Logs a message with level INFO. :param obj msg: what to log """ self.logger.info(msg_str(msg, vars))
[docs] @rpc(flags={"async"}) def warning(self, msg, vars=[]): """ Logs a message with level WARNING. :param obj msg: what to log """ self.logger.warning(msg_str(msg, vars))
[docs] @rpc(flags={"async"}) def error(self, msg, vars=[]): """ Logs a message with level ERROR. :param obj msg: what to log """ self.logger.error(msg_str(msg, vars))
[docs] @rpc(flags={"async"}) def critical(self, msg, vars=[]): """ Logs a message with level CRITICAL. :param obj msg: what to log """ self.logger.critical(msg_str(msg, vars))