Mercurial > piecrust2
comparison piecrust/processing/base.py @ 213:e34a6826a3d4
internal: Remove the (unused) `new_only` flag for pipeline processing.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 31 Jan 2015 15:33:18 -0800 |
parents | 989d0abd7c17 |
children | a47580a0955b |
comparison
equal
deleted
inserted
replaced
212:701591ebfcba | 213:e34a6826a3d4 |
---|---|
169 if has_star: | 169 if has_star: |
170 procs.append(p) | 170 procs.append(p) |
171 return procs | 171 return procs |
172 | 172 |
173 def run(self, src_dir_or_file=None, *, | 173 def run(self, src_dir_or_file=None, *, |
174 new_only=False, delete=True, | 174 delete=True, previous_record=None, save_record=True): |
175 previous_record=None, save_record=True): | |
176 # Invoke pre-processors. | 175 # Invoke pre-processors. |
177 for proc in self.processors: | 176 for proc in self.processors: |
178 proc.onPipelineStart(self) | 177 proc.onPipelineStart(self) |
179 | 178 |
180 # Sort our processors again in case the pre-process step involved | 179 # Sort our processors again in case the pre-process step involved |
225 (src_dir_or_file, known_roots)) | 224 (src_dir_or_file, known_roots)) |
226 | 225 |
227 ctx = ProcessingContext(base_dir, mount_info, queue, record) | 226 ctx = ProcessingContext(base_dir, mount_info, queue, record) |
228 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) | 227 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) |
229 if os.path.isdir(src_dir_or_file): | 228 if os.path.isdir(src_dir_or_file): |
230 self.processDirectory(ctx, src_dir_or_file, new_only) | 229 self.processDirectory(ctx, src_dir_or_file) |
231 elif os.path.isfile(src_dir_or_file): | 230 elif os.path.isfile(src_dir_or_file): |
232 self.processFile(ctx, src_dir_or_file, new_only) | 231 self.processFile(ctx, src_dir_or_file) |
233 | 232 |
234 else: | 233 else: |
235 # Process everything. | 234 # Process everything. |
236 for name, info in self.mounts.items(): | 235 for name, info in self.mounts.items(): |
237 path = info['path'] | 236 path = info['path'] |
238 ctx = ProcessingContext(path, info, queue, record) | 237 ctx = ProcessingContext(path, info, queue, record) |
239 logger.debug("Initiating processing pipeline on: %s" % path) | 238 logger.debug("Initiating processing pipeline on: %s" % path) |
240 self.processDirectory(ctx, path, new_only) | 239 self.processDirectory(ctx, path) |
241 | 240 |
242 # Wait on all workers. | 241 # Wait on all workers. |
243 for w in pool: | 242 for w in pool: |
244 w.join() | 243 w.join() |
245 if abort.is_set(): | 244 if abort.is_set(): |
246 raise Exception("Worker pool was aborted.") | 245 raise Exception("Worker pool was aborted.") |
247 | 246 |
248 # Handle deletions. | 247 # Handle deletions. |
249 if delete and not new_only: | 248 if delete: |
250 for path, reason in record.getDeletions(): | 249 for path, reason in record.getDeletions(): |
251 logger.debug("Removing '%s': %s" % (path, reason)) | 250 logger.debug("Removing '%s': %s" % (path, reason)) |
252 try: | 251 try: |
253 os.remove(path) | 252 os.remove(path) |
254 except FileNotFoundError: | 253 except FileNotFoundError: |
270 record.saveCurrent(record_cache.getCachePath(record_name)) | 269 record.saveCurrent(record_cache.getCachePath(record_name)) |
271 logger.debug(format_timed(t, 'saved bake record', colored=False)) | 270 logger.debug(format_timed(t, 'saved bake record', colored=False)) |
272 | 271 |
273 return record.detach() | 272 return record.detach() |
274 | 273 |
275 def processDirectory(self, ctx, start_dir, new_only=False): | 274 def processDirectory(self, ctx, start_dir): |
276 for dirpath, dirnames, filenames in os.walk(start_dir): | 275 for dirpath, dirnames, filenames in os.walk(start_dir): |
277 rel_dirpath = os.path.relpath(dirpath, start_dir) | 276 rel_dirpath = os.path.relpath(dirpath, start_dir) |
278 dirnames[:] = [d for d in dirnames | 277 dirnames[:] = [d for d in dirnames |
279 if not re_matchany(d, self.skip_patterns, rel_dirpath)] | 278 if not re_matchany(d, self.skip_patterns, rel_dirpath)] |
280 | 279 |
281 for filename in filenames: | 280 for filename in filenames: |
282 if re_matchany(filename, self.skip_patterns, rel_dirpath): | 281 if re_matchany(filename, self.skip_patterns, rel_dirpath): |
283 continue | 282 continue |
284 self.processFile(ctx, os.path.join(dirpath, filename), | 283 self.processFile(ctx, os.path.join(dirpath, filename)) |
285 new_only) | 284 |
286 | 285 def processFile(self, ctx, path): |
287 def processFile(self, ctx, path, new_only=False): | |
288 logger.debug("Queuing: %s" % path) | 286 logger.debug("Queuing: %s" % path) |
289 job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path, new_only) | 287 job = ProcessingWorkerJob(ctx.base_dir, ctx.mount_info, path) |
290 ctx.job_queue.put_nowait(job) | 288 ctx.job_queue.put_nowait(job) |
291 | 289 |
292 | 290 |
293 class ProcessingWorkerContext(object): | 291 class ProcessingWorkerContext(object): |
294 def __init__(self, pipeline, record, | 292 def __init__(self, pipeline, record, |
299 self.abort_event = abort_event | 297 self.abort_event = abort_event |
300 self.pipeline_lock = pipeline_lock | 298 self.pipeline_lock = pipeline_lock |
301 | 299 |
302 | 300 |
303 class ProcessingWorkerJob(object): | 301 class ProcessingWorkerJob(object): |
304 def __init__(self, base_dir, mount_info, path, new_only=False): | 302 def __init__(self, base_dir, mount_info, path): |
305 self.base_dir = base_dir | 303 self.base_dir = base_dir |
306 self.mount_info = mount_info | 304 self.mount_info = mount_info |
307 self.path = path | 305 self.path = path |
308 self.new_only = new_only | |
309 | 306 |
310 | 307 |
311 class ProcessingWorker(threading.Thread): | 308 class ProcessingWorker(threading.Thread): |
312 def __init__(self, wid, ctx): | 309 def __init__(self, wid, ctx): |
313 super(ProcessingWorker, self).__init__() | 310 super(ProcessingWorker, self).__init__() |
337 pipeline = self.ctx.pipeline | 334 pipeline = self.ctx.pipeline |
338 record = self.ctx.record | 335 record = self.ctx.record |
339 | 336 |
340 rel_path = os.path.relpath(job.path, job.base_dir) | 337 rel_path = os.path.relpath(job.path, job.base_dir) |
341 previous_entry = record.getPreviousEntry(rel_path) | 338 previous_entry = record.getPreviousEntry(rel_path) |
342 if job.new_only and previous_entry: | |
343 return | |
344 | 339 |
345 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) | 340 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) |
346 record.addEntry(record_entry) | 341 record.addEntry(record_entry) |
347 | 342 |
348 # Figure out if a previously processed file is overriding this one. | 343 # Figure out if a previously processed file is overriding this one. |