Mercurial > piecrust2
diff piecrust/pipelines/_pagebaker.py @ 852:4850f8c21b6e
core: Start of the big refactor for PieCrust 3.0.
* Everything is a `ContentSource`, including assets directories.
* Most content sources are subclasses of the base file-system source.
* A source is processed by a "pipeline", and there are 2 built-in pipelines,
one for assets and one for pages. The asset pipeline is vaguely functional,
but the page pipeline is completely broken right now.
* Rewrite the baking process as just running appropriate pipelines on each
content item. This should allow for better parallelization.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 17 May 2017 00:11:48 -0700 |
parents | |
children | 08e02c2a2a1a |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/pipelines/_pagebaker.py Wed May 17 00:11:48 2017 -0700 @@ -0,0 +1,272 @@ +import os.path +import queue +import logging +import threading +import urllib.parse +from piecrust.pipelines._pagerecords import SubPagePipelineRecordEntry +from piecrust.rendering import RenderingContext, render_page, PASS_FORMATTING +from piecrust.uriutil import split_uri + + +logger = logging.getLogger(__name__) + + +class BakingError(Exception): + pass + + +class PageBaker(object): + def __init__(self, app, out_dir, force=False, copy_assets=True): + self.app = app + self.out_dir = out_dir + self.force = force + self.copy_assets = copy_assets + self.site_root = app.config.get('site/root') + self.pretty_urls = app.config.get('site/pretty_urls') + self._writer_queue = None + self._writer = None + + def startWriterQueue(self): + self._writer_queue = queue.Queue() + self._writer = threading.Thread( + name='PageSerializer', + target=_text_writer, + args=(self._writer_queue,)) + self._writer.start() + + def stopWriterQueue(self): + self._writer_queue.put_nowait(None) + self._writer.join() + + def getOutputPath(self, uri, pretty_urls): + uri_root, uri_path = split_uri(self.app, uri) + + bake_path = [self.out_dir] + decoded_uri = urllib.parse.unquote(uri_path) + if pretty_urls: + bake_path.append(decoded_uri) + bake_path.append('index.html') + elif decoded_uri == '': + bake_path.append('index.html') + else: + bake_path.append(decoded_uri) + + return os.path.normpath(os.path.join(*bake_path)) + + def bake(self, qualified_page, prev_entry, dirty_source_names): + # Start baking the sub-pages. + cur_sub = 1 + has_more_subs = True + sub_entries = [] + pretty_urls = qualified_page.config.get( + 'pretty_urls', self.pretty_urls) + + while has_more_subs: + sub_page = qualified_page.getSubPage(cur_sub) + sub_uri = sub_page.uri + logger.debug("Baking '%s' [%d]..." % (sub_uri, cur_sub)) + + out_path = self.getOutputPath(sub_uri, pretty_urls) + + # Create the sub-entry for the bake record. + sub_entry = SubPagePipelineRecordEntry(sub_uri, out_path) + sub_entries.append(sub_entry) + + # Find a corresponding sub-entry in the previous bake record. + prev_sub_entry = None + if prev_entry is not None: + try: + prev_sub_entry = prev_entry.getSub(cur_sub) + except IndexError: + pass + + # Figure out if we need to invalidate or force anything. + force_this_sub, invalidate_formatting = _compute_force_flags( + prev_sub_entry, sub_entry, dirty_source_names) + force_this_sub = force_this_sub or self.force + + # Check for up-to-date outputs. + do_bake = True + if not force_this_sub: + try: + in_path_time = qualified_page.path_mtime + out_path_time = os.path.getmtime(out_path) + if out_path_time >= in_path_time: + do_bake = False + except OSError: + # File doesn't exist, we'll need to bake. + pass + + # If this page didn't bake because it's already up-to-date. + # Keep trying for as many subs as we know this page has. + if not do_bake: + sub_entry.render_info = prev_sub_entry.copyRenderInfo() + sub_entry.flags = SubPagePipelineRecordEntry.FLAG_NONE + + if prev_entry.num_subs >= cur_sub + 1: + cur_sub += 1 + has_more_subs = True + logger.debug(" %s is up to date, skipping to next " + "sub-page." % out_path) + continue + + logger.debug(" %s is up to date, skipping bake." % out_path) + break + + # All good, proceed. + try: + if invalidate_formatting: + cache_key = sub_uri + self.app.env.rendered_segments_repository.invalidate( + cache_key) + sub_entry.flags |= \ + SubPagePipelineRecordEntry.FLAG_FORMATTING_INVALIDATED + + logger.debug(" p%d -> %s" % (cur_sub, out_path)) + rp = self._bakeSingle(qualified_page, cur_sub, out_path) + except Exception as ex: + logger.exception(ex) + page_rel_path = os.path.relpath(qualified_page.path, + self.app.root_dir) + raise BakingError("%s: error baking '%s'." % + (page_rel_path, sub_uri)) from ex + + # Record what we did. + sub_entry.flags |= SubPagePipelineRecordEntry.FLAG_BAKED + sub_entry.render_info = rp.copyRenderInfo() + + # Copy page assets. + if (cur_sub == 1 and self.copy_assets and + sub_entry.anyPass(lambda p: p.used_assets)): + if pretty_urls: + out_assets_dir = os.path.dirname(out_path) + else: + out_assets_dir, out_name = os.path.split(out_path) + if sub_uri != self.site_root: + out_name_noext, _ = os.path.splitext(out_name) + out_assets_dir = os.path.join(out_assets_dir, + out_name_noext) + + logger.debug("Copying page assets to: %s" % out_assets_dir) + _ensure_dir_exists(out_assets_dir) + + qualified_page.source.buildAssetor(qualified_page, sub_uri).copyAssets(out_assets_dir) + + # Figure out if we have more work. + has_more_subs = False + if sub_entry.anyPass(lambda p: p.pagination_has_more): + cur_sub += 1 + has_more_subs = True + + return sub_entries + + def _bakeSingle(self, qp, out_path): + ctx = RenderingContext(qp) + qp.source.prepareRenderContext(ctx) + + with self.app.env.timerScope("PageRender"): + rp = render_page(ctx) + + with self.app.env.timerScope("PageSerialize"): + if self._writer_queue is not None: + self._writer_queue.put_nowait((out_path, rp.content)) + else: + with open(out_path, 'w', encoding='utf8') as fp: + fp.write(rp.content) + + return rp + + +def _text_writer(q): + while True: + item = q.get() + if item is not None: + out_path, txt = item + out_dir = os.path.dirname(out_path) + _ensure_dir_exists(out_dir) + + with open(out_path, 'w', encoding='utf8') as fp: + fp.write(txt) + + q.task_done() + else: + # Sentinel object, terminate the thread. + q.task_done() + break + + +def _compute_force_flags(prev_sub_entry, sub_entry, dirty_source_names): + # Figure out what to do with this page. + force_this_sub = False + invalidate_formatting = False + sub_uri = sub_entry.out_uri + if (prev_sub_entry and + (prev_sub_entry.was_baked_successfully or + prev_sub_entry.was_clean)): + # If the current page is known to use pages from other sources, + # see if any of those got baked, or are going to be baked for + # some reason. If so, we need to bake this one too. + # (this happens for instance with the main page of a blog). + dirty_for_this, invalidated_render_passes = ( + _get_dirty_source_names_and_render_passes( + prev_sub_entry, dirty_source_names)) + if len(invalidated_render_passes) > 0: + logger.debug( + "'%s' is known to use sources %s, which have " + "items that got (re)baked. Will force bake this " + "page. " % (sub_uri, dirty_for_this)) + sub_entry.flags |= \ + SubPagePipelineRecordEntry.FLAG_FORCED_BY_SOURCE + force_this_sub = True + + if PASS_FORMATTING in invalidated_render_passes: + logger.debug( + "Will invalidate cached formatting for '%s' " + "since sources were using during that pass." + % sub_uri) + invalidate_formatting = True + elif (prev_sub_entry and + prev_sub_entry.errors): + # Previous bake failed. We'll have to bake it again. + logger.debug( + "Previous record entry indicates baking failed for " + "'%s'. Will bake it again." % sub_uri) + sub_entry.flags |= \ + SubPagePipelineRecordEntry.FLAG_FORCED_BY_PREVIOUS_ERRORS + force_this_sub = True + elif not prev_sub_entry: + # No previous record. We'll have to bake it. + logger.debug("No previous record entry found for '%s'. Will " + "force bake it." % sub_uri) + sub_entry.flags |= \ + SubPagePipelineRecordEntry.FLAG_FORCED_BY_NO_PREVIOUS + force_this_sub = True + + return force_this_sub, invalidate_formatting + + +def _get_dirty_source_names_and_render_passes(sub_entry, dirty_source_names): + dirty_for_this = set() + invalidated_render_passes = set() + for p, pinfo in enumerate(sub_entry.render_info): + if pinfo: + for src_name in pinfo.used_source_names: + is_dirty = (src_name in dirty_source_names) + if is_dirty: + invalidated_render_passes.add(p) + dirty_for_this.add(src_name) + break + return dirty_for_this, invalidated_render_passes + + +def _ensure_dir_exists(path): + try: + os.makedirs(path, mode=0o755, exist_ok=True) + except OSError: + # In a multiprocess environment, several process may very + # occasionally try to create the same directory at the same time. + # Let's ignore any error and if something's really wrong (like file + # acces permissions or whatever), then it will more legitimately fail + # just after this when we try to write files. + pass +