diff piecrust/baking/baker.py @ 853:f070a4fc033c

core: Continue PieCrust3 refactor, simplify pages. The asset pipeline is still the only function pipeline at this point. * No more `QualifiedPage`, and several other pieces of code deleted. * Data providers are simpler and more focused. For instance, the page iterator doesn't try to support other types of items. * Route parameters are proper known source metadata to remove the confusion between the two. * Make the baker and pipeline more correctly manage records and record histories. * Add support for record collapsing and deleting stale outputs in the asset pipeline.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 21 May 2017 00:06:59 -0700
parents 4850f8c21b6e
children 08e02c2a2a1a
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Wed May 17 00:11:48 2017 -0700
+++ b/piecrust/baking/baker.py	Sun May 21 00:06:59 2017 -0700
@@ -16,10 +16,10 @@
 logger = logging.getLogger(__name__)
 
 
-def get_bake_records_path(app, out_dir):
+def get_bake_records_path(app, out_dir, *, suffix=''):
     records_cache = app.cache.getCache('baker')
     records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest()
-    records_name = records_id + '.record'
+    records_name = '%s%s.records' % (records_id, suffix)
     return records_cache.getCachePath(records_name)
 
 
@@ -74,7 +74,7 @@
 
         # Create the bake records history which tracks what's up-to-date
         # or not since last time we baked to the given output folder.
-        record_history = MultiRecordHistory(previous_records, self._records)
+        record_histories = MultiRecordHistory(previous_records, self._records)
 
         # Pre-create all caches.
         for cache_name in ['app', 'baker', 'pages', 'renders']:
@@ -87,8 +87,6 @@
         #
         # Also, create and initialize each pipeline for each source.
         sources_by_realm = {}
-        ppctx = PipelineContext(self.out_dir, record_history,
-                                force=self.force)
         for source in self.app.sources:
             pname = source.config['pipeline']
             if pname in self.allowed_pipelines:
@@ -96,8 +94,14 @@
                     source.config['realm'], [])
 
                 pp = self._pipeline_classes[pname](source)
+
+                record_name = _get_record_name(source.name, pname)
+                record_history = record_histories.getHistory(record_name)
+                ppctx = PipelineContext(self.out_dir, record_history,
+                                        force=self.force)
                 pp.initialize(ppctx)
-                srclist.append((source, pp))
+
+                srclist.append((source, pp, ppctx))
             else:
                 logger.debug(
                     "Skip source '%s' because pipeline '%s' is ignored." %
@@ -112,7 +116,19 @@
         for realm in realm_list:
             srclist = sources_by_realm.get(realm)
             if srclist is not None:
-                self._bakeRealm(record_history, pool, realm, srclist)
+                self._bakeRealm(pool, srclist)
+
+        # Handle deletions.
+        for realm in realm_list:
+            srclist = sources_by_realm.get(realm)
+            if srclist is not None:
+                self._deleteStaleOutputs(pool, srclist)
+
+        # Collapse records.
+        for realm in realm_list:
+            srclist = sources_by_realm.get(realm)
+            if srclist is not None:
+                self._collapseRecords(srclist)
 
         # All done with the workers. Close the pool and get reports.
         pool_stats = pool.close()
@@ -120,13 +136,13 @@
         for ps in pool_stats:
             if ps is not None:
                 total_stats.mergeStats(ps)
-        record_history.current.stats = total_stats
+        record_histories.current.stats = total_stats
 
         # Shutdown the pipelines.
         for realm in realm_list:
             srclist = sources_by_realm.get(realm)
             if srclist is not None:
-                for _, pp in srclist:
+                for _, pp, ppctx in srclist:
                     pp.shutdown(ppctx)
 
         # Backup previous records.
@@ -136,28 +152,28 @@
             suffix = '' if i == 0 else '.%d' % i
             records_path_i = os.path.join(
                 records_dir,
-                '%s%s.record' % (records_id, suffix))
+                '%s%s.records' % (records_id, suffix))
             if os.path.exists(records_path_i):
                 records_path_next = os.path.join(
                     records_dir,
-                    '%s.%s.record' % (records_id, i + 1))
+                    '%s.%s.records' % (records_id, i + 1))
                 if os.path.exists(records_path_next):
                     os.remove(records_path_next)
                 os.rename(records_path_i, records_path_next)
 
