Mercurial > piecrust2
view piecrust/pipelines/base.py @ 1079:00a0a65d08e6
data: Fix debug rendering of the family data.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 15 Feb 2018 21:15:30 -0800 |
parents | 971b4d67e82a |
children | 5f97b5b59dfe |
line wrap: on
line source
import os.path import logging from werkzeug.utils import cached_property from piecrust.configuration import ConfigurationError from piecrust.sources.base import ContentItem logger = logging.getLogger(__name__) class PipelineContext: """ The context for running a content pipeline. """ def __init__(self, out_dir, *, worker_id=-1, force=None): self.out_dir = out_dir self.worker_id = worker_id self.force = force @property def is_worker(self): """ Returns `True` if the content pipeline is running inside a worker process, and this is the first one. """ return self.worker_id >= 0 @property def is_main_process(self): """ Returns `True` is the content pipeline is running inside the main process (and not a worker process). This is the case if there are no worker processes at all. """ return self.worker_id < 0 class _PipelineMasterProcessJobContextBase: def __init__(self, record_name, record_histories): self.record_name = record_name self.record_histories = record_histories @property def previous_record(self): return self.record_histories.getPreviousRecord(self.record_name) @property def current_record(self): return self.record_histories.getCurrentRecord(self.record_name) class PipelineJobCreateContext(_PipelineMasterProcessJobContextBase): """ Context for creating pipeline baking jobs. This is run on the master process, so it can access both the previous and current records. """ def __init__(self, pass_num, record_name, record_histories): super().__init__(record_name, record_histories) self.pass_num = pass_num class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase): """ Context for validating jobs on subsequent step runs (i.e. validating the list of jobs to run starting with the second step). This is run on the master process, so it can access both the previous and current records. """ def __init__(self, pass_num, step_num, record_name, record_histories): super().__init__(record_name, record_histories) self.pass_num = pass_num self.step_num = step_num class PipelineJobRunContext: """ Context for running pipeline baking jobs. This is run on the worker processes, so it can only access the previous records. """ def __init__(self, job, record_name, previous_records): self.job = job self.record_name = record_name self.previous_records = previous_records @cached_property def record_entry_spec(self): return self.job.get('record_entry_spec', self.job['job_spec'][1]) @cached_property def previous_record(self): return self.previous_records.getRecord(self.record_name) @cached_property def previous_entry(self): return self.previous_record.getEntry(self.record_entry_spec) class PipelineJobResultHandleContext: """ The context for handling the result from a job that ran in a worker process. This is run on the master process, so it can access the current record. """ def __init__(self, record, job, pass_num, step_num): self.record = record self.job = job self.pass_num = pass_num self.step_num = step_num @cached_property def record_entry(self): record_entry_spec = self.job.get('record_entry_spec', self.job['job_spec'][1]) return self.record.getEntry(record_entry_spec) class PipelinePostJobRunContext: def __init__(self, record_history): self.record_history = record_history class PipelineDeletionContext: def __init__(self, record_history): self.record_history = record_history class PipelineCollapseRecordContext: def __init__(self, record_history): self.record_history = record_history class ContentPipeline: """ A pipeline that processes content from a `ContentSource`. """ PIPELINE_NAME = None RECORD_ENTRY_CLASS = None PASS_NUM = 0 def __init__(self, source, ctx): self.source = source self.ctx = ctx self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME) app = source.app tmp_dir = app.cache_dir if not tmp_dir: import tempfile tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') self.tmp_dir = os.path.join(tmp_dir, self.PIPELINE_NAME) @property def app(self): return self.source.app def initialize(self): pass def createJobs(self, ctx): return [ create_job(self, item.spec) for item in self.source.getAllContents()] def createRecordEntry(self, item_spec): entry_class = self.RECORD_ENTRY_CLASS record_entry = entry_class() record_entry.item_spec = item_spec return record_entry def handleJobResult(self, result, ctx): raise NotImplementedError() def validateNextStepJobs(self, jobs, ctx): pass def run(self, job, ctx, result): raise NotImplementedError() def postJobRun(self, ctx): pass def getDeletions(self, ctx): pass def collapseRecords(self, ctx): pass def shutdown(self): pass def create_job(pipeline, item_spec, **kwargs): job = { 'job_spec': (pipeline.source.name, item_spec) } job.update(kwargs) return job def content_item_from_job(pipeline, job): return pipeline.source.findContentFromSpec(job['job_spec'][1]) def get_record_name_for_source(source): ppname = get_pipeline_name_for_source(source) return '%s@%s' % (source.name, ppname) def get_pipeline_name_for_source(source): pname = source.config['pipeline'] if not pname: pname = source.DEFAULT_PIPELINE_NAME if not pname: raise ConfigurationError( "Source '%s' doesn't specify any pipeline." % source.name) return pname class PipelineManager: def __init__(self, app, out_dir, *, record_histories=None, worker_id=-1, force=False): self.app = app self.record_histories = record_histories self.out_dir = out_dir self.worker_id = worker_id self.force = force self._pipeline_classes = {} for pclass in app.plugin_loader.getPipelines(): self._pipeline_classes[pclass.PIPELINE_NAME] = pclass self._pipelines = {} def getPipeline(self, source_name): return self.getPipelineInfo(source_name).pipeline def getPipelineInfo(self, source_name): return self._pipelines[source_name] def getPipelineInfos(self): return self._pipelines.values() def createPipeline(self, source): if source.name in self._pipelines: raise ValueError("Pipeline for source '%s' was already created." % source.name) pname = get_pipeline_name_for_source(source) ppctx = PipelineContext(self.out_dir, worker_id=self.worker_id, force=self.force) pp = self._pipeline_classes[pname](source, ppctx) pp.initialize() record_history = None if self.record_histories: record_history = self.record_histories.getHistory(pp.record_name) info = _PipelineInfo(pp, record_history) self._pipelines[source.name] = info return info def postJobRun(self): for ppinfo in self.getPipelineInfos(): ppinfo.record_history.build() for ppinfo in self.getPipelineInfos(): ctx = PipelinePostJobRunContext(ppinfo.record_history) ppinfo.pipeline.postJobRun(ctx) def deleteStaleOutputs(self): for ppinfo in self.getPipelineInfos(): ctx = PipelineDeletionContext(ppinfo.record_history) to_delete = ppinfo.pipeline.getDeletions(ctx) current_record = ppinfo.record_history.current if to_delete is not None: for path, reason in to_delete: logger.debug("Removing '%s': %s" % (path, reason)) current_record.deleted_out_paths.append(path) try: os.remove(path) except FileNotFoundError: pass logger.info('[delete] %s' % path) def collapseRecords(self, keep_unused_records=False): seen_records = [] for ppinfo in self.getPipelineInfos(): ctx = PipelineCollapseRecordContext(ppinfo.record_history) ppinfo.pipeline.collapseRecords(ctx) seen_records.append(ppinfo.pipeline.record_name) if keep_unused_records: cur_recs = self.record_histories.current prev_recs = self.record_histories.previous for prev_rec in prev_recs.records: if prev_rec.name in seen_records: continue logger.debug("Keeping record: %s" % prev_rec.name) cur_recs.records.append(prev_rec) def shutdownPipelines(self): for ppinfo in self.getPipelineInfos(): ppinfo.pipeline.shutdown() self._pipelines = {} class _PipelineInfo: def __init__(self, pipeline, record_history): self.pipeline = pipeline self.record_history = record_history self.userdata = None @property def source(self): return self.pipeline.source @property def current_record(self): if self.record_history is not None: return self.record_history.current raise Exception("The current record is not available.") @property def pipeline_name(self): return self.pipeline.PIPELINE_NAME