import functools
import os
import pprint
import sys
import textwrap
import traceback
import click
from pcvs.helpers import exceptions
from pcvs.helpers.exceptions import CommonException
[docs]def pretty_print_exception(e: exceptions.GenericError):
"""Display exceptions in a fancy way.
:param e: the execption to print
:type e: exceptions.GenericError.
"""
global manager
if isinstance(e, exceptions.GenericError):
manager.err([e.err, e.help])
manager.info("Extra infos:\n{}".format(e.dbg_str))
else:
manager.err(str(e))
[docs]class IOManager:
"""
Manager for Input/Output streams.
Contains methods for logging and printing in PCVS. IOManager handles
multiple outputs (file + standard output), logging levels (warning, error,
info, etc) and pretty banners. Colors are handled by click and color tags
are written in files (use less -r).
:param special_chars: dictionary for fancy bullet characters
:type special_chars: dict
:param verb_levels: verbosity level (normal, info, debug)
:type verb_levels: list
:param color_list: list of colors used by PCVS
:type color_list: list
"""
special_chars = {
"ascii": {
'copy': '(c)',
'item': '*',
'sec': '#',
'hdr': '=',
'star': '*',
'fail': "X",
'succ': "V",
'none': "-",
'git': '(git)',
'time': '(time)',
'full_pg': '#',
'empty_pg': '-',
'sep_v': " | ",
'sep_h': "-"
},
"unicode": {
'copy': '\u00A9',
'item': '\u27E2',
'sec': '\u2756',
'hdr': '\u23BC',
'star': '\u2605',
'fail': '\u2718',
'succ': '\u2714',
"none": '\u2205',
'git': '\u237F',
'time': '\U0000231A',
'full_pg': click.style("\u25CF", bold=True, fg="cyan"),
'empty_pg': click.style("\u25CB", bold=True, fg="bright_black"),
'sep_v': " \u237F ",
'sep_h': "\u23BC"
}
}
verb_levels = [(0, "normal"), (1, "info"), (2, "debug")]
color_list = [
"black",
"red",
"green",
"yellow",
"blue",
"magenta",
"cyan",
"white",
"bright_black",
"bright_red",
"bright_green",
"bright_yellow",
"bright_blue",
"bright_magenta",
"bright_cyan",
"bright_white",
]
@property
def verbose(self):
"""getter for verbosity level
:return: verbosity level (0, 1, 2)
:rtype: int
"""
return self._verbose
@property
def tty(self):
"""getter for tty information
:return: False if tty not used, 1 if tty used
:rtype: bool
"""
return self._tty
[docs] def set_tty(self, enable):
"""[summary]
:param enable: [description]
:type enable: [type]
"""
self._tty = enable
[docs] def enable_tty(self):
"""enables tty
"""
self.set_tty(enable=True)
[docs] def disable_tty(self):
"""disables tty
"""
self.set_tty(enable=False)
@property
def log_filename(self):
"""getter for logfile path
:return: logfile path
:rtype: str
"""
return os.path.abspath(self._logfile.name) if self._logfile else None
[docs] def set_logfile(self, enable, logfile=None):
"""setter for logfile path
:param logfile: logfile name, defaults to None
:type logfile: str, optional
"""
if logfile is not None:
if not os.access(os.path.dirname(logfile), os.W_OK):
raise CommonException.IOError(
"{} is not writable !".format(logfile))
if os.path.abspath(logfile) != self.log_filename:
self._logfile = open(os.path.abspath(logfile), 'w+')
self._logenabled = enable
def __init__(self, verbose=0, enable_unicode=True, length=80, logfile=None, tty=True):
"""constructor for IOManager object
:param verbose: verbosity level (0 : low, 1: info, 2: debug), defaults
to 0
:type verbose: int, optional
:param enable_unicode: True if unicode alphabet usage is authorised,
defaults to True
:type enable_unicode: bool, optional
:param length: length of terminal (character number), defaults to 80
:type length: int, optional
:param logfile: logfile name, file logging disabled if logfile=None,
defaults to None
:type logfile: str, optional
:param tty: True if logs must be in stdout, defaults to True
:type tty: bool, optional
:raises CommonException.AlreadyExistError: only one IOManager can
exist in a session
"""
self._linelength = 93
self._wrapper = None
self._tty = tty
self._logfile = None
self._verbose = verbose
self._unicode = enable_unicode
self._logbuffer = ""
self._logenabled = False
self.enable_unicode(self._unicode)
if length is not None:
self._linelength = length if length > 0 else click.get_terminal_size()[
0]
self._wrapper = textwrap.TextWrapper(width=self._linelength)
if logfile is not None:
logfile = os.path.abspath(logfile)
if os.path.isfile(logfile):
raise CommonException.AlreadyExistError(logfile)
self.set_logfile(True, logfile)
def __del__(self):
"""desctuctor for IOManager (closes streams)
"""
if self._logfile and self._logfile.name:
self._logfile.close()
def __print_rawline(self, msg, err=False):
"""print a line as text
:param msg: message to be printed
:type msg: str
:param err: True if the message is an error message, defaults to False
:type err: bool, optional
"""
if self._tty:
click.echo(msg, err=err)
content = '{}{}'.format(msg, '\n' if msg[-1] != "\n" else "")
if self._logenabled and self._logfile:
self._logfile.write(self._logbuffer)
self._logfile.write(content)
self._logfile.flush()
self._logbuffer = ""
else:
self._logbuffer += content
[docs] def has_verb_level(self, match):
""" returns true if the verbosity level is activated.
:param match: verbosity level to check
:type match: str or int
:return: True if "match" verbosity level is supposed to be printed by
the IOManager
:rtype: bool
"""
req_idx = 0
for e in self.verb_levels:
if match.lower() == e[1].lower():
req_idx = e[0]
break
return req_idx <= self._verbose
[docs] def get_verbosity_str(self):
"""[summary]
:return: [description]
:rtype: [type]
"""
for e in self.verb_levels:
if self._verbose == e[0]:
return e[1]
return self.verb_levels[0][1]
[docs] def print(self, *msg):
"""prints a raw line. Takes multiple arguments.
"""
for i in msg:
self.__print_rawline(i)
[docs] def write(self, txt):
"""print a string.
:param txt: message to be printed
:type txt: str
"""
self.__print_rawline(txt)
[docs] def capture_exception(self, e_type, user_func=None):
"""wraps functions to capture unhandled exceptions for high-level
function not to crash.
:param *e_type: errors to be caught
"""
def inner_function(func):
"""wrapper for inner function using try/except to avoid crashing
:param func: function to wrap
:type func: function
:raises e: exceptions to catch
:return: wrapper
:rtype: function
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
"""functools wrapping function
:raises e: exception to catch
:return: result of wrapped function
:rtype: any
"""
try:
return func(*args, **kwargs)
except e_type as e:
#raise e
if user_func is None:
pretty_print_exception(e)
manager.debug(
traceback.format_exception(*sys.exc_info()))
sys.exit(1)
else:
user_func(e)
return wrapper
return inner_function
[docs] def enable_unicode(self, e=True):
"""enables/disables unicode alphabet usage
:param e: True to enable unicode usage, defaults to True
:type e: bool, optional
"""
self.glyphs = self.special_chars["unicode"] if e is True else self.special_chars["ascii"]
[docs] def avail_chars(self):
"""lists allowed bullet characters
:return: a list of characters
:rtype: list
"""
return self.glyphs.keys()
[docs] def utf(self, k):
"""returns the corresponding character to a bullet character
:param k: character used as bullet character
:type k: char
:return: fancy bullet character
:rtype: char
"""
assert(k in self.glyphs.keys())
return self.glyphs[k]
[docs] def style(self, *args, **kwargs):
"""returns a string style using click
:return: a string style
:rtype: click.style
"""
return click.style(*args, **kwargs)
[docs] def print_section(self, s, out=True):
"""prints a section
:param s: content of the section
:type s: str
:param out: True if the section has to be logged, False if it has to be
returned, defaults to True
:type out: bool, optional
:return: section string if out=False, Nothing otherwise
:rtype: str
"""
f = "{} {}".format(self.utf('sec'), s)
self._wrapper.subsequent_indent = " "
s = self._wrapper.fill(click.style(f, fg='yellow'))
if out:
self.__print_rawline(s)
else:
return s
[docs] def print_item(self, s, depth=1, out=True, with_bullet=True):
"""prints an item
:param s: item content
:type s: str
:param depth: number of tabulations used for indentation, defaults to 1
:type depth: int, optional
:param out: True if the item has to be logged, False if it has to be
returned, defaults to True
:type out: bool, optional
:param with_bullet: True if the item should have a bullet, defaults to True
:type with_bullet: bool, optional
:return: item string if out=False, Nothing otherwise
:rtype: str
"""
indent = (" " * depth)
bullet = indent + \
"{} ".format(self.utf('item')) if with_bullet is True else ""
content = "{}".format(s)
self._wrapper.subsequent_indent = indent + " "
s = self._wrapper.fill(click.style(
bullet, fg="red") + click.style(content, fg="reset"))
if out:
self.__print_rawline(s)
else:
return s
[docs] def print_job(self, label, time, name, colorname="red", icon=None):
"""prints a job description
:param label: job label
:type label: str
:param time: time elapsed since the job launch
:type time: float
:param name: name of the job
:type name: str
:param colorname: color of the job log, defaults to "red"
:type colorname: str, optional
:param icon: bullet, defaults to None
:type icon: str, optional
"""
if icon is not None:
icon = self.utf(icon)
self.__print_rawline(click.style(" {} {:8.2f}s{}{:7}{}{}".format(
icon,
time,
self.utf("sep_v"),
label,
self.utf("sep_v"),
name),
fg=colorname, bold=True))
[docs] def debug(self, msg):
"""prints a debug message
"""
if(self._verbose >= 2):
if type(msg) != list:
msg = [msg]
for elt in msg:
for line in elt.split('\n'):
self.__print_rawline("DEBUG: {}".format(
click.style(line, fg="bright_black")), err=True)
[docs] def info(self, msg):
"""prints an info message
"""
if(self._verbose >= 1):
if type(msg) != list:
msg = [msg]
for elt in msg:
for line in elt.split('\n'):
self.__print_rawline("INFO : {}".format(
click.style(line, fg="cyan")), err=True)
[docs] def warn(self, msg):
"""prints a warning message
"""
if type(msg) != list:
msg = [msg]
for elt in msg:
for line in elt.split('\n'):
self.__print_rawline("WARN : {}".format(
click.style(line, fg="yellow", bold=True)), err=True)
[docs] def err(self, msg):
"""prints an error message
"""
if type(msg) != list:
msg = [msg]
enclosing_line = click.style(self.utf('hdr') * self._linelength,
fg="red",
bold=True)
self.__print_rawline("{}".format(enclosing_line), err=True)
for elt in msg:
for line in elt.split('\n'):
self.__print_rawline("ERROR: {}".format(
click.style(line, fg="red", bold=True)), err=True)
self.__print_rawline("{}".format(enclosing_line), err=True)
[docs] def print_short_banner(self, string=False):
"""prints a little banner
:param string: True if the banner has to be returned, False if it has to be logged
:type string: bool
"""
logo = [
r""" ____ ______ _ __ _____ """,
r""" / __ \ / ____/ | | / / / ___/ """,
r""" / /_/ / / / | | / / \__ \ """,
r""" / ____/ / /___ | |/ / ___/ / """,
r""" /_/ \____/ |___/ /____/ """,
r""" """,
r""" Parallel Computing -- Validation System """,
r""" Copyright {} 2017 -- CEA """.format(
self.utf('copy')),
r""""""
]
s = []
if self._linelength < max(map(lambda x: len(x), logo)):
s = [
click.style("{}".format(self.utf("star")*14), fg="green"),
click.style("{} -- PCVS -- {}".format(self.utf("star"),
self.utf('star')), fg="yellow"),
click.style("{} CEA {} 2017 {}".format(
self.utf('star'), self.utf('copy'), self.utf('star')), fg="red"),
click.style("{}".format(self.utf("star")*14), fg="green")
]
else:
start = " " * ((self._linelength - len(logo[0]))//2-1)
newline = "\n" + start
s = [
click.style(start + newline.join(logo[0:3]), fg="green"),
click.style(start + newline.join(logo[3:4]), fg="yellow"),
click.style(start + newline.join(logo[4:5]), fg="red"),
click.style(start + newline.join(logo[5:]))
]
if string is True:
return "\n".join(s)
else:
self.__print_rawline("\n".join(s))
[docs] def nimpl(self, *msg): # pragma: no cover
"""prints the "not implemented" error
"""
self.err("This is not implemented (yet)!")
[docs] def print_n_stop(self, **kwargs): # pragma: no cover
"""prints a message, then exits the program
"""
# not replacing these prints (for debug only)
for k, v in kwargs.items():
click.secho("{}: ".format(k), fg="yellow", nl=False)
click.secho(pprint.pformat(v), fg="blue")
sys.exit(0)
[docs] def print_banner(self):
"""prints a large banner
"""
# ok, this is ugly but the only way to make flake/pylint happy with
# source file formatting AND keeping a nicely logo printed out witout
# having to load a file.
# But, it is not trivial to edit. A single terminal line is split in
# half. Each 'logo' value is a line, created from the implicit
# concatenation of multiple raw strings (ex: logo= [r"a" r"b", r"c"])
#
# the full header can be found under the /utils/ source dir.
logo = [
r""" ____ ____ __ ______ __ _ """,
r""" / __ \____ __________ _/ / /__ / / / ____/___ ____ ___ ____ __ __/ /_(_)___ ____ _ """,
r""" / /_/ / __ `/ ___/ __ `/ / / _ \/ / / / / __ \/ __ `__ \/ __ \/ / / / __/ / __ \/ __ `/ """,
r""" / ____/ /_/ / / / /_/ / / / __/ / / /___/ /_/ / / / / / / /_/ / /_/ / /_/ / / / / /_/ / """,
r"""/_/ \__,_/_/ \__,_/_/_/\___/_/ \____/\____/_/ /_/ /_/ .___/\__,_/\__/_/_/ /_/\__, / """,
r""" /_/ /____/ """,
r""" {} (PCVS) {}""".format(
self.utf('star'), self.utf('star')),
r""" _ __ ___ __ __ _ _____ __ """,
r""" | | / /___ _/ (_)___/ /___ _/ /_(_)___ ____ / ___/__ _______/ /____ ____ ___ """,
r""" | | / / __ `/ / / __ / __ `/ __/ / __ \/ __ \ \__ \/ / / / ___/ __/ _ \/ __ `__ \ """,
r""" | |/ / /_/ / / / /_/ / /_/ / /_/ / /_/ / / / / ___/ / /_/ /__ / /_/ __/ / / / / / """,
r""" |___/\__,_/_/_/\__,_/\__,_/\__/_/\____/_/ /_/ /____/\__, /____/\__/\___/_/ /_/ /_/ """,
r""" /____/ """,
r""" """,
r""" Copyright {} 2017 Commissariat à l'Énergie Atomique et aux Énergies Alternatives (CEA) """.format(
self.utf('copy')),
r""" """,
r""" This program comes with ABSOLUTELY NO WARRANTY; """,
r""" This is free software, and you are welcome to redistribute it """,
r""" under certain conditions; Please see COPYING for details. """,
r""" """,
]
if self._linelength < max(map(lambda x: len(x), logo)):
self.print_short_banner()
return
else:
start = " " * ((self._linelength - len(logo[0]))//2-1)
newline = "\n" + start
self.__print_rawline(click.style(
start + newline.join(logo[0:6]), fg="green"))
self.__print_rawline(click.style(start + newline.join(logo[6:7])))
self.__print_rawline(click.style(start +
newline.join(logo[7:10]), fg="green"))
self.__print_rawline(click.style(start +
newline.join(logo[10:11]), fg="yellow"))
self.__print_rawline(click.style(
start + newline.join(logo[11:13]), fg="red"))
self.__print_rawline(click.style(start + newline.join(logo[13:])))
manager = IOManager()
[docs]def init(v=0, e=False, l=100, quiet=False):
"""initializes a global manager for everyone to use
:param v: verbosity level, defaults to 0
:type v: int, optional
:param e: True to enable unicode alphabet, False to use ascii, defaults to
False
:type e: bool, optional
:param l: length of the terminal, defaults to 100
:type l: int, optional
:param quiet: False to write to stdout, defaults to False
:type quiet: bool, optional
"""
global manager
manager = IOManager(verbose=v, enable_unicode=e, length=l, tty=(not quiet))
[docs]def progbar(it, print_func=None, man=None, **kargs):
"""prints a progress bar using click
:param it: iterable on which the progress bar has to iterate
:type it: iterable
:param print_func: method used to show the item next to the progress bar,
defaults to None
:type print_func: function, optional
:param man: manager used to describe the bullets, defaults to None
:type man: log.IOManager, optional
:return: a click progress bar (iterable)
:rtype: click.ProgressBar
"""
if man is None:
man = manager
return click.progressbar(
it, empty_char=man.utf('empty_pg'),
info_sep=man.utf('sep_v'), fill_char=man.utf('full_pg'),
show_percent=False, show_eta=False, show_pos=False,
item_show_func=print_func,
**kargs)