Mercurial > piecrust2
diff piecrust/baking/baker.py @ 3:f485ba500df3
Gigantic change to basically make PieCrust 2 vaguely functional.
- Serving works, with debug window.
- Baking works, multi-threading, with dependency handling.
- Various things not implemented yet.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 10 Aug 2014 23:43:16 -0700 |
parents | |
children | 474c9882decf |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/baking/baker.py Sun Aug 10 23:43:16 2014 -0700 @@ -0,0 +1,483 @@ +import time +import os.path +import codecs +import urllib2 +import hashlib +import logging +import threading +from Queue import Queue, Empty +from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry +from piecrust.chefutil import format_timed +from piecrust.data.filters import (PaginationFilter, HasFilterClause, + IsFilterClause, AndBooleanClause) +from piecrust.processing.base import ProcessorPipeline +from piecrust.rendering import PageRenderingContext, render_page +from piecrust.sources.base import (PageFactory, + REALM_NAMES, REALM_USER, REALM_THEME) + + +logger = logging.getLogger(__name__) + + +class PageBaker(object): + def __init__(self, app, out_dir, force=False, record=None, + copy_assets=False): + self.app = app + self.out_dir = out_dir + self.force = force + self.record = record + self.force = force + self.copy_assets = copy_assets + self.pretty_urls = app.config.get('site/pretty_urls') + self.pagination_suffix = app.config.get('site/pagination_suffix') + + def getOutputUri(self, uri, num): + suffix = self.pagination_suffix.replace('%num%', str(num)) + if self.pretty_urls: + # Output will be: + # - `uri/name` + # - `uri/name/2` + # - `uri/name.ext` + # - `uri/name.ext/2` + if num <= 1: + return uri + return uri + suffix + else: + # Output will be: + # - `uri/name.html` + # - `uri/name/2.html` + # - `uri/name.ext` + # - `uri/name/2.ext` + if uri == '/': + if num <= 1: + return '/' + return '/' + suffix.lstrip('/') + else: + if num <= 1: + return uri + #TODO: watch out for tags with dots in them. + base_uri, ext = os.path.splitext(uri) + return base_uri + suffix + ext + + def getOutputPath(self, uri): + bake_path = [self.out_dir] + decoded_uri = urllib2.unquote(uri.lstrip('/')).decode('utf8') + if self.pretty_urls: + bake_path.append(decoded_uri) + bake_path.append('index.html') + else: + name, ext = os.path.splitext(decoded_uri) + if ext: + bake_path.append(decoded_uri) + else: + bake_path.append(decoded_uri + '.html') + + return os.path.join(*bake_path) + + def bake(self, factory, route, taxonomy_name=None, taxonomy_term=None): + page = factory.buildPage() + + pagination_filter = None + custom_data = None + if taxonomy_name and taxonomy_term: + # Must bake a taxonomy listing page... we'll have to add a + # pagination filter for only get matching posts, and the output + # URL will be a bit different. + tax = self.app.getTaxonomy(taxonomy_name) + pagination_filter = PaginationFilter() + if tax.is_multiple: + if isinstance(taxonomy_term, tuple): + abc = AndBooleanClause() + for t in taxonomy_term: + abc.addClause(HasFilterClause(taxonomy_name, t)) + pagination_filter.addClause(abc) + slugified_term = '/'.join(taxonomy_term) + else: + pagination_filter.addClause(HasFilterClause(taxonomy_name, + taxonomy_term)) + slugified_term = taxonomy_term + else: + pagination_filter.addClause(IsFilterClause(taxonomy_name, + taxonomy_term)) + slugified_term = taxonomy_term + custom_data = {tax.term_name: taxonomy_term} + uri = route.getUri({tax.term_name: slugified_term}) + else: + # Normal page bake. + uri = route.getUri(factory.metadata) + + cur_sub = 1 + has_more_subs = True + cur_record_entry = BakeRecordPageEntry(page) + cur_record_entry.taxonomy_name = taxonomy_name + cur_record_entry.taxonomy_term = taxonomy_term + prev_record_entry = self.record.getPreviousEntry(page, taxonomy_name, + taxonomy_term) + + logger.debug("Baking '%s'..." % uri) + while has_more_subs: + sub_uri = self.getOutputUri(uri, cur_sub) + out_path = self.getOutputPath(sub_uri) + + # Check for up-to-date outputs. + do_bake = True + if not self.force and prev_record_entry: + try: + in_path_time = os.path.getmtime(page.path) + out_path_time = os.path.getmtime(out_path) + if out_path_time > in_path_time: + do_bake = False + except OSError: + # File doesn't exist, we'll need to bake. + pass + + # If this page didn't bake because it's already up-to-date. + # Keep trying for as many subs as we know this page has. + if not do_bake: + if (prev_record_entry is not None and + prev_record_entry.num_subs < cur_sub): + logger.debug("") + cur_sub += 1 + has_more_subs = True + logger.debug(" %s is up to date, skipping to next " + "sub-page." % out_path) + continue + + # We don't know how many subs to expect... just skip. + logger.debug(" %s is up to date, skipping bake." % out_path) + break + + # All good, proceed. + try: + logger.debug(" p%d -> %s" % (cur_sub, out_path)) + ctx, rp = self._bakeSingle(page, sub_uri, cur_sub, out_path, + pagination_filter, custom_data) + except Exception as ex: + logger.exception("Error baking page '%s' for URI '%s': %s" % + (page.ref_spec, uri, ex)) + raise + + cur_record_entry.out_uris.append(sub_uri) + cur_record_entry.out_paths.append(out_path) + cur_record_entry.used_source_names |= ctx.used_source_names + cur_record_entry.used_taxonomy_terms |= ctx.used_taxonomy_terms + + has_more_subs = False + if ctx.used_pagination is not None: + cur_record_entry.used_source_names.add( + ctx.used_pagination._source.name) + if ctx.used_pagination.has_more: + cur_sub += 1 + has_more_subs = True + + if self.record: + self.record.addEntry(cur_record_entry) + + return cur_record_entry + + def _bakeSingle(self, page, sub_uri, num, out_path, + pagination_filter=None, custom_data=None): + ctx = PageRenderingContext(page, sub_uri) + ctx.page_num = num + if pagination_filter: + ctx.pagination_filter = pagination_filter + if custom_data: + ctx.custom_data = custom_data + + rp = render_page(ctx) + + out_dir = os.path.dirname(out_path) + if not os.path.isdir(out_dir): + os.makedirs(out_dir, 0755) + + with codecs.open(out_path, 'w', 'utf-8') as fp: + fp.write(rp.content.decode('utf-8')) + + return ctx, rp + + +class Baker(object): + def __init__(self, app, out_dir=None, force=False, portable=False, + no_assets=False): + self.app = app + self.out_dir = out_dir or os.path.join(app.root_dir, '_counter') + self.force = force + self.portable = portable + self.no_assets = no_assets + self.num_workers = app.config.get('baker/workers') or 4 + + # Remember what taxonomy pages we should skip + # (we'll bake them repeatedly later with each taxonomy term) + self.taxonomy_pages = [] + logger.debug("Gathering taxonomy page paths:") + for tax in self.app.taxonomies: + for src in self.app.sources: + path = tax.resolvePagePath(src.name) + if path is not None: + self.taxonomy_pages.append(path) + logger.debug(" - %s" % path) + + def bake(self): + logger.debug(" Bake Output: %s" % self.out_dir) + logger.debug(" Root URL: %s" % self.app.config.get('site/root')) + + # Get into bake mode. + start_time = time.clock() + self.app.config.set('baker/is_baking', True) + self.app.env.base_asset_url_format = '%site_root%%uri%' + + # Make sure the output directory exists. + if not os.path.isdir(self.out_dir): + os.makedirs(self.out_dir, 0755) + + # Load/create the bake record. + record = TransitionalBakeRecord() + record_cache = self.app.cache.getCache('bake_r') + record_name = hashlib.md5(self.out_dir).hexdigest() + '.record' + if not self.force and record_cache.has(record_name): + t = time.clock() + record.loadPrevious(record_cache.getCachePath(record_name)) + logger.debug(format_timed(t, 'loaded previous bake record', + colored=False)); + + # Gather all sources by realm -- we're going to bake each realm + # separately so we can handle "overlaying" (i.e. one realm overrides + # another realm's pages). + sources_by_realm = {} + for source in self.app.sources: + srclist = sources_by_realm.setdefault(source.realm, []) + srclist.append(source) + + # Bake the realms. + realm_list = [REALM_USER, REALM_THEME] + for realm in realm_list: + srclist = sources_by_realm.get(realm) + if srclist is not None: + self._bakeRealm(record, realm, srclist) + + # Bake taxonomies. + self._bakeTaxonomies(record) + + # Bake the assets. + if not self.no_assets: + self._bakeAssets(record) + + # Save the bake record. + t = time.clock() + record.collapseRecords() + record.saveCurrent(record_cache.getCachePath(record_name)) + logger.debug(format_timed(t, 'saved bake record', colored=False)) + + # All done. + self.app.config.set('baker/is_baking', False) + logger.info('-------------------------'); + logger.info(format_timed(start_time, 'done baking')); + + def _bakeRealm(self, record, realm, srclist): + # Gather all page factories from the sources and queue them + # for the workers to pick up. Just skip taxonomy pages for now. + logger.debug("Baking realm %s" % REALM_NAMES[realm]) + pool, queue, abort = self._createWorkerPool(record, self.num_workers) + + for source in srclist: + factories = source.getPageFactories() + for fac in factories: + if fac.path in self.taxonomy_pages: + logger.debug("Skipping taxonomy page: %s:%s" % + (source.name, fac.ref_spec)) + continue + + route = self.app.getRoute(source.name, fac.metadata) + if route is None: + logger.error("Can't get route for page: %s" % fac.ref_spec) + continue + + logger.debug("Queuing: %s" % fac.ref_spec) + queue.put_nowait(BakeWorkerJob(fac, route)) + + self._waitOnWorkerPool(pool, abort) + + def _bakeTaxonomies(self, record): + logger.debug("Baking taxonomies") + + # Let's see all the taxonomy terms for which we must bake a + # listing page... first, pre-populate our big map of used terms. + buckets = {} + tax_names = [t.name for t in self.app.taxonomies] + source_names = [s.name for s in self.app.sources] + for sn in source_names: + source_taxonomies = {} + buckets[sn] = source_taxonomies + for tn in tax_names: + source_taxonomies[tn] = set() + + # Now see which ones are 'dirty' based on our bake record. + logger.debug("Gathering dirty taxonomy terms") + for prev_entry, cur_entry in record.transitions.itervalues(): + for tax in self.app.taxonomies: + changed_terms = None + # Re-bake all taxonomy pages that include new or changed + # pages. + if not prev_entry and cur_entry and cur_entry.was_baked: + changed_terms = cur_entry.config.get(tax.name) + elif prev_entry and cur_entry and cur_entry.was_baked: + changed_terms = [] + prev_terms = prev_entry.config.get(tax.name) + cur_terms = cur_entry.config.get(tax.name) + if tax.is_multiple: + if prev_terms is not None: + changed_terms += prev_terms + if cur_terms is not None: + changed_terms += cur_terms + else: + if prev_terms is not None: + changed_terms.append(prev_terms) + if cur_terms is not None: + changed_terms.append(cur_terms) + if changed_terms is not None: + if not isinstance(changed_terms, list): + changed_terms = [changed_terms] + buckets[cur_entry.source_name][tax.name] |= ( + set(changed_terms)) + + # Re-bake the combination pages for terms that are 'dirty'. + known_combinations = set() + logger.debug("Gathering dirty term combinations") + for prev_entry, cur_entry in record.transitions.itervalues(): + if cur_entry: + known_combinations |= cur_entry.used_taxonomy_terms + elif prev_entry: + known_combinations |= prev_entry.used_taxonomy_terms + for sn, tn, terms in known_combinations: + changed_terms = buckets[sn][tn] + if not changed_terms.isdisjoint(set(terms)): + changed_terms.add(terms) + + # Start baking those terms. + pool, queue, abort = self._createWorkerPool(record, self.num_workers) + for source_name, source_taxonomies in buckets.iteritems(): + for tax_name, terms in source_taxonomies.iteritems(): + if len(terms) == 0: + continue + + logger.debug("Baking '%s' for source '%s': %s" % + (tax_name, source_name, terms)) + tax = self.app.getTaxonomy(tax_name) + route = self.app.getTaxonomyRoute(tax_name, source_name) + tax_page_ref = tax.getPageRef(source_name) + if not tax_page_ref.exists: + logger.debug("No taxonomy page found at '%s', skipping." % + tax.page_ref) + continue + + tax_page_source = tax_page_ref.source + tax_page_rel_path = tax_page_ref.rel_path + logger.debug("Using taxonomy page: %s:%s" % + (tax_page_source.name, tax_page_rel_path)) + + for term in terms: + fac = PageFactory(tax_page_source, tax_page_rel_path, + {tax.term_name: term}) + logger.debug("Queuing: %s [%s, %s]" % + (fac.ref_spec, tax_name, term)) + queue.put_nowait( + BakeWorkerJob(fac, route, tax_name, term)) + + self._waitOnWorkerPool(pool, abort) + + def _bakeAssets(self, record): + baker_params = self.app.config.get('baker') or {} + skip_patterns = baker_params.get('skip_patterns') + force_patterns = baker_params.get('force_patterns') + proc = ProcessorPipeline( + self.app, self.out_dir, force=self.force, + skip_patterns=skip_patterns, force_patterns=force_patterns, + num_workers=self.num_workers) + proc.run() + + def _createWorkerPool(self, record, pool_size=4): + pool = [] + queue = Queue() + abort = threading.Event() + for i in range(pool_size): + ctx = BakeWorkerContext(self.app, self.out_dir, self.force, + record, queue, abort) + worker = BakeWorker(i, ctx) + worker.start() + pool.append(worker) + return pool, queue, abort + + def _waitOnWorkerPool(self, pool, abort): + for w in pool: + w.join() + if abort.is_set(): + raise Exception("Worker pool was aborted.") + + +class BakeWorkerContext(object): + def __init__(self, app, out_dir, force, record, work_queue, + abort_event): + self.app = app + self.out_dir = out_dir + self.force = force + self.record = record + self.work_queue = work_queue + self.abort_event = abort_event + + +class BakeWorkerJob(object): + def __init__(self, factory, route, taxonomy_name=None, taxonomy_term=None): + self.factory = factory + self.route = route + self.taxonomy_name = taxonomy_name + self.taxonomy_term = taxonomy_term + + @property + def source(self): + return self.factory.source + + +class BakeWorker(threading.Thread): + def __init__(self, wid, ctx): + super(BakeWorker, self).__init__() + self.wid = wid + self.ctx = ctx + self.num_bakes = 0 + self._page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force, + ctx.record) + + def run(self): + while(not self.ctx.abort_event.is_set()): + try: + job = self.ctx.work_queue.get(True, 0.1) + except Empty: + logger.debug("[%d] No more work... shutting down." % self.wid) + break + + try: + self._unsafeRun(job) + logger.debug("[%d] Done with page." % self.wid) + self.ctx.work_queue.task_done() + except Exception as ex: + self.ctx.abort_event.set() + logger.error("[%d] Critical error, aborting." % self.wid) + logger.exception(ex) + break + + def _unsafeRun(self, job): + start_time = time.clock() + + bake_res = self._page_baker.bake(job.factory, job.route, + taxonomy_name=job.taxonomy_name, + taxonomy_term=job.taxonomy_term) + + if bake_res.was_baked: + uri = bake_res.out_uris[0] + friendly_uri = uri if uri != '' else '[main page]' + friendly_count = '' + if bake_res.num_subs > 1: + friendly_count = ' (%d pages)' % bake_res.num_subs + logger.info(format_timed(start_time, '[%d] %s%s' % + (self.wid, friendly_uri, friendly_count))) + self.num_bakes += 1 +