comparison piecrust/pipelines/base.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents d6d35b2efd04
children fa489c5e829e
comparison
equal deleted inserted replaced
988:f83ae0a5d793 989:8adc27285d93
1 import os.path 1 import os.path
2 import logging 2 import logging
3 from werkzeug.utils import cached_property 3 from werkzeug.utils import cached_property
4 from piecrust.configuration import ConfigurationError 4 from piecrust.configuration import ConfigurationError
5 from piecrust.sources.base import ContentItem
5 6
6 7
7 logger = logging.getLogger(__name__) 8 logger = logging.getLogger(__name__)
8 9
9 10
30 if there are no worker processes at all. 31 if there are no worker processes at all.
31 """ 32 """
32 return self.worker_id < 0 33 return self.worker_id < 0
33 34
34 35
35 class PipelineJob: 36 class _PipelineMasterProcessJobContextBase:
36 """ Base class for a pipline baking job. 37 def __init__(self, record_name, record_histories):
37 """ 38 self.record_name = record_name
38 def __init__(self, pipeline, content_item): 39 self.record_histories = record_histories
39 self.source_name = pipeline.source.name 40
40 self.record_name = pipeline.record_name 41 @property
41 self.content_item = content_item 42 def previous_record(self):
42 self.step_num = 0 43 return self.record_histories.getPreviousRecord(self.record_name)
43 self.data = {} 44
44 45 @property
45 46 def current_record(self):
46 class PipelineJobCreateContext: 47 return self.record_histories.getCurrentRecord(self.record_name)
47 """ Context for create pipeline baking jobs. 48
48 """ 49
49 def __init__(self, step_num, record_histories): 50 class PipelineJobCreateContext(_PipelineMasterProcessJobContextBase):
51 """ Context for creating pipeline baking jobs.
52
53 This is run on the master process, so it can access both the
54 previous and current records.
55 """
56 def __init__(self, pass_num, record_name, record_histories):
57 super().__init__(record_name, record_histories)
58 self.pass_num = pass_num
59
60
61 class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase):
62 """ Context for validating jobs on subsequent step runs (i.e. validating
63 the list of jobs to run starting with the second step).
64
65 This is run on the master process, so it can access both the
66 previous and current records.
67 """
68 def __init__(self, pass_num, step_num, record_name, record_histories):
69 super().__init__(record_name, record_histories)
70 self.pass_num = pass_num
50 self.step_num = step_num 71 self.step_num = step_num
51 self.record_histories = record_histories
52 72
53 73
54 class PipelineJobRunContext: 74 class PipelineJobRunContext:
55 """ Context for running pipeline baking jobs. 75 """ Context for running pipeline baking jobs.
56 """ 76
57 def __init__(self, job, record_name, record_histories): 77 This is run on the worker processes, so it can only access the
78 previous records.
79 """
80 def __init__(self, job, record_name, previous_records):
58 self.job = job 81 self.job = job
59 self.record_name = record_name 82 self.record_name = record_name
60 self.record_histories = record_histories 83 self.previous_records = previous_records
61 84
62 @property 85 @cached_property
63 def content_item(self): 86 def record_entry_spec(self):
64 return self.job.content_item 87 return self.job.get('record_entry_spec',
88 self.job['job_spec'][1])
65 89
66 @cached_property 90 @cached_property
67 def previous_record(self): 91 def previous_record(self):
68 return self.record_histories.getPreviousRecord(self.record_name) 92 return self.previous_records.getRecord(self.record_name)
69
70 @cached_property
71 def record_entry_spec(self):
72 content_item = self.content_item
73 return content_item.metadata.get('record_entry_spec',
74 content_item.spec)
75 93
76 @cached_property 94 @cached_property
77 def previous_entry(self): 95 def previous_entry(self):
78 return self.previous_record.getEntry(self.record_entry_spec) 96 return self.previous_record.getEntry(self.record_entry_spec)
79 97
80 98
81 class PipelineJobResult: 99 class PipelineJobResultHandleContext:
82 """ Result of running a pipeline on a content item. 100 """ The context for handling the result from a job that ran in a
83 """ 101 worker process.
84 def __init__(self): 102
85 self.record_entry = None 103 This is run on the master process, so it can access the current
86 self.next_step_job = None 104 record.
87
88
89 class PipelineMergeRecordContext:
90 """ The context for merging a record entry for a second or higher pass
91 into the bake record.
92 """ 105 """
93 def __init__(self, record, job, step_num): 106 def __init__(self, record, job, step_num):
94 self.record = record 107 self.record = record
95 self.job = job 108 self.job = job
96 self.step_num = step_num 109 self.step_num = step_num
110
111 @cached_property
112 def record_entry(self):
113 record_entry_spec = self.job.get('record_entry_spec',
114 self.job['job_spec'][1])
115 return self.record.getEntry(record_entry_spec)
97 116
98 117
99 class PipelinePostJobRunContext: 118 class PipelinePostJobRunContext:
100 def __init__(self, record_history): 119 def __init__(self, record_history):
101 self.record_history = record_history 120 self.record_history = record_history
135 return self.source.app 154 return self.source.app
136 155
137 def initialize(self): 156 def initialize(self):
138 pass 157 pass
139 158
159 def loadAllContents(self):
160 return None
161
140 def createJobs(self, ctx): 162 def createJobs(self, ctx):
141 return [ 163 return [
142 self.createJob(item) 164 create_job(self, item.spec)
143 for item in self.source.getAllContents()] 165 for item in self.source.getAllContents()]
144 166
145 def createJob(self, content_item): 167 def createRecordEntry(self, item_spec):
146 return PipelineJob(self, content_item)
147
148 def createRecordEntry(self, job, ctx):
149 entry_class = self.RECORD_ENTRY_CLASS 168 entry_class = self.RECORD_ENTRY_CLASS
150 record_entry = entry_class() 169 record_entry = entry_class()
151 record_entry.item_spec = ctx.record_entry_spec 170 record_entry.item_spec = item_spec
152 return record_entry 171 return record_entry
153 172
154 def mergeRecordEntry(self, record_entry, ctx): 173 def handleJobResult(self, result, ctx):
155 raise NotImplementedError() 174 raise NotImplementedError()
175
176 def validateNextStepJobs(self, jobs, ctx):
177 pass
156 178
157 def run(self, job, ctx, result): 179 def run(self, job, ctx, result):
158 raise NotImplementedError() 180 raise NotImplementedError()
159 181
160 def postJobRun(self, ctx): 182 def postJobRun(self, ctx):
166 def collapseRecords(self, ctx): 188 def collapseRecords(self, ctx):
167 pass 189 pass
168 190
169 def shutdown(self): 191 def shutdown(self):
170 pass 192 pass
193
194
195 def create_job(pipeline, item_spec, **kwargs):
196 job = {
197 'job_spec': (pipeline.source.name, item_spec)
198 }
199 job.update(kwargs)
200 return job
201
202
203 def content_item_from_job(pipeline, job):
204 return pipeline.source.findContentFromSpec(job['job_spec'][1])
171 205
172 206
173 def get_record_name_for_source(source): 207 def get_record_name_for_source(source):
174 ppname = get_pipeline_name_for_source(source) 208 ppname = get_pipeline_name_for_source(source)
175 return '%s@%s' % (source.name, ppname) 209 return '%s@%s' % (source.name, ppname)
184 "Source '%s' doesn't specify any pipeline." % source.name) 218 "Source '%s' doesn't specify any pipeline." % source.name)
185 return pname 219 return pname
186 220
187 221
188 class PipelineManager: 222 class PipelineManager:
189 def __init__(self, app, out_dir, record_histories, *, 223 def __init__(self, app, out_dir, *,
190 worker_id=-1, force=False): 224 record_histories=None, worker_id=-1, force=False):
191 self.app = app 225 self.app = app
192 self.record_histories = record_histories 226 self.record_histories = record_histories
193 self.out_dir = out_dir 227 self.out_dir = out_dir
194 self.worker_id = worker_id 228 self.worker_id = worker_id
195 self.force = force 229 self.force = force
199 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass 233 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
200 234
201 self._pipelines = {} 235 self._pipelines = {}
202 236
203 def getPipeline(self, source_name): 237 def getPipeline(self, source_name):
238 return self.getPipelineInfo(source_name).pipeline
239
240 def getPipelineInfo(self, source_name):
204 return self._pipelines[source_name] 241 return self._pipelines[source_name]
205 242
206 def getPipelines(self): 243 def getPipelineInfos(self):
207 return self._pipelines.values() 244 return self._pipelines.values()
208 245
209 def createPipeline(self, source): 246 def createPipeline(self, source):
210 if source.name in self._pipelines: 247 if source.name in self._pipelines:
211 raise ValueError("Pipeline for source '%s' was already created." % 248 raise ValueError("Pipeline for source '%s' was already created." %
215 ppctx = PipelineContext(self.out_dir, 252 ppctx = PipelineContext(self.out_dir,
216 worker_id=self.worker_id, force=self.force) 253 worker_id=self.worker_id, force=self.force)
217 pp = self._pipeline_classes[pname](source, ppctx) 254 pp = self._pipeline_classes[pname](source, ppctx)
218 pp.initialize() 255 pp.initialize()
219 256
220 record_history = self.record_histories.getHistory(pp.record_name) 257 record_history = None
258 if self.record_histories:
259 record_history = self.record_histories.getHistory(pp.record_name)
221 260
222 info = _PipelineInfo(pp, record_history) 261 info = _PipelineInfo(pp, record_history)
223 self._pipelines[source.name] = info 262 self._pipelines[source.name] = info
224 return info 263 return info
225 264
226 def postJobRun(self): 265 def postJobRun(self):
227 for ppinfo in self.getPipelines(): 266 for ppinfo in self.getPipelineInfos():
228 ppinfo.record_history.build() 267 ppinfo.record_history.build()
229 268
230 for ppinfo in self.getPipelines(): 269 for ppinfo in self.getPipelineInfos():
231 ctx = PipelinePostJobRunContext(ppinfo.record_history) 270 ctx = PipelinePostJobRunContext(ppinfo.record_history)
232 ppinfo.pipeline.postJobRun(ctx) 271 ppinfo.pipeline.postJobRun(ctx)
233 272
234 def deleteStaleOutputs(self): 273 def deleteStaleOutputs(self):
235 for ppinfo in self.getPipelines(): 274 for ppinfo in self.getPipelineInfos():
236 ctx = PipelineDeletionContext(ppinfo.record_history) 275 ctx = PipelineDeletionContext(ppinfo.record_history)
237 to_delete = ppinfo.pipeline.getDeletions(ctx) 276 to_delete = ppinfo.pipeline.getDeletions(ctx)
238 current_record = ppinfo.record_history.current 277 current_record = ppinfo.record_history.current
239 if to_delete is not None: 278 if to_delete is not None:
240 for path, reason in to_delete: 279 for path, reason in to_delete:
245 except FileNotFoundError: 284 except FileNotFoundError:
246 pass 285 pass
247 logger.info('[delete] %s' % path) 286 logger.info('[delete] %s' % path)
248 287
249 def collapseRecords(self): 288 def collapseRecords(self):
250 for ppinfo in self.getPipelines(): 289 for ppinfo in self.getPipelineInfos():
251 ctx = PipelineCollapseRecordContext(ppinfo.record_history) 290 ctx = PipelineCollapseRecordContext(ppinfo.record_history)
252 ppinfo.pipeline.collapseRecords(ctx) 291 ppinfo.pipeline.collapseRecords(ctx)
253 292
254 def shutdownPipelines(self): 293 def shutdownPipelines(self):
255 for ppinfo in self.getPipelines(): 294 for ppinfo in self.getPipelineInfos():
256 ppinfo.pipeline.shutdown() 295 ppinfo.pipeline.shutdown()
257 296
258 self._pipelines = {} 297 self._pipelines = {}
259 298
260 299
267 @property 306 @property
268 def source(self): 307 def source(self):
269 return self.pipeline.source 308 return self.pipeline.source
270 309
271 @property 310 @property
311 def current_record(self):
312 if self.record_history is not None:
313 return self.record_history.current
314 raise Exception("The current record is not available.")
315
316 @property
272 def pipeline_name(self): 317 def pipeline_name(self):
273 return self.pipeline.PIPELINE_NAME 318 return self.pipeline.PIPELINE_NAME
274 319