Mercurial > piecrust2
diff piecrust/pipelines/base.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | d6d35b2efd04 |
children | fa489c5e829e |
line wrap: on
line diff
--- a/piecrust/pipelines/base.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/base.py Sun Nov 19 14:29:17 2017 -0800 @@ -2,6 +2,7 @@ import logging from werkzeug.utils import cached_property from piecrust.configuration import ConfigurationError +from piecrust.sources.base import ContentItem logger = logging.getLogger(__name__) @@ -32,69 +33,87 @@ return self.worker_id < 0 -class PipelineJob: - """ Base class for a pipline baking job. - """ - def __init__(self, pipeline, content_item): - self.source_name = pipeline.source.name - self.record_name = pipeline.record_name - self.content_item = content_item - self.step_num = 0 - self.data = {} +class _PipelineMasterProcessJobContextBase: + def __init__(self, record_name, record_histories): + self.record_name = record_name + self.record_histories = record_histories + + @property + def previous_record(self): + return self.record_histories.getPreviousRecord(self.record_name) + + @property + def current_record(self): + return self.record_histories.getCurrentRecord(self.record_name) -class PipelineJobCreateContext: - """ Context for create pipeline baking jobs. +class PipelineJobCreateContext(_PipelineMasterProcessJobContextBase): + """ Context for creating pipeline baking jobs. + + This is run on the master process, so it can access both the + previous and current records. """ - def __init__(self, step_num, record_histories): + def __init__(self, pass_num, record_name, record_histories): + super().__init__(record_name, record_histories) + self.pass_num = pass_num + + +class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase): + """ Context for validating jobs on subsequent step runs (i.e. validating + the list of jobs to run starting with the second step). + + This is run on the master process, so it can access both the + previous and current records. + """ + def __init__(self, pass_num, step_num, record_name, record_histories): + super().__init__(record_name, record_histories) + self.pass_num = pass_num self.step_num = step_num - self.record_histories = record_histories class PipelineJobRunContext: """ Context for running pipeline baking jobs. + + This is run on the worker processes, so it can only access the + previous records. """ - def __init__(self, job, record_name, record_histories): + def __init__(self, job, record_name, previous_records): self.job = job self.record_name = record_name - self.record_histories = record_histories + self.previous_records = previous_records - @property - def content_item(self): - return self.job.content_item + @cached_property + def record_entry_spec(self): + return self.job.get('record_entry_spec', + self.job['job_spec'][1]) @cached_property def previous_record(self): - return self.record_histories.getPreviousRecord(self.record_name) - - @cached_property - def record_entry_spec(self): - content_item = self.content_item - return content_item.metadata.get('record_entry_spec', - content_item.spec) + return self.previous_records.getRecord(self.record_name) @cached_property def previous_entry(self): return self.previous_record.getEntry(self.record_entry_spec) -class PipelineJobResult: - """ Result of running a pipeline on a content item. - """ - def __init__(self): - self.record_entry = None - self.next_step_job = None +class PipelineJobResultHandleContext: + """ The context for handling the result from a job that ran in a + worker process. - -class PipelineMergeRecordContext: - """ The context for merging a record entry for a second or higher pass - into the bake record. + This is run on the master process, so it can access the current + record. """ def __init__(self, record, job, step_num): self.record = record self.job = job self.step_num = step_num + @cached_property + def record_entry(self): + record_entry_spec = self.job.get('record_entry_spec', + self.job['job_spec'][1]) + return self.record.getEntry(record_entry_spec) + class PipelinePostJobRunContext: def __init__(self, record_history): @@ -137,23 +156,26 @@ def initialize(self): pass + def loadAllContents(self): + return None + def createJobs(self, ctx): return [ - self.createJob(item) + create_job(self, item.spec) for item in self.source.getAllContents()] - def createJob(self, content_item): - return PipelineJob(self, content_item) - - def createRecordEntry(self, job, ctx): + def createRecordEntry(self, item_spec): entry_class = self.RECORD_ENTRY_CLASS record_entry = entry_class() - record_entry.item_spec = ctx.record_entry_spec + record_entry.item_spec = item_spec return record_entry - def mergeRecordEntry(self, record_entry, ctx): + def handleJobResult(self, result, ctx): raise NotImplementedError() + def validateNextStepJobs(self, jobs, ctx): + pass + def run(self, job, ctx, result): raise NotImplementedError() @@ -170,6 +192,18 @@ pass +def create_job(pipeline, item_spec, **kwargs): + job = { + 'job_spec': (pipeline.source.name, item_spec) + } + job.update(kwargs) + return job + + +def content_item_from_job(pipeline, job): + return pipeline.source.findContentFromSpec(job['job_spec'][1]) + + def get_record_name_for_source(source): ppname = get_pipeline_name_for_source(source) return '%s@%s' % (source.name, ppname) @@ -186,8 +220,8 @@ class PipelineManager: - def __init__(self, app, out_dir, record_histories, *, - worker_id=-1, force=False): + def __init__(self, app, out_dir, *, + record_histories=None, worker_id=-1, force=False): self.app = app self.record_histories = record_histories self.out_dir = out_dir @@ -201,9 +235,12 @@ self._pipelines = {} def getPipeline(self, source_name): + return self.getPipelineInfo(source_name).pipeline + + def getPipelineInfo(self, source_name): return self._pipelines[source_name] - def getPipelines(self): + def getPipelineInfos(self): return self._pipelines.values() def createPipeline(self, source): @@ -217,22 +254,24 @@ pp = self._pipeline_classes[pname](source, ppctx) pp.initialize() - record_history = self.record_histories.getHistory(pp.record_name) + record_history = None + if self.record_histories: + record_history = self.record_histories.getHistory(pp.record_name) info = _PipelineInfo(pp, record_history) self._pipelines[source.name] = info return info def postJobRun(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ppinfo.record_history.build() - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ctx = PipelinePostJobRunContext(ppinfo.record_history) ppinfo.pipeline.postJobRun(ctx) def deleteStaleOutputs(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ctx = PipelineDeletionContext(ppinfo.record_history) to_delete = ppinfo.pipeline.getDeletions(ctx) current_record = ppinfo.record_history.current @@ -247,12 +286,12 @@ logger.info('[delete] %s' % path) def collapseRecords(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ctx = PipelineCollapseRecordContext(ppinfo.record_history) ppinfo.pipeline.collapseRecords(ctx) def shutdownPipelines(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ppinfo.pipeline.shutdown() self._pipelines = {} @@ -269,6 +308,12 @@ return self.pipeline.source @property + def current_record(self): + if self.record_history is not None: + return self.record_history.current + raise Exception("The current record is not available.") + + @property def pipeline_name(self): return self.pipeline.PIPELINE_NAME