changeset 129:3080b6d02f40

Add ability for the processing pipeline to only process new assets.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 15 Nov 2014 16:03:24 +0100
parents 28444014ce7d
children 7f81c84f7ddb
files piecrust/processing/base.py
diffstat 1 files changed, 19 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/processing/base.py	Fri Nov 14 22:49:50 2014 +0100
+++ b/piecrust/processing/base.py	Sat Nov 15 16:03:24 2014 +0100
@@ -137,7 +137,7 @@
             lambda p: p.PROCESSOR_NAME in authorized_names,
             self.processors))
 
-    def run(self, src_dir_or_file=None):
+    def run(self, src_dir_or_file=None, new_only=False):
         # Invoke pre-processors.
         for proc in self.processors:
             proc.onPipelineStart(self)
@@ -186,16 +186,16 @@
             ctx = ProcessingContext(base_dir, queue, record)
             logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file)
             if os.path.isdir(src_dir_or_file):
-                self.processDirectory(ctx, src_dir_or_file)
+                self.processDirectory(ctx, src_dir_or_file, new_only)
             elif os.path.isfile(src_dir_or_file):
-                self.processFile(ctx, src_dir_or_file)
+                self.processFile(ctx, src_dir_or_file, new_only)
 
         else:
             # Process everything.
             for path in self.mounts:
                 ctx = ProcessingContext(path, queue, record)
                 logger.debug("Initiating processing pipeline on: %s" % path)
-                self.processDirectory(ctx, path)
+                self.processDirectory(ctx, path, new_only)
 
         # Wait on all workers.
         for w in pool:
@@ -204,10 +204,11 @@
             raise Exception("Worker pool was aborted.")
 
         # Handle deletions.
-        for path, reason in record.getDeletions():
-            logger.debug("Removing '%s': %s" % (path, reason))
-            os.remove(path)
-            logger.info('[delete] %s' % path)
+        if not new_only:
+            for path, reason in record.getDeletions():
+                logger.debug("Removing '%s': %s" % (path, reason))
+                os.remove(path)
+                logger.info('[delete] %s' % path)
 
         # Invoke post-processors.
         for proc in self.processors:
@@ -223,7 +224,7 @@
 
         return record
 
-    def processDirectory(self, ctx, start_dir):
+    def processDirectory(self, ctx, start_dir, new_only=False):
         for dirpath, dirnames, filenames in os.walk(start_dir):
             rel_dirpath = os.path.relpath(dirpath, start_dir)
             dirnames[:] = [d for d in dirnames
@@ -232,11 +233,12 @@
             for filename in filenames:
                 if re_matchany(filename, self.skip_patterns, rel_dirpath):
                     continue
-                self.processFile(ctx, os.path.join(dirpath, filename))
+                self.processFile(ctx, os.path.join(dirpath, filename),
+                                 new_only)
 
-    def processFile(self, ctx, path):
+    def processFile(self, ctx, path, new_only=False):
         logger.debug("Queuing: %s" % path)
-        job = ProcessingWorkerJob(ctx.base_dir, path)
+        job = ProcessingWorkerJob(ctx.base_dir, path, new_only)
         ctx.job_queue.put_nowait(job)
 
 
@@ -251,9 +253,10 @@
 
 
 class ProcessingWorkerJob(object):
-    def __init__(self, base_dir, path):
+    def __init__(self, base_dir, path, new_only=False):
         self.base_dir = base_dir
         self.path = path
+        self.new_only = new_only
 
 
 class ProcessingWorker(threading.Thread):
@@ -287,6 +290,9 @@
 
         rel_path = os.path.relpath(job.path, job.base_dir)
         previous_entry = record.getPreviousEntry(rel_path)
+        if job.new_only and previous_entry:
+            return
+
         record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
         record.addEntry(record_entry)