Mercurial > piecrust2
view piecrust/baking/worker.py @ 852:4850f8c21b6e
core: Start of the big refactor for PieCrust 3.0.
* Everything is a `ContentSource`, including assets directories.
* Most content sources are subclasses of the base file-system source.
* A source is processed by a "pipeline", and there are 2 built-in pipelines,
one for assets and one for pages. The asset pipeline is vaguely functional,
but the page pipeline is completely broken right now.
* Rewrite the baking process as just running appropriate pipelines on each
content item. This should allow for better parallelization.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 17 May 2017 00:11:48 -0700 |
parents | c3cb2f9df882 |
children | f070a4fc033c |
line wrap: on
line source
import time import logging from piecrust.pipelines.base import PipelineContext, PipelineResult from piecrust.pipelines.records import ( MultiRecordHistory, MultiRecord, Record, load_records) from piecrust.sources.base import ContentItem from piecrust.workerpool import IWorker logger = logging.getLogger(__name__) class BakeWorkerContext(object): def __init__(self, appfactory, out_dir, *, force=False, previous_records_path=None, allowed_pipelines=None): self.appfactory = appfactory self.out_dir = out_dir self.force = force self.previous_records_path = previous_records_path self.allowed_pipelines = allowed_pipelines class BakeWorker(IWorker): def __init__(self, ctx): self.ctx = ctx self.app = None self.record_history = None self._work_start_time = time.perf_counter() self._sources = {} self._ppctx = None def initialize(self): # Create the app local to this worker. app = self.ctx.appfactory.create() app.config.set('baker/is_baking', True) app.config.set('baker/worker_id', self.wid) app.config.set('site/base_asset_url_format', '%uri') app.env.fs_cache_only_for_main_page = True stats = app.env.stats stats.registerTimer("BakeWorker_%d_Total" % self.wid) stats.registerTimer("BakeWorkerInit") stats.registerTimer("JobReceive") stats.registerTimer('LoadJob', raise_if_registered=False) stats.registerTimer('RenderFirstSubJob', raise_if_registered=False) stats.registerTimer('BakeJob', raise_if_registered=False) stats.registerCounter("SourceUseAbortions") stats.registerManifest("LoadJobs") stats.registerManifest("RenderJobs") stats.registerManifest("BakeJobs") self.app = app # Load previous record if self.ctx.previous_records_path: previous_records = load_records(self.ctx.previous_records_path) else: previous_records = MultiRecord() current_records = MultiRecord() self.record_history = MultiRecordHistory( previous_records, current_records) # Cache sources and create pipelines. ppclasses = {} for ppclass in app.plugin_loader.getPipelines(): ppclasses[ppclass.PIPELINE_NAME] = ppclass self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history, worker_id=self.wid, force=self.ctx.force) for src in app.sources: ppname = src.config['pipeline'] if (self.ctx.allowed_pipelines is not None and ppname not in self.ctx.allowed_pipelines): continue pp = ppclasses[ppname](src) pp.initialize(self._ppctx) self._sources[src.name] = (src, pp) stats.stepTimerSince("BakeWorkerInit", self._work_start_time) def process(self, job): logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec)) src, pp = self._sources[job.source_name] item = ContentItem(job.item_spec, job.item_metadata) record_class = pp.RECORD_CLASS or Record ppres = PipelineResult(record_class()) ppres.record.item_spec = job.item_spec pp.run(item, self._ppctx, ppres) return ppres def getStats(self): stats = self.app.env.stats stats.stepTimerSince("BakeWorker_%d_Total" % self.wid, self._work_start_time) return stats def shutdown(self): for src, pp in self._sources.values(): pp.shutdown(self._ppctx) class BakeJob: def __init__(self, source_name, item_spec, item_metadata): self.source_name = source_name self.item_spec = item_spec self.item_metadata = item_metadata class JobHandler: def __init__(self, ctx): self.ctx = ctx @property def app(self): return self.ctx.app def handleJob(self, job): raise NotImplementedError() def shutdown(self): pass def _get_errors(ex): errors = [] while ex is not None: errors.append(str(ex)) ex = ex.__cause__ return errors