changeset 918:7f1da7e7b154

internal: The processing loop for the server is now using the baker. Instead of reimplementing a custom way to run the pipelines, the loop is just calling the baker, but only for asset pipelines.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 29 Sep 2017 08:43:34 -0700
parents 33a89139c284
children 725744a4c42d
files piecrust/baking/baker.py piecrust/serving/procloop.py
diffstat 2 files changed, 51 insertions(+), 90 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Fri Sep 29 08:42:38 2017 -0700
+++ b/piecrust/baking/baker.py	Fri Sep 29 08:43:34 2017 -0700
@@ -25,15 +25,20 @@
 
 
 class Baker(object):
-    def __init__(self, appfactory, app, out_dir,
-                 force=False, allowed_pipelines=None,
-                 forbidden_pipelines=None):
+    def __init__(self, appfactory, app, out_dir, *,
+                 force=False,
+                 allowed_pipelines=None,
+                 forbidden_pipelines=None,
+                 allowed_sources=None,
+                 rotate_bake_records=True):
         self.appfactory = appfactory
         self.app = app
         self.out_dir = out_dir
         self.force = force
         self.allowed_pipelines = allowed_pipelines
         self.forbidden_pipelines = forbidden_pipelines
+        self.allowed_sources = allowed_sources
+        self.rotate_bake_records = rotate_bake_records
 
     def bake(self):
         start_time = time.perf_counter()
@@ -89,7 +94,11 @@
             self.app, self.out_dir, record_histories)
         ok_pp = self.allowed_pipelines
         nok_pp = self.forbidden_pipelines
+        ok_src = self.allowed_sources
         for source in self.app.sources:
+            if ok_src is not None and source.name not in ok_src:
+                continue
+
             pname = get_pipeline_name_for_source(source)
             if ok_pp is not None and pname not in ok_pp:
                 continue
@@ -144,20 +153,21 @@
         ppmngr.shutdownPipelines()
 
         # Backup previous records.
-        records_dir, records_fn = os.path.split(records_path)
-        records_id, _ = os.path.splitext(records_fn)
-        for i in range(8, -1, -1):
-            suffix = '' if i == 0 else '.%d' % i
-            records_path_i = os.path.join(
-                records_dir,
-                '%s%s.records' % (records_id, suffix))
-            if os.path.exists(records_path_i):
-                records_path_next = os.path.join(
+        if self.rotate_bake_records:
+            records_dir, records_fn = os.path.split(records_path)
+            records_id, _ = os.path.splitext(records_fn)
+            for i in range(8, -1, -1):
+                suffix = '' if i == 0 else '.%d' % i
+                records_path_i = os.path.join(
                     records_dir,
-                    '%s.%s.records' % (records_id, i + 1))
-                if os.path.exists(records_path_next):
-                    os.remove(records_path_next)
-                os.rename(records_path_i, records_path_next)
+                    '%s%s.records' % (records_id, suffix))
+                if os.path.exists(records_path_i):
+                    records_path_next = os.path.join(
+                        records_dir,
+                        '%s.%s.records' % (records_id, i + 1))
+                    if os.path.exists(records_path_next):
+                        os.remove(records_path_next)
+                    os.rename(records_path_i, records_path_next)
 
         # Save the bake records.
         with format_timed_scope(logger, "saved bake records.",
--- a/piecrust/serving/procloop.py	Fri Sep 29 08:42:38 2017 -0700
+++ b/piecrust/serving/procloop.py	Fri Sep 29 08:43:34 2017 -0700
@@ -121,7 +121,7 @@
             return
 
         logger.debug("Doing initial processing loop bake...")
-        self._runPipelines()
+        self._runPipelinesSafe()
 
         logger.debug("Running processing loop...")
         self._last_config_mtime = os.path.getmtime(self._config_path)
@@ -156,7 +156,7 @@
                             logger,
                             "change detected, reprocessed '%s'." %
                             procinfo.source.name):
-                        self._runPipelines(procinfo.source)
+                        self._runPipelinesSafe(procinfo.source)
 
             time.sleep(self.interval)
 
