comparison piecrust/processing/base.py @ 129:3080b6d02f40

Add ability for the processing pipeline to only process new assets.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 15 Nov 2014 16:03:24 +0100
parents e5cba2622d26
children 9e4c2e68a129
comparison
equal deleted inserted replaced
128:28444014ce7d 129:3080b6d02f40
135 def filterProcessors(self, authorized_names): 135 def filterProcessors(self, authorized_names):
136 self.processors = list(filter( 136 self.processors = list(filter(
137 lambda p: p.PROCESSOR_NAME in authorized_names, 137 lambda p: p.PROCESSOR_NAME in authorized_names,
138 self.processors)) 138 self.processors))
139 139
140 def run(self, src_dir_or_file=None): 140 def run(self, src_dir_or_file=None, new_only=False):
141 # Invoke pre-processors. 141 # Invoke pre-processors.
142 for proc in self.processors: 142 for proc in self.processors:
143 proc.onPipelineStart(self) 143 proc.onPipelineStart(self)
144 144
145 # Sort our processors again in case the pre-process step involved 145 # Sort our processors again in case the pre-process step involved
184 (src_dir_or_file, self.mounts)) 184 (src_dir_or_file, self.mounts))
185 185
186 ctx = ProcessingContext(base_dir, queue, record) 186 ctx = ProcessingContext(base_dir, queue, record)
187 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) 187 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file)
188 if os.path.isdir(src_dir_or_file): 188 if os.path.isdir(src_dir_or_file):
189 self.processDirectory(ctx, src_dir_or_file) 189 self.processDirectory(ctx, src_dir_or_file, new_only)
190 elif os.path.isfile(src_dir_or_file): 190 elif os.path.isfile(src_dir_or_file):
191 self.processFile(ctx, src_dir_or_file) 191 self.processFile(ctx, src_dir_or_file, new_only)
192 192
193 else: 193 else:
194 # Process everything. 194 # Process everything.
195 for path in self.mounts: 195 for path in self.mounts:
196 ctx = ProcessingContext(path, queue, record) 196 ctx = ProcessingContext(path, queue, record)
197 logger.debug("Initiating processing pipeline on: %s" % path) 197 logger.debug("Initiating processing pipeline on: %s" % path)
198 self.processDirectory(ctx, path) 198 self.processDirectory(ctx, path, new_only)
199 199
200 # Wait on all workers. 200 # Wait on all workers.
201 for w in pool: 201 for w in pool:
202 w.join() 202 w.join()
203 if abort.is_set(): 203 if abort.is_set():
204 raise Exception("Worker pool was aborted.") 204 raise Exception("Worker pool was aborted.")
205 205
206 # Handle deletions. 206 # Handle deletions.
207 for path, reason in record.getDeletions(): 207 if not new_only:
208 logger.debug("Removing '%s': %s" % (path, reason)) 208 for path, reason in record.getDeletions():
209 os.remove(path) 209 logger.debug("Removing '%s': %s" % (path, reason))
210 logger.info('[delete] %s' % path) 210 os.remove(path)
211 logger.info('[delete] %s' % path)
211 212
212 # Invoke post-processors. 213 # Invoke post-processors.
213 for proc in self.processors: 214 for proc in self.processors:
214 proc.onPipelineEnd(self) 215 proc.onPipelineEnd(self)
215 216
221 record.saveCurrent(record_cache.getCachePath(record_name)) 222 record.saveCurrent(record_cache.getCachePath(record_name))
222 logger.debug(format_timed(t, 'saved bake record', colored=False)) 223 logger.debug(format_timed(t, 'saved bake record', colored=False))
223 224
224 return record 225 return record
225 226
226 def processDirectory(self, ctx, start_dir): 227 def processDirectory(self, ctx, start_dir, new_only=False):
227 for dirpath, dirnames, filenames in os.walk(start_dir): 228 for dirpath, dirnames, filenames in os.walk(start_dir):
228 rel_dirpath = os.path.relpath(dirpath, start_dir) 229 rel_dirpath = os.path.relpath(dirpath, start_dir)
229 dirnames[:] = [d for d in dirnames 230 dirnames[:] = [d for d in dirnames
230 if not re_matchany(d, self.skip_patterns, rel_dirpath)] 231 if not re_matchany(d, self.skip_patterns, rel_dirpath)]
231 232
232 for filename in filenames: 233 for filename in filenames:
233 if re_matchany(filename, self.skip_patterns, rel_dirpath): 234 if re_matchany(filename, self.skip_patterns, rel_dirpath):
234 continue 235 continue
235 self.processFile(ctx, os.path.join(dirpath, filename)) 236 self.processFile(ctx, os.path.join(dirpath, filename),
236 237 new_only)
237 def processFile(self, ctx, path): 238
239 def processFile(self, ctx, path, new_only=False):
238 logger.debug("Queuing: %s" % path) 240 logger.debug("Queuing: %s" % path)
239 job = ProcessingWorkerJob(ctx.base_dir, path) 241 job = ProcessingWorkerJob(ctx.base_dir, path, new_only)
240 ctx.job_queue.put_nowait(job) 242 ctx.job_queue.put_nowait(job)
241 243
242 244
243 class ProcessingWorkerContext(object): 245 class ProcessingWorkerContext(object):
244 def __init__(self, pipeline, record, 246 def __init__(self, pipeline, record,
249 self.abort_event = abort_event 251 self.abort_event = abort_event
250 self.pipeline_lock = pipeline_lock 252 self.pipeline_lock = pipeline_lock
251 253
252 254
253 class ProcessingWorkerJob(object): 255 class ProcessingWorkerJob(object):
254 def __init__(self, base_dir, path): 256 def __init__(self, base_dir, path, new_only=False):
255 self.base_dir = base_dir 257 self.base_dir = base_dir
256 self.path = path 258 self.path = path
259 self.new_only = new_only
257 260
258 261
259 class ProcessingWorker(threading.Thread): 262 class ProcessingWorker(threading.Thread):
260 def __init__(self, wid, ctx): 263 def __init__(self, wid, ctx):
261 super(ProcessingWorker, self).__init__() 264 super(ProcessingWorker, self).__init__()
285 pipeline = self.ctx.pipeline 288 pipeline = self.ctx.pipeline
286 record = self.ctx.record 289 record = self.ctx.record
287 290
288 rel_path = os.path.relpath(job.path, job.base_dir) 291 rel_path = os.path.relpath(job.path, job.base_dir)
289 previous_entry = record.getPreviousEntry(rel_path) 292 previous_entry = record.getPreviousEntry(rel_path)
293 if job.new_only and previous_entry:
294 return
295
290 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) 296 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
291 record.addEntry(record_entry) 297 record.addEntry(record_entry)
292 298
293 # Figure out if a previously processed file is overriding this one. 299 # Figure out if a previously processed file is overriding this one.
294 # This can happen if a theme file (processed via a mount point) 300 # This can happen if a theme file (processed via a mount point)