Mercurial > piecrust2
comparison piecrust/processing/base.py @ 120:133845647083
Better error management and removal support in baking/processing.
* Baker and processor pipeline now store errors in their records.
* They also support deleting output files that are no longer valid.
* The basic transitional record class implements more boilerplate code.
* The processor pipeline is run from the `bake` command directly.
* New unit tests.
* Unit test mocking now mocks `os.remove` too.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 09 Nov 2014 14:46:23 -0800 |
parents | 6827dcc9d3fb |
children | e5cba2622d26 |
comparison
equal
deleted
inserted
replaced
119:0811f92cbdc7 | 120:133845647083 |
---|---|
1 import re | 1 import re |
2 import time | 2 import time |
3 import shutil | 3 import shutil |
4 import os.path | 4 import os.path |
5 import logging | 5 import logging |
6 import hashlib | |
6 import threading | 7 import threading |
7 from queue import Queue, Empty | 8 from queue import Queue, Empty |
8 from piecrust.chefutil import format_timed | 9 from piecrust.chefutil import format_timed |
10 from piecrust.processing.records import ( | |
11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord, | |
12 FLAG_PROCESSED, FLAG_OVERRIDEN) | |
9 from piecrust.processing.tree import (ProcessingTreeBuilder, | 13 from piecrust.processing.tree import (ProcessingTreeBuilder, |
10 ProcessingTreeRunner, STATE_DIRTY, print_node) | 14 ProcessingTreeRunner, ProcessingTreeError, STATE_DIRTY, print_node) |
11 from piecrust.records import Record | |
12 | 15 |
13 | 16 |
14 logger = logging.getLogger(__name__) | 17 logger = logging.getLogger(__name__) |
15 | 18 |
16 | 19 |
92 out_path = os.path.join(out_dir, out_name) | 95 out_path = os.path.join(out_dir, out_name) |
93 return self._doProcess(path, out_path) | 96 return self._doProcess(path, out_path) |
94 | 97 |
95 def _doProcess(self, in_path, out_path): | 98 def _doProcess(self, in_path, out_path): |
96 raise NotImplementedError() | 99 raise NotImplementedError() |
97 | |
98 | |
99 class ProcessorPipelineRecord(Record): | |
100 VERSION = 1 | |
101 | |
102 def __init__(self): | |
103 super(ProcessorPipelineRecord, self).__init__() | |
104 | |
105 def addEntry(self, item): | |
106 self.entries.append(item) | |
107 | |
108 def hasOverrideEntry(self, rel_path): | |
109 return self.findEntry(rel_path) is not None | |
110 | |
111 def findEntry(self, rel_path): | |
112 rel_path = rel_path.lower() | |
113 for entry in self.entries: | |
114 for out_path in entry.rel_outputs: | |
115 if out_path.lower() == rel_path: | |
116 return entry | |
117 return None | |
118 | |
119 | |
120 class ProcessorPipelineRecordEntry(object): | |
121 def __init__(self, base_dir, rel_input, is_processed=False, | |
122 is_overridden=False): | |
123 self.base_dir = base_dir | |
124 self.rel_input = rel_input | |
125 self.rel_outputs = [] | |
126 self.is_processed = is_processed | |
127 self.is_overridden = is_overridden | |
128 | |
129 @property | |
130 def path(self): | |
131 return os.path.join(self.base_dir, self.rel_input) | |
132 | 100 |
133 | 101 |
134 class ProcessingContext(object): | 102 class ProcessingContext(object): |
135 def __init__(self, base_dir, job_queue, record=None): | 103 def __init__(self, base_dir, job_queue, record=None): |
136 self.base_dir = base_dir | 104 self.base_dir = base_dir |
175 | 143 |
176 # Sort our processors again in case the pre-process step involved | 144 # Sort our processors again in case the pre-process step involved |
177 # patching the processors with some new ones. | 145 # patching the processors with some new ones. |
178 self.processors.sort(key=lambda p: p.priority) | 146 self.processors.sort(key=lambda p: p.priority) |
179 | 147 |
148 # Create the pipeline record. | |
149 record = TransitionalProcessorPipelineRecord() | |
150 record_cache = self.app.cache.getCache('baker') | |
151 record_name = ( | |
152 'assets_' + | |
153 hashlib.md5(self.out_dir.encode('utf8')).hexdigest() + | |
154 '.record') | |
155 if not self.force and record_cache.has(record_name): | |
156 t = time.clock() | |
157 record.loadPrevious(record_cache.getCachePath(record_name)) | |
158 logger.debug(format_timed(t, 'loaded previous bake record', | |
159 colored=False)) | |
160 | |
180 # Create the workers. | 161 # Create the workers. |
181 pool = [] | 162 pool = [] |
182 queue = Queue() | 163 queue = Queue() |
183 abort = threading.Event() | 164 abort = threading.Event() |
184 pipeline_lock = threading.Lock() | 165 pipeline_lock = threading.Lock() |
185 record = ProcessorPipelineRecord() | |
186 for i in range(self.num_workers): | 166 for i in range(self.num_workers): |
187 ctx = ProcessingWorkerContext(self, record, queue, abort, | 167 ctx = ProcessingWorkerContext(self, record, |
188 pipeline_lock) | 168 queue, abort, pipeline_lock) |
189 worker = ProcessingWorker(i, ctx) | 169 worker = ProcessingWorker(i, ctx) |
190 worker.start() | 170 worker.start() |
191 pool.append(worker) | 171 pool.append(worker) |
192 | 172 |
193 if src_dir_or_file is not None: | 173 if src_dir_or_file is not None: |
220 for w in pool: | 200 for w in pool: |
221 w.join() | 201 w.join() |
222 if abort.is_set(): | 202 if abort.is_set(): |
223 raise Exception("Worker pool was aborted.") | 203 raise Exception("Worker pool was aborted.") |
224 | 204 |
205 # Handle deletions. | |
206 for path, reason in record.getDeletions(): | |
207 logger.debug("Removing '%s': %s" % (path, reason)) | |
208 os.remove(path) | |
209 logger.info('[delete] %s' % path) | |
210 | |
225 # Invoke post-processors. | 211 # Invoke post-processors. |
226 for proc in self.processors: | 212 for proc in self.processors: |
227 proc.onPipelineEnd(self) | 213 proc.onPipelineEnd(self) |
214 | |
215 # Save the process record. | |
216 t = time.clock() | |
217 record.current.process_time = time.time() | |
218 record.current.out_dir = self.out_dir | |
219 record.collapseRecords() | |
220 record.saveCurrent(record_cache.getCachePath(record_name)) | |
221 logger.debug(format_timed(t, 'saved bake record', colored=False)) | |
228 | 222 |
229 return record | 223 return record |
230 | 224 |
231 def processDirectory(self, ctx, start_dir): | 225 def processDirectory(self, ctx, start_dir): |
232 for dirpath, dirnames, filenames in os.walk(start_dir): | 226 for dirpath, dirnames, filenames in os.walk(start_dir): |
244 job = ProcessingWorkerJob(ctx.base_dir, path) | 238 job = ProcessingWorkerJob(ctx.base_dir, path) |
245 ctx.job_queue.put_nowait(job) | 239 ctx.job_queue.put_nowait(job) |
246 | 240 |
247 | 241 |
248 class ProcessingWorkerContext(object): | 242 class ProcessingWorkerContext(object): |
249 def __init__(self, pipeline, record, work_queue, abort_event, | 243 def __init__(self, pipeline, record, |
250 pipeline_lock): | 244 work_queue, abort_event, pipeline_lock): |
251 self.pipeline = pipeline | 245 self.pipeline = pipeline |
252 self.record = record | 246 self.record = record |
253 self.work_queue = work_queue | 247 self.work_queue = work_queue |
254 self.abort_event = abort_event | 248 self.abort_event = abort_event |
255 self.pipeline_lock = pipeline_lock | 249 self.pipeline_lock = pipeline_lock |
289 start_time = time.clock() | 283 start_time = time.clock() |
290 pipeline = self.ctx.pipeline | 284 pipeline = self.ctx.pipeline |
291 record = self.ctx.record | 285 record = self.ctx.record |
292 | 286 |
293 rel_path = os.path.relpath(job.path, job.base_dir) | 287 rel_path = os.path.relpath(job.path, job.base_dir) |
288 previous_entry = record.getPreviousEntry(rel_path) | |
289 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path) | |
290 record.addEntry(record_entry) | |
294 | 291 |
295 # Figure out if a previously processed file is overriding this one. | 292 # Figure out if a previously processed file is overriding this one. |
296 # This can happen if a theme file (processed via a mount point) | 293 # This can happen if a theme file (processed via a mount point) |
297 # is overridden in the user's website. | 294 # is overridden in the user's website. |
298 if record.hasOverrideEntry(rel_path): | 295 if record.current.hasOverrideEntry(rel_path): |
299 record.addEntry(ProcessorPipelineRecordEntry( | 296 record_entry.flags |= FLAG_OVERRIDEN |
300 job.base_dir, rel_path, | |
301 is_processed=False, is_overridden=True)) | |
302 logger.info(format_timed(start_time, | 297 logger.info(format_timed(start_time, |
303 '%s [not baked, overridden]' % rel_path)) | 298 '%s [not baked, overridden]' % rel_path)) |
304 return | 299 return |
305 | 300 |
306 builder = ProcessingTreeBuilder(pipeline.processors) | 301 try: |
307 tree_root = builder.build(rel_path) | 302 builder = ProcessingTreeBuilder(pipeline.processors) |
303 tree_root = builder.build(rel_path) | |
304 except ProcessingTreeError as ex: | |
305 record_entry.errors.append(str(ex)) | |
306 logger.error("Error processing %s: %s" % (rel_path, ex)) | |
307 return | |
308 | |
308 print_node(tree_root, recursive=True) | 309 print_node(tree_root, recursive=True) |
309 leaves = tree_root.getLeaves() | 310 leaves = tree_root.getLeaves() |
310 fi = ProcessorPipelineRecordEntry(job.base_dir, rel_path) | 311 record_entry.rel_outputs = [l.path for l in leaves] |
311 fi.rel_outputs = [l.path for l in leaves] | 312 |
312 record.addEntry(fi) | 313 force = (pipeline.force or previous_entry is None or |
313 | 314 not previous_entry.was_processed_successfully) |
314 force = pipeline.force | |
315 if not force: | 315 if not force: |
316 force = re_matchany(rel_path, pipeline.force_patterns) | 316 force = re_matchany(rel_path, pipeline.force_patterns) |
317 | 317 |
318 if force: | 318 if force: |
319 tree_root.setState(STATE_DIRTY, True) | 319 tree_root.setState(STATE_DIRTY, True) |
320 | 320 |
321 runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir, | 321 try: |
322 pipeline.out_dir, self.ctx.pipeline_lock) | 322 runner = ProcessingTreeRunner( |
323 if runner.processSubTree(tree_root): | 323 job.base_dir, pipeline.tmp_dir, |
324 fi.is_processed = True | 324 pipeline.out_dir, self.ctx.pipeline_lock) |
325 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) | 325 if runner.processSubTree(tree_root): |
326 record_entry.flags |= FLAG_PROCESSED | |
327 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) | |
328 except ProcessingTreeError as ex: | |
329 record_entry.errors.append(str(ex)) | |
330 logger.error("Error processing %s: %s" % (rel_path, ex)) | |
326 | 331 |
327 | 332 |
328 def make_re(patterns): | 333 def make_re(patterns): |
329 re_patterns = [] | 334 re_patterns = [] |
330 for pat in patterns: | 335 for pat in patterns: |