diff piecrust/serving/procloop.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents c2ea75e37540
children 448710d84121
line wrap: on
line diff
--- a/piecrust/serving/procloop.py	Sun May 21 00:06:59 2017 -0700
+++ b/piecrust/serving/procloop.py	Sun Jun 04 23:34:28 2017 -0700
@@ -7,8 +7,11 @@
 import itertools
 import threading
 from piecrust import CONFIG_PATH, THEME_CONFIG_PATH
-from piecrust.app import PieCrust
-from piecrust.processing.pipeline import ProcessorPipeline
+from piecrust.pipelines.base import (
+    PipelineJobCreateContext, PipelineJobRunContext, PipelineJobResult,
+    PipelineManager)
+from piecrust.pipelines.records import (
+    MultiRecord, MultiRecordHistory)
 
 
 logger = logging.getLogger(__name__)
@@ -74,25 +77,28 @@
         self._running = 2
 
 
+class _AssetProcessingInfo:
+    def __init__(self, source):
+        self.source = source
+        self.paths = set()
+        self.last_bake_time = time.time()
+
+
 class ProcessingLoop(threading.Thread):
     def __init__(self, appfactory, out_dir):
-        super(ProcessingLoop, self).__init__(
-                name='pipeline-reloader', daemon=True)
+        super().__init__(name='pipeline-reloader', daemon=True)
         self.appfactory = appfactory
         self.out_dir = out_dir
         self.last_status_id = 0
         self.interval = 1
