Mercurial > piecrust2
comparison piecrust/pipelines/base.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | d6d35b2efd04 |
children | fa489c5e829e |
comparison
equal
deleted
inserted
replaced
988:f83ae0a5d793 | 989:8adc27285d93 |
---|---|
1 import os.path | 1 import os.path |
2 import logging | 2 import logging |
3 from werkzeug.utils import cached_property | 3 from werkzeug.utils import cached_property |
4 from piecrust.configuration import ConfigurationError | 4 from piecrust.configuration import ConfigurationError |
5 from piecrust.sources.base import ContentItem | |
5 | 6 |
6 | 7 |
7 logger = logging.getLogger(__name__) | 8 logger = logging.getLogger(__name__) |
8 | 9 |
9 | 10 |
30 if there are no worker processes at all. | 31 if there are no worker processes at all. |
31 """ | 32 """ |
32 return self.worker_id < 0 | 33 return self.worker_id < 0 |
33 | 34 |
34 | 35 |
35 class PipelineJob: | 36 class _PipelineMasterProcessJobContextBase: |
36 """ Base class for a pipline baking job. | 37 def __init__(self, record_name, record_histories): |
37 """ | 38 self.record_name = record_name |
38 def __init__(self, pipeline, content_item): | 39 self.record_histories = record_histories |
39 self.source_name = pipeline.source.name | 40 |
40 self.record_name = pipeline.record_name | 41 @property |
41 self.content_item = content_item | 42 def previous_record(self): |
42 self.step_num = 0 | 43 return self.record_histories.getPreviousRecord(self.record_name) |
43 self.data = {} | 44 |
44 | 45 @property |
45 | 46 def current_record(self): |
46 class PipelineJobCreateContext: | 47 return self.record_histories.getCurrentRecord(self.record_name) |
47 """ Context for create pipeline baking jobs. | 48 |
48 """ | 49 |
49 def __init__(self, step_num, record_histories): | 50 class PipelineJobCreateContext(_PipelineMasterProcessJobContextBase): |
51 """ Context for creating pipeline baking jobs. | |
52 | |
53 This is run on the master process, so it can access both the | |
54 previous and current records. | |
55 """ | |
56 def __init__(self, pass_num, record_name, record_histories): | |
57 super().__init__(record_name, record_histories) | |
58 self.pass_num = pass_num | |
59 | |
60 | |
61 class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase): | |
62 """ Context for validating jobs on subsequent step runs (i.e. validating | |
63 the list of jobs to run starting with the second step). | |
64 | |
65 This is run on the master process, so it can access both the | |
66 previous and current records. | |
67 """ | |
68 def __init__(self, pass_num, step_num, record_name, record_histories): | |
69 super().__init__(record_name, record_histories) | |
70 self.pass_num = pass_num | |
50 self.step_num = step_num | 71 self.step_num = step_num |
51 self.record_histories = record_histories | |
52 | 72 |
53 | 73 |
54 class PipelineJobRunContext: | 74 class PipelineJobRunContext: |
55 """ Context for running pipeline baking jobs. | 75 """ Context for running pipeline baking jobs. |
56 """ | 76 |
57 def __init__(self, job, record_name, record_histories): | 77 This is run on the worker processes, so it can only access the |
78 previous records. | |
79 """ | |
80 def __init__(self, job, record_name, previous_records): | |
58 self.job = job | 81 self.job = job |
59 self.record_name = record_name | 82 self.record_name = record_name |
60 self.record_histories = record_histories | 83 self.previous_records = previous_records |
61 | 84 |
62 @property | 85 @cached_property |
63 def content_item(self): | 86 def record_entry_spec(self): |
64 return self.job.content_item | 87 return self.job.get('record_entry_spec', |
88 self.job['job_spec'][1]) | |
65 | 89 |
66 @cached_property | 90 @cached_property |
67 def previous_record(self): | 91 def previous_record(self): |
68 return self.record_histories.getPreviousRecord(self.record_name) | 92 return self.previous_records.getRecord(self.record_name) |
69 | |
70 @cached_property | |
71 def record_entry_spec(self): | |
72 content_item = self.content_item | |
73 return content_item.metadata.get('record_entry_spec', | |
74 content_item.spec) | |
75 | 93 |
76 @cached_property | 94 @cached_property |
77 def previous_entry(self): | 95 def previous_entry(self): |
78 return self.previous_record.getEntry(self.record_entry_spec) | 96 return self.previous_record.getEntry(self.record_entry_spec) |
79 | 97 |
80 | 98 |
81 class PipelineJobResult: | 99 class PipelineJobResultHandleContext: |
82 """ Result of running a pipeline on a content item. | 100 """ The context for handling the result from a job that ran in a |
83 """ | 101 worker process. |
84 def __init__(self): | 102 |
85 self.record_entry = None | 103 This is run on the master process, so it can access the current |
86 self.next_step_job = None | 104 record. |
87 | |
88 | |
89 class PipelineMergeRecordContext: | |
90 """ The context for merging a record entry for a second or higher pass | |
91 into the bake record. | |
92 """ | 105 """ |
93 def __init__(self, record, job, step_num): | 106 def __init__(self, record, job, step_num): |
94 self.record = record | 107 self.record = record |
95 self.job = job | 108 self.job = job |
96 self.step_num = step_num | 109 self.step_num = step_num |
110 | |
111 @cached_property | |
112 def record_entry(self): | |
113 record_entry_spec = self.job.get('record_entry_spec', | |
114 self.job['job_spec'][1]) | |
115 return self.record.getEntry(record_entry_spec) | |
97 | 116 |
98 | 117 |
99 class PipelinePostJobRunContext: | 118 class PipelinePostJobRunContext: |
100 def __init__(self, record_history): | 119 def __init__(self, record_history): |
101 self.record_history = record_history | 120 self.record_history = record_history |
135 return self.source.app | 154 return self.source.app |
136 | 155 |
137 def initialize(self): | 156 def initialize(self): |
138 pass | 157 pass |
139 | 158 |
159 def loadAllContents(self): | |
160 return None | |
161 | |
140 def createJobs(self, ctx): | 162 def createJobs(self, ctx): |
141 return [ | 163 return [ |
142 self.createJob(item) | 164 create_job(self, item.spec) |
143 for item in self.source.getAllContents()] | 165 for item in self.source.getAllContents()] |
144 | 166 |
145 def createJob(self, content_item): | 167 def createRecordEntry(self, item_spec): |
146 return PipelineJob(self, content_item) | |
147 | |
148 def createRecordEntry(self, job, ctx): | |
149 entry_class = self.RECORD_ENTRY_CLASS | 168 entry_class = self.RECORD_ENTRY_CLASS |
150 record_entry = entry_class() | 169 record_entry = entry_class() |
151 record_entry.item_spec = ctx.record_entry_spec | 170 record_entry.item_spec = item_spec |
152 return record_entry | 171 return record_entry |
153 | 172 |
154 def mergeRecordEntry(self, record_entry, ctx): | 173 def handleJobResult(self, result, ctx): |
155 raise NotImplementedError() | 174 raise NotImplementedError() |
175 | |
176 def validateNextStepJobs(self, jobs, ctx): | |
177 pass | |
156 | 178 |
157 def run(self, job, ctx, result): | 179 def run(self, job, ctx, result): |
158 raise NotImplementedError() | 180 raise NotImplementedError() |
159 | 181 |
160 def postJobRun(self, ctx): | 182 def postJobRun(self, ctx): |
166 def collapseRecords(self, ctx): | 188 def collapseRecords(self, ctx): |
167 pass | 189 pass |
168 | 190 |
169 def shutdown(self): | 191 def shutdown(self): |
170 pass | 192 pass |
193 | |
194 | |
195 def create_job(pipeline, item_spec, **kwargs): | |
196 job = { | |
197 'job_spec': (pipeline.source.name, item_spec) | |
198 } | |
199 job.update(kwargs) | |
200 return job | |
201 | |
202 | |
203 def content_item_from_job(pipeline, job): | |
204 return pipeline.source.findContentFromSpec(job['job_spec'][1]) | |
171 | 205 |
172 | 206 |
173 def get_record_name_for_source(source): | 207 def get_record_name_for_source(source): |
174 ppname = get_pipeline_name_for_source(source) | 208 ppname = get_pipeline_name_for_source(source) |
175 return '%s@%s' % (source.name, ppname) | 209 return '%s@%s' % (source.name, ppname) |
184 "Source '%s' doesn't specify any pipeline." % source.name) | 218 "Source '%s' doesn't specify any pipeline." % source.name) |
185 return pname | 219 return pname |
186 | 220 |
187 | 221 |
188 class PipelineManager: | 222 class PipelineManager: |
189 def __init__(self, app, out_dir, record_histories, *, | 223 def __init__(self, app, out_dir, *, |
190 worker_id=-1, force=False): | 224 record_histories=None, worker_id=-1, force=False): |
191 self.app = app | 225 self.app = app |
192 self.record_histories = record_histories | 226 self.record_histories = record_histories |
193 self.out_dir = out_dir | 227 self.out_dir = out_dir |
194 self.worker_id = worker_id | 228 self.worker_id = worker_id |
195 self.force = force | 229 self.force = force |
199 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass | 233 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass |
200 | 234 |
201 self._pipelines = {} | 235 self._pipelines = {} |
202 | 236 |
203 def getPipeline(self, source_name): | 237 def getPipeline(self, source_name): |
238 return self.getPipelineInfo(source_name).pipeline | |
239 | |
240 def getPipelineInfo(self, source_name): | |
204 return self._pipelines[source_name] | 241 return self._pipelines[source_name] |
205 | 242 |
206 def getPipelines(self): | 243 def getPipelineInfos(self): |
207 return self._pipelines.values() | 244 return self._pipelines.values() |
208 | 245 |
209 def createPipeline(self, source): | 246 def createPipeline(self, source): |
210 if source.name in self._pipelines: | 247 if source.name in self._pipelines: |
211 raise ValueError("Pipeline for source '%s' was already created." % | 248 raise ValueError("Pipeline for source '%s' was already created." % |
215 ppctx = PipelineContext(self.out_dir, | 252 ppctx = PipelineContext(self.out_dir, |
216 worker_id=self.worker_id, force=self.force) | 253 worker_id=self.worker_id, force=self.force) |
217 pp = self._pipeline_classes[pname](source, ppctx) | 254 pp = self._pipeline_classes[pname](source, ppctx) |
218 pp.initialize() | 255 pp.initialize() |
219 | 256 |
220 record_history = self.record_histories.getHistory(pp.record_name) | 257 record_history = None |
258 if self.record_histories: | |
259 record_history = self.record_histories.getHistory(pp.record_name) | |
221 | 260 |
222 info = _PipelineInfo(pp, record_history) | 261 info = _PipelineInfo(pp, record_history) |
223 self._pipelines[source.name] = info | 262 self._pipelines[source.name] = info |
224 return info | 263 return info |
225 | 264 |
226 def postJobRun(self): | 265 def postJobRun(self): |
227 for ppinfo in self.getPipelines(): | 266 for ppinfo in self.getPipelineInfos(): |
228 ppinfo.record_history.build() | 267 ppinfo.record_history.build() |
229 | 268 |
230 for ppinfo in self.getPipelines(): | 269 for ppinfo in self.getPipelineInfos(): |
231 ctx = PipelinePostJobRunContext(ppinfo.record_history) | 270 ctx = PipelinePostJobRunContext(ppinfo.record_history) |
232 ppinfo.pipeline.postJobRun(ctx) | 271 ppinfo.pipeline.postJobRun(ctx) |
233 | 272 |
234 def deleteStaleOutputs(self): | 273 def deleteStaleOutputs(self): |
235 for ppinfo in self.getPipelines(): | 274 for ppinfo in self.getPipelineInfos(): |
236 ctx = PipelineDeletionContext(ppinfo.record_history) | 275 ctx = PipelineDeletionContext(ppinfo.record_history) |
237 to_delete = ppinfo.pipeline.getDeletions(ctx) | 276 to_delete = ppinfo.pipeline.getDeletions(ctx) |
238 current_record = ppinfo.record_history.current | 277 current_record = ppinfo.record_history.current |
239 if to_delete is not None: | 278 if to_delete is not None: |
240 for path, reason in to_delete: | 279 for path, reason in to_delete: |
245 except FileNotFoundError: | 284 except FileNotFoundError: |
246 pass | 285 pass |
247 logger.info('[delete] %s' % path) | 286 logger.info('[delete] %s' % path) |
248 | 287 |
249 def collapseRecords(self): | 288 def collapseRecords(self): |
250 for ppinfo in self.getPipelines(): | 289 for ppinfo in self.getPipelineInfos(): |
251 ctx = PipelineCollapseRecordContext(ppinfo.record_history) | 290 ctx = PipelineCollapseRecordContext(ppinfo.record_history) |
252 ppinfo.pipeline.collapseRecords(ctx) | 291 ppinfo.pipeline.collapseRecords(ctx) |
253 | 292 |
254 def shutdownPipelines(self): | 293 def shutdownPipelines(self): |
255 for ppinfo in self.getPipelines(): | 294 for ppinfo in self.getPipelineInfos(): |
256 ppinfo.pipeline.shutdown() | 295 ppinfo.pipeline.shutdown() |
257 | 296 |
258 self._pipelines = {} | 297 self._pipelines = {} |
259 | 298 |
260 | 299 |
267 @property | 306 @property |
268 def source(self): | 307 def source(self): |
269 return self.pipeline.source | 308 return self.pipeline.source |
270 | 309 |
271 @property | 310 @property |
311 def current_record(self): | |
312 if self.record_history is not None: | |
313 return self.record_history.current | |
314 raise Exception("The current record is not available.") | |
315 | |
316 @property | |
272 def pipeline_name(self): | 317 def pipeline_name(self): |
273 return self.pipeline.PIPELINE_NAME | 318 return self.pipeline.PIPELINE_NAME |
274 | 319 |