-        # Save the bake record.
+        # Save the bake records.
         with format_timed_scope(logger, "saved bake records.",
                                 level=logging.DEBUG, colored=False):
-            record_history.current.bake_time = time.time()
-            record_history.current.out_dir = self.out_dir
-            record_history.current.save(records_path)
+            record_histories.current.bake_time = time.time()
+            record_histories.current.out_dir = self.out_dir
+            record_histories.current.save(records_path)
 
         # All done.
         self.app.config.set('baker/is_baking', False)
         logger.debug(format_timed(start_time, 'done baking'))
 
         self._records = None
-        return record_history.current
+        return record_histories.current
 
     def _handleCacheValidity(self, previous_records, current_records):
         start_time = time.perf_counter()
@@ -170,8 +186,8 @@
             # version of the app.
             reason = "not valid anymore"
         elif previous_records.invalidated:
-            # We have no valid previous bake record.
-            reason = "need bake record regeneration"
+            # We have no valid previous bake records.
+            reason = "need bake records regeneration"
         else:
             # Check if any template has changed since the last bake. Since
             # there could be some advanced conditional logic going on, we'd
@@ -200,8 +216,8 @@
                 start_time, "cache is assumed valid", colored=False))
             return True
 
-    def _bakeRealm(self, record_history, pool, realm, srclist):
-        for source, pp in srclist:
+    def _bakeRealm(self, pool, srclist):
+        for source, pp, ppctx in srclist:
             logger.debug("Queuing jobs for source '%s' using pipeline '%s'." %
                          (source.name, pp.PIPELINE_NAME))
             jobs = [BakeJob(source.name, item.spec, item.metadata)
@@ -209,6 +225,25 @@
             pool.queueJobs(jobs)
         pool.wait()
 
+    def _deleteStaleOutputs(self, pool, srclist):
+        for source, pp, ppctx in srclist:
+            ppctx.record_history.build()
+
+            to_delete = pp.getDeletions(ppctx)
+            if to_delete is not None:
+                for path, reason in to_delete:
+                    logger.debug("Removing '%s': %s" % (path, reason))
+                    ppctx.current_record.deleted_out_paths.append(path)
+                    try:
+                        os.remove(path)
+                    except FileNotFoundError:
+                        pass
+                    logger.info('[delete] %s' % path)
+
+    def _collapseRecords(self, srclist):
+        for source, pp, ppctx in srclist:
+            pp.collapseRecords(ppctx)
+
     def _logErrors(self, item_spec, errors):
         logger.error("Errors found in %s:" % item_spec)
         for e in errors:
@@ -237,19 +272,21 @@
         return pool
 
     def _handleWorkerResult(self, job, res):
-        record_name = self._getRecordName(job)
+        record_name = _get_record_name(job.source_name, res.pipeline_name)
         record = self._records.getRecord(record_name)
-        record.entries.append(res.record)
-        if not res.record.success:
+        record.entries.append(res.record_entry)
+        if not res.record_entry.success:
             record.success = False
             self._records.success = False
-            self._logErrors(job.item_spec, res.record.errors)
+            self._logErrors(job.item_spec, res.record_entry.errors)
 
     def _handleWorkerError(self, job, exc_data):
         e = RecordEntry()
         e.item_spec = job.item_spec
         e.errors.append(str(exc_data))
 
+        ppname = self.app.getSource(job.source_name).config['pipeline']
+        record_name = _get_record_name(job.source_name, ppname)
         record_name = self._getRecordName(job)
         record = self._records.getRecord(record_name)
         record.entries.append(e)
@@ -261,7 +298,6 @@
         if self.app.debug:
             logger.error(exc_data.traceback)
 
-    def _getRecordName(self, job):
-        sn = job.source_name
-        ppn = self.app.getSource(sn).config['pipeline']
-        return '%s@%s' % (sn, ppn)
+
+def _get_record_name(source_name, pipeline_name):
+    return '%s@%s' % (source_name, pipeline_name)