Mercurial > piecrust2
comparison piecrust/pipelines/base.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
| author | Ludovic Chabant <ludovic@chabant.com> |
|---|---|
| date | Sun, 04 Jun 2017 23:34:28 -0700 |
| parents | f070a4fc033c |
| children | 448710d84121 |
comparison
equal
deleted
inserted
replaced
| 853:f070a4fc033c | 854:08e02c2a2a1a |
|---|---|
| 1 import os.path | 1 import os.path |
| 2 import logging | 2 import logging |
| 3 from werkzeug.utils import cached_property | |
| 4 from piecrust.configuration import ConfigurationError | |
| 3 | 5 |
| 4 | 6 |
| 5 logger = logging.getLogger(__name__) | 7 logger = logging.getLogger(__name__) |
| 6 | 8 |
| 7 | 9 |
| 8 class PipelineContext: | 10 class PipelineContext: |
| 9 """ The context for running a content pipeline. | 11 """ The context for running a content pipeline. |
| 10 """ | 12 """ |
| 11 def __init__(self, out_dir, record_history, *, | 13 def __init__(self, out_dir, *, |
| 12 worker_id=-1, force=None): | 14 worker_id=-1, force=None): |
| 13 self.out_dir = out_dir | 15 self.out_dir = out_dir |
| 14 self.record_history = record_history | |
| 15 self.worker_id = worker_id | 16 self.worker_id = worker_id |
| 16 self.force = force | 17 self.force = force |
| 17 | 18 |
| 18 @property | 19 @property |
| 19 def is_worker(self): | 20 def is_worker(self): |
| 28 the main process (and not a worker process). This is the case | 29 the main process (and not a worker process). This is the case |
| 29 if there are no worker processes at all. | 30 if there are no worker processes at all. |
| 30 """ | 31 """ |
| 31 return self.worker_id < 0 | 32 return self.worker_id < 0 |
| 32 | 33 |
| 33 @property | 34 |
| 34 def current_record(self): | 35 class PipelineJob: |
| 35 return self.record_history.current | 36 """ Base class for a pipline baking job. |
| 36 | 37 """ |
| 37 | 38 def __init__(self, pipeline, content_item): |
| 38 class PipelineResult: | 39 self.source_name = pipeline.source.name |
| 40 self.record_name = pipeline.record_name | |
| 41 self.content_item = content_item | |
| 42 self.data = {} | |
| 43 | |
| 44 | |
| 45 class PipelineJobRunContext: | |
| 46 """ Context for running pipeline baking jobs. | |
| 47 """ | |
| 48 def __init__(self, job, pipeline, record_histories): | |
| 49 self.record_histories = record_histories | |
| 50 self._job_item_spec = job.content_item.spec | |
| 51 self._record_name = pipeline.record_name | |
| 52 | |
| 53 @cached_property | |
| 54 def previous_record(self): | |
| 55 return self.record_histories.getPreviousRecord(self._record_name) | |
| 56 | |
| 57 @cached_property | |
| 58 def previous_entry(self): | |
| 59 return self.previous_record.getEntry(self._job_item_spec) | |
| 60 | |
| 61 | |
| 62 class PipelineJobResult: | |
| 39 """ Result of running a pipeline on a content item. | 63 """ Result of running a pipeline on a content item. |
| 40 """ | 64 """ |
| 41 def __init__(self): | 65 def __init__(self): |
| 42 self.pipeline_name = None | |
| 43 self.record_entry = None | 66 self.record_entry = None |
| 67 self.next_pass_job = None | |
| 68 | |
| 69 | |
| 70 class PipelineMergeRecordContext: | |
| 71 """ The context for merging a record entry for a second or higher pass | |
| 72 into the bake record. | |
| 73 """ | |
| 74 def __init__(self, record, job, pass_num): | |
| 75 self.record = record | |
| 76 self.job = job | |
| 77 self.pass_num = pass_num | |
| 78 | |
| 79 | |
| 80 class PipelineDeletionContext: | |
| 81 def __init__(self, record_history): | |
| 82 self.record_history = record_history | |
| 83 | |
| 84 | |
| 85 class PipelineCollapseRecordContext: | |
| 86 def __init__(self, record_history): | |
| 87 self.record_history = record_history | |
| 44 | 88 |
| 45 | 89 |
| 46 class ContentPipeline: | 90 class ContentPipeline: |
| 47 """ A pipeline that processes content from a `ContentSource`. | 91 """ A pipeline that processes content from a `ContentSource`. |
| 48 """ | 92 """ |
| 49 PIPELINE_NAME = None | 93 PIPELINE_NAME = None |
| 50 PIPELINE_PASSES = 1 | |
| 51 RECORD_ENTRY_CLASS = None | 94 RECORD_ENTRY_CLASS = None |
| 52 | 95 PASS_NUM = 0 |
| 53 def __init__(self, source): | 96 |
| 97 def __init__(self, source, ctx): | |
| 54 self.source = source | 98 self.source = source |
| 99 self.ctx = ctx | |
| 100 self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME) | |
| 55 | 101 |
| 56 app = source.app | 102 app = source.app |
| 57 tmp_dir = app.cache_dir | 103 tmp_dir = app.cache_dir |
| 58 if not tmp_dir: | 104 if not tmp_dir: |
| 59 import tempfile | 105 import tempfile |
| 62 | 108 |
| 63 @property | 109 @property |
| 64 def app(self): | 110 def app(self): |
| 65 return self.source.app | 111 return self.source.app |
| 66 | 112 |
| 67 def initialize(self, ctx): | 113 def initialize(self): |
| 68 pass | 114 pass |
| 69 | 115 |
| 70 def run(self, content_item, ctx, result): | 116 def createJobs(self): |
| 117 return [ | |
| 118 self.createJob(item) | |
| 119 for item in self.source.getAllContents()] | |
| 120 | |
| 121 def createJob(self, content_item): | |
| 122 return PipelineJob(self, content_item) | |
| 123 | |
| 124 def createRecordEntry(self, job): | |
| 125 entry_class = self.RECORD_ENTRY_CLASS | |
| 126 record_entry = entry_class() | |
| 127 record_entry.item_spec = job.content_item.spec | |
| 128 return record_entry | |
| 129 | |
| 130 def mergeRecordEntry(self, record_entry, ctx): | |
| 71 raise NotImplementedError() | 131 raise NotImplementedError() |
| 72 | 132 |
| 133 def run(self, job, ctx, result): | |
| 134 raise NotImplementedError() | |
| 135 | |
| 73 def getDeletions(self, ctx): | 136 def getDeletions(self, ctx): |
| 74 pass | 137 pass |
| 75 | 138 |
| 76 def collapseRecords(self, ctx): | 139 def collapseRecords(self, ctx): |
| 77 pass | 140 pass |
| 78 | 141 |
| 79 def shutdown(self, ctx): | 142 def shutdown(self): |
| 80 pass | 143 pass |
| 144 | |
| 145 | |
| 146 def get_pipeline_name_for_source(source): | |
| 147 pname = source.config['pipeline'] | |
| 148 if not pname: | |
| 149 pname = source.DEFAULT_PIPELINE_NAME | |
| 150 if not pname: | |
| 151 raise ConfigurationError( | |
| 152 "Source '%s' doesn't specify any pipeline." % source.name) | |
| 153 return pname | |
| 154 | |
| 155 | |
| 156 class PipelineManager: | |
| 157 def __init__(self, app, out_dir, record_histories, *, | |
| 158 worker_id=-1, force=False): | |
| 159 self.app = app | |
| 160 self.record_histories = record_histories | |
| 161 self.out_dir = out_dir | |
| 162 self.worker_id = worker_id | |
| 163 self.force = force | |
| 164 | |
| 165 self._pipeline_classes = {} | |
| 166 for pclass in app.plugin_loader.getPipelines(): | |
| 167 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass | |
| 168 | |
| 169 self._pipelines = {} | |
| 170 | |
| 171 def getPipeline(self, source_name): | |
| 172 return self._pipelines[source_name] | |
| 173 | |
| 174 def getPipelines(self): | |
| 175 return self._pipelines.values() | |
| 176 | |
| 177 def createPipeline(self, source): | |
| 178 if source.name in self._pipelines: | |
| 179 raise ValueError("Pipeline for source '%s' was already created." % | |
| 180 source.name) | |
| 181 | |
| 182 pname = get_pipeline_name_for_source(source) | |
| 183 ppctx = PipelineContext(self.out_dir, | |
| 184 worker_id=self.worker_id, force=self.force) | |
| 185 pp = self._pipeline_classes[pname](source, ppctx) | |
| 186 pp.initialize() | |
| 187 | |
| 188 record_history = self.record_histories.getHistory(pp.record_name) | |
| 189 | |
| 190 info = _PipelineInfo(pp, record_history) | |
| 191 self._pipelines[source.name] = info | |
| 192 return info | |
| 193 | |
| 194 def buildHistoryDiffs(self): | |
| 195 for ppinfo in self.getPipelines(): | |
| 196 ppinfo.record_history.build() | |
| 197 | |
| 198 def deleteStaleOutputs(self): | |
| 199 for ppinfo in self.getPipelines(): | |
| 200 ctx = PipelineDeletionContext(ppinfo.record_history) | |
| 201 to_delete = ppinfo.pipeline.getDeletions(ctx) | |
| 202 current_record = ppinfo.record_history.current | |
| 203 if to_delete is not None: | |
| 204 for path, reason in to_delete: | |
| 205 logger.debug("Removing '%s': %s" % (path, reason)) | |
| 206 current_record.deleted_out_paths.append(path) | |
| 207 try: | |
| 208 os.remove(path) | |
| 209 except FileNotFoundError: | |
| 210 pass | |
| 211 logger.info('[delete] %s' % path) | |
| 212 | |
| 213 def collapseRecords(self): | |
| 214 for ppinfo in self.getPipelines(): | |
| 215 ctx = PipelineCollapseRecordContext(ppinfo.record_history) | |
| 216 ppinfo.pipeline.collapseRecords(ctx) | |
| 217 | |
| 218 def shutdownPipelines(self): | |
| 219 for ppinfo in self.getPipelines(): | |
| 220 ppinfo.pipeline.shutdown() | |
| 221 | |
| 222 self._pipelines = {} | |
| 223 | |
| 224 | |
| 225 class _PipelineInfo: | |
| 226 def __init__(self, pipeline, record_history): | |
| 227 self.pipeline = pipeline | |
| 228 self.record_history = record_history | |
| 229 self.userdata = None | |
| 230 | |
| 231 @property | |
| 232 def source(self): | |
| 233 return self.pipeline.source | |
| 234 | |
| 235 @property | |
| 236 def pipeline_name(self): | |
| 237 return self.pipeline.PIPELINE_NAME | |
| 238 |
