comparison piecrust/baking/baker.py @ 1015:fa489c5e829e

bake: Load pages in parallel again.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 29 Nov 2017 20:37:57 -0800
parents 071f30aa04bb
children 3c6e6e7b9639
comparison
equal deleted inserted replaced
1014:071f30aa04bb 1015:fa489c5e829e
88 self.app.cache.getCache(cache_name) 88 self.app.cache.getCache(cache_name)
89 89
90 # Create the pipelines. 90 # Create the pipelines.
91 ppmngr = self._createPipelineManager(record_histories) 91 ppmngr = self._createPipelineManager(record_histories)
92 92
93 # Done with all the setup, let's start the actual work.
94 logger.info(format_timed(start_time, "setup baker"))
95
96 # Load all sources, pre-cache templates.
97 load_start_time = time.perf_counter()
98 self._populateTemplateCaches()
99 logger.info(format_timed(load_start_time, "cache templates"))
100
93 # Create the worker processes. 101 # Create the worker processes.
94 pool_userdata = _PoolUserData(self, ppmngr) 102 pool_userdata = _PoolUserData(self, ppmngr)
95 pool = self._createWorkerPool(records_path, pool_userdata) 103 pool = self._createWorkerPool(records_path, pool_userdata)
96
97 # Done with all the setup, let's start the actual work.
98 logger.info(format_timed(start_time, "setup baker"))
99
100 # Load all sources, pre-cache templates.
101 load_start_time = time.perf_counter()
102 self._startPopulateTemplateCaches(pool)
103 self._loadSources(ppmngr)
104 self._endPopulateTemplateCache(pool)
105 logger.info(format_timed(load_start_time, "loaded site content"))
106 104
107 # Bake the realms. 105 # Bake the realms.
108 self._bakeRealms(pool, ppmngr, record_histories) 106 self._bakeRealms(pool, ppmngr, record_histories)
109 107
110 # Handle deletions, collapse records, etc. 108 # Handle deletions, collapse records, etc.
206 raise Exception("The website has no content sources, or the bake " 204 raise Exception("The website has no content sources, or the bake "
207 "command was invoked with all pipelines filtered " 205 "command was invoked with all pipelines filtered "
208 "out. There's nothing to do.") 206 "out. There's nothing to do.")
209 return ppmngr 207 return ppmngr
210 208
211 def _loadSources(self, ppmngr): 209 def _populateTemplateCaches(self):
212 for ppinfo in ppmngr.getPipelineInfos(): 210 for eng in self.app.plugin_loader.getTemplateEngines():
213 rec = ppinfo.record_history.current 211 eng.populateCache()
214 rec_entries = ppinfo.pipeline.loadAllContents()
215 if rec_entries is not None:
216 for e in rec_entries:
217 rec.addEntry(e)
218
219 def _startPopulateTemplateCaches(self, pool):
220 # If we can, cache templates in a worker process, so we can load
221 # the sources' pages in the main process in the meantime.
222 # But if we don't have any workers, well, we'll have to make do
223 # in the `_endPopulateTemplateCache` method.
224 if pool.pool_size == 0:
225 return
226
227 pool._callback = None
228 pool._error_callback = None
229 job = {'job_spec': ('__special__', 'populate_template_cache')}
230 pool.queueJobs([job])
231
232 def _endPopulateTemplateCache(self, pool):
233 if pool.pool_size == 0:
234 # No workers... load the templates synchronously.
235 for eng in self.app.plugin_loader.getTemplateEngines():
236 eng.populateCache()
237 else:
238 # Wait for the job to finish.
239 pool.wait()
240 pool._callback = self._handleWorkerResult
241 pool._error_callback = self._handleWorkerError
242 212
243 def _bakeRealms(self, pool, ppmngr, record_histories): 213 def _bakeRealms(self, pool, ppmngr, record_histories):
244 # Bake the realms -- user first, theme second, so that a user item 214 # Bake the realms -- user first, theme second, so that a user item
245 # can override a theme item. 215 # can override a theme item.
246 # Do this for as many times as we have pipeline passes left to do. 216 # Do this for as many times as we have pipeline passes left to do.
259 229
260 def _bakeRealm(self, pool, ppmngr, record_histories, 230 def _bakeRealm(self, pool, ppmngr, record_histories,
261 pp_pass_num, realm, pplist): 231 pp_pass_num, realm, pplist):
262 # Start with the first step, where we iterate on the content sources' 232 # Start with the first step, where we iterate on the content sources'
263 # items and run jobs on those. 233 # items and run jobs on those.
234 pool.userdata.cur_pass = pp_pass_num
264 pool.userdata.cur_step = 0 235 pool.userdata.cur_step = 0
265 next_step_jobs = {} 236 next_step_jobs = {}
266 pool.userdata.next_step_jobs = next_step_jobs 237 pool.userdata.next_step_jobs = next_step_jobs
267 238
268 start_time = time.perf_counter() 239 start_time = time.perf_counter()
379 error_callback=self._handleWorkerError, 350 error_callback=self._handleWorkerError,
380 userdata=pool_userdata) 351 userdata=pool_userdata)
381 return pool 352 return pool
382 353
383 def _handleWorkerResult(self, job, res, userdata): 354 def _handleWorkerResult(self, job, res, userdata):
355 cur_pass = userdata.cur_pass
384 cur_step = userdata.cur_step 356 cur_step = userdata.cur_step
385 source_name, item_spec = job['job_spec'] 357 source_name, item_spec = job['job_spec']
386 358
387 # See if there's a next step to take. 359 # See if there's a next step to take.
388 npj = res.get('next_step_job') 360 npj = res.get('next_step_job')
392 364
393 # Make the pipeline do custom handling to update the record entry. 365 # Make the pipeline do custom handling to update the record entry.
394 ppinfo = userdata.ppmngr.getPipelineInfo(source_name) 366 ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
395 pipeline = ppinfo.pipeline 367 pipeline = ppinfo.pipeline
396 record = ppinfo.current_record 368 record = ppinfo.current_record
397 ppmrctx = PipelineJobResultHandleContext(record, job, cur_step) 369 ppmrctx = PipelineJobResultHandleContext(record, job, cur_pass,
370 cur_step)
398 pipeline.handleJobResult(res, ppmrctx) 371 pipeline.handleJobResult(res, ppmrctx)
399 372
400 # Set the overall success flags if there was an error. 373 # Set the overall success flags if there was an error.
401 record_entry = ppmrctx.record_entry 374 record_entry = ppmrctx.record_entry
402 if not record_entry.success: 375 if not record_entry.success:
428 class _PoolUserData: 401 class _PoolUserData:
429 def __init__(self, baker, ppmngr): 402 def __init__(self, baker, ppmngr):
430 self.baker = baker 403 self.baker = baker
431 self.ppmngr = ppmngr 404 self.ppmngr = ppmngr
432 self.records = ppmngr.record_histories.current 405 self.records = ppmngr.record_histories.current
406 self.cur_pass = 0
433 self.cur_step = 0 407 self.cur_step = 0
434 self.next_step_jobs = {} 408 self.next_step_jobs = {}
435 409
436 410
437 def _get_pipeline_infos_by_pass_and_realm(pp_infos): 411 def _get_pipeline_infos_by_pass_and_realm(pp_infos):