comparison piecrust/processing/base.py @ 120:133845647083

Better error management and removal support in baking/processing. * Baker and processor pipeline now store errors in their records. * They also support deleting output files that are no longer valid. * The basic transitional record class implements more boilerplate code. * The processor pipeline is run from the `bake` command directly. * New unit tests. * Unit test mocking now mocks `os.remove` too.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 09 Nov 2014 14:46:23 -0800
parents 6827dcc9d3fb
children e5cba2622d26
comparison
equal deleted inserted replaced
119:0811f92cbdc7 120:133845647083
1 import re 1 import re
2 import time 2 import time
3 import shutil 3 import shutil
4 import os.path 4 import os.path
5 import logging 5 import logging
6 import hashlib
6 import threading 7 import threading
7 from queue import Queue, Empty 8 from queue import Queue, Empty
8 from piecrust.chefutil import format_timed 9 from piecrust.chefutil import format_timed
10 from piecrust.processing.records import (
11 ProcessorPipelineRecordEntry, TransitionalProcessorPipelineRecord,
12 FLAG_PROCESSED, FLAG_OVERRIDEN)
9 from piecrust.processing.tree import (ProcessingTreeBuilder, 13 from piecrust.processing.tree import (ProcessingTreeBuilder,
10 ProcessingTreeRunner, STATE_DIRTY, print_node) 14 ProcessingTreeRunner, ProcessingTreeError, STATE_DIRTY, print_node)
11 from piecrust.records import Record
12 15
13 16
14 logger = logging.getLogger(__name__) 17 logger = logging.getLogger(__name__)
15 18
16 19
92 out_path = os.path.join(out_dir, out_name) 95 out_path = os.path.join(out_dir, out_name)
93 return self._doProcess(path, out_path) 96 return self._doProcess(path, out_path)
94 97
95 def _doProcess(self, in_path, out_path): 98 def _doProcess(self, in_path, out_path):
96 raise NotImplementedError() 99 raise NotImplementedError()
97
98
99 class ProcessorPipelineRecord(Record):
100 VERSION = 1
101
102 def __init__(self):
103 super(ProcessorPipelineRecord, self).__init__()
104
105 def addEntry(self, item):
106 self.entries.append(item)
107
108 def hasOverrideEntry(self, rel_path):
109 return self.findEntry(rel_path) is not None
110
111 def findEntry(self, rel_path):
112 rel_path = rel_path.lower()
113 for entry in self.entries:
114 for out_path in entry.rel_outputs:
115 if out_path.lower() == rel_path:
116 return entry
117 return None
118
119
120 class ProcessorPipelineRecordEntry(object):
121 def __init__(self, base_dir, rel_input, is_processed=False,
122 is_overridden=False):
123 self.base_dir = base_dir
124 self.rel_input = rel_input
125 self.rel_outputs = []
126 self.is_processed = is_processed
127 self.is_overridden = is_overridden
128
129 @property
130 def path(self):
131 return os.path.join(self.base_dir, self.rel_input)
132 100
133 101
134 class ProcessingContext(object): 102 class ProcessingContext(object):
135 def __init__(self, base_dir, job_queue, record=None): 103 def __init__(self, base_dir, job_queue, record=None):
136 self.base_dir = base_dir 104 self.base_dir = base_dir
175 143
176 # Sort our processors again in case the pre-process step involved 144 # Sort our processors again in case the pre-process step involved
177 # patching the processors with some new ones. 145 # patching the processors with some new ones.
178 self.processors.sort(key=lambda p: p.priority) 146 self.processors.sort(key=lambda p: p.priority)
179 147
148 # Create the pipeline record.
149 record = TransitionalProcessorPipelineRecord()
150 record_cache = self.app.cache.getCache('baker')
151 record_name = (
152 'assets_' +
153 hashlib.md5(self.out_dir.encode('utf8')).hexdigest() +
154 '.record')
155 if not self.force and record_cache.has(record_name):
156 t = time.clock()
157 record.loadPrevious(record_cache.getCachePath(record_name))
158 logger.debug(format_timed(t, 'loaded previous bake record',
159 colored=False))
160
180 # Create the workers. 161 # Create the workers.
181 pool = [] 162 pool = []
182 queue = Queue() 163 queue = Queue()
183 abort = threading.Event() 164 abort = threading.Event()
184 pipeline_lock = threading.Lock() 165 pipeline_lock = threading.Lock()
185 record = ProcessorPipelineRecord()
186 for i in range(self.num_workers): 166 for i in range(self.num_workers):
187 ctx = ProcessingWorkerContext(self, record, queue, abort, 167 ctx = ProcessingWorkerContext(self, record,
188 pipeline_lock) 168 queue, abort, pipeline_lock)
189 worker = ProcessingWorker(i, ctx) 169 worker = ProcessingWorker(i, ctx)
190 worker.start() 170 worker.start()
191 pool.append(worker) 171 pool.append(worker)
192 172
193 if src_dir_or_file is not None: 173 if src_dir_or_file is not None:
220 for w in pool: 200 for w in pool:
221 w.join() 201 w.join()
222 if abort.is_set(): 202 if abort.is_set():
223 raise Exception("Worker pool was aborted.") 203 raise Exception("Worker pool was aborted.")
224 204
205 # Handle deletions.
206 for path, reason in record.getDeletions():
207 logger.debug("Removing '%s': %s" % (path, reason))
208 os.remove(path)
209 logger.info('[delete] %s' % path)
210
225 # Invoke post-processors. 211 # Invoke post-processors.
226 for proc in self.processors: 212 for proc in self.processors:
227 proc.onPipelineEnd(self) 213 proc.onPipelineEnd(self)
214
215 # Save the process record.
216 t = time.clock()
217 record.current.process_time = time.time()
218 record.current.out_dir = self.out_dir
219 record.collapseRecords()
220 record.saveCurrent(record_cache.getCachePath(record_name))
221 logger.debug(format_timed(t, 'saved bake record', colored=False))
228 222
229 return record 223 return record
230 224
231 def processDirectory(self, ctx, start_dir): 225 def processDirectory(self, ctx, start_dir):
232 for dirpath, dirnames, filenames in os.walk(start_dir): 226 for dirpath, dirnames, filenames in os.walk(start_dir):
244 job = ProcessingWorkerJob(ctx.base_dir, path) 238 job = ProcessingWorkerJob(ctx.base_dir, path)
245 ctx.job_queue.put_nowait(job) 239 ctx.job_queue.put_nowait(job)
246 240
247 241
248 class ProcessingWorkerContext(object): 242 class ProcessingWorkerContext(object):
249 def __init__(self, pipeline, record, work_queue, abort_event, 243 def __init__(self, pipeline, record,
250 pipeline_lock): 244 work_queue, abort_event, pipeline_lock):
251 self.pipeline = pipeline 245 self.pipeline = pipeline
252 self.record = record 246 self.record = record
253 self.work_queue = work_queue 247 self.work_queue = work_queue
254 self.abort_event = abort_event 248 self.abort_event = abort_event
255 self.pipeline_lock = pipeline_lock 249 self.pipeline_lock = pipeline_lock
289 start_time = time.clock() 283 start_time = time.clock()
290 pipeline = self.ctx.pipeline 284 pipeline = self.ctx.pipeline
291 record = self.ctx.record 285 record = self.ctx.record
292 286
293 rel_path = os.path.relpath(job.path, job.base_dir) 287 rel_path = os.path.relpath(job.path, job.base_dir)
288 previous_entry = record.getPreviousEntry(rel_path)
289 record_entry = ProcessorPipelineRecordEntry(job.base_dir, rel_path)
290 record.addEntry(record_entry)
294 291
295 # Figure out if a previously processed file is overriding this one. 292 # Figure out if a previously processed file is overriding this one.
296 # This can happen if a theme file (processed via a mount point) 293 # This can happen if a theme file (processed via a mount point)
297 # is overridden in the user's website. 294 # is overridden in the user's website.
298 if record.hasOverrideEntry(rel_path): 295 if record.current.hasOverrideEntry(rel_path):
299 record.addEntry(ProcessorPipelineRecordEntry( 296 record_entry.flags |= FLAG_OVERRIDEN
300 job.base_dir, rel_path,
301 is_processed=False, is_overridden=True))
302 logger.info(format_timed(start_time, 297 logger.info(format_timed(start_time,
303 '%s [not baked, overridden]' % rel_path)) 298 '%s [not baked, overridden]' % rel_path))
304 return 299 return
305 300
306 builder = ProcessingTreeBuilder(pipeline.processors) 301 try:
307 tree_root = builder.build(rel_path) 302 builder = ProcessingTreeBuilder(pipeline.processors)
303 tree_root = builder.build(rel_path)
304 except ProcessingTreeError as ex:
305 record_entry.errors.append(str(ex))
306 logger.error("Error processing %s: %s" % (rel_path, ex))
307 return
308
308 print_node(tree_root, recursive=True) 309 print_node(tree_root, recursive=True)
309 leaves = tree_root.getLeaves() 310 leaves = tree_root.getLeaves()
310 fi = ProcessorPipelineRecordEntry(job.base_dir, rel_path) 311 record_entry.rel_outputs = [l.path for l in leaves]
311 fi.rel_outputs = [l.path for l in leaves] 312
312 record.addEntry(fi) 313 force = (pipeline.force or previous_entry is None or
313 314 not previous_entry.was_processed_successfully)
314 force = pipeline.force
315 if not force: 315 if not force:
316 force = re_matchany(rel_path, pipeline.force_patterns) 316 force = re_matchany(rel_path, pipeline.force_patterns)
317 317
318 if force: 318 if force:
319 tree_root.setState(STATE_DIRTY, True) 319 tree_root.setState(STATE_DIRTY, True)
320 320
321 runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir, 321 try:
322 pipeline.out_dir, self.ctx.pipeline_lock) 322 runner = ProcessingTreeRunner(
323 if runner.processSubTree(tree_root): 323 job.base_dir, pipeline.tmp_dir,
324 fi.is_processed = True 324 pipeline.out_dir, self.ctx.pipeline_lock)
325 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) 325 if runner.processSubTree(tree_root):
326 record_entry.flags |= FLAG_PROCESSED
327 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path)))
328 except ProcessingTreeError as ex:
329 record_entry.errors.append(str(ex))
330 logger.error("Error processing %s: %s" % (rel_path, ex))
326 331
327 332
328 def make_re(patterns): 333 def make_re(patterns):
329 re_patterns = [] 334 re_patterns = []
330 for pat in patterns: 335 for pat in patterns: