comparison piecrust/baking/baker.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents 45ad976712ec
children 09dc0240f08a
comparison
equal deleted inserted replaced
988:f83ae0a5d793 989:8adc27285d93
4 import logging 4 import logging
5 from piecrust.chefutil import ( 5 from piecrust.chefutil import (
6 format_timed_scope, format_timed) 6 format_timed_scope, format_timed)
7 from piecrust.environment import ExecutionStats 7 from piecrust.environment import ExecutionStats
8 from piecrust.pipelines.base import ( 8 from piecrust.pipelines.base import (
9 PipelineJobCreateContext, PipelineMergeRecordContext, PipelineManager, 9 PipelineJobCreateContext, PipelineJobResultHandleContext,
10 PipelineJobValidateContext, PipelineManager,
10 get_pipeline_name_for_source) 11 get_pipeline_name_for_source)
11 from piecrust.pipelines.records import ( 12 from piecrust.pipelines.records import (
12 MultiRecordHistory, MultiRecord, RecordEntry, 13 MultiRecordHistory, MultiRecord, RecordEntry,
13 load_records) 14 load_records)
14 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES 15 from piecrust.sources.base import REALM_USER, REALM_THEME, REALM_NAMES
40 self.allowed_sources = allowed_sources 41 self.allowed_sources = allowed_sources
41 self.rotate_bake_records = rotate_bake_records 42 self.rotate_bake_records = rotate_bake_records
42 43
43 def bake(self): 44 def bake(self):
44 start_time = time.perf_counter() 45 start_time = time.perf_counter()
46
47 # Setup baker.
45 logger.debug(" Bake Output: %s" % self.out_dir) 48 logger.debug(" Bake Output: %s" % self.out_dir)
46 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) 49 logger.debug(" Root URL: %s" % self.app.config.get('site/root'))
47 50
48 # Get into bake mode. 51 # Get into bake mode.
49 self.app.config.set('baker/is_baking', True) 52 self.app.config.set('baker/is_baking', True)
50 self.app.config.set('site/asset_url_format', '%page_uri%/%filename%') 53 self.app.config.set('site/asset_url_format', '%page_uri%/%filename%')
51 54
52 stats = self.app.env.stats 55 stats = self.app.env.stats
53 stats.registerTimer('WorkerTaskPut') 56 stats.registerTimer('LoadSourceContents', raise_if_registered=False)
57 stats.registerTimer('MasterTaskPut_1', raise_if_registered=False)
58 stats.registerTimer('MasterTaskPut_2+', raise_if_registered=False)
54 59
55 # Make sure the output directory exists. 60 # Make sure the output directory exists.
56 if not os.path.isdir(self.out_dir): 61 if not os.path.isdir(self.out_dir):
57 os.makedirs(self.out_dir, 0o755) 62 os.makedirs(self.out_dir, 0o755)
58 63
87 ppmngr = self._createPipelineManager(record_histories) 92 ppmngr = self._createPipelineManager(record_histories)
88 93
89 # Create the worker processes. 94 # Create the worker processes.
90 pool_userdata = _PoolUserData(self, ppmngr) 95 pool_userdata = _PoolUserData(self, ppmngr)
91 pool = self._createWorkerPool(records_path, pool_userdata) 96 pool = self._createWorkerPool(records_path, pool_userdata)
97
98 # Done with all the setup, let's start the actual work.
99 logger.info(format_timed(start_time, "setup baker"))
100
101 # Load all sources.
102 self._loadSources(ppmngr)
92 103
93 # Bake the realms. 104 # Bake the realms.
94 self._bakeRealms(pool, ppmngr, record_histories) 105 self._bakeRealms(pool, ppmngr, record_histories)
95 106
96 # Handle deletions, collapse records, etc. 107 # Handle deletions, collapse records, etc.
147 # We have to bake everything from scratch. 158 # We have to bake everything from scratch.
148 self.app.cache.clearCaches(except_names=['app', 'baker']) 159 self.app.cache.clearCaches(except_names=['app', 'baker'])
149 self.force = True 160 self.force = True
150 current_records.incremental_count = 0 161 current_records.incremental_count = 0
151 previous_records = MultiRecord() 162 previous_records = MultiRecord()
152 logger.info(format_timed( 163 logger.debug(format_timed(
153 start_time, "cleaned cache (reason: %s)" % reason)) 164 start_time, "cleaned cache (reason: %s)" % reason,
165 colored=False))
154 return False 166 return False
155 else: 167 else:
156 current_records.incremental_count += 1 168 current_records.incremental_count += 1
157 logger.debug(format_timed( 169 logger.debug(format_timed(
158 start_time, "cache is assumed valid", colored=False)) 170 start_time, "cache is assumed valid", colored=False))
165 # realm). 177 # realm).
166 # 178 #
167 # Also, create and initialize each pipeline for each source. 179 # Also, create and initialize each pipeline for each source.
168 has_any_pp = False 180 has_any_pp = False
169 ppmngr = PipelineManager( 181 ppmngr = PipelineManager(
170 self.app, self.out_dir, record_histories) 182 self.app, self.out_dir,
183 record_histories=record_histories)
171 ok_pp = self.allowed_pipelines 184 ok_pp = self.allowed_pipelines
172 nok_pp = self.forbidden_pipelines 185 nok_pp = self.forbidden_pipelines
173 ok_src = self.allowed_sources 186 ok_src = self.allowed_sources
174 for source in self.app.sources: 187 for source in self.app.sources:
175 if ok_src is not None and source.name not in ok_src: 188 if ok_src is not None and source.name not in ok_src:
190 raise Exception("The website has no content sources, or the bake " 203 raise Exception("The website has no content sources, or the bake "
191 "command was invoked with all pipelines filtered " 204 "command was invoked with all pipelines filtered "
192 "out. There's nothing to do.") 205 "out. There's nothing to do.")
193 return ppmngr 206 return ppmngr
194 207
208 def _loadSources(self, ppmngr):
209 start_time = time.perf_counter()
210
211 for ppinfo in ppmngr.getPipelineInfos():
212 rec = ppinfo.record_history.current
213 rec_entries = ppinfo.pipeline.loadAllContents()
214 if rec_entries is not None:
215 for e in rec_entries:
216 rec.addEntry(e)
217
218 stats = self.app.env.stats
219 stats.stepTimer('LoadSourceContents',
220 time.perf_counter() - start_time)
221 logger.info(format_timed(start_time, "loaded site content"))
222
195 def _bakeRealms(self, pool, ppmngr, record_histories): 223 def _bakeRealms(self, pool, ppmngr, record_histories):
196 # Bake the realms -- user first, theme second, so that a user item 224 # Bake the realms -- user first, theme second, so that a user item
197 # can override a theme item. 225 # can override a theme item.
198 # Do this for as many times as we have pipeline passes left to do. 226 # Do this for as many times as we have pipeline passes left to do.
199 realm_list = [REALM_USER, REALM_THEME] 227 realm_list = [REALM_USER, REALM_THEME]
200 pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm( 228 pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm(
201 ppmngr.getPipelines()) 229 ppmngr.getPipelineInfos())
202 230
203 for pp_pass_num in sorted(pp_by_pass_and_realm.keys()): 231 for pp_pass_num in sorted(pp_by_pass_and_realm.keys()):
204 logger.debug("Pipelines pass %d" % pp_pass_num) 232 logger.debug("Pipelines pass %d" % pp_pass_num)
205 pp_by_realm = pp_by_pass_and_realm[pp_pass_num] 233 pp_by_realm = pp_by_pass_and_realm[pp_pass_num]
206 for realm in realm_list: 234 for realm in realm_list:
207 pplist = pp_by_realm.get(realm) 235 pplist = pp_by_realm.get(realm)
208 if pplist is not None: 236 if pplist is not None:
209 self._bakeRealm( 237 self._bakeRealm(pool, ppmngr, record_histories,
210 pool, record_histories, pp_pass_num, realm, pplist) 238 pp_pass_num, realm, pplist)
211 239
212 def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): 240 def _bakeRealm(self, pool, ppmngr, record_histories,
241 pp_pass_num, realm, pplist):
213 # Start with the first step, where we iterate on the content sources' 242 # Start with the first step, where we iterate on the content sources'
214 # items and run jobs on those. 243 # items and run jobs on those.
215 pool.userdata.cur_step = 0 244 pool.userdata.cur_step = 0
216 next_step_jobs = {} 245 next_step_jobs = {}
217 pool.userdata.next_step_jobs = next_step_jobs 246 pool.userdata.next_step_jobs = next_step_jobs
218 247
219 start_time = time.perf_counter() 248 start_time = time.perf_counter()
220 job_count = 0 249 job_count = 0
250 stats = self.app.env.stats
221 realm_name = REALM_NAMES[realm].lower() 251 realm_name = REALM_NAMES[realm].lower()
222 stats = self.app.env.stats
223 252
224 for ppinfo in pplist: 253 for ppinfo in pplist:
225 src = ppinfo.source 254 src = ppinfo.source
226 pp = ppinfo.pipeline 255 pp = ppinfo.pipeline
256 jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name,
257 record_histories)
227 258
228 next_step_jobs[src.name] = [] 259 next_step_jobs[src.name] = []
229 jcctx = PipelineJobCreateContext(pp_pass_num, record_histories)
230 jobs = pp.createJobs(jcctx) 260 jobs = pp.createJobs(jcctx)
231 if jobs is not None: 261 if jobs is not None:
232 new_job_count = len(jobs) 262 new_job_count = len(jobs)
233 job_count += new_job_count 263 job_count += new_job_count
234 pool.queueJobs(jobs) 264 pool.queueJobs(jobs)
238 logger.debug( 268 logger.debug(
239 "Queued %d jobs for source '%s' using pipeline '%s' " 269 "Queued %d jobs for source '%s' using pipeline '%s' "
240 "(%s, step 0)." % 270 "(%s, step 0)." %
241 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) 271 (new_job_count, src.name, pp.PIPELINE_NAME, realm_name))
242 272
243 stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) 273 stats.stepTimer('MasterTaskPut_1', time.perf_counter() - start_time)
244 274
245 if job_count == 0: 275 if job_count == 0:
246 logger.debug("No jobs queued! Bailing out of this bake pass.") 276 logger.debug("No jobs queued! Bailing out of this bake pass.")
247 return 277 return
248 278
268 for sn, jobs in next_step_jobs.items(): 298 for sn, jobs in next_step_jobs.items():
269 if jobs: 299 if jobs:
270 logger.debug( 300 logger.debug(
271 "Queuing jobs for source '%s' (%s, step %d)." % 301 "Queuing jobs for source '%s' (%s, step %d)." %
272 (sn, realm_name, pool.userdata.cur_step)) 302 (sn, realm_name, pool.userdata.cur_step))
303
304 pp = ppmngr.getPipeline(sn)
305 valctx = PipelineJobValidateContext(
306 pp_pass_num, pool.userdata.cur_step,
307 pp.record_name, record_histories)
308 pp.validateNextStepJobs(jobs, valctx)
309
273 job_count += len(jobs) 310 job_count += len(jobs)
274 pool.userdata.next_step_jobs[sn] = [] 311 pool.userdata.next_step_jobs[sn] = []
275 pool.queueJobs(jobs) 312 pool.queueJobs(jobs)
276 313
277 stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) 314 stats.stepTimer('MasterTaskPut_2+', time.perf_counter() - start_time)
278 315
279 if job_count == 0: 316 if job_count == 0:
280 break 317 break
281 318
282 pool.wait() 319 pool.wait()
290 327
291 def _logErrors(self, item_spec, errors): 328 def _logErrors(self, item_spec, errors):
292 logger.error("Errors found in %s:" % item_spec) 329 logger.error("Errors found in %s:" % item_spec)
293 for e in errors: 330 for e in errors:
294 logger.error(" " + e) 331 logger.error(" " + e)
332
333 def _logWorkerException(self, item_spec, exc_data):
334 logger.error("Errors found in %s:" % item_spec)
335 logger.error(exc_data['value'])
336 if self.app.debug:
337 logger.error(exc_data['traceback'])
295 338
296 def _createWorkerPool(self, previous_records_path, pool_userdata): 339 def _createWorkerPool(self, previous_records_path, pool_userdata):
297 from piecrust.workerpool import WorkerPool 340 from piecrust.workerpool import WorkerPool
298 from piecrust.baking.worker import BakeWorkerContext, BakeWorker 341 from piecrust.baking.worker import BakeWorkerContext, BakeWorker
299 342
317 userdata=pool_userdata) 360 userdata=pool_userdata)
318 return pool 361 return pool
319 362
320 def _handleWorkerResult(self, job, res, userdata): 363 def _handleWorkerResult(self, job, res, userdata):
321 cur_step = userdata.cur_step 364 cur_step = userdata.cur_step
322 record = userdata.records.getRecord(job.record_name) 365 source_name, item_spec = job['job_spec']
323 366
324 if cur_step == 0: 367 # See if there's a next step to take.
325 record.addEntry(res.record_entry) 368 npj = res.get('next_step_job')
326 else:
327 ppinfo = userdata.ppmngr.getPipeline(job.source_name)
328 ppmrctx = PipelineMergeRecordContext(record, job, cur_step)
329 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx)
330
331 npj = res.next_step_job
332 if npj is not None: 369 if npj is not None:
333 npj.step_num = cur_step + 1 370 npj['step_num'] = cur_step + 1
334 userdata.next_step_jobs[job.source_name].append(npj) 371 userdata.next_step_jobs[source_name].append(npj)
335 372
336 if not res.record_entry.success: 373 # Make the pipeline do custom handling to update the record entry.
374 ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
375 pipeline = ppinfo.pipeline
376 record = ppinfo.current_record
377 ppmrctx = PipelineJobResultHandleContext(record, job, cur_step)
378 pipeline.handleJobResult(res, ppmrctx)
379
380 # Set the overall success flags if there was an error.
381 record_entry = ppmrctx.record_entry
382 if not record_entry.success:
337 record.success = False 383 record.success = False
338 userdata.records.success = False 384 userdata.records.success = False
339 self._logErrors(job.content_item.spec, res.record_entry.errors) 385 self._logErrors(job['item_spec'], record_entry.errors)
340 386
341 def _handleWorkerError(self, job, exc_data, userdata): 387 def _handleWorkerError(self, job, exc_data, userdata):
342 cur_step = userdata.cur_step 388 # Set the overall success flag.
343 record = userdata.records.getRecord(job.record_name) 389 source_name, item_spec = job['job_spec']
344 390 ppinfo = userdata.ppmngr.getPipelineInfo(source_name)
345 record_entry_spec = job.content_item.metadata.get( 391 pipeline = ppinfo.pipeline
346 'record_entry_spec', job.content_item.spec) 392 record = ppinfo.current_record
347
348 if cur_step == 0:
349 ppinfo = userdata.ppmngr.getPipeline(job.source_name)
350 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry
351 e = entry_class()
352 e.item_spec = record_entry_spec
353 e.errors.append(str(exc_data))
354 record.addEntry(e)
355 else:
356 e = record.getEntry(record_entry_spec)
357 e.errors.append(str(exc_data))
358
359 record.success = False 393 record.success = False
360 userdata.records.success = False 394 userdata.records.success = False
361 395
362 self._logErrors(job.content_item.spec, e.errors) 396 # Add those errors to the record, if possible.
397 record_entry_spec = job.get('record_entry_spec', item_spec)
398 e = record.getEntry(record_entry_spec)
399 if e:
400 e.errors.append(exc_data['value'])
401 self._logWorkerException(item_spec, exc_data)
402
403 # Log debug stuff.
363 if self.app.debug: 404 if self.app.debug:
364 logger.error(exc_data.traceback) 405 logger.error(exc_data.traceback)
365 406
366 407
367 class _PoolUserData: 408 class _PoolUserData: