comparison piecrust/processing/base.py @ 213:e34a6826a3d4

internal: Remove the (unused) `new_only` flag for pipeline processing.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 31 Jan 2015 15:33:18 -0800
parents 989d0abd7c17
children a47580a0955b
comparison
equal deleted inserted replaced
212:701591ebfcba 213:e34a6826a3d4
169 if has_star: 169 if has_star:
170 procs.append(p) 170 procs.append(p)
171 return procs 171 return procs
172 172
173 def run(self, src_dir_or_file=None, *, 173 def run(self, src_dir_or_file=None, *,
174 new_only=False, delete=True, 174 delete=True, previous_record=None, save_record=True):
175 previous_record=None, save_record=True):
176 # Invoke pre-processors. 175 # Invoke pre-processors.
177 for proc in self.processors: 176 for proc in self.processors:
178 proc.onPipelineStart(self) 177 proc.onPipelineStart(self)
179 178
180 # Sort our processors again in case the pre-process step involved 179 # Sort our processors again in case the pre-process step involved
225 (src_dir_or_file, known_roots)) 224 (src_dir_or_file, known_roots))
226 225
227 ctx = ProcessingContext(base_dir, mount_info, queue, record) 226 ctx = ProcessingContext(base_dir, mount_info, queue, record)
228 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) 227 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file)
229 if os.path.isdir(src_dir_or_file): 228 if os.path.isdir(src_dir_or_file):
230 self.processDirectory(ctx, src_dir_or_file, new_only) 229 self.processDirectory(ctx, src_dir_or_file)
231 elif os.path.isfile(src_dir_or_file): 230 elif os.path.isfile(src_dir_or_file):
232 self.processFile(ctx, src_dir_or_file, new_only) 231 self.processFile(ctx, src_dir_or_file)
233 232
234 else: 233 else:
235 # Process everything. 234 # Process everything.
236 for name, info in self.mounts.items(): 235 for name, info in self.mounts.items():
237 path = info['path'] 236 path = info['path']
238 ctx = ProcessingContext(path, info, queue, record) 237 ctx = ProcessingContext(path, info, queue, record)
239 logger.debug("Initiating processing pipeline on: %s" % path) 238 logger.debug("Initiating processing pipeline on: %s" % path)
240 self.processDirectory(ctx, path, new_only) 239 self.processDirectory(ctx, path)
241 240
242 # Wait on all workers. 241 # Wait on all workers.
243 for w in pool: 242 for w in pool:
244 w.join() 243 w.join()
245 if abort.is_set(): 244 if abort.is_set():
246 raise Exception("Worker pool was aborted.") 245 raise Exception("Worker pool was aborted.")
247 246
248 # Handle deletions. 247 # Handle deletions.
249 if delete and not new_only: 248 if delete:
250 for path, reason in record.getDeletions(): 249 for path, reason in record.getDeletions():
251 logger.debug("Removing '%s': %s" % (path, reason)) 250 logger.debug("Removing '%s': %s" % (path, reason))
252 try: 251 try:
253 os.remove(path) 252 os.remove(path)
254 except FileNotFoundError: 253 except FileNotFoundError:
270 record.saveCurrent(record_cache.getCachePath(record_name)) 269 record.saveCurrent(record_cache.getCachePath(record_name))
271 logger.debug(format_timed(t, 'saved bake record', colored=False)) 270 logger.debug(format_timed(t, 'saved bake record', colored=False))
272 271
273 return record.detach() 272 return record.detach()
274 273
275 def processDirectory(self, ctx, start_dir, new_only=False): 274 def processDirectory(self, ctx, start_dir):
276 for dirpath, dirnames, filenames in os.walk(start_dir): 275 for dirpath, dirnames, filenames in os.walk(start_dir):
277 rel_dirpath = os.path.relpath(dirpath, start_dir) 276 rel_dirpath = os.path.relpath(dirpath, start_dir)
278 dirnames[:] = [d for d in dirnames 277 dirnames[:] = [d for d in dirnames
279 if not re_matchany(d, self.skip_patterns, rel_dirpath)] 278 if not re_matchany(d, self.skip_patterns, rel_dirpath)]
280 279
281 for filename in filenames: 280 for filename in filenames:
282 if re_matchany(filename, self.skip_patterns, rel_dirpath): 281 if re_matchany(filename, self.skip_patterns, rel_dirpath):
283 continue 282 continue
284 self.processFile(ctx, os.path.join(dirpath, filename), 283 self.processFile(ctx, os.path.join(dirpath, filename))
285 new_only) 284
286 285 def processFile(self, ctx, path):
287 def processFile(self, ctx, path, new_only=False):
288 logger.debug("Queuing: %s" % path) 286 logger.debug("Queuing: %s" % path)
289 job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path, new_only) 287 job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path)
290 ctx.job_queue.put_nowait(job) 288 ctx.job_queue.put_nowait(job)
291 289
292 290
293 class ProcessingWorkerContext(object): 291 class ProcessingWorkerContext(object):
294 def __init__(self, pipeline, record, 292 def __init__(self, pipeline, record,
299 self.abort_event = abort_event 297 self.abort_event = abort_event
300 self.pipeline_lock = pipeline_lock 298 self.pipeline_lock = pipeline_lock
301 299
302 300
303 class ProcessingWorkerJob(object): 301 class ProcessingWorkerJob(object):
304 def __init__(self, base_dir, mount_info, path, new_only=False): 302 def __init__(self, base_dir, mount_info, path):
305 self.base_dir = base_dir 303 self.base_dir = base_dir
306 self.mount_info = mount_info 304 self.mount_info = mount_info
307 self.path = path 305 self.path = path
308 self.new_only = new_only
309 306
310 307
311 class ProcessingWorker(threading.Thread): 308 class ProcessingWorker(threading.Thread):
312 def __init__(self, wid, ctx): 309 def __init__(self, wid, ctx):
313 super(ProcessingWorker, self).__init__() 310 super(ProcessingWorker, self).__init__()
337 pipeline = self.ctx.pipeline 334 pipeline = self.ctx.pipeline
338 record = self.ctx.record 335 record = self.ctx.record
339 336
340 rel_path = os.path.relpath(job.path, job.base_dir) 337 rel_path = os.path.relpath(job.path, job.base_dir)
341 previous_entry = record.getPreviousEntry(rel_path) 338 previous_entry = record.getPreviousEntry(rel_path)
342 if job.new_only and previous_entry:
343 return
344 339
345 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) 340 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
346 record.addEntry(record_entry) 341 record.addEntry(record_entry)
347 342
348 # Figure out if a previously processed file is overriding this one. 343 # Figure out if a previously processed file is overriding this one.