Source code for pcvs.backend.session

import copy
import os
from datetime import datetime
from enum import IntEnum
from multiprocessing import Process

from ruamel.yaml import YAML
from ruamel.yaml.main import yaml_object

from pcvs import PATH_SESSION, io
from pcvs.helpers import log, utils

yml = YAML()


[docs]def unlock_session_file(): """Release the lock after manipulating the session.yml file. The call won't fail if the lockfile is not taken before unlocking. """ utils.unlock_file(PATH_SESSION)
[docs]def lock_session_file(timeout=None): """Acquire the lockfil before manipulating the session.yml file. This ensure safety between multiple PCVS instances. Be sure to call `unlock_session_file()` once completed :param timeout: return from blocking once timeout is expired (raising TimeoutError) :type timeout: int """ utils.lock_file(PATH_SESSION, timeout=timeout)
[docs]def store_session_to_file(c): """Save a new session into the session file (in HOME dir). :param c: session infos to store :type c: dict :return: the sid associated to new create session id. :rtype: int """ all_sessions = None sid = -1 global yml lock_session_file() try: # to operate, PCVS needs to full-read and then full-write the whole file if os.path.isfile(PATH_SESSION): with open(PATH_SESSION, 'r') as fh: all_sessions = yml.load(fh) # compute the session id, incrementally done from the highest session id # currently running and registered. # Please not a session is not flushed away from logs until the user # explicitly does it if all_sessions is None: all_sessions = {"__metadata": {"next": 0}} # Lookup for an available session id number sid = all_sessions["__metadata"]["next"] while sid in all_sessions.keys() or sid == -1: sid += 1 all_sessions["__metadata"]["next"] = (sid + 1) % 1000000 assert (sid not in all_sessions.keys()) all_sessions[sid] = c # dump the file back with open(PATH_SESSION, 'w') as fh: yml.dump(all_sessions, fh) finally: unlock_session_file() return sid
[docs]def update_session_from_file(sid, update): """Update data from a running session from the global file. This only add/replace keys present in argument dict. Other keys remain. :param sid: the session id :type sid: int :param update: the keys to update. If already existing, content is replaced :type: dict """ global yml lock_session_file() try: all_sessions = None if os.path.isfile(PATH_SESSION): with open(PATH_SESSION, 'r') as fh: all_sessions = yml.load(fh) if all_sessions is not None and sid in all_sessions: for k, v in update.items(): all_sessions[sid][k] = v # only if editing is done, flush the file back with open(PATH_SESSION, 'w') as fh: yml.dump(all_sessions, fh) finally: unlock_session_file()
[docs]def remove_session_from_file(sid): """clear a session from logs. :param sid: the session id to remove. :type sid: int """ global yml lock_session_file() try: all_sessions = None if os.path.isfile(PATH_SESSION): with open(PATH_SESSION, 'r') as fh: all_sessions = yml.load(fh) if all_sessions is not None and sid in all_sessions: del all_sessions[sid] with open(PATH_SESSION, 'w') as fh: if len(all_sessions) > 0: yml.dump(all_sessions, fh) # else, truncate the file to zero -> open(w) with no data finally: unlock_session_file()
[docs]def list_alive_sessions(): """Load and return the complete dict from session.yml file :return: the session dict :rtype: dict """ global yml lock_session_file(timeout=15) try: with open(PATH_SESSION, 'r') as fh: all_sessions = yml.load(fh) if all_sessions: del all_sessions["__metadata"] except FileNotFoundError as e: all_sessions = {} finally: unlock_session_file() return all_sessions
[docs]def main_detached_session(sid, user_func, *args, **kwargs): """Main function processed when running in detached mode. This function is called by Session.run_detached() and is launched from cloned process (same global env, new main function). :raises Exception: any error occuring during the main process is re-raised. :param sid: the session id :param user_func: the Python function used as the new main() :param args: user_func() arguments :type args: tuple :param kwargs: user_func() arguments :type kwargs: dict """ # When calling a subprocess, the parent is attached to its child # Parent won't terminate if a single child is still running. # Setting a child 'deamon' will allow parent to terminate children at exit # (still not what we want) # the trick is to double fork: the parent creates a child, also crating a # a child (child2). When the process is run, the first child completes # immediately, relasing the parent. if os.fork() != 0: return ret = 0 try: # run the code in detached mode # beware: this function should only raises exception to stop. # a sys.exit() will bypass the rest here. ret = user_func(*args, **kwargs) update_session_from_file(sid, { 'state': Session.State.COMPLETED, 'ended': datetime.now() }) except Exception as e: update_session_from_file(sid, { 'state': Session.State.ERROR, 'ended': datetime.now() }) raise e return ret
[docs]class Session: """Object representing a running validation (detached or not). Despite the fact it is designed for manage concurrent runs, it takes a callback and can be derived for other needs. :param _func: user function to be called once the session starts :type _func: Callable :param _sid: session id, automatically generated :type _sid: int :param _session_infos: session infos dict :type _session_infos: dict """
[docs] @yaml_object(yml) class State(IntEnum): """Enum of possible Session states.""" WAITING = 0 IN_PROGRESS = 1 COMPLETED = 2 ERROR = 3
[docs] @classmethod def to_yaml(cls, representer, data): """Convert a Test.State to a valid YAML representation. A new tag is created: 'Session.State' as a scalar (str). :param dumper: the YAML dumper object :type dumper: :class:`YAML().dumper` :param data: the object to represent :type data: class:`Session.State` :return: the YAML representation :rtype: Any """ return representer.represent_scalar(u'!State', u'{}||{}'.format(data.name, data.value))
[docs] @classmethod def from_yaml(cls, constructor, node): """Construct a :class:`Session.State` from its YAML representation. Relies on the fact the node contains a 'Session.State' tag. :param loader: the YAML loader :type loader: :class:`yaml.FullLoader` :param node: the YAML representation :type node: Any :return: The session State as an object :rtype: :class:`Session.State` """ s = constructor.construct_scalar(node) name, value = s.split('||') obj = Session.State(int(value)) assert (obj.name == name) return obj
def __str__(self): """Stringify the state. :return: the enum name. :rtype: str """ return self.name
@property def state(self): """Getter to session status. :return: session status :rtype: int """ return self._session_infos['state'] @property def id(self): """Getter to session id. :return: session id :rtype: int """ return self._sid @property def rc(self): """Gett to final RC. :return: rc :rtype: int """ return self._rc @property def infos(self): """Getter to session infos. :return: session infos :rtype: dict """ return self._session_infos
[docs] def property(self, kw): """Access specific data from the session stored info session.yml. :param kw: the information to retrieve. kw must be a valid key :type kw: str :return: the requested session infos if exist :rtype: Any """ assert (kw in self._session_infos) return self._session_infos[kw]
def __init__(self, date=None, path="."): """constructor method. :param date: the start timestamp :type date: datetime.datetime :param path: the build directory :type path: str """ self._func = None self._rc = -1 self._sid = -1 # this dict is then flushed to the session.yml self._session_infos = { "path": path, "log": io.console.logfile, "io": io.console.outfile, "progress": 0, "state": Session.State.WAITING, "started": date, "ended": None }
[docs] def load_from(self, sid, data): """Update the current object with session infos read from global file. :param sid: session id read from file :type sid: int :param data: session infos read from file :type data: dict """ self._sid = sid self._session_infos = data
[docs] def register_callback(self, callback): """Register the callback used as main function once the session is started. :param callback: function to invoke :type callback: Callable """ self._func = callback
[docs] def run_detached(self, *args, **kwargs): """Run the session is detached mode. Arguments are for user function only. :param args: user function positional arguments :type args: tuple :param kwargs user function keyword-based arguments. :type kwargs: tuple :return: the Session id created for this run. :rtype: int """ io.detach_console() self._session_infos['io'] = io.console.outfile if self._func is not None: # some sessions can have their starting time set directly when # initializing the object. # for instance for runs, elapsed time not session time but wall time""" if self.property('started') == None: self._session_infos['started'] = datetime.now() # flag it as running & make the info public self._session_infos['state'] = self.State.IN_PROGRESS self._sid = store_session_to_file(self._session_infos) # run the new process child = Process(target=main_detached_session, args=(self._sid, self._func, *args), kwargs=kwargs) child.start() return self._sid
[docs] def run(self, *args, **kwargs): """Run the session normally, without detaching the focus. Arguments are user function ones. This function is also in charge of redirecting I/O properly (stdout, file, logs) :param args: user function positional arguments :type args: tuple :param kwargs user function keyword-based arguments. :type kwargs: tuple """ if self._func is not None: # same as above, shifted starting time or not if self.property('started') == None: self._session_infos['started'] = datetime.now() self._session_infos['state'] = self.State.IN_PROGRESS self._sid = store_session_to_file(self._session_infos) # run the code try: self._rc = self._func(*args, **kwargs) finally: # in that mode, no information is left to users once the session # is complete. remove_session_from_file(self._sid) return self._sid