Mercurial > piecrust2
comparison piecrust/processing/base.py @ 129:3080b6d02f40
Add ability for the processing pipeline to only process new assets.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sat, 15 Nov 2014 16:03:24 +0100 |
parents | e5cba2622d26 |
children | 9e4c2e68a129 |
comparison
equal
deleted
inserted
replaced
128:28444014ce7d | 129:3080b6d02f40 |
---|---|
135 def filterProcessors(self, authorized_names): | 135 def filterProcessors(self, authorized_names): |
136 self.processors = list(filter( | 136 self.processors = list(filter( |
137 lambda p: p.PROCESSOR_NAME in authorized_names, | 137 lambda p: p.PROCESSOR_NAME in authorized_names, |
138 self.processors)) | 138 self.processors)) |
139 | 139 |
140 def run(self, src_dir_or_file=None): | 140 def run(self, src_dir_or_file=None, new_only=False): |
141 # Invoke pre-processors. | 141 # Invoke pre-processors. |
142 for proc in self.processors: | 142 for proc in self.processors: |
143 proc.onPipelineStart(self) | 143 proc.onPipelineStart(self) |
144 | 144 |
145 # Sort our processors again in case the pre-process step involved | 145 # Sort our processors again in case the pre-process step involved |
184 (src_dir_or_file, self.mounts)) | 184 (src_dir_or_file, self.mounts)) |
185 | 185 |
186 ctx = ProcessingContext(base_dir, queue, record) | 186 ctx = ProcessingContext(base_dir, queue, record) |
187 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) | 187 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) |
188 if os.path.isdir(src_dir_or_file): | 188 if os.path.isdir(src_dir_or_file): |
189 self.processDirectory(ctx, src_dir_or_file) | 189 self.processDirectory(ctx, src_dir_or_file, new_only) |
190 elif os.path.isfile(src_dir_or_file): | 190 elif os.path.isfile(src_dir_or_file): |
191 self.processFile(ctx, src_dir_or_file) | 191 self.processFile(ctx, src_dir_or_file, new_only) |
192 | 192 |
193 else: | 193 else: |
194 # Process everything. | 194 # Process everything. |
195 for path in self.mounts: | 195 for path in self.mounts: |
196 ctx = ProcessingContext(path, queue, record) | 196 ctx = ProcessingContext(path, queue, record) |
197 logger.debug("Initiating processing pipeline on: %s" % path) | 197 logger.debug("Initiating processing pipeline on: %s" % path) |
198 self.processDirectory(ctx, path) | 198 self.processDirectory(ctx, path, new_only) |
199 | 199 |
200 # Wait on all workers. | 200 # Wait on all workers. |
201 for w in pool: | 201 for w in pool: |
202 w.join() | 202 w.join() |
203 if abort.is_set(): | 203 if abort.is_set(): |
204 raise Exception("Worker pool was aborted.") | 204 raise Exception("Worker pool was aborted.") |
205 | 205 |
206 # Handle deletions. | 206 # Handle deletions. |
207 for path, reason in record.getDeletions(): | 207 if not new_only: |
208 logger.debug("Removing '%s': %s" % (path, reason)) | 208 for path, reason in record.getDeletions(): |
209 os.remove(path) | 209 logger.debug("Removing '%s': %s" % (path, reason)) |
210 logger.info('[delete] %s' % path) | 210 os.remove(path) |
211 logger.info('[delete] %s' % path) | |
211 | 212 |
212 # Invoke post-processors. | 213 # Invoke post-processors. |
213 for proc in self.processors: | 214 for proc in self.processors: |
214 proc.onPipelineEnd(self) | 215 proc.onPipelineEnd(self) |
215 | 216 |
221 record.saveCurrent(record_cache.getCachePath(record_name)) | 222 record.saveCurrent(record_cache.getCachePath(record_name)) |
222 logger.debug(format_timed(t, 'saved bake record', colored=False)) | 223 logger.debug(format_timed(t, 'saved bake record', colored=False)) |
223 | 224 |
224 return record | 225 return record |
225 | 226 |
226 def processDirectory(self, ctx, start_dir): | 227 def processDirectory(self, ctx, start_dir, new_only=False): |
227 for dirpath, dirnames, filenames in os.walk(start_dir): | 228 for dirpath, dirnames, filenames in os.walk(start_dir): |
228 rel_dirpath = os.path.relpath(dirpath, start_dir) | 229 rel_dirpath = os.path.relpath(dirpath, start_dir) |
229 dirnames[:] = [d for d in dirnames | 230 dirnames[:] = [d for d in dirnames |
230 if not re_matchany(d, self.skip_patterns, rel_dirpath)] | 231 if not re_matchany(d, self.skip_patterns, rel_dirpath)] |
231 | 232 |
232 for filename in filenames: | 233 for filename in filenames: |
233 if re_matchany(filename, self.skip_patterns, rel_dirpath): | 234 if re_matchany(filename, self.skip_patterns, rel_dirpath): |
234 continue | 235 continue |
235 self.processFile(ctx, os.path.join(dirpath, filename)) | 236 self.processFile(ctx, os.path.join(dirpath, filename), |
236 | 237 new_only) |
237 def processFile(self, ctx, path): | 238 |
239 def processFile(self, ctx, path, new_only=False): | |
238 logger.debug("Queuing: %s" % path) | 240 logger.debug("Queuing: %s" % path) |
239 job = ProcessingWorkerJob(ctx.base_dir, path) | 241 job = ProcessingWorkerJob(ctx.base_dir, path, new_only) |
240 ctx.job_queue.put_nowait(job) | 242 ctx.job_queue.put_nowait(job) |
241 | 243 |
242 | 244 |
243 class ProcessingWorkerContext(object): | 245 class ProcessingWorkerContext(object): |
244 def __init__(self, pipeline, record, | 246 def __init__(self, pipeline, record, |
249 self.abort_event = abort_event | 251 self.abort_event = abort_event |
250 self.pipeline_lock = pipeline_lock | 252 self.pipeline_lock = pipeline_lock |
251 | 253 |
252 | 254 |
253 class ProcessingWorkerJob(object): | 255 class ProcessingWorkerJob(object): |
254 def __init__(self, base_dir, path): | 256 def __init__(self, base_dir, path, new_only=False): |
255 self.base_dir = base_dir | 257 self.base_dir = base_dir |
256 self.path = path | 258 self.path = path |
259 self.new_only = new_only | |
257 | 260 |
258 | 261 |
259 class ProcessingWorker(threading.Thread): | 262 class ProcessingWorker(threading.Thread): |
260 def __init__(self, wid, ctx): | 263 def __init__(self, wid, ctx): |
261 super(ProcessingWorker, self).__init__() | 264 super(ProcessingWorker, self).__init__() |
285 pipeline = self.ctx.pipeline | 288 pipeline = self.ctx.pipeline |
286 record = self.ctx.record | 289 record = self.ctx.record |
287 | 290 |
288 rel_path = os.path.relpath(job.path, job.base_dir) | 291 rel_path = os.path.relpath(job.path, job.base_dir) |
289 previous_entry = record.getPreviousEntry(rel_path) | 292 previous_entry = record.getPreviousEntry(rel_path) |
293 if job.new_only and previous_entry: | |
294 return | |
295 | |
290 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) | 296 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) |
291 record.addEntry(record_entry) | 297 record.addEntry(record_entry) |
292 | 298 |
293 # Figure out if a previously processed file is overriding this one. | 299 # Figure out if a previously processed file is overriding this one. |
294 # This can happen if a theme file (processed via a mount point) | 300 # This can happen if a theme file (processed via a mount point) |