comparison piecrust/baking/baker.py @ 3:f485ba500df3

Gigantic change to basically make PieCrust 2 vaguely functional. - Serving works, with debug window. - Baking works, multi-threading, with dependency handling. - Various things not implemented yet.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 10 Aug 2014 23:43:16 -0700
parents
children 474c9882decf
comparison
equal deleted inserted replaced
2:40fa08b261b9 3:f485ba500df3
1 import time
2 import os.path
3 import codecs
4 import urllib2
5 import hashlib
6 import logging
7 import threading
8 from Queue import Queue, Empty
9 from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry
10 from piecrust.chefutil import format_timed
11 from piecrust.data.filters import (PaginationFilter, HasFilterClause,
12 IsFilterClause, AndBooleanClause)
13 from piecrust.processing.base import ProcessorPipeline
14 from piecrust.rendering import PageRenderingContext, render_page
15 from piecrust.sources.base import (PageFactory,
16 REALM_NAMES, REALM_USER, REALM_THEME)
17
18
19 logger = logging.getLogger(__name__)
20
21
22 class PageBaker(object):
23 def __init__(self, app, out_dir, force=False, record=None,
24 copy_assets=False):
25 self.app = app
26 self.out_dir = out_dir
27 self.force = force
28 self.record = record
29 self.force = force
30 self.copy_assets = copy_assets
31 self.pretty_urls = app.config.get('site/pretty_urls')
32 self.pagination_suffix = app.config.get('site/pagination_suffix')
33
34 def getOutputUri(self, uri, num):
35 suffix = self.pagination_suffix.replace('%num%', str(num))
36 if self.pretty_urls:
37 # Output will be:
38 # - `uri/name`
39 # - `uri/name/2`
40 # - `uri/name.ext`
41 # - `uri/name.ext/2`
42 if num <= 1:
43 return uri
44 return uri + suffix
45 else:
46 # Output will be:
47 # - `uri/name.html`
48 # - `uri/name/2.html`
49 # - `uri/name.ext`
50 # - `uri/name/2.ext`
51 if uri == '/':
52 if num <= 1:
53 return '/'
54 return '/' + suffix.lstrip('/')
55 else:
56 if num <= 1:
57 return uri
58 #TODO: watch out for tags with dots in them.
59 base_uri, ext = os.path.splitext(uri)
60 return base_uri + suffix + ext
61
62 def getOutputPath(self, uri):
63 bake_path = [self.out_dir]
64 decoded_uri = urllib2.unquote(uri.lstrip('/')).decode('utf8')
65 if self.pretty_urls:
66 bake_path.append(decoded_uri)
67 bake_path.append('index.html')
68 else:
69 name, ext = os.path.splitext(decoded_uri)
70 if ext:
71 bake_path.append(decoded_uri)
72 else:
73 bake_path.append(decoded_uri + '.html')
74
75 return os.path.join(*bake_path)
76
77 def bake(self, factory, route, taxonomy_name=None, taxonomy_term=None):
78 page = factory.buildPage()
79
80 pagination_filter = None
81 custom_data = None
82 if taxonomy_name and taxonomy_term:
83 # Must bake a taxonomy listing page... we'll have to add a
84 # pagination filter for only get matching posts, and the output
85 # URL will be a bit different.
86 tax = self.app.getTaxonomy(taxonomy_name)
87 pagination_filter = PaginationFilter()
88 if tax.is_multiple:
89 if isinstance(taxonomy_term, tuple):
90 abc = AndBooleanClause()
91 for t in taxonomy_term:
92 abc.addClause(HasFilterClause(taxonomy_name, t))
93 pagination_filter.addClause(abc)
94 slugified_term = '/'.join(taxonomy_term)
95 else:
96 pagination_filter.addClause(HasFilterClause(taxonomy_name,
97 taxonomy_term))
98 slugified_term = taxonomy_term
99 else:
100 pagination_filter.addClause(IsFilterClause(taxonomy_name,
101 taxonomy_term))
102 slugified_term = taxonomy_term
103 custom_data = {tax.term_name: taxonomy_term}
104 uri = route.getUri({tax.term_name: slugified_term})
105 else:
106 # Normal page bake.
107 uri = route.getUri(factory.metadata)
108
109 cur_sub = 1
110 has_more_subs = True
111 cur_record_entry = BakeRecordPageEntry(page)
112 cur_record_entry.taxonomy_name = taxonomy_name
113 cur_record_entry.taxonomy_term = taxonomy_term
114 prev_record_entry = self.record.getPreviousEntry(page, taxonomy_name,
115 taxonomy_term)
116
117 logger.debug("Baking '%s'..." % uri)
118 while has_more_subs:
119 sub_uri = self.getOutputUri(uri, cur_sub)
120 out_path = self.getOutputPath(sub_uri)
121
122 # Check for up-to-date outputs.
123 do_bake = True
124 if not self.force and prev_record_entry:
125 try:
126 in_path_time = os.path.getmtime(page.path)
127 out_path_time = os.path.getmtime(out_path)
128 if out_path_time > in_path_time:
129 do_bake = False
130 except OSError:
131 # File doesn't exist, we'll need to bake.
132 pass
133
134 # If this page didn't bake because it's already up-to-date.
135 # Keep trying for as many subs as we know this page has.
136 if not do_bake:
137 if (prev_record_entry is not None and
138 prev_record_entry.num_subs < cur_sub):
139 logger.debug("")
140 cur_sub += 1
141 has_more_subs = True
142 logger.debug(" %s is up to date, skipping to next "
143 "sub-page." % out_path)
144 continue
145
146 # We don't know how many subs to expect... just skip.
147 logger.debug(" %s is up to date, skipping bake." % out_path)
148 break
149
150 # All good, proceed.
151 try:
152 logger.debug(" p%d -> %s" % (cur_sub, out_path))
153 ctx, rp = self._bakeSingle(page, sub_uri, cur_sub, out_path,
154 pagination_filter, custom_data)
155 except Exception as ex:
156 logger.exception("Error baking page '%s' for URI '%s': %s" %
157 (page.ref_spec, uri, ex))
158 raise
159
160 cur_record_entry.out_uris.append(sub_uri)
161 cur_record_entry.out_paths.append(out_path)
162 cur_record_entry.used_source_names |= ctx.used_source_names
163 cur_record_entry.used_taxonomy_terms |= ctx.used_taxonomy_terms
164
165 has_more_subs = False
166 if ctx.used_pagination is not None:
167 cur_record_entry.used_source_names.add(
168 ctx.used_pagination._source.name)
169 if ctx.used_pagination.has_more:
170 cur_sub += 1
171 has_more_subs = True
172
173 if self.record:
174 self.record.addEntry(cur_record_entry)
175
176 return cur_record_entry
177
178 def _bakeSingle(self, page, sub_uri, num, out_path,
179 pagination_filter=None, custom_data=None):
180 ctx = PageRenderingContext(page, sub_uri)
181 ctx.page_num = num
182 if pagination_filter:
183 ctx.pagination_filter = pagination_filter
184 if custom_data:
185 ctx.custom_data = custom_data
186
187 rp = render_page(ctx)
188
189 out_dir = os.path.dirname(out_path)
190 if not os.path.isdir(out_dir):
191 os.makedirs(out_dir, 0755)
192
193 with codecs.open(out_path, 'w', 'utf-8') as fp:
194 fp.write(rp.content.decode('utf-8'))
195
196 return ctx, rp
197
198
199 class Baker(object):
200 def __init__(self, app, out_dir=None, force=False, portable=False,
201 no_assets=False):
202 self.app = app
203 self.out_dir = out_dir or os.path.join(app.root_dir, '_counter')
204 self.force = force
205 self.portable = portable
206 self.no_assets = no_assets
207 self.num_workers = app.config.get('baker/workers') or 4
208
209 # Remember what taxonomy pages we should skip
210 # (we'll bake them repeatedly later with each taxonomy term)
211 self.taxonomy_pages = []
212 logger.debug("Gathering taxonomy page paths:")
213 for tax in self.app.taxonomies:
214 for src in self.app.sources:
215 path = tax.resolvePagePath(src.name)
216 if path is not None:
217 self.taxonomy_pages.append(path)
218 logger.debug(" - %s" % path)
219
220 def bake(self):
221 logger.debug(" Bake Output: %s" % self.out_dir)
222 logger.debug(" Root URL: %s" % self.app.config.get('site/root'))
223
224 # Get into bake mode.
225 start_time = time.clock()
226 self.app.config.set('baker/is_baking', True)
227 self.app.env.base_asset_url_format = '%site_root%%uri%'
228
229 # Make sure the output directory exists.
230 if not os.path.isdir(self.out_dir):
231 os.makedirs(self.out_dir, 0755)
232
233 # Load/create the bake record.
234 record = TransitionalBakeRecord()
235 record_cache = self.app.cache.getCache('bake_r')
236 record_name = hashlib.md5(self.out_dir).hexdigest() + '.record'
237 if not self.force and record_cache.has(record_name):
238 t = time.clock()
239 record.loadPrevious(record_cache.getCachePath(record_name))
240 logger.debug(format_timed(t, 'loaded previous bake record',
241 colored=False));
242
243 # Gather all sources by realm -- we're going to bake each realm
244 # separately so we can handle "overlaying" (i.e. one realm overrides
245 # another realm's pages).
246 sources_by_realm = {}
247 for source in self.app.sources:
248 srclist = sources_by_realm.setdefault(source.realm, [])
249 srclist.append(source)
250
251 # Bake the realms.
252 realm_list = [REALM_USER, REALM_THEME]
253 for realm in realm_list:
254 srclist = sources_by_realm.get(realm)
255 if srclist is not None:
256 self._bakeRealm(record, realm, srclist)
257
258 # Bake taxonomies.
259 self._bakeTaxonomies(record)
260
261 # Bake the assets.
262 if not self.no_assets:
263 self._bakeAssets(record)
264
265 # Save the bake record.
266 t = time.clock()
267 record.collapseRecords()
268 record.saveCurrent(record_cache.getCachePath(record_name))
269 logger.debug(format_timed(t, 'saved bake record', colored=False))
270
271 # All done.
272 self.app.config.set('baker/is_baking', False)
273 logger.info('-------------------------');
274 logger.info(format_timed(start_time, 'done baking'));
275
276 def _bakeRealm(self, record, realm, srclist):
277 # Gather all page factories from the sources and queue them
278 # for the workers to pick up. Just skip taxonomy pages for now.
279 logger.debug("Baking realm %s" % REALM_NAMES[realm])
280 pool, queue, abort = self._createWorkerPool(record, self.num_workers)
281
282 for source in srclist:
283 factories = source.getPageFactories()
284 for fac in factories:
285 if fac.path in self.taxonomy_pages:
286 logger.debug("Skipping taxonomy page: %s:%s" %
287 (source.name, fac.ref_spec))
288 continue
289
290 route = self.app.getRoute(source.name, fac.metadata)
291 if route is None:
292 logger.error("Can't get route for page: %s" % fac.ref_spec)
293 continue
294
295 logger.debug("Queuing: %s" % fac.ref_spec)
296 queue.put_nowait(BakeWorkerJob(fac, route))
297
298 self._waitOnWorkerPool(pool, abort)
299
300 def _bakeTaxonomies(self, record):
301 logger.debug("Baking taxonomies")
302
303 # Let's see all the taxonomy terms for which we must bake a
304 # listing page... first, pre-populate our big map of used terms.
305 buckets = {}
306 tax_names = [t.name for t in self.app.taxonomies]
307 source_names = [s.name for s in self.app.sources]
308 for sn in source_names:
309 source_taxonomies = {}
310 buckets[sn] = source_taxonomies
311 for tn in tax_names:
312 source_taxonomies[tn] = set()
313
314 # Now see which ones are 'dirty' based on our bake record.
315 logger.debug("Gathering dirty taxonomy terms")
316 for prev_entry, cur_entry in record.transitions.itervalues():
317 for tax in self.app.taxonomies:
318 changed_terms = None
319 # Re-bake all taxonomy pages that include new or changed
320 # pages.
321 if not prev_entry and cur_entry and cur_entry.was_baked:
322 changed_terms = cur_entry.config.get(tax.name)
323 elif prev_entry and cur_entry and cur_entry.was_baked:
324 changed_terms = []
325 prev_terms = prev_entry.config.get(tax.name)
326 cur_terms = cur_entry.config.get(tax.name)
327 if tax.is_multiple:
328 if prev_terms is not None:
329 changed_terms += prev_terms
330 if cur_terms is not None:
331 changed_terms += cur_terms
332 else:
333 if prev_terms is not None:
334 changed_terms.append(prev_terms)
335 if cur_terms is not None:
336 changed_terms.append(cur_terms)
337 if changed_terms is not None:
338 if not isinstance(changed_terms, list):
339 changed_terms = [changed_terms]
340 buckets[cur_entry.source_name][tax.name] |= (
341 set(changed_terms))
342
343 # Re-bake the combination pages for terms that are 'dirty'.
344 known_combinations = set()
345 logger.debug("Gathering dirty term combinations")
346 for prev_entry, cur_entry in record.transitions.itervalues():
347 if cur_entry:
348 known_combinations |= cur_entry.used_taxonomy_terms
349 elif prev_entry:
350 known_combinations |= prev_entry.used_taxonomy_terms
351 for sn, tn, terms in known_combinations:
352 changed_terms = buckets[sn][tn]
353 if not changed_terms.isdisjoint(set(terms)):
354 changed_terms.add(terms)
355
356 # Start baking those terms.
357 pool, queue, abort = self._createWorkerPool(record, self.num_workers)
358 for source_name, source_taxonomies in buckets.iteritems():
359 for tax_name, terms in source_taxonomies.iteritems():
360 if len(terms) == 0:
361 continue
362
363 logger.debug("Baking '%s' for source '%s': %s" %
364 (tax_name, source_name, terms))
365 tax = self.app.getTaxonomy(tax_name)
366 route = self.app.getTaxonomyRoute(tax_name, source_name)
367 tax_page_ref = tax.getPageRef(source_name)
368 if not tax_page_ref.exists:
369 logger.debug("No taxonomy page found at '%s', skipping." %
370 tax.page_ref)
371 continue
372
373 tax_page_source = tax_page_ref.source
374 tax_page_rel_path = tax_page_ref.rel_path
375 logger.debug("Using taxonomy page: %s:%s" %
376 (tax_page_source.name, tax_page_rel_path))
377
378 for term in terms:
379 fac = PageFactory(tax_page_source, tax_page_rel_path,
380 {tax.term_name: term})
381 logger.debug("Queuing: %s [%s, %s]" %
382 (fac.ref_spec, tax_name, term))
383 queue.put_nowait(
384 BakeWorkerJob(fac, route, tax_name, term))
385
386 self._waitOnWorkerPool(pool, abort)
387
388 def _bakeAssets(self, record):
389 baker_params = self.app.config.get('baker') or {}
390 skip_patterns = baker_params.get('skip_patterns')
391 force_patterns = baker_params.get('force_patterns')
392 proc = ProcessorPipeline(
393 self.app, self.out_dir, force=self.force,
394 skip_patterns=skip_patterns, force_patterns=force_patterns,
395 num_workers=self.num_workers)
396 proc.run()
397
398 def _createWorkerPool(self, record, pool_size=4):
399 pool = []
400 queue = Queue()
401 abort = threading.Event()
402 for i in range(pool_size):
403 ctx = BakeWorkerContext(self.app, self.out_dir, self.force,
404 record, queue, abort)
405 worker = BakeWorker(i, ctx)
406 worker.start()
407 pool.append(worker)
408 return pool, queue, abort
409
410 def _waitOnWorkerPool(self, pool, abort):
411 for w in pool:
412 w.join()
413 if abort.is_set():
414 raise Exception("Worker pool was aborted.")
415
416
417 class BakeWorkerContext(object):
418 def __init__(self, app, out_dir, force, record, work_queue,
419 abort_event):
420 self.app = app
421 self.out_dir = out_dir
422 self.force = force
423 self.record = record
424 self.work_queue = work_queue
425 self.abort_event = abort_event
426
427
428 class BakeWorkerJob(object):
429 def __init__(self, factory, route, taxonomy_name=None, taxonomy_term=None):
430 self.factory = factory
431 self.route = route
432 self.taxonomy_name = taxonomy_name
433 self.taxonomy_term = taxonomy_term
434
435 @property
436 def source(self):
437 return self.factory.source
438
439
440 class BakeWorker(threading.Thread):
441 def __init__(self, wid, ctx):
442 super(BakeWorker, self).__init__()
443 self.wid = wid
444 self.ctx = ctx
445 self.num_bakes = 0
446 self._page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force,
447 ctx.record)
448
449 def run(self):
450 while(not self.ctx.abort_event.is_set()):
451 try:
452 job = self.ctx.work_queue.get(True, 0.1)
453 except Empty:
454 logger.debug("[%d] No more work... shutting down." % self.wid)
455 break
456
457 try:
458 self._unsafeRun(job)
459 logger.debug("[%d] Done with page." % self.wid)
460 self.ctx.work_queue.task_done()
461 except Exception as ex:
462 self.ctx.abort_event.set()
463 logger.error("[%d] Critical error, aborting." % self.wid)
464 logger.exception(ex)
465 break
466
467 def _unsafeRun(self, job):
468 start_time = time.clock()
469
470 bake_res = self._page_baker.bake(job.factory, job.route,
471 taxonomy_name=job.taxonomy_name,
472 taxonomy_term=job.taxonomy_term)
473
474 if bake_res.was_baked:
475 uri = bake_res.out_uris[0]
476 friendly_uri = uri if uri != '' else '[main page]'
477 friendly_count = ''
478 if bake_res.num_subs > 1:
479 friendly_count = ' (%d pages)' % bake_res.num_subs
480 logger.info(format_timed(start_time, '[%d] %s%s' %
481 (self.wid, friendly_uri, friendly_count)))
482 self.num_bakes += 1
483