diff piecrust/serving/procloop.py @ 918:7f1da7e7b154

internal: The processing loop for the server is now using the baker. Instead of reimplementing a custom way to run the pipelines, the loop is just calling the baker, but only for asset pipelines.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 29 Sep 2017 08:43:34 -0700
parents 342e3ea24b5d
children 89d94955b818
line wrap: on
line diff
--- a/piecrust/serving/procloop.py	Fri Sep 29 08:42:38 2017 -0700
+++ b/piecrust/serving/procloop.py	Fri Sep 29 08:43:34 2017 -0700
@@ -121,7 +121,7 @@
             return
 
         logger.debug("Doing initial processing loop bake...")
-        self._runPipelines()
+        self._runPipelinesSafe()
 
         logger.debug("Running processing loop...")
         self._last_config_mtime = os.path.getmtime(self._config_path)
@@ -156,7 +156,7 @@
                             logger,
                             "change detected, reprocessed '%s'." %
                             procinfo.source.name):
-                        self._runPipelines(procinfo.source)
+                        self._runPipelinesSafe(procinfo.source)
 
             time.sleep(self.interval)
 
@@ -176,30 +176,33 @@
             for item in src.getAllContents():
                 procinfo.paths.add(item.spec)
 
-    def _runPipelines(self, only_for_source=None):
-        current_records = MultiRecord()
-        record_histories = MultiRecordHistory(
-            self._last_records, current_records)
-        ppmngr = PipelineManager(
-            self._app, self.out_dir, record_histories)
+    def _runPipelinesSafe(self, only_for_source=None):
+        try:
+            self._runPipelines(only_for_source)
+        except Exception as ex:
+            logger.error("Error while running asset pipeline:")
+            logger.exception(ex)
+
+    def _runPipelines(self, only_for_source):
+        from piecrust.baking.baker import Baker
 
-        # Create the pipelines, but also remember some stuff for what
-        # we want to do.
-        for src in self._app.sources:
-            if src.config['pipeline'] != 'asset':
-                continue
-            if only_for_source is not None and src != only_for_source:
-                continue
+        allowed_sources = None
+        if only_for_source:
+            allowed_sources = [only_for_source.name]
+        baker = Baker(
+            self.appfactory, self._app, self.out_dir,
+            allowed_pipelines=['asset'],
+            allowed_sources=allowed_sources,
+            rotate_bake_records=False)
+        records = baker.bake()
 
-            ppmngr.createPipeline(src)
+        self._onPipelinesRun(records)
 
-        for ppinfo in ppmngr.getPipelines():
-            self._runPipeline(ppmngr, ppinfo)
-
+    def _onPipelinesRun(self, records):
         self.last_status_id += 1
 
-        if self._last_records.success:
-            for rec in self._last_records.records:
+        if records.success:
+            for rec in records.records:
                 changed = filter(
                     lambda i: not i.was_collapsed_from_last_run,
                     rec.getEntries())
@@ -217,7 +220,7 @@
                 'id': self.last_status_id,
                 'type': 'pipeline_error',
                 'assets': []}
-            for rec in self._last_records.records:
+            for rec in records.records:
                 for entry in rec.getEntries():
                     if entry.errors:
                         asset_item = {
@@ -227,58 +230,6 @@
 
                 self._notifyObservers(item)
 
-    def _runPipeline(self, ppmngr, ppinfo):
-        src = ppinfo.source
-        logger.debug("Running pipeline '%s' on: %s" %
-                     (ppinfo.pipeline_name, src.name))
-
-        # Set the time.
-        procinfo = self._proc_infos[src.name]
-        procinfo.last_bake_time = time.time()
-
-        # Process all items in the source.
-        pp = ppinfo.pipeline
-        cr = ppinfo.record_history.current
-        record_histories = ppmngr.record_histories
-        current_records = record_histories.current
-        jobctx = PipelineJobCreateContext(0, record_histories)
-        jobs = pp.createJobs(jobctx)
-        for job in jobs:
-            runctx = PipelineJobRunContext(
-                job, pp.record_name, record_histories)
-
-            ppres = PipelineJobResult()
-            ppres.record_entry = pp.createRecordEntry(job, runctx)
-
-            try:
-                pp.run(job, runctx, ppres)
-            except Exception as e:
-                ppres.record_entry.errors.append(str(e))
-
-            if ppres.next_step_job is not None:
-                logger.error("The processing loop for the server "
-                             "doesn't support multi-step pipelines.")
-
-            cr.addEntry(ppres.record_entry)
-            if not ppres.record_entry.success:
-                cr.success = False
-                current_records.success = False
-                logger.error("Errors found in %s:" % job.content_item.spec)
-                for e in ppres.record_entry.errors:
-                    logger.error("  " + e)
-
-        # Do all the final stuff.
-        ppmngr.postJobRun()
-        ppmngr.deleteStaleOutputs()
-        ppmngr.collapseRecords()
-        ppmngr.shutdownPipelines()
-
-        # Swap the old record with the next record.
-        pr = ppinfo.record_history.previous
-        logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name))
-        self._last_records.records.remove(pr)
-        self._last_records.records.append(cr)
-
     def _notifyObservers(self, item):
         with self._obs_lock:
             observers = list(self._obs)