Source code for atomiq.components.basics.log
from __future__ import annotations
from artiq.experiment import rpc
from atomiq.components.primitives import Component
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
[docs]
def msg_str(msg, vars):
if len(vars) == 0:
return msg
else:
return msg.format(*vars)
[docs]
class KernelLogger(Component):
"""
This components a python logger from within artiq
"""
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super().__init__(*args, **kwargs)
[docs]
@rpc(flags={"async"})
def debug(self, msg, vars=[]):
"""
Logs a message with level DEBUG.
:param obj msg: what to log
"""
self.logger.debug(msg_str(msg, vars))
[docs]
@rpc(flags={"async"})
def info(self, msg, vars=[]):
"""
Logs a message with level INFO.
:param obj msg: what to log
"""
self.logger.info(msg_str(msg, vars))
[docs]
@rpc(flags={"async"})
def warning(self, msg, vars=[]):
"""
Logs a message with level WARNING.
:param obj msg: what to log
"""
self.logger.warning(msg_str(msg, vars))
[docs]
@rpc(flags={"async"})
def error(self, msg, vars=[]):
"""
Logs a message with level ERROR.
:param obj msg: what to log
"""
self.logger.error(msg_str(msg, vars))
[docs]
@rpc(flags={"async"})
def critical(self, msg, vars=[]):
"""
Logs a message with level CRITICAL.
:param obj msg: what to log
"""
self.logger.critical(msg_str(msg, vars))