Mercurial > piecrust2
diff piecrust/baking/baker.py @ 853:f070a4fc033c
core: Continue PieCrust3 refactor, simplify pages.
The asset pipeline is still the only function pipeline at this point.
* No more `QualifiedPage`, and several other pieces of code deleted.
* Data providers are simpler and more focused. For instance, the page iterator
doesn't try to support other types of items.
* Route parameters are proper known source metadata to remove the confusion
between the two.
* Make the baker and pipeline more correctly manage records and record
histories.
* Add support for record collapsing and deleting stale outputs in the asset
pipeline.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 21 May 2017 00:06:59 -0700 |
parents | 4850f8c21b6e |
children | 08e02c2a2a1a |
line wrap: on
line diff
--- a/piecrust/baking/baker.py Wed May 17 00:11:48 2017 -0700 +++ b/piecrust/baking/baker.py Sun May 21 00:06:59 2017 -0700 @@ -16,10 +16,10 @@ logger = logging.getLogger(__name__) -def get_bake_records_path(app, out_dir): +def get_bake_records_path(app, out_dir, *, suffix=''): records_cache = app.cache.getCache('baker') records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest() - records_name = records_id + '.record' + records_name = '%s%s.records' % (records_id, suffix) return records_cache.getCachePath(records_name) @@ -74,7 +74,7 @@ # Create the bake records history which tracks what's up-to-date # or not since last time we baked to the given output folder. - record_history = MultiRecordHistory(previous_records, self._records) + record_histories = MultiRecordHistory(previous_records, self._records) # Pre-create all caches. for cache_name in ['app', 'baker', 'pages', 'renders']: @@ -87,8 +87,6 @@ # # Also, create and initialize each pipeline for each source. sources_by_realm = {} - ppctx = PipelineContext(self.out_dir, record_history, - force=self.force) for source in self.app.sources: pname = source.config['pipeline'] if pname in self.allowed_pipelines: @@ -96,8 +94,14 @@ source.config['realm'], []) pp = self._pipeline_classes[pname](source) + + record_name = _get_record_name(source.name, pname) + record_history = record_histories.getHistory(record_name) + ppctx = PipelineContext(self.out_dir, record_history, + force=self.force) pp.initialize(ppctx) - srclist.append((source, pp)) + + srclist.append((source, pp, ppctx)) else: logger.debug( "Skip source '%s' because pipeline '%s' is ignored." % @@ -112,7 +116,19 @@ for realm in realm_list: srclist = sources_by_realm.get(realm) if srclist is not None: - self._bakeRealm(record_history, pool, realm, srclist) + self._bakeRealm(pool, srclist) + + # Handle deletions. + for realm in realm_list: + srclist = sources_by_realm.get(realm) + if srclist is not None: + self._deleteStaleOutputs(pool, srclist) + + # Collapse records. + for realm in realm_list: + srclist = sources_by_realm.get(realm) + if srclist is not None: + self._collapseRecords(srclist) # All done with the workers. Close the pool and get reports. pool_stats = pool.close() @@ -120,13 +136,13 @@ for ps in pool_stats: if ps is not None: total_stats.mergeStats(ps) - record_history.current.stats = total_stats + record_histories.current.stats = total_stats # Shutdown the pipelines. for realm in realm_list: srclist = sources_by_realm.get(realm) if srclist is not None: - for _, pp in srclist: + for _, pp, ppctx in srclist: pp.shutdown(ppctx) # Backup previous records. @@ -136,28 +152,28 @@ suffix = '' if i == 0 else '.%d' % i records_path_i = os.path.join( records_dir, - '%s%s.record' % (records_id, suffix)) + '%s%s.records' % (records_id, suffix)) if os.path.exists(records_path_i): records_path_next = os.path.join( records_dir, - '%s.%s.record' % (records_id, i + 1)) + '%s.%s.records' % (records_id, i + 1)) if os.path.exists(records_path_next): os.remove(records_path_next) os.rename(records_path_i, records_path_next) - # Save the bake record. + # Save the bake records. with format_timed_scope(logger, "saved bake records.", level=logging.DEBUG, colored=False): - record_history.current.bake_time = time.time() - record_history.current.out_dir = self.out_dir - record_history.current.save(records_path) + record_histories.current.bake_time = time.time() + record_histories.current.out_dir = self.out_dir + record_histories.current.save(records_path) # All done. self.app.config.set('baker/is_baking', False) logger.debug(format_timed(start_time, 'done baking')) self._records = None - return record_history.current + return record_histories.current def _handleCacheValidity(self, previous_records, current_records): start_time = time.perf_counter() @@ -170,8 +186,8 @@ # version of the app. reason = "not valid anymore" elif previous_records.invalidated: - # We have no valid previous bake record. - reason = "need bake record regeneration" + # We have no valid previous bake records. + reason = "need bake records regeneration" else: # Check if any template has changed since the last bake. Since # there could be some advanced conditional logic going on, we'd @@ -200,8 +216,8 @@ start_time, "cache is assumed valid", colored=False)) return True - def _bakeRealm(self, record_history, pool, realm, srclist): - for source, pp in srclist: + def _bakeRealm(self, pool, srclist): + for source, pp, ppctx in srclist: logger.debug("Queuing jobs for source '%s' using pipeline '%s'." % (source.name, pp.PIPELINE_NAME)) jobs = [BakeJob(source.name, item.spec, item.metadata) @@ -209,6 +225,25 @@ pool.queueJobs(jobs) pool.wait() + def _deleteStaleOutputs(self, pool, srclist): + for source, pp, ppctx in srclist: + ppctx.record_history.build() + + to_delete = pp.getDeletions(ppctx) + if to_delete is not None: + for path, reason in to_delete: + logger.debug("Removing '%s': %s" % (path, reason)) + ppctx.current_record.deleted_out_paths.append(path) + try: + os.remove(path) + except FileNotFoundError: + pass + logger.info('[delete] %s' % path) + + def _collapseRecords(self, srclist): + for source, pp, ppctx in srclist: + pp.collapseRecords(ppctx) + def _logErrors(self, item_spec, errors): logger.error("Errors found in %s:" % item_spec) for e in errors: @@ -237,19 +272,21 @@ return pool def _handleWorkerResult(self, job, res): - record_name = self._getRecordName(job) + record_name = _get_record_name(job.source_name, res.pipeline_name) record = self._records.getRecord(record_name) - record.entries.append(res.record) - if not res.record.success: + record.entries.append(res.record_entry) + if not res.record_entry.success: record.success = False self._records.success = False - self._logErrors(job.item_spec, res.record.errors) + self._logErrors(job.item_spec, res.record_entry.errors) def _handleWorkerError(self, job, exc_data): e = RecordEntry() e.item_spec = job.item_spec e.errors.append(str(exc_data)) + ppname = self.app.getSource(job.source_name).config['pipeline'] + record_name = _get_record_name(job.source_name, ppname) record_name = self._getRecordName(job) record = self._records.getRecord(record_name) record.entries.append(e) @@ -261,7 +298,6 @@ if self.app.debug: logger.error(exc_data.traceback) - def _getRecordName(self, job): - sn = job.source_name - ppn = self.app.getSource(sn).config['pipeline'] - return '%s@%s' % (sn, ppn) + +def _get_record_name(source_name, pipeline_name): + return '%s@%s' % (source_name, pipeline_name)