import json
import os
from pcvs import NAME_BUILD_RESDIR
from pcvs.helpers.system import MetaConfig, ValidationScheme
from pcvs.plugins import Plugin
[docs]class Publisher:
"""Manage result publication and storage on disk.
Jobs are submitted to a publisher, forming a set of ready-to-be-flushed
elements. Every time `generate_file` is invoked, tests are dumped to a file
named after ``pcvs_rawdat<id>.json``, where ``id`` is a automatic increment.
Then, pool is emptied and waiting for new tests. This way, a single manager
manages multiple files.
:cvar scheme: path to test result scheme
:type scheme: str
:cvar increment: used within filename
:type increment: int
:cvar fn_fmt: filename format string
:type fn_fmt: str
:ivar _layout: hierarchical representation of tests within the file
:type _layout: dict
:ivar _destpath: target filepath
:type _destpath: str
"""
scheme = None
increment = 0
fn_fmt = "pcvs_rawdat{:>04d}.json"
def __init__(self, prefix=None):
"""constructor method.
:param prefix: where result file will be stored in
:type prefix: str
"""
super().__init__()
self._layout = {
"tests": []
}
if not prefix:
prefix = "."
self._destpath = os.path.join(prefix, NAME_BUILD_RESDIR)
assert(os.path.isdir(self._destpath))
@property
def format(self):
"""Return format type (currently only 'json' is supported).
:return: format as printable string
:rtype: str
"""
return "json"
[docs] def empty_entries(self):
"""Empty the publisher from all saved jobs."""
self._layout['tests'] = list()
[docs] def validate(self, stream):
"""Ensure the test results layout saved is compliant with standards.
:param stream: content to validate against publisher scheme.
:type: stream: dict or str
"""
if not self.scheme:
self.scheme = ValidationScheme("test-result")
self.scheme.validate(stream)
[docs] def add(self, json):
"""Add a new job to be published.
:param json: the Test() JSON.
:type json: json
"""
self.validate(json)
self._layout['tests'].append(json)
[docs] def flush(self):
"""Flush down saved JSON-based jobs to a single file.
The Publisher is then reset for the next flush (next file).
"""
MetaConfig.root.get_internal('pColl').invoke_plugins(
Plugin.Step.SCHED_PUBLISH_BEFORE)
# nothing to flush since the last call
if len(self._layout['tests']) <= 0:
return
filename = os.path.join(
self._destpath, self.fn_fmt.format(Publisher.increment))
assert(not os.path.isfile(filename))
Publisher.increment += 1
with open(filename, 'w+') as fh:
json.dump(self._layout, fh)
self.empty_entries()
MetaConfig.root.get_internal('pColl').invoke_plugins(
Plugin.Step.SCHED_PUBLISH_BEFORE)