Mercurial > piecrust2
diff piecrust/baking/worker.py @ 852:4850f8c21b6e
core: Start of the big refactor for PieCrust 3.0.
* Everything is a `ContentSource`, including assets directories.
* Most content sources are subclasses of the base file-system source.
* A source is processed by a "pipeline", and there are 2 built-in pipelines,
one for assets and one for pages. The asset pipeline is vaguely functional,
but the page pipeline is completely broken right now.
* Rewrite the baking process as just running appropriate pipelines on each
content item. This should allow for better parallelization.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 17 May 2017 00:11:48 -0700 |
parents | c3cb2f9df882 |
children | f070a4fc033c |
line wrap: on
line diff
--- a/piecrust/baking/worker.py Sat Apr 29 21:42:22 2017 -0700 +++ b/piecrust/baking/worker.py Wed May 17 00:11:48 2017 -0700 @@ -1,13 +1,9 @@ import time import logging -from piecrust.app import PieCrust, apply_variant_and_values -from piecrust.baking.records import BakeRecord, _get_transition_key -from piecrust.baking.single import PageBaker, BakingError -from piecrust.environment import AbortedSourceUseError -from piecrust.rendering import ( - QualifiedPage, PageRenderingContext, render_page_segments) -from piecrust.routing import create_route_metadata -from piecrust.sources.base import PageFactory +from piecrust.pipelines.base import PipelineContext, PipelineResult +from piecrust.pipelines.records import ( + MultiRecordHistory, MultiRecord, Record, load_records) +from piecrust.sources.base import ContentItem from piecrust.workerpool import IWorker @@ -16,80 +12,109 @@ class BakeWorkerContext(object): def __init__(self, appfactory, out_dir, *, - force=False, previous_record_path=None): + force=False, previous_records_path=None, + allowed_pipelines=None): self.appfactory = appfactory self.out_dir = out_dir self.force = force - self.previous_record_path = previous_record_path - self.app = None - self.previous_record = None - self.previous_record_index = None + self.previous_records_path = previous_records_path + self.allowed_pipelines = allowed_pipelines class BakeWorker(IWorker): def __init__(self, ctx): self.ctx = ctx - self.work_start_time = time.perf_counter() + self.app = None + self.record_history = None + self._work_start_time = time.perf_counter() + self._sources = {} + self._ppctx = None def initialize(self): # Create the app local to this worker. app = self.ctx.appfactory.create() app.config.set('baker/is_baking', True) app.config.set('baker/worker_id', self.wid) - app.env.base_asset_url_format = '%uri%' + app.config.set('site/base_asset_url_format', '%uri') + app.env.fs_cache_only_for_main_page = True - app.env.registerTimer("BakeWorker_%d_Total" % self.wid) - app.env.registerTimer("BakeWorkerInit") - app.env.registerTimer("JobReceive") - app.env.registerCounter("SourceUseAbortions") - app.env.registerManifest("LoadJobs") - app.env.registerManifest("RenderJobs") - app.env.registerManifest("BakeJobs") - self.ctx.app = app + + stats = app.env.stats + stats.registerTimer("BakeWorker_%d_Total" % self.wid) + stats.registerTimer("BakeWorkerInit") + stats.registerTimer("JobReceive") + stats.registerTimer('LoadJob', raise_if_registered=False) + stats.registerTimer('RenderFirstSubJob', + raise_if_registered=False) + stats.registerTimer('BakeJob', raise_if_registered=False) + + stats.registerCounter("SourceUseAbortions") + + stats.registerManifest("LoadJobs") + stats.registerManifest("RenderJobs") + stats.registerManifest("BakeJobs") + + self.app = app # Load previous record - if self.ctx.previous_record_path: - self.ctx.previous_record = BakeRecord.load( - self.ctx.previous_record_path) - self.ctx.previous_record_index = {} - for e in self.ctx.previous_record.entries: - key = _get_transition_key(e.path, e.extra_key) - self.ctx.previous_record_index[key] = e + if self.ctx.previous_records_path: + previous_records = load_records(self.ctx.previous_records_path) + else: + previous_records = MultiRecord() + current_records = MultiRecord() + self.record_history = MultiRecordHistory( + previous_records, current_records) + + # Cache sources and create pipelines. + ppclasses = {} + for ppclass in app.plugin_loader.getPipelines(): + ppclasses[ppclass.PIPELINE_NAME] = ppclass - # Create the job handlers. - job_handlers = { - JOB_LOAD: LoadJobHandler(self.ctx), - JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx), - JOB_BAKE: BakeJobHandler(self.ctx)} - for jt, jh in job_handlers.items(): - app.env.registerTimer(type(jh).__name__) - self.job_handlers = job_handlers + self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history, + worker_id=self.wid, + force=self.ctx.force) + for src in app.sources: + ppname = src.config['pipeline'] + if (self.ctx.allowed_pipelines is not None and + ppname not in self.ctx.allowed_pipelines): + continue - app.env.stepTimerSince("BakeWorkerInit", self.work_start_time) + pp = ppclasses[ppname](src) + pp.initialize(self._ppctx) + self._sources[src.name] = (src, pp) + + stats.stepTimerSince("BakeWorkerInit", self._work_start_time) def process(self, job): - handler = self.job_handlers[job['type']] - with self.ctx.app.env.timerScope(type(handler).__name__): - return handler.handleJob(job['job']) + logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec)) + src, pp = self._sources[job.source_name] + item = ContentItem(job.item_spec, job.item_metadata) - def getReport(self, pool_reports): - self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, - self.work_start_time) - data = self.ctx.app.env.getStats() - data.timers.update(pool_reports) - return { - 'type': 'stats', - 'data': data} + record_class = pp.RECORD_CLASS or Record + ppres = PipelineResult(record_class()) + ppres.record.item_spec = job.item_spec + pp.run(item, self._ppctx, ppres) + return ppres + + def getStats(self): + stats = self.app.env.stats + stats.stepTimerSince("BakeWorker_%d_Total" % self.wid, + self._work_start_time) + return stats def shutdown(self): - for jh in self.job_handlers.values(): - jh.shutdown() + for src, pp in self._sources.values(): + pp.shutdown(self._ppctx) -JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) +class BakeJob: + def __init__(self, source_name, item_spec, item_metadata): + self.source_name = source_name + self.item_spec = item_spec + self.item_metadata = item_metadata -class JobHandler(object): +class JobHandler: def __init__(self, ctx): self.ctx = ctx @@ -111,131 +136,3 @@ ex = ex.__cause__ return errors - -def save_factory(fac): - return { - 'source_name': fac.source.name, - 'rel_path': fac.rel_path, - 'metadata': fac.metadata} - - -def load_factory(app, info): - source = app.getSource(info['source_name']) - return PageFactory(source, info['rel_path'], info['metadata']) - - -class LoadJobHandler(JobHandler): - def handleJob(self, job): - # Just make sure the page has been cached. - fac = load_factory(self.app, job) - logger.debug("Loading page: %s" % fac.ref_spec) - self.app.env.addManifestEntry('LoadJobs', fac.ref_spec) - result = { - 'source_name': fac.source.name, - 'path': fac.path, - 'config': None, - 'timestamp': None, - 'errors': None} - try: - page = fac.buildPage() - page._load() - result['config'] = page.config.getAll() - result['timestamp'] = page.datetime.timestamp() - except Exception as ex: - logger.debug("Got loading error. Sending it to master.") - result['errors'] = _get_errors(ex) - if self.ctx.app.debug: - logger.exception(ex) - return result - - -class RenderFirstSubJobHandler(JobHandler): - def handleJob(self, job): - # Render the segments for the first sub-page of this page. - fac = load_factory(self.app, job['factory_info']) - self.app.env.addManifestEntry('RenderJobs', fac.ref_spec) - - route_index = job['route_index'] - route = self.app.routes[route_index] - - page = fac.buildPage() - route_metadata = create_route_metadata(page) - qp = QualifiedPage(page, route, route_metadata) - ctx = PageRenderingContext(qp) - self.app.env.abort_source_use = True - - result = { - 'path': fac.path, - 'aborted': False, - 'errors': None} - logger.debug("Preparing page: %s" % fac.ref_spec) - try: - render_page_segments(ctx) - except AbortedSourceUseError: - logger.debug("Page %s was aborted." % fac.ref_spec) - self.app.env.stepCounter("SourceUseAbortions") - result['aborted'] = True - except Exception as ex: - logger.debug("Got rendering error. Sending it to master.") - result['errors'] = _get_errors(ex) - if self.ctx.app.debug: - logger.exception(ex) - finally: - self.app.env.abort_source_use = False - return result - - -class BakeJobHandler(JobHandler): - def __init__(self, ctx): - super(BakeJobHandler, self).__init__(ctx) - self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force) - - def shutdown(self): - self.page_baker.shutdown() - - def handleJob(self, job): - # Actually bake the page and all its sub-pages to the output folder. - fac = load_factory(self.app, job['factory_info']) - self.app.env.addManifestEntry('BakeJobs', fac.ref_spec) - - route_index = job['route_index'] - route_metadata = job['route_metadata'] - route = self.app.routes[route_index] - - gen_name = job['generator_name'] - gen_key = job['generator_record_key'] - dirty_source_names = job['dirty_source_names'] - - page = fac.buildPage() - qp = QualifiedPage(page, route, route_metadata) - - result = { - 'path': fac.path, - 'generator_name': gen_name, - 'generator_record_key': gen_key, - 'sub_entries': None, - 'errors': None} - - if job.get('needs_config', False): - result['config'] = page.config.getAll() - - previous_entry = None - if self.ctx.previous_record_index is not None: - key = _get_transition_key(fac.path, gen_key) - previous_entry = self.ctx.previous_record_index.get(key) - - logger.debug("Baking page: %s" % fac.ref_spec) - logger.debug("With route metadata: %s" % route_metadata) - try: - sub_entries = self.page_baker.bake( - qp, previous_entry, dirty_source_names, gen_name) - result['sub_entries'] = sub_entries - - except Exception as ex: - logger.debug("Got baking error. Sending it to master.") - result['errors'] = _get_errors(ex) - if self.ctx.app.debug: - logger.exception(ex) - - return result -