Mercurial > piecrust2
changeset 860:c71472e6537f
refactor: Get the processing loop in the server functional again.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 08 Jun 2017 08:51:27 -0700 |
parents | 86994e076be4 |
children | d214918d4d2c |
files | piecrust/serving/procloop.py |
diffstat | 1 files changed, 63 insertions(+), 57 deletions(-) [+] |
line wrap: on
line diff
--- a/piecrust/serving/procloop.py Thu Jun 08 08:51:00 2017 -0700 +++ b/piecrust/serving/procloop.py Thu Jun 08 08:51:27 2017 -0700 @@ -110,8 +110,19 @@ self._obs.remove(obs) def run(self): - self._init() + logger.debug("Initializing processing loop with output: %s" % + self.out_dir) + try: + self._init() + except Exception as ex: + logger.error("Error initializing processing loop:") + logger.exception(ex) + return + logger.debug("Doing initial processing loop bake...") + self._runPipelines() + + logger.debug("Running processing loop...") self._last_config_mtime = os.path.getmtime(self._config_path) while True: @@ -123,7 +134,7 @@ self._runPipelines() continue - for procinfo in self._proc_infos: + for procinfo in self._proc_infos.values(): # For each assets folder we try to find the first new or # modified file. If any, we just run the pipeline on # that source. @@ -140,7 +151,7 @@ found_new_or_modified = True break if found_new_or_modified: - self._runPipeline(procinfo) + self._runPipelines(procinfo.source) time.sleep(self.interval) @@ -148,21 +159,23 @@ self._app = self.appfactory.create() self._last_records = MultiRecord() - self._proc_infos = [] + self._proc_infos = {} for src in self._app.sources: if src.config['pipeline'] != 'asset': continue procinfo = _AssetProcessingInfo(src) - self._proc_infos.append(procinfo) + self._proc_infos[src.name] = procinfo # Build the list of initial asset files. for item in src.getAllContents(): procinfo.paths.add(item.spec) - def _runPipelines(self): - record_histories = MultiRecordHistory(MultiRecord(), self._records) - self._ppmngr = PipelineManager( + def _runPipelines(self, only_for_source=None): + current_records = MultiRecord() + record_histories = MultiRecordHistory( + self._last_records, current_records) + ppmngr = PipelineManager( self._app, self.out_dir, record_histories) # Create the pipelines, but also remember some stuff for what @@ -170,77 +183,69 @@ for src in self._app.sources: if src.config['pipeline'] != 'asset': continue - - ppinfo = self._ppmngr.createPipeline(src) - api = _AssetProcessingInfo() - ppinfo.userdata = api + if only_for_source is not None and src != only_for_source: + continue - current_records = MultiRecord() - record_histories = MultiRecordHistory( - self._records, current_records) + ppmngr.createPipeline(src) - for ppinfo, procinfo in self._pipelines: - self._runPipeline(ppinfo, procinfo, record_histories) + for ppinfo in ppmngr.getPipelines(): + self._runPipeline(ppmngr, ppinfo) - status_id = self.last_status_id + 1 self.last_status_id += 1 - if self._records.success: - changed = filter( - lambda i: not i.was_collapsed_from_last_run, - self._record.entries) - changed = itertools.chain.from_iterable( - map(lambda i: i.rel_outputs, changed)) - changed = list(changed) - item = { - 'id': status_id, - 'type': 'pipeline_success', - 'assets': changed} + if self._last_records.success: + for rec in self._last_records.records: + changed = filter( + lambda i: not i.was_collapsed_from_last_run, + rec.getEntries()) + changed = itertools.chain.from_iterable( + map(lambda i: i.out_paths, changed)) + changed = list(changed) + item = { + 'id': self.last_status_id, + 'type': 'pipeline_success', + 'assets': changed} - self._notifyObservers(item) + self._notifyObservers(item) else: item = { - 'id': status_id, + 'id': self.last_status_id, 'type': 'pipeline_error', 'assets': []} - for entry in self._record.entries: - if entry.errors: - asset_item = { - 'path': entry.path, - 'errors': list(entry.errors)} - item['assets'].append(asset_item) - - self._notifyObservers(item) + for rec in self._last_records.records: + for entry in rec.getEntries(): + if entry.errors: + asset_item = { + 'path': entry.item_spec, + 'errors': list(entry.errors)} + item['assets'].append(asset_item) - def _runPipeline(self, procinfo): - procinfo.last_bake_time = time.time() - - src = procinfo.source + self._notifyObservers(item) - current_records = MultiRecord() - record_histories = MultiRecordHistory( - self._last_records, current_records) - ppmngr = PipelineManager( - self._app, self.out_dir, record_histories) - ppinfo = ppmngr.createPipeline(src) - + def _runPipeline(self, ppmngr, ppinfo): + src = ppinfo.source logger.debug("Running pipeline '%s' on: %s" % (ppinfo.pipeline_name, src.name)) + # Set the time. + procinfo = self._proc_infos[src.name] + procinfo.last_bake_time = time.time() + # Process all items in the source. pp = ppinfo.pipeline cr = ppinfo.record_history.current - jobctx = PipelineJobCreateContext(src) - for item in src.getAllContents(): - job = pp.createJob(item, jobctx) - + record_histories = ppmngr.record_histories + current_records = record_histories.current + jobctx = PipelineJobCreateContext(0, record_histories) + jobs = pp.createJobs(jobctx) + for job in jobs: ppres = PipelineJobResult() ppres.record_entry = pp.createRecordEntry(job) runctx = PipelineJobRunContext( - ppinfo.pipeline_ctx, job, record_histories) + job, pp, record_histories) try: - pp.run(item, runctx, ppres) + pp.run(job, runctx, ppres) except Exception as e: ppres.record_entry.errors.append(str(e)) @@ -252,7 +257,7 @@ if not ppres.record_entry.success: cr.success = False current_records.success = False - logger.error("Errors found in %s:" % item.spec) + logger.error("Errors found in %s:" % job.content_item.spec) for e in ppres.record_entry.errors: logger.error(" " + e) @@ -264,6 +269,7 @@ # Swap the old record with the next record. pr = ppinfo.record_history.previous + logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name)) self._last_records.records.remove(pr) self._last_records.records.append(cr)