Mercurial > piecrust2
diff piecrust/pipelines/page.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | 4850f8c21b6e |
children | 504ddb370df8 |
line wrap: on
line diff
--- a/piecrust/pipelines/page.py Sun May 21 00:06:59 2017 -0700 +++ b/piecrust/pipelines/page.py Sun Jun 04 23:34:28 2017 -0700 @@ -1,25 +1,48 @@ -import hashlib +import logging from piecrust.pipelines.base import ContentPipeline +from piecrust.pipelines._pagebaker import PageBaker +from piecrust.pipelines._pagerecords import PagePipelineRecordEntry +from piecrust.rendering import ( + RenderingContext, render_page_segments) +from piecrust.sources.base import AbortedSourceUseError + + +logger = logging.getLogger(__name__) class PagePipeline(ContentPipeline): PIPELINE_NAME = 'page' - PIPELINE_PASSES = 3 + RECORD_ENTRY_CLASS = PagePipelineRecordEntry + + def __init__(self, source, ppctx): + super().__init__(source, ppctx) + self._pagebaker = None + self._stats = source.app.env.stats - def initialize(self, ctx): - pass + def initialize(self): + stats = self.app.env.stats + stats.registerCounter('SourceUseAbortions', raise_if_registered=False) - def run(self, content_item, ctx): - raise NotImplementedError() + self._pagebaker = PageBaker(self.app, + self.ctx.out_dir, + force=self.ctx.force) + self._pagebaker.startWriterQueue() - def shutdown(self, ctx): - pass + def mergeRecordEntry(self, record_entry, ctx): + existing = ctx.record.getEntry(record_entry.item_spec) + existing.errors += record_entry.errors + existing.flags |= record_entry.flags + existing.subs = record_entry.subs - def collapseRecords(self, record_history): - pass + def run(self, job, ctx, result): + pass_name = job.data.get('pass', 0) + if pass_name == 0: + self._renderSegmentsOrPostpone(job.content_item, ctx, result) + elif pass_name == 1: + self._fullRender(job.content_item, ctx, result) - def getDeletions(self, record_history): - for prev, cur in record_history.diffs(): + def getDeletions(self, ctx): + for prev, cur in ctx.record_history.diffs: if prev and not cur: for sub in prev.subs: yield (sub.out_path, 'previous source file was removed') @@ -30,344 +53,44 @@ for p in diff: yield (p, 'source file changed outputs') - -JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) - - -def _get_transition_key(path, extra_key=None): - key = path - if extra_key: - key += '+%s' % extra_key - return hashlib.md5(key.encode('utf8')).hexdigest() - + def collapseRecords(self, ctx): + pass -# def getOverrideEntry(self, path, uri): -# for pair in self.transitions.values(): -# cur = pair[1] -# if cur and cur.path != path: -# for o in cur.subs: -# if o.out_uri == uri: -# return cur -# return None - - - -# # Create the job handlers. -# job_handlers = { -# JOB_LOAD: LoadJobHandler(self.ctx), -# JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx), -# JOB_BAKE: BakeJobHandler(self.ctx)} -# for jt, jh in job_handlers.items(): -# app.env.registerTimer(type(jh).__name__) -# self.job_handlers = job_handlers -# -# def process(self, job): -# handler = self.job_handlers[job['type']] -# with self.ctx.app.env.timerScope(type(handler).__name__): -# return handler.handleJob(job['job']) + def shutdown(self): + self._pagebaker.stopWriterQueue() -# def _loadRealmPages(self, record_history, pool, factories): -# def _handler(res): -# # Create the record entry for this page. -# # This will also update the `dirty_source_names` for the record -# # as we add page files whose last modification times are later -# # than the last bake. -# record_entry = BakeRecordEntry(res['source_name'], res['path']) -# record_entry.config = res['config'] -# record_entry.timestamp = res['timestamp'] -# if res['errors']: -# record_entry.errors += res['errors'] -# record_history.current.success = False -# self._logErrors(res['path'], res['errors']) -# record_history.addEntry(record_entry) -# -# logger.debug("Loading %d realm pages..." % len(factories)) -# with format_timed_scope(logger, -# "loaded %d pages" % len(factories), -# level=logging.DEBUG, colored=False, -# timer_env=self.app.env, -# timer_category='LoadJob'): -# jobs = [] -# for fac in factories: -# job = { -# 'type': JOB_LOAD, -# 'job': save_factory(fac)} -# jobs.append(job) -# ar = pool.queueJobs(jobs, handler=_handler) -# ar.wait() -# -# def _renderRealmPages(self, record_history, pool, factories): -# def _handler(res): -# entry = record_history.getCurrentEntry(res['path']) -# if res['errors']: -# entry.errors += res['errors'] -# record_history.current.success = False -# self._logErrors(res['path'], res['errors']) -# -# logger.debug("Rendering %d realm pages..." % len(factories)) -# with format_timed_scope(logger, -# "prepared %d pages" % len(factories), -# level=logging.DEBUG, colored=False, -# timer_env=self.app.env, -# timer_category='RenderFirstSubJob'): -# jobs = [] -# for fac in factories: -# record_entry = record_history.getCurrentEntry(fac.path) -# if record_entry.errors: -# logger.debug("Ignoring %s because it had previous " -# "errors." % fac.ref_spec) -# continue -# -# # Make sure the source and the route exist for this page, -# # otherwise we add errors to the record entry and we'll skip -# # this page for the rest of the bake. -# source = self.app.getSource(fac.source.name) -# if source is None: -# record_entry.errors.append( -# "Can't get source for page: %s" % fac.ref_spec) -# logger.error(record_entry.errors[-1]) -# continue -# -# route = self.app.getSourceRoute(fac.source.name, fac.metadata) -# if route is None: -# record_entry.errors.append( -# "Can't get route for page: %s" % fac.ref_spec) -# logger.error(record_entry.errors[-1]) -# continue -# -# # All good, queue the job. -# route_index = self.app.routes.index(route) -# job = { -# 'type': JOB_RENDER_FIRST, -# 'job': { -# 'factory_info': save_factory(fac), -# 'route_index': route_index -# } -# } -# jobs.append(job) -# -# ar = pool.queueJobs(jobs, handler=_handler) -# ar.wait() -# -# def _bakeRealmPages(self, record_history, pool, realm, factories): -# def _handler(res): -# entry = record_history.getCurrentEntry(res['path']) -# entry.subs = res['sub_entries'] -# if res['errors']: -# entry.errors += res['errors'] -# self._logErrors(res['path'], res['errors']) -# if entry.has_any_error: -# record_history.current.success = False -# if entry.subs and entry.was_any_sub_baked: -# record_history.current.baked_count[realm] += 1 -# record_history.current.total_baked_count[realm] += len(entry.subs) -# -# logger.debug("Baking %d realm pages..." % len(factories)) -# with format_timed_scope(logger, -# "baked %d pages" % len(factories), -# level=logging.DEBUG, colored=False, -# timer_env=self.app.env, -# timer_category='BakeJob'): -# jobs = [] -# for fac in factories: -# job = self._makeBakeJob(record_history, fac) -# if job is not None: -# jobs.append(job) -# -# ar = pool.queueJobs(jobs, handler=_handler) -# ar.wait() -# + def _renderSegmentsOrPostpone(self, content_item, ctx, result): + # Here our job is to render the page's segments so that they're + # cached in memory and on disk... unless we detect that the page + # is using some other sources, in which case we abort and we'll try + # again on the second pass. + logger.debug("Rendering segments for: %s" % content_item.spec) + record_entry = result.record_entry + stats = self.app.env.stats + page = self.app.getPage(self.source, content_item) + record_entry.config = page.config.getAll() -# def _makeBakeJob(self, record_history, fac): -# # Get the previous (if any) and current entry for this page. -# pair = record_history.getPreviousAndCurrentEntries(fac.path) -# assert pair is not None -# prev_entry, cur_entry = pair -# assert cur_entry is not None -# -# # Ignore if there were errors in the previous passes. -# if cur_entry.errors: -# logger.debug("Ignoring %s because it had previous " -# "errors." % fac.ref_spec) -# return None -# -# # Build the route metadata and find the appropriate route. -# page = fac.buildPage() -# route_metadata = create_route_metadata(page) -# route = self.app.getSourceRoute(fac.source.name, route_metadata) -# assert route is not None -# -# # Figure out if this page is overriden by another previously -# # baked page. This happens for example when the user has -# # made a page that has the same page/URL as a theme page. -# uri = route.getUri(route_metadata) -# override_entry = record_history.getOverrideEntry(page.path, uri) -# if override_entry is not None: -# override_source = self.app.getSource( -# override_entry.source_name) -# if override_source.realm == fac.source.realm: -# cur_entry.errors.append( -# "Page '%s' maps to URL '%s' but is overriden " -# "by page '%s'." % -# (fac.ref_spec, uri, override_entry.path)) -# logger.error(cur_entry.errors[-1]) -# cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN -# return None -# -# route_index = self.app.routes.index(route) -# job = { -# 'type': JOB_BAKE, -# 'job': { -# 'factory_info': save_factory(fac), -# 'generator_name': None, -# 'generator_record_key': None, -# 'route_index': route_index, -# 'route_metadata': route_metadata, -# 'dirty_source_names': record_history.dirty_source_names -# } -# } -# return job -# -# def _handleDeletetions(self, record_history): -# logger.debug("Handling deletions...") -# for path, reason in record_history.getDeletions(): -# logger.debug("Removing '%s': %s" % (path, reason)) -# record_history.current.deleted.append(path) -# try: -# os.remove(path) -# logger.info('[delete] %s' % path) -# except OSError: -# # Not a big deal if that file had already been removed -# # by the user. -# pass -# - - + rdrctx = RenderingContext(page) + self.app.env.abort_source_use = True + try: + render_page_segments(rdrctx) + except AbortedSourceUseError: + logger.debug("Page was aborted for using source: %s" % + content_item.spec) + stats.stepCounter("SourceUseAbortions") + finally: + self.app.env.abort_source_use = False -#def save_factory(fac): -# return { -# 'source_name': fac.source.name, -# 'rel_path': fac.rel_path, -# 'metadata': fac.metadata} -# -# -#def load_factory(app, info): -# source = app.getSource(info['source_name']) -# return PageFactory(source, info['rel_path'], info['metadata']) -# -# -#class LoadJobHandler(JobHandler): -# def handleJob(self, job): -# # Just make sure the page has been cached. -# fac = load_factory(self.app, job) -# logger.debug("Loading page: %s" % fac.ref_spec) -# self.app.env.addManifestEntry('LoadJobs', fac.ref_spec) -# result = { -# 'source_name': fac.source.name, -# 'path': fac.path, -# 'config': None, -# 'timestamp': None, -# 'errors': None} -# try: -# page = fac.buildPage() -# page._load() -# result['config'] = page.config.getAll() -# result['timestamp'] = page.datetime.timestamp() -# except Exception as ex: -# logger.debug("Got loading error. Sending it to master.") -# result['errors'] = _get_errors(ex) -# if self.ctx.app.debug: -# logger.exception(ex) -# return result -# -# -#class RenderFirstSubJobHandler(JobHandler): -# def handleJob(self, job): -# # Render the segments for the first sub-page of this page. -# fac = load_factory(self.app, job['factory_info']) -# self.app.env.addManifestEntry('RenderJobs', fac.ref_spec) -# -# route_index = job['route_index'] -# route = self.app.routes[route_index] -# -# page = fac.buildPage() -# qp = QualifiedPage(page, route, route_metadata) -# ctx = RenderingContext(qp) -# self.app.env.abort_source_use = True -# -# result = { -# 'path': fac.path, -# 'aborted': False, -# 'errors': None} -# logger.debug("Preparing page: %s" % fac.ref_spec) -# try: -# render_page_segments(ctx) -# except AbortedSourceUseError: -# logger.debug("Page %s was aborted." % fac.ref_spec) -# self.app.env.stepCounter("SourceUseAbortions") -# result['aborted'] = True -# except Exception as ex: -# logger.debug("Got rendering error. Sending it to master.") -# result['errors'] = _get_errors(ex) -# if self.ctx.app.debug: -# logger.exception(ex) -# finally: -# self.app.env.abort_source_use = False -# return result -# -# -#class BakeJobHandler(JobHandler): -# def __init__(self, ctx): -# super(BakeJobHandler, self).__init__(ctx) -# self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force) -# -# def shutdown(self): -# self.page_baker.shutdown() -# -# def handleJob(self, job): -# # Actually bake the page and all its sub-pages to the output folder. -# fac = load_factory(self.app, job['factory_info']) -# self.app.env.addManifestEntry('BakeJobs', fac.ref_spec) -# -# route_index = job['route_index'] -# route_metadata = job['route_metadata'] -# route = self.app.routes[route_index] -# -# gen_name = job['generator_name'] -# gen_key = job['generator_record_key'] -# dirty_source_names = job['dirty_source_names'] -# -# page = fac.buildPage() -# qp = QualifiedPage(page, route, route_metadata) -# -# result = { -# 'path': fac.path, -# 'generator_name': gen_name, -# 'generator_record_key': gen_key, -# 'sub_entries': None, -# 'errors': None} -# -# if job.get('needs_config', False): -# result['config'] = page.config.getAll() -# -# previous_entry = None -# if self.ctx.previous_record_index is not None: -# key = _get_transition_key(fac.path, gen_key) -# previous_entry = self.ctx.previous_record_index.get(key) -# -# logger.debug("Baking page: %s" % fac.ref_spec) -# logger.debug("With route metadata: %s" % route_metadata) -# try: -# sub_entries = self.page_baker.bake( -# qp, previous_entry, dirty_source_names, gen_name) -# result['sub_entries'] = sub_entries -# -# except Exception as ex: -# logger.debug("Got baking error. Sending it to master.") -# result['errors'] = _get_errors(ex) -# if self.ctx.app.debug: -# logger.exception(ex) -# -# return result -# + result.next_pass_job = self.createJob(content_item) + result.next_pass_job.data.update({ + 'pass': 1, + 'record_entry': record_entry + }) + + def _fullRender(self, content_item, ctx, result): + logger.debug("Full render for: %s" % content_item.spec) + page = self.app.getPage(self.source, content_item) + prev_entry = ctx.previous_entry + cur_entry = result.record_entry + self._pagebaker.bake(page, prev_entry, cur_entry, [])