changeset 860:c71472e6537f

refactor: Get the processing loop in the server functional again.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 08 Jun 2017 08:51:27 -0700
parents 86994e076be4
children d214918d4d2c
files piecrust/serving/procloop.py
diffstat 1 files changed, 63 insertions(+), 57 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/serving/procloop.py	Thu Jun 08 08:51:00 2017 -0700
+++ b/piecrust/serving/procloop.py	Thu Jun 08 08:51:27 2017 -0700
@@ -110,8 +110,19 @@
             self._obs.remove(obs)
 
     def run(self):
-        self._init()
+        logger.debug("Initializing processing loop with output: %s" %
+                     self.out_dir)
+        try:
+            self._init()
+        except Exception as ex:
+            logger.error("Error initializing processing loop:")
+            logger.exception(ex)
+            return
 
+        logger.debug("Doing initial processing loop bake...")
+        self._runPipelines()
+
+        logger.debug("Running processing loop...")
         self._last_config_mtime = os.path.getmtime(self._config_path)
 
         while True:
@@ -123,7 +134,7 @@
                 self._runPipelines()
                 continue
 
-            for procinfo in self._proc_infos:
+            for procinfo in self._proc_infos.values():
                 # For each assets folder we try to find the first new or
                 # modified file. If any, we just run the pipeline on
                 # that source.
@@ -140,7 +151,7 @@
                         found_new_or_modified = True
                         break
                 if found_new_or_modified:
-                    self._runPipeline(procinfo)
+                    self._runPipelines(procinfo.source)
 
             time.sleep(self.interval)
 
@@ -148,21 +159,23 @@
         self._app = self.appfactory.create()
         self._last_records = MultiRecord()
 
-        self._proc_infos = []
+        self._proc_infos = {}
         for src in self._app.sources:
             if src.config['pipeline'] != 'asset':
                 continue
 
             procinfo = _AssetProcessingInfo(src)
-            self._proc_infos.append(procinfo)
+            self._proc_infos[src.name] = procinfo
 
             # Build the list of initial asset files.
             for item in src.getAllContents():
                 procinfo.paths.add(item.spec)
 
-    def _runPipelines(self):
-        record_histories = MultiRecordHistory(MultiRecord(), self._records)
-        self._ppmngr = PipelineManager(
+    def _runPipelines(self, only_for_source=None):
+        current_records = MultiRecord()
+        record_histories = MultiRecordHistory(
+            self._last_records, current_records)
+        ppmngr = PipelineManager(
             self._app, self.out_dir, record_histories)
 
         # Create the pipelines, but also remember some stuff for what
@@ -170,77 +183,69 @@
         for src in self._app.sources:
             if src.config['pipeline'] != 'asset':
                 continue
-
-            ppinfo = self._ppmngr.createPipeline(src)
-            api = _AssetProcessingInfo()
-            ppinfo.userdata = api
+            if only_for_source is not None and src != only_for_source:
+                continue
 
-        current_records = MultiRecord()
-        record_histories = MultiRecordHistory(
-            self._records, current_records)
+            ppmngr.createPipeline(src)
 
-        for ppinfo, procinfo in self._pipelines:
-            self._runPipeline(ppinfo, procinfo, record_histories)
+        for ppinfo in ppmngr.getPipelines():
+            self._runPipeline(ppmngr, ppinfo)
 
-        status_id = self.last_status_id + 1
         self.last_status_id += 1
 
-        if self._records.success:
-            changed = filter(
-                lambda i: not i.was_collapsed_from_last_run,
-                self._record.entries)
-            changed = itertools.chain.from_iterable(
-                map(lambda i: i.rel_outputs, changed))
-            changed = list(changed)
-            item = {
-                'id': status_id,
-                'type': 'pipeline_success',
-                'assets': changed}
+        if self._last_records.success:
+            for rec in self._last_records.records:
+                changed = filter(
+                    lambda i: not i.was_collapsed_from_last_run,
+                    rec.getEntries())
+                changed = itertools.chain.from_iterable(
+                    map(lambda i: i.out_paths, changed))
+                changed = list(changed)
+                item = {
+                    'id': self.last_status_id,
+                    'type': 'pipeline_success',
+                    'assets': changed}
 
-            self._notifyObservers(item)
+                self._notifyObservers(item)
         else:
             item = {
-                'id': status_id,
+                'id': self.last_status_id,
                 'type': 'pipeline_error',
                 'assets': []}
-            for entry in self._record.entries:
-                if entry.errors:
-                    asset_item = {
-                        'path': entry.path,
-                        'errors': list(entry.errors)}
-                    item['assets'].append(asset_item)
-
-            self._notifyObservers(item)
+            for rec in self._last_records.records:
+                for entry in rec.getEntries():
+                    if entry.errors:
+                        asset_item = {
+                            'path': entry.item_spec,
+                            'errors': list(entry.errors)}
+                        item['assets'].append(asset_item)
 
-    def _runPipeline(self, procinfo):
-        procinfo.last_bake_time = time.time()
-
-        src = procinfo.source
+                self._notifyObservers(item)
 
-        current_records = MultiRecord()
-        record_histories = MultiRecordHistory(
-            self._last_records, current_records)
-        ppmngr = PipelineManager(
-            self._app, self.out_dir, record_histories)
-        ppinfo = ppmngr.createPipeline(src)
-
+    def _runPipeline(self, ppmngr, ppinfo):
+        src = ppinfo.source
         logger.debug("Running pipeline '%s' on: %s" %
                      (ppinfo.pipeline_name, src.name))
 
+        # Set the time.
+        procinfo = self._proc_infos[src.name]
+        procinfo.last_bake_time = time.time()
+
         # Process all items in the source.
         pp = ppinfo.pipeline
         cr = ppinfo.record_history.current
-        jobctx = PipelineJobCreateContext(src)
-        for item in src.getAllContents():
-            job = pp.createJob(item, jobctx)
-
+        record_histories = ppmngr.record_histories
+        current_records = record_histories.current
+        jobctx = PipelineJobCreateContext(0, record_histories)
+        jobs = pp.createJobs(jobctx)
+        for job in jobs:
             ppres = PipelineJobResult()
             ppres.record_entry = pp.createRecordEntry(job)
 
             runctx = PipelineJobRunContext(
-                ppinfo.pipeline_ctx, job, record_histories)
+                job, pp, record_histories)
             try:
-                pp.run(item, runctx, ppres)
+                pp.run(job, runctx, ppres)
             except Exception as e:
                 ppres.record_entry.errors.append(str(e))
 
@@ -252,7 +257,7 @@
             if not ppres.record_entry.success:
                 cr.success = False
                 current_records.success = False
-                logger.error("Errors found in %s:" % item.spec)
+                logger.error("Errors found in %s:" % job.content_item.spec)
                 for e in ppres.record_entry.errors:
                     logger.error("  " + e)
 
@@ -264,6 +269,7 @@
 
         # Swap the old record with the next record.
         pr = ppinfo.record_history.previous
+        logger.debug("Swapping record '%s' with '%s'." % (pr.name, cr.name))
         self._last_records.records.remove(pr)
         self._last_records.records.append(cr)