Mercurial > piecrust2
comparison piecrust/processing/base.py @ 129:3080b6d02f40
Add ability for the processing pipeline to only process new assets.
| author | Ludovic Chabant <ludovic@chabant.com> |
|---|---|
| date | Sat, 15 Nov 2014 16:03:24 +0100 |
| parents | e5cba2622d26 |
| children | 9e4c2e68a129 |
comparison
equal
deleted
inserted
replaced
| 128:28444014ce7d | 129:3080b6d02f40 |
|---|---|
| 135 def filterProcessors(self, authorized_names): | 135 def filterProcessors(self, authorized_names): |
| 136 self.processors = list(filter( | 136 self.processors = list(filter( |
| 137 lambda p: p.PROCESSOR_NAME in authorized_names, | 137 lambda p: p.PROCESSOR_NAME in authorized_names, |
| 138 self.processors)) | 138 self.processors)) |
| 139 | 139 |
| 140 def run(self, src_dir_or_file=None): | 140 def run(self, src_dir_or_file=None, new_only=False): |
| 141 # Invoke pre-processors. | 141 # Invoke pre-processors. |
| 142 for proc in self.processors: | 142 for proc in self.processors: |
| 143 proc.onPipelineStart(self) | 143 proc.onPipelineStart(self) |
| 144 | 144 |
| 145 # Sort our processors again in case the pre-process step involved | 145 # Sort our processors again in case the pre-process step involved |
| 184 (src_dir_or_file, self.mounts)) | 184 (src_dir_or_file, self.mounts)) |
| 185 | 185 |
| 186 ctx = ProcessingContext(base_dir, queue, record) | 186 ctx = ProcessingContext(base_dir, queue, record) |
| 187 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) | 187 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) |
| 188 if os.path.isdir(src_dir_or_file): | 188 if os.path.isdir(src_dir_or_file): |
| 189 self.processDirectory(ctx, src_dir_or_file) | 189 self.processDirectory(ctx, src_dir_or_file, new_only) |
| 190 elif os.path.isfile(src_dir_or_file): | 190 elif os.path.isfile(src_dir_or_file): |
| 191 self.processFile(ctx, src_dir_or_file) | 191 self.processFile(ctx, src_dir_or_file, new_only) |
| 192 | 192 |
| 193 else: | 193 else: |
| 194 # Process everything. | 194 # Process everything. |
| 195 for path in self.mounts: | 195 for path in self.mounts: |
| 196 ctx = ProcessingContext(path, queue, record) | 196 ctx = ProcessingContext(path, queue, record) |
| 197 logger.debug("Initiating processing pipeline on: %s" % path) | 197 logger.debug("Initiating processing pipeline on: %s" % path) |
| 198 self.processDirectory(ctx, path) | 198 self.processDirectory(ctx, path, new_only) |
| 199 | 199 |
| 200 # Wait on all workers. | 200 # Wait on all workers. |
| 201 for w in pool: | 201 for w in pool: |
| 202 w.join() | 202 w.join() |
| 203 if abort.is_set(): | 203 if abort.is_set(): |
| 204 raise Exception("Worker pool was aborted.") | 204 raise Exception("Worker pool was aborted.") |
| 205 | 205 |
| 206 # Handle deletions. | 206 # Handle deletions. |
| 207 for path, reason in record.getDeletions(): | 207 if not new_only: |
| 208 logger.debug("Removing '%s': %s" % (path, reason)) | 208 for path, reason in record.getDeletions(): |
| 209 os.remove(path) | 209 logger.debug("Removing '%s': %s" % (path, reason)) |
| 210 logger.info('[delete] %s' % path) | 210 os.remove(path) |
| 211 logger.info('[delete] %s' % path) | |
| 211 | 212 |
| 212 # Invoke post-processors. | 213 # Invoke post-processors. |
| 213 for proc in self.processors: | 214 for proc in self.processors: |
| 214 proc.onPipelineEnd(self) | 215 proc.onPipelineEnd(self) |
| 215 | 216 |
| 221 record.saveCurrent(record_cache.getCachePath(record_name)) | 222 record.saveCurrent(record_cache.getCachePath(record_name)) |
| 222 logger.debug(format_timed(t, 'saved bake record', colored=False)) | 223 logger.debug(format_timed(t, 'saved bake record', colored=False)) |
| 223 | 224 |
| 224 return record | 225 return record |
| 225 | 226 |
| 226 def processDirectory(self, ctx, start_dir): | 227 def processDirectory(self, ctx, start_dir, new_only=False): |
| 227 for dirpath, dirnames, filenames in os.walk(start_dir): | 228 for dirpath, dirnames, filenames in os.walk(start_dir): |
| 228 rel_dirpath = os.path.relpath(dirpath, start_dir) | 229 rel_dirpath = os.path.relpath(dirpath, start_dir) |
| 229 dirnames[:] = [d for d in dirnames | 230 dirnames[:] = [d for d in dirnames |
| 230 if not re_matchany(d, self.skip_patterns, rel_dirpath)] | 231 if not re_matchany(d, self.skip_patterns, rel_dirpath)] |
| 231 | 232 |
| 232 for filename in filenames: | 233 for filename in filenames: |
| 233 if re_matchany(filename, self.skip_patterns, rel_dirpath): | 234 if re_matchany(filename, self.skip_patterns, rel_dirpath): |
| 234 continue | 235 continue |
| 235 self.processFile(ctx, os.path.join(dirpath, filename)) | 236 self.processFile(ctx, os.path.join(dirpath, filename), |
| 236 | 237 new_only) |
| 237 def processFile(self, ctx, path): | 238 |
| 239 def processFile(self, ctx, path, new_only=False): | |
| 238 logger.debug("Queuing: %s" % path) | 240 logger.debug("Queuing: %s" % path) |
| 239 job = ProcessingWorkerJob(ctx.base_dir, path) | 241 job = ProcessingWorkerJob(ctx.base_dir, path, new_only) |
| 240 ctx.job_queue.put_nowait(job) | 242 ctx.job_queue.put_nowait(job) |
| 241 | 243 |
| 242 | 244 |
| 243 class ProcessingWorkerContext(object): | 245 class ProcessingWorkerContext(object): |
| 244 def __init__(self, pipeline, record, | 246 def __init__(self, pipeline, record, |
| 249 self.abort_event = abort_event | 251 self.abort_event = abort_event |
| 250 self.pipeline_lock = pipeline_lock | 252 self.pipeline_lock = pipeline_lock |
| 251 | 253 |
| 252 | 254 |
| 253 class ProcessingWorkerJob(object): | 255 class ProcessingWorkerJob(object): |
| 254 def __init__(self, base_dir, path): | 256 def __init__(self, base_dir, path, new_only=False): |
| 255 self.base_dir = base_dir | 257 self.base_dir = base_dir |
| 256 self.path = path | 258 self.path = path |
| 259 self.new_only = new_only | |
| 257 | 260 |
| 258 | 261 |
| 259 class ProcessingWorker(threading.Thread): | 262 class ProcessingWorker(threading.Thread): |
| 260 def __init__(self, wid, ctx): | 263 def __init__(self, wid, ctx): |
| 261 super(ProcessingWorker, self).__init__() | 264 super(ProcessingWorker, self).__init__() |
| 285 pipeline = self.ctx.pipeline | 288 pipeline = self.ctx.pipeline |
| 286 record = self.ctx.record | 289 record = self.ctx.record |
| 287 | 290 |
| 288 rel_path = os.path.relpath(job.path, job.base_dir) | 291 rel_path = os.path.relpath(job.path, job.base_dir) |
| 289 previous_entry = record.getPreviousEntry(rel_path) | 292 previous_entry = record.getPreviousEntry(rel_path) |
| 293 if job.new_only and previous_entry: | |
| 294 return | |
| 295 | |
| 290 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) | 296 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) |
| 291 record.addEntry(record_entry) | 297 record.addEntry(record_entry) |
| 292 | 298 |
| 293 # Figure out if a previously processed file is overriding this one. | 299 # Figure out if a previously processed file is overriding this one. |
| 294 # This can happen if a theme file (processed via a mount point) | 300 # This can happen if a theme file (processed via a mount point) |
