Mercurial > piecrust2
diff piecrust/serving/procloop.py @ 918:7f1da7e7b154
internal: The processing loop for the server is now using the baker.
Instead of reimplementing a custom way to run the pipelines, the loop is
just calling the baker, but only for asset pipelines.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 29 Sep 2017 08:43:34 -0700 |
parents | 342e3ea24b5d |
children | 89d94955b818 |
line wrap: on
line diff
--- a/piecrust/serving/procloop.py Fri Sep 29 08:42:38 2017 -0700 +++ b/piecrust/serving/procloop.py Fri Sep 29 08:43:34 2017 -0700 @@ -121,7 +121,7 @@ return logger.debug("Doing initial processing loop bake...") - self._runPipelines() + self._runPipelinesSafe() logger.debug("Running processing loop...") self._last_config_mtime = os.path.getmtime(self._config_path) @@ -156,7 +156,7 @@ logger, "change detected, reprocessed '%s'." % procinfo.source.name): - self._runPipelines(procinfo.source) + self._runPipelinesSafe(procinfo.source) time.sleep(self.interval) @@ -176,30 +176,33 @@ for item in src.getAllContents(): procinfo.paths.add(item.spec) - def _runPipelines(self, only_for_source=None): - current_records = MultiRecord() - record_histories = MultiRecordHistory( - self._last_records, current_records) - ppmngr = PipelineManager( - self._app, self.out_dir, record_histories) + def _runPipelinesSafe(self, only_for_source=None): + try: + self._runPipelines(only_for_source) + except Exception as ex: + logger.error("Error while running asset pipeline:") + logger.exception(ex) + + def _runPipelines(self, only_for_source): + from piecrust.baking.baker import Baker - # Create the pipelines, but also remember some stuff for what - # we want to do. - for src in self._app.sources: - if src.config['pipeline'] != 'asset': - continue - if only_for_source is not None and src != only_for_source: - continue + allowed_sources = None + if only_for_source: + allowed_sources = [only_for_source.name] + baker = Baker( + self.appfactory, self._app, self.out_dir, + allowed_pipelines=['asset'], + allowed_sources=allowed_sources, + rotate_bake_records=False) + records = baker.bake() - ppmngr.createPipeline(src) + self._onPipelinesRun(records) - for ppinfo in ppmngr.getPipelines(): - self._runPipeline(ppmngr, ppinfo) - + def _onPipelinesRun(self, records): self.last_status_id += 1 - if self._last_records.success: - for rec in self._last_records.records: + if records.success: + for rec in records.records: changed = filter( lambda i: not i.was_collapsed_from_last_run, rec.getEntries()) @@ -217,7 +220,7 @@ 'id': self.last_status_id, 'type': 'pipeline_error', 'assets': []} - for rec in self._last_records.records: + for rec in records.records: for entry in rec.getEntries(): if entry.errors: asset_item = { @@ -227,58 +230,6 @@ self._notifyObservers(item) - def _runPipeline(self, ppmngr, ppinfo): - src = ppinfo.source - logger.debug("Running pipeline '%s' on: %s" % - (ppinfo.pipeline_name, src.name)) - - # Set the time. - procinfo = self._proc_infos[src.name] - procinfo.last_bake_time = time.time() - - # Process all items in the source. - pp = ppinfo.pipeline - cr = ppinfo.record_history.current - record_histories = ppmngr.record_histories - current_records = record_histories.current - jobctx = PipelineJobCreateContext(0, record_histories) - jobs = pp.createJobs(jobctx) - for job in jobs: - runctx = PipelineJobRunContext( - job, pp.record_name, record_histories) - - ppres = PipelineJobResult() - ppres.record_entry = pp.createRecordEntry(job, runctx) - - try: - pp.run(job, runctx, ppres) - except Exception as e: - ppres.record_entry.errors.append(str(e)) - - if ppres.next_step_job is not None: - logger.error("The processing loop for the server " - "doesn't support multi-step pipelines.") - - cr.addEntry(ppres.record_entry) - if not ppres.record_entry.success: - cr.success = False - current_records.success = False - logger.error("Errors found in %s:" % job.content_item.spec) - for e in ppres.record_entry.errors: - logger.error(" " + e) - - # Do all the final stuff. - ppmngr.postJobRun() - ppmngr.deleteStaleOutputs() - ppmngr.collapseRecords() - ppmngr.shutdownPipelines() - - # Swap the old record with the next record. - pr = ppinfo.record_history.previous - logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name)) - self._last_records.records.remove(pr) - self._last_records.records.append(cr) - def _notifyObservers(self, item): with self._obs_lock: observers = list(self._obs)