@@ -176,30 +176,33 @@
             for item in src.getAllContents():
                 procinfo.paths.add(item.spec)
 
-    def _runPipelines(self, only_for_source=None):
-        current_records = MultiRecord()
-        record_histories = MultiRecordHistory(
-            self._last_records, current_records)
-        ppmngr = PipelineManager(
-            self._app, self.out_dir, record_histories)
+    def _runPipelinesSafe(self, only_for_source=None):
+        try:
+            self._runPipelines(only_for_source)
+        except Exception as ex:
+            logger.error("Error while running asset pipeline:")
+            logger.exception(ex)
+
+    def _runPipelines(self, only_for_source):
+        from piecrust.baking.baker import Baker
 
-        # Create the pipelines, but also remember some stuff for what
-        # we want to do.
-        for src in self._app.sources:
-            if src.config['pipeline'] != 'asset':
-                continue
-            if only_for_source is not None and src != only_for_source:
-                continue
+        allowed_sources = None
+        if only_for_source:
+            allowed_sources = [only_for_source.name]
+        baker = Baker(
+            self.appfactory, self._app, self.out_dir,
+            allowed_pipelines=['asset'],
+            allowed_sources=allowed_sources,
+            rotate_bake_records=False)
+        records = baker.bake()
 
-            ppmngr.createPipeline(src)
+        self._onPipelinesRun(records)
 
-        for ppinfo in ppmngr.getPipelines():
-            self._runPipeline(ppmngr, ppinfo)
-
+    def _onPipelinesRun(self, records):
         self.last_status_id += 1
 
-        if self._last_records.success:
-            for rec in self._last_records.records:
+        if records.success:
+            for rec in records.records:
                 changed = filter(
                     lambda i: not i.was_collapsed_from_last_run,
                     rec.getEntries())
@@ -217,7 +220,7 @@
                 'id': self.last_status_id,
                 'type': 'pipeline_error',
                 'assets': []}
-            for rec in self._last_records.records:
+            for rec in records.records:
                 for entry in rec.getEntries():
                     if entry.errors:
                         asset_item = {
@@ -227,58 +230,6 @@
 
                 self._notifyObservers(item)
 
-    def _runPipeline(self, ppmngr, ppinfo):
-        src = ppinfo.source
-        logger.debug("Running pipeline '%s' on: %s" %
-                     (ppinfo.pipeline_name, src.name))
-
-        # Set the time.
-        procinfo = self._proc_infos[src.name]
-        procinfo.last_bake_time = time.time()
-
-        # Process all items in the source.
-        pp = ppinfo.pipeline
-        cr = ppinfo.record_history.current
-        record_histories = ppmngr.record_histories
-        current_records = record_histories.current
-        jobctx = PipelineJobCreateContext(0, record_histories)
-        jobs = pp.createJobs(jobctx)
-        for job in jobs:
-            runctx = PipelineJobRunContext(
-                job, pp.record_name, record_histories)
-
-            ppres = PipelineJobResult()
-            ppres.record_entry = pp.createRecordEntry(job, runctx)
-
-            try:
-                pp.run(job, runctx, ppres)
-            except Exception as e:
-                ppres.record_entry.errors.append(str(e))
-
-            if ppres.next_step_job is not None:
-                logger.error("The processing loop for the server "
-                             "doesn't support multi-step pipelines.")
-
-            cr.addEntry(ppres.record_entry)
-            if not ppres.record_entry.success:
-                cr.success = False
-                current_records.success = False
-                logger.error("Errors found in %s:" % job.content_item.spec)
-                for e in ppres.record_entry.errors:
-                    logger.error("  " + e)
-
-        # Do all the final stuff.
-        ppmngr.postJobRun()
-        ppmngr.deleteStaleOutputs()
-        ppmngr.collapseRecords()
-        ppmngr.shutdownPipelines()
-
-        # Swap the old record with the next record.
-        pr = ppinfo.record_history.previous
-        logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name))
-        self._last_records.records.remove(pr)
-        self._last_records.records.append(cr)
-
     def _notifyObservers(self, item):
         with self._obs_lock:
             observers = list(self._obs)