-        self.app = None
-        self._roots = []
-        self._monitor_assets_root = False
-        self._paths = set()
-        self._record = None
-        self._last_bake = 0
+        self._app = None
+        self._proc_infos = None
+        self._last_records = None
         self._last_config_mtime = 0
         self._obs = []
         self._obs_lock = threading.Lock()
         config_name = (
-                THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
+            THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
         self._config_path = os.path.join(appfactory.root_dir, config_name)
 
     def addObserver(self, obs):
@@ -104,116 +110,162 @@
             self._obs.remove(obs)
 
     def run(self):
-        self._initPipeline()
+        self._init()
 
-        self._last_bake = time.time()
         self._last_config_mtime = os.path.getmtime(self._config_path)
-        self._record = self.pipeline.run()
 
         while True:
             cur_config_time = os.path.getmtime(self._config_path)
             if self._last_config_mtime < cur_config_time:
                 logger.info("Site configuration changed, reloading pipeline.")
                 self._last_config_mtime = cur_config_time
-                self._initPipeline()
-                for root in self._roots:
-                    self._runPipeline(root)
+                self._init()
+                self._runPipelines()
                 continue
 
-            if self._monitor_assets_root:
-                assets_dir = os.path.join(self.app.root_dir, 'assets')
-                if os.path.isdir(assets_dir):
-                    logger.info("Assets directory was created, reloading "
-                                "pipeline.")
-                    self._initPipeline()
-                    self._runPipeline(assets_dir)
-                    continue
-
-            for root in self._roots:
-                # For each mount root we try to find the first new or
+            for procinfo in self._proc_infos:
+                # For each assets folder we try to find the first new or
                 # modified file. If any, we just run the pipeline on
-                # that mount.
+                # that source.
                 found_new_or_modified = False
-                for dirpath, dirnames, filenames in os.walk(root):
-                    for filename in filenames:
-                        path = os.path.join(dirpath, filename)
-                        if path not in self._paths:
-                            logger.debug("Found new asset: %s" % path)
-                            self._paths.add(path)
-                            found_new_or_modified = True
-                            break
-                        if os.path.getmtime(path) > self._last_bake:
-                            logger.debug("Found modified asset: %s" % path)
-                            found_new_or_modified = True
-                            break
-
-                    if found_new_or_modified:
+                for item in procinfo.source.getAllContents():
+                    path = item.spec
+                    if path not in procinfo.paths:
+                        logger.debug("Found new asset: %s" % path)
+                        procinfo.paths.add(path)
+                        found_new_or_modified = True
                         break
-
+                    if os.path.getmtime(path) > procinfo.last_bake_time:
+                        logger.debug("Found modified asset: %s" % path)
+                        found_new_or_modified = True
+                        break
                 if found_new_or_modified:
-                    self._runPipeline(root)
+                    self._runPipeline(procinfo)
 
             time.sleep(self.interval)
 
-    def _initPipeline(self):
-        # Create the app and pipeline.
-        self.app = self.appfactory.create()
-        self.pipeline = ProcessorPipeline(self.app, self.out_dir)
+    def _init(self):
+        self._app = self.appfactory.create()
+        self._last_records = MultiRecord()
+
+        self._proc_infos = []
+        for src in self._app.sources:
+            if src.config['pipeline'] != 'asset':
+                continue
 
-        # Get the list of assets directories.
-        self._roots = list(self.pipeline.mounts.keys())
+            procinfo = _AssetProcessingInfo(src)
+            self._proc_infos.append(procinfo)
 
-        # The 'assets' folder may not be in the mounts list if it doesn't
-        # exist yet, but we want to monitor for when the user creates it.
-        default_root = os.path.join(self.app.root_dir, 'assets')
-        self._monitor_assets_root = (default_root not in self._roots)
+            # Build the list of initial asset files.
+            for item in src.getAllContents():
+                procinfo.paths.add(item.spec)
+
+    def _runPipelines(self):
+        record_histories = MultiRecordHistory(MultiRecord(), self._records)
+        self._ppmngr = PipelineManager(
+            self._app, self.out_dir, record_histories)
 
-        # Build the list of initial asset files.
-        self._paths = set()
-        for root in self._roots:
-            for dirpath, dirnames, filenames in os.walk(root):
-                self._paths |= set([os.path.join(dirpath, f)
-                                    for f in filenames])
+        # Create the pipelines, but also remember some stuff for what
+        # we want to do.
+        for src in self._app.sources:
+            if src.config['pipeline'] != 'asset':
+                continue
+
+            ppinfo = self._ppmngr.createPipeline(src)
+            api = _AssetProcessingInfo()
+            ppinfo.userdata = api
+
+        current_records = MultiRecord()
+        record_histories = MultiRecordHistory(
+            self._records, current_records)
 
-    def _runPipeline(self, root):
-        self._last_bake = time.time()
-        try:
-            self._record = self.pipeline.run(
-                    root,
-                    previous_record=self._record,
-                    save_record=False)
+        for ppinfo, procinfo in self._pipelines:
+            self._runPipeline(ppinfo, procinfo, record_histories)
+
+        status_id = self.last_status_id + 1
+        self.last_status_id += 1
+
+        if self._records.success:
+            changed = filter(
+                lambda i: not i.was_collapsed_from_last_run,
+                self._record.entries)
+            changed = itertools.chain.from_iterable(
+                map(lambda i: i.rel_outputs, changed))
+            changed = list(changed)
+            item = {
+                'id': status_id,
+                'type': 'pipeline_success',
+                'assets': changed}
 
-            status_id = self.last_status_id + 1
-            self.last_status_id += 1
+            self._notifyObservers(item)
+        else:
+            item = {
+                'id': status_id,
+                'type': 'pipeline_error',
+                'assets': []}
+            for entry in self._record.entries:
+                if entry.errors:
+                    asset_item = {
+                        'path': entry.path,
+                        'errors': list(entry.errors)}
+                    item['assets'].append(asset_item)
+
+            self._notifyObservers(item)
 
-            if self._record.success:
-                changed = filter(
-                        lambda i: not i.was_collapsed_from_last_run,
-                        self._record.entries)
-                changed = itertools.chain.from_iterable(
-                        map(lambda i: i.rel_outputs, changed))
-                changed = list(changed)
-                item = {
-                        'id': status_id,
-                        'type': 'pipeline_success',
-                        'assets': changed}
+    def _runPipeline(self, procinfo):
+        procinfo.last_bake_time = time.time()
+
+        src = procinfo.source
+
+        current_records = MultiRecord()
+        record_histories = MultiRecordHistory(
+            self._last_records, current_records)
+        ppmngr = PipelineManager(
+            self._app, self.out_dir, record_histories)
+        ppinfo = ppmngr.createPipeline(src)
+
+        logger.debug("Running pipeline '%s' on: %s" %
+                     (ppinfo.pipeline_name, src.name))
 
-                self._notifyObservers(item)
-            else:
-                item = {
-                        'id': status_id,
-                        'type': 'pipeline_error',
-                        'assets': []}
-                for entry in self._record.entries:
-                    if entry.errors:
-                        asset_item = {
-                                'path': entry.path,
-                                'errors': list(entry.errors)}
-                        item['assets'].append(asset_item)
+        # Process all items in the source.
+        pp = ppinfo.pipeline
+        cr = ppinfo.record_history.current
+        jobctx = PipelineJobCreateContext(src)
+        for item in src.getAllContents():
+            job = pp.createJob(item, jobctx)
+
+            ppres = PipelineJobResult()
+            ppres.record_entry = pp.createRecordEntry(job)
+
+            runctx = PipelineJobRunContext(
+                ppinfo.pipeline_ctx, job, record_histories)
+            try:
+                pp.run(item, runctx, ppres)
+            except Exception as e:
+                ppres.record_entry.errors.append(str(e))
 
-                self._notifyObservers(item)
-        except Exception as ex:
-            logger.exception(ex)
+            if ppres.next_pass_job is not None:
+                logger.error("The processing loop for the server "
+                             "doesn't support multi-pass pipelines.")
+
+            cr.addEntry(ppres.record_entry)
+            if not ppres.record_entry.success:
+                cr.success = False
+                current_records.success = False
+                logger.error("Errors found in %s:" % item.spec)
+                for e in ppres.record_entry.errors:
+                    logger.error("  " + e)
+
+        # Do all the final stuff.
+        ppmngr.buildHistoryDiffs()
+        ppmngr.deleteStaleOutputs()
+        ppmngr.collapseRecords()
+        ppmngr.shutdownPipelines()
+
+        # Swap the old record with the next record.
+        pr = ppinfo.record_history.previous
+        self._last_records.records.remove(pr)
+        self._last_records.records.append(cr)
 
     def _notifyObservers(self, item):
         with self._obs_lock: