Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | 45ad976712ec |
children | 09dc0240f08a |
comparison
equal
deleted
inserted
replaced
988:f83ae0a5d793 | 989:8adc27285d93 |
---|---|
4 import logging | 4 import logging |
5 from piecrust.chefutil import ( | 5 from piecrust.chefutil import ( |
6 format_timed_scope, format_timed) | 6 format_timed_scope, format_timed) |
7 from piecrust.environment import ExecutionStats | 7 from piecrust.environment import ExecutionStats |
8 from piecrust.pipelines.base import ( | 8 from piecrust.pipelines.base import ( |
9 PipelineJobCreateContext, PipelineMergeRecordContext, PipelineManager, | 9 PipelineJobCreateContext, PipelineJobResultHandleContext, |
10 PipelineJobValidateContext, PipelineManager, | |
10 get_pipeline_name_for_source) | 11 get_pipeline_name_for_source) |
11 from piecrust.pipelines.records import ( | 12 from piecrust.pipelines.records import ( |
12 MultiRecordHistory, MultiRecord, RecordEntry, | 13 MultiRecordHistory, MultiRecord, RecordEntry, |
13 load_records) | 14 load_records) |
14 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES | 15 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES |
40 self.allowed_sources = allowed_sources | 41 self.allowed_sources = allowed_sources |
41 self.rotate_bake_records = rotate_bake_records | 42 self.rotate_bake_records = rotate_bake_records |
42 | 43 |
43 def bake(self): | 44 def bake(self): |
44 start_time = time.perf_counter() | 45 start_time = time.perf_counter() |
46 | |
47 # Setup baker. | |
45 logger.debug(" Bake Output: %s" % self.out_dir) | 48 logger.debug(" Bake Output: %s" % self.out_dir) |
46 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) | 49 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) |
47 | 50 |
48 # Get into bake mode. | 51 # Get into bake mode. |
49 self.app.config.set('baker/is_baking', True) | 52 self.app.config.set('baker/is_baking', True) |
50 self.app.config.set('site/asset_url_format', '%page_uri%/%filename%') | 53 self.app.config.set('site/asset_url_format', '%page_uri%/%filename%') |
51 | 54 |
52 stats = self.app.env.stats | 55 stats = self.app.env.stats |
53 stats.registerTimer('WorkerTaskPut') | 56 stats.registerTimer('LoadSourceContents', raise_if_registered=False) |
57 stats.registerTimer('MasterTaskPut_1', raise_if_registered=False) | |
58 stats.registerTimer('MasterTaskPut_2+', raise_if_registered=False) | |
54 | 59 |
55 # Make sure the output directory exists. | 60 # Make sure the output directory exists. |
56 if not os.path.isdir(self.out_dir): | 61 if not os.path.isdir(self.out_dir): |
57 os.makedirs(self.out_dir, 0o755) | 62 os.makedirs(self.out_dir, 0o755) |
58 | 63 |
87 ppmngr = self._createPipelineManager(record_histories) | 92 ppmngr = self._createPipelineManager(record_histories) |
88 | 93 |
89 # Create the worker processes. | 94 # Create the worker processes. |
90 pool_userdata = _PoolUserData(self, ppmngr) | 95 pool_userdata = _PoolUserData(self, ppmngr) |
91 pool = self._createWorkerPool(records_path, pool_userdata) | 96 pool = self._createWorkerPool(records_path, pool_userdata) |
97 | |
98 # Done with all the setup, let's start the actual work. | |
99 logger.info(format_timed(start_time, "setup baker")) | |
100 | |
101 # Load all sources. | |
102 self._loadSources(ppmngr) | |
92 | 103 |
93 # Bake the realms. | 104 # Bake the realms. |
94 self._bakeRealms(pool, ppmngr, record_histories) | 105 self._bakeRealms(pool, ppmngr, record_histories) |
95 | 106 |
96 # Handle deletions, collapse records, etc. | 107 # Handle deletions, collapse records, etc. |
147 # We have to bake everything from scratch. | 158 # We have to bake everything from scratch. |
148 self.app.cache.clearCaches(except_names=['app', 'baker']) | 159 self.app.cache.clearCaches(except_names=['app', 'baker']) |
149 self.force = True | 160 self.force = True |
150 current_records.incremental_count = 0 | 161 current_records.incremental_count = 0 |
151 previous_records = MultiRecord() | 162 previous_records = MultiRecord() |
152 logger.info(format_timed( | 163 logger.debug(format_timed( |
153 start_time, "cleaned cache (reason: %s)" % reason)) | 164 start_time, "cleaned cache (reason: %s)" % reason, |
165 colored=False)) | |
154 return False | 166 return False |
155 else: | 167 else: |
156 current_records.incremental_count += 1 | 168 current_records.incremental_count += 1 |
157 logger.debug(format_timed( | 169 logger.debug(format_timed( |
158 start_time, "cache is assumed valid", colored=False)) | 170 start_time, "cache is assumed valid", colored=False)) |
165 # realm). | 177 # realm). |
166 # | 178 # |
167 # Also, create and initialize each pipeline for each source. | 179 # Also, create and initialize each pipeline for each source. |
168 has_any_pp = False | 180 has_any_pp = False |
169 ppmngr = PipelineManager( | 181 ppmngr = PipelineManager( |
170 self.app, self.out_dir, record_histories) | 182 self.app, self.out_dir, |
183 record_histories=record_histories) | |
171 ok_pp = self.allowed_pipelines | 184 ok_pp = self.allowed_pipelines |
172 nok_pp = self.forbidden_pipelines | 185 nok_pp = self.forbidden_pipelines |
173 ok_src = self.allowed_sources | 186 ok_src = self.allowed_sources |
174 for source in self.app.sources: | 187 for source in self.app.sources: |
175 if ok_src is not None and source.name not in ok_src: | 188 if ok_src is not None and source.name not in ok_src: |
190 raise Exception("The website has no content sources, or the bake " | 203 raise Exception("The website has no content sources, or the bake " |
191 "command was invoked with all pipelines filtered " | 204 "command was invoked with all pipelines filtered " |
192 "out. There's nothing to do.") | 205 "out. There's nothing to do.") |
193 return ppmngr | 206 return ppmngr |
194 | 207 |
208 def _loadSources(self, ppmngr): | |
209 start_time = time.perf_counter() | |
210 | |
211 for ppinfo in ppmngr.getPipelineInfos(): | |
212 rec = ppinfo.record_history.current | |
213 rec_entries = ppinfo.pipeline.loadAllContents() | |
214 if rec_entries is not None: | |
215 for e in rec_entries: | |
216 rec.addEntry(e) | |
217 | |
218 stats = self.app.env.stats | |
219 stats.stepTimer('LoadSourceContents', | |
220 time.perf_counter() - start_time) | |
221 logger.info(format_timed(start_time, "loaded site content")) | |
222 | |
195 def _bakeRealms(self, pool, ppmngr, record_histories): | 223 def _bakeRealms(self, pool, ppmngr, record_histories): |
196 # Bake the realms -- user first, theme second, so that a user item | 224 # Bake the realms -- user first, theme second, so that a user item |
197 # can override a theme item. | 225 # can override a theme item. |
198 # Do this for as many times as we have pipeline passes left to do. | 226 # Do this for as many times as we have pipeline passes left to do. |
199 realm_list = [REALM_USER, REALM_THEME] | 227 realm_list = [REALM_USER, REALM_THEME] |
200 pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm( | 228 pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm( |
201 ppmngr.getPipelines()) | 229 ppmngr.getPipelineInfos()) |
202 | 230 |
203 for pp_pass_num in sorted(pp_by_pass_and_realm.keys()): | 231 for pp_pass_num in sorted(pp_by_pass_and_realm.keys()): |
204 logger.debug("Pipelines pass %d" % pp_pass_num) | 232 logger.debug("Pipelines pass %d" % pp_pass_num) |
205 pp_by_realm = pp_by_pass_and_realm[pp_pass_num] | 233 pp_by_realm = pp_by_pass_and_realm[pp_pass_num] |
206 for realm in realm_list: | 234 for realm in realm_list: |
207 pplist = pp_by_realm.get(realm) | 235 pplist = pp_by_realm.get(realm) |
208 if pplist is not None: | 236 if pplist is not None: |
209 self._bakeRealm( | 237 self._bakeRealm(pool, ppmngr, record_histories, |
210 pool, record_histories, pp_pass_num, realm, pplist) | 238 pp_pass_num, realm, pplist) |
211 | 239 |
212 def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): | 240 def _bakeRealm(self, pool, ppmngr, record_histories, |
241 pp_pass_num, realm, pplist): | |
213 # Start with the first step, where we iterate on the content sources' | 242 # Start with the first step, where we iterate on the content sources' |
214 # items and run jobs on those. | 243 # items and run jobs on those. |
215 pool.userdata.cur_step = 0 | 244 pool.userdata.cur_step = 0 |
216 next_step_jobs = {} | 245 next_step_jobs = {} |
217 pool.userdata.next_step_jobs = next_step_jobs | 246 pool.userdata.next_step_jobs = next_step_jobs |
218 | 247 |
219 start_time = time.perf_counter() | 248 start_time = time.perf_counter() |
220 job_count = 0 | 249 job_count = 0 |
250 stats = self.app.env.stats | |
221 realm_name = REALM_NAMES[realm].lower() | 251 realm_name = REALM_NAMES[realm].lower() |
222 stats = self.app.env.stats | |
223 | 252 |
224 for ppinfo in pplist: | 253 for ppinfo in pplist: |
225 src = ppinfo.source | 254 src = ppinfo.source |
226 pp = ppinfo.pipeline | 255 pp = ppinfo.pipeline |
256 jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, | |
257 record_histories) | |
227 | 258 |
228 next_step_jobs[src.name] = [] | 259 next_step_jobs[src.name] = [] |
229 jcctx = PipelineJobCreateContext(pp_pass_num, record_histories) | |
230 jobs = pp.createJobs(jcctx) | 260 jobs = pp.createJobs(jcctx) |
231 if jobs is not None: | 261 if jobs is not None: |
232 new_job_count = len(jobs) | 262 new_job_count = len(jobs) |
233 job_count += new_job_count | 263 job_count += new_job_count |
234 pool.queueJobs(jobs) | 264 pool.queueJobs(jobs) |
238 logger.debug( | 268 logger.debug( |
239 "Queued %d jobs for source '%s' using pipeline '%s' " | 269 "Queued %d jobs for source '%s' using pipeline '%s' " |
240 "(%s, step 0)." % | 270 "(%s, step 0)." % |
241 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) | 271 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) |
242 | 272 |
243 stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) | 273 stats.stepTimer('MasterTaskPut_1', time.perf_counter() - start_time) |
244 | 274 |
245 if job_count == 0: | 275 if job_count == 0: |
246 logger.debug("No jobs queued! Bailing out of this bake pass.") | 276 logger.debug("No jobs queued! Bailing out of this bake pass.") |
247 return | 277 return |
248 | 278 |
268 for sn, jobs in next_step_jobs.items(): | 298 for sn, jobs in next_step_jobs.items(): |
269 if jobs: | 299 if jobs: |
270 logger.debug( | 300 logger.debug( |
271 "Queuing jobs for source '%s' (%s, step %d)." % | 301 "Queuing jobs for source '%s' (%s, step %d)." % |
272 (sn, realm_name, pool.userdata.cur_step)) | 302 (sn, realm_name, pool.userdata.cur_step)) |
303 | |
304 pp = ppmngr.getPipeline(sn) | |
305 valctx = PipelineJobValidateContext( | |
306 pp_pass_num, pool.userdata.cur_step, | |
307 pp.record_name, record_histories) | |
308 pp.validateNextStepJobs(jobs, valctx) | |
309 | |
273 job_count += len(jobs) | 310 job_count += len(jobs) |
274 pool.userdata.next_step_jobs[sn] = [] | 311 pool.userdata.next_step_jobs[sn] = [] |
275 pool.queueJobs(jobs) | 312 pool.queueJobs(jobs) |
276 | 313 |
277 stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) | 314 stats.stepTimer('MasterTaskPut_2+', time.perf_counter() - start_time) |
278 | 315 |
279 if job_count == 0: | 316 if job_count == 0: |
280 break | 317 break |
281 | 318 |
282 pool.wait() | 319 pool.wait() |
290 | 327 |
291 def _logErrors(self, item_spec, errors): | 328 def _logErrors(self, item_spec, errors): |
292 logger.error("Errors found in %s:" % item_spec) | 329 logger.error("Errors found in %s:" % item_spec) |
293 for e in errors: | 330 for e in errors: |
294 logger.error(" " + e) | 331 logger.error(" " + e) |
332 | |
333 def _logWorkerException(self, item_spec, exc_data): | |
334 logger.error("Errors found in %s:" % item_spec) | |
335 logger.error(exc_data['value']) | |
336 if self.app.debug: | |
337 logger.error(exc_data['traceback']) | |
295 | 338 |
296 def _createWorkerPool(self, previous_records_path, pool_userdata): | 339 def _createWorkerPool(self, previous_records_path, pool_userdata): |
297 from piecrust.workerpool import WorkerPool | 340 from piecrust.workerpool import WorkerPool |
298 from piecrust.baking.worker import BakeWorkerContext, BakeWorker | 341 from piecrust.baking.worker import BakeWorkerContext, BakeWorker |
299 | 342 |
317 userdata=pool_userdata) | 360 userdata=pool_userdata) |
318 return pool | 361 return pool |
319 | 362 |
320 def _handleWorkerResult(self, job, res, userdata): | 363 def _handleWorkerResult(self, job, res, userdata): |
321 cur_step = userdata.cur_step | 364 cur_step = userdata.cur_step |
322 record = userdata.records.getRecord(job.record_name) | 365 source_name, item_spec = job['job_spec'] |
323 | 366 |
324 if cur_step == 0: | 367 # See if there's a next step to take. |
325 record.addEntry(res.record_entry) | 368 npj = res.get('next_step_job') |
326 else: | |
327 ppinfo = userdata.ppmngr.getPipeline(job.source_name) | |
328 ppmrctx = PipelineMergeRecordContext(record, job, cur_step) | |
329 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) | |
330 | |
331 npj = res.next_step_job | |
332 if npj is not None: | 369 if npj is not None: |
333 npj.step_num = cur_step + 1 | 370 npj['step_num'] = cur_step + 1 |
334 userdata.next_step_jobs[job.source_name].append(npj) | 371 userdata.next_step_jobs[source_name].append(npj) |
335 | 372 |
336 if not res.record_entry.success: | 373 # Make the pipeline do custom handling to update the record entry. |
374 ppinfo = userdata.ppmngr.getPipelineInfo(source_name) | |
375 pipeline = ppinfo.pipeline | |
376 record = ppinfo.current_record | |
377 ppmrctx = PipelineJobResultHandleContext(record, job, cur_step) | |
378 pipeline.handleJobResult(res, ppmrctx) | |
379 | |
380 # Set the overall success flags if there was an error. | |
381 record_entry = ppmrctx.record_entry | |
382 if not record_entry.success: | |
337 record.success = False | 383 record.success = False |
338 userdata.records.success = False | 384 userdata.records.success = False |
339 self._logErrors(job.content_item.spec, res.record_entry.errors) | 385 self._logErrors(job['item_spec'], record_entry.errors) |
340 | 386 |
341 def _handleWorkerError(self, job, exc_data, userdata): | 387 def _handleWorkerError(self, job, exc_data, userdata): |
342 cur_step = userdata.cur_step | 388 # Set the overall success flag. |
343 record = userdata.records.getRecord(job.record_name) | 389 source_name, item_spec = job['job_spec'] |
344 | 390 ppinfo = userdata.ppmngr.getPipelineInfo(source_name) |
345 record_entry_spec = job.content_item.metadata.get( | 391 pipeline = ppinfo.pipeline |
346 'record_entry_spec', job.content_item.spec) | 392 record = ppinfo.current_record |
347 | |
348 if cur_step == 0: | |
349 ppinfo = userdata.ppmngr.getPipeline(job.source_name) | |
350 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry | |
351 e = entry_class() | |
352 e.item_spec = record_entry_spec | |
353 e.errors.append(str(exc_data)) | |
354 record.addEntry(e) | |
355 else: | |
356 e = record.getEntry(record_entry_spec) | |
357 e.errors.append(str(exc_data)) | |
358 | |
359 record.success = False | 393 record.success = False |
360 userdata.records.success = False | 394 userdata.records.success = False |
361 | 395 |
362 self._logErrors(job.content_item.spec, e.errors) | 396 # Add those errors to the record, if possible. |
397 record_entry_spec = job.get('record_entry_spec', item_spec) | |
398 e = record.getEntry(record_entry_spec) | |
399 if e: | |
400 e.errors.append(exc_data['value']) | |
401 self._logWorkerException(item_spec, exc_data) | |
402 | |
403 # Log debug stuff. | |
363 if self.app.debug: | 404 if self.app.debug: |
364 logger.error(exc_data.traceback) | 405 logger.error(exc_data.traceback) |
365 | 406 |
366 | 407 |
367 class _PoolUserData: | 408 class _PoolUserData: |