Mercurial > piecrust2
diff piecrust/serving/procloop.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | c2ea75e37540 |
children | 448710d84121 |
line wrap: on
line diff
--- a/piecrust/serving/procloop.py Sun May 21 00:06:59 2017 -0700 +++ b/piecrust/serving/procloop.py Sun Jun 04 23:34:28 2017 -0700 @@ -7,8 +7,11 @@ import itertools import threading from piecrust import CONFIG_PATH, THEME_CONFIG_PATH -from piecrust.app import PieCrust -from piecrust.processing.pipeline import ProcessorPipeline +from piecrust.pipelines.base import ( + PipelineJobCreateContext, PipelineJobRunContext, PipelineJobResult, + PipelineManager) +from piecrust.pipelines.records import ( + MultiRecord, MultiRecordHistory) logger = logging.getLogger(__name__) @@ -74,25 +77,28 @@ self._running = 2 +class _AssetProcessingInfo: + def __init__(self, source): + self.source = source + self.paths = set() + self.last_bake_time = time.time() + + class ProcessingLoop(threading.Thread): def __init__(self, appfactory, out_dir): - super(ProcessingLoop, self).__init__( - name='pipeline-reloader', daemon=True) + super().__init__(name='pipeline-reloader', daemon=True) self.appfactory = appfactory self.out_dir = out_dir self.last_status_id = 0 self.interval = 1 - self.app = None - self._roots = [] - self._monitor_assets_root = False - self._paths = set() - self._record = None - self._last_bake = 0 + self._app = None + self._proc_infos = None + self._last_records = None self._last_config_mtime = 0 self._obs = [] self._obs_lock = threading.Lock() config_name = ( - THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) + THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) self._config_path = os.path.join(appfactory.root_dir, config_name) def addObserver(self, obs): @@ -104,116 +110,162 @@ self._obs.remove(obs) def run(self): - self._initPipeline() + self._init() - self._last_bake = time.time() self._last_config_mtime = os.path.getmtime(self._config_path) - self._record = self.pipeline.run() while True: cur_config_time = os.path.getmtime(self._config_path) if self._last_config_mtime < cur_config_time: logger.info("Site configuration changed, reloading pipeline.") self._last_config_mtime = cur_config_time - self._initPipeline() - for root in self._roots: - self._runPipeline(root) + self._init() + self._runPipelines() continue - if self._monitor_assets_root: - assets_dir = os.path.join(self.app.root_dir, 'assets') - if os.path.isdir(assets_dir): - logger.info("Assets directory was created, reloading " - "pipeline.") - self._initPipeline() - self._runPipeline(assets_dir) - continue - - for root in self._roots: - # For each mount root we try to find the first new or + for procinfo in self._proc_infos: + # For each assets folder we try to find the first new or # modified file. If any, we just run the pipeline on - # that mount. + # that source. found_new_or_modified = False - for dirpath, dirnames, filenames in os.walk(root): - for filename in filenames: - path = os.path.join(dirpath, filename) - if path not in self._paths: - logger.debug("Found new asset: %s" % path) - self._paths.add(path) - found_new_or_modified = True - break - if os.path.getmtime(path) > self._last_bake: - logger.debug("Found modified asset: %s" % path) - found_new_or_modified = True - break - - if found_new_or_modified: + for item in procinfo.source.getAllContents(): + path = item.spec + if path not in procinfo.paths: + logger.debug("Found new asset: %s" % path) + procinfo.paths.add(path) + found_new_or_modified = True break - + if os.path.getmtime(path) > procinfo.last_bake_time: + logger.debug("Found modified asset: %s" % path) + found_new_or_modified = True + break if found_new_or_modified: - self._runPipeline(root) + self._runPipeline(procinfo) time.sleep(self.interval) - def _initPipeline(self): - # Create the app and pipeline. - self.app = self.appfactory.create() - self.pipeline = ProcessorPipeline(self.app, self.out_dir) + def _init(self): + self._app = self.appfactory.create() + self._last_records = MultiRecord() + + self._proc_infos = [] + for src in self._app.sources: + if src.config['pipeline'] != 'asset': + continue - # Get the list of assets directories. - self._roots = list(self.pipeline.mounts.keys()) + procinfo = _AssetProcessingInfo(src) + self._proc_infos.append(procinfo) - # The 'assets' folder may not be in the mounts list if it doesn't - # exist yet, but we want to monitor for when the user creates it. - default_root = os.path.join(self.app.root_dir, 'assets') - self._monitor_assets_root = (default_root not in self._roots) + # Build the list of initial asset files. + for item in src.getAllContents(): + procinfo.paths.add(item.spec) + + def _runPipelines(self): + record_histories = MultiRecordHistory(MultiRecord(), self._records) + self._ppmngr = PipelineManager( + self._app, self.out_dir, record_histories) - # Build the list of initial asset files. - self._paths = set() - for root in self._roots: - for dirpath, dirnames, filenames in os.walk(root): - self._paths |= set([os.path.join(dirpath, f) - for f in filenames]) + # Create the pipelines, but also remember some stuff for what + # we want to do. + for src in self._app.sources: + if src.config['pipeline'] != 'asset': + continue + + ppinfo = self._ppmngr.createPipeline(src) + api = _AssetProcessingInfo() + ppinfo.userdata = api + + current_records = MultiRecord() + record_histories = MultiRecordHistory( + self._records, current_records) - def _runPipeline(self, root): - self._last_bake = time.time() - try: - self._record = self.pipeline.run( - root, - previous_record=self._record, - save_record=False) + for ppinfo, procinfo in self._pipelines: + self._runPipeline(ppinfo, procinfo, record_histories) + + status_id = self.last_status_id + 1 + self.last_status_id += 1 + + if self._records.success: + changed = filter( + lambda i: not i.was_collapsed_from_last_run, + self._record.entries) + changed = itertools.chain.from_iterable( + map(lambda i: i.rel_outputs, changed)) + changed = list(changed) + item = { + 'id': status_id, + 'type': 'pipeline_success', + 'assets': changed} - status_id = self.last_status_id + 1 - self.last_status_id += 1 + self._notifyObservers(item) + else: + item = { + 'id': status_id, + 'type': 'pipeline_error', + 'assets': []} + for entry in self._record.entries: + if entry.errors: + asset_item = { + 'path': entry.path, + 'errors': list(entry.errors)} + item['assets'].append(asset_item) + + self._notifyObservers(item) - if self._record.success: - changed = filter( - lambda i: not i.was_collapsed_from_last_run, - self._record.entries) - changed = itertools.chain.from_iterable( - map(lambda i: i.rel_outputs, changed)) - changed = list(changed) - item = { - 'id': status_id, - 'type': 'pipeline_success', - 'assets': changed} + def _runPipeline(self, procinfo): + procinfo.last_bake_time = time.time() + + src = procinfo.source + + current_records = MultiRecord() + record_histories = MultiRecordHistory( + self._last_records, current_records) + ppmngr = PipelineManager( + self._app, self.out_dir, record_histories) + ppinfo = ppmngr.createPipeline(src) + + logger.debug("Running pipeline '%s' on: %s" % + (ppinfo.pipeline_name, src.name)) - self._notifyObservers(item) - else: - item = { - 'id': status_id, - 'type': 'pipeline_error', - 'assets': []} - for entry in self._record.entries: - if entry.errors: - asset_item = { - 'path': entry.path, - 'errors': list(entry.errors)} - item['assets'].append(asset_item) + # Process all items in the source. + pp = ppinfo.pipeline + cr = ppinfo.record_history.current + jobctx = PipelineJobCreateContext(src) + for item in src.getAllContents(): + job = pp.createJob(item, jobctx) + + ppres = PipelineJobResult() + ppres.record_entry = pp.createRecordEntry(job) + + runctx = PipelineJobRunContext( + ppinfo.pipeline_ctx, job, record_histories) + try: + pp.run(item, runctx, ppres) + except Exception as e: + ppres.record_entry.errors.append(str(e)) - self._notifyObservers(item) - except Exception as ex: - logger.exception(ex) + if ppres.next_pass_job is not None: + logger.error("The processing loop for the server " + "doesn't support multi-pass pipelines.") + + cr.addEntry(ppres.record_entry) + if not ppres.record_entry.success: + cr.success = False + current_records.success = False + logger.error("Errors found in %s:" % item.spec) + for e in ppres.record_entry.errors: + logger.error(" " + e) + + # Do all the final stuff. + ppmngr.buildHistoryDiffs() + ppmngr.deleteStaleOutputs() + ppmngr.collapseRecords() + ppmngr.shutdownPipelines() + + # Swap the old record with the next record. + pr = ppinfo.record_history.previous + self._last_records.records.remove(pr) + self._last_records.records.append(cr) def _notifyObservers(self, item): with self._obs_lock: