comparison piecrust/baking/baker.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents f070a4fc033c
children 448710d84121
comparison
equal deleted inserted replaced
853:f070a4fc033c 854:08e02c2a2a1a
1 import time 1 import time
2 import os.path 2 import os.path
3 import hashlib 3 import hashlib
4 import logging 4 import logging
5 from piecrust.baking.worker import BakeJob
6 from piecrust.chefutil import ( 5 from piecrust.chefutil import (
7 format_timed_scope, format_timed) 6 format_timed_scope, format_timed)
8 from piecrust.environment import ExecutionStats 7 from piecrust.environment import ExecutionStats
9 from piecrust.pipelines.base import PipelineContext 8 from piecrust.pipelines.base import (
9 PipelineMergeRecordContext, PipelineManager,
10 get_pipeline_name_for_source)
10 from piecrust.pipelines.records import ( 11 from piecrust.pipelines.records import (
11 MultiRecordHistory, MultiRecord, RecordEntry, 12 MultiRecordHistory, MultiRecord, RecordEntry,
12 load_records) 13 load_records)
13 from piecrust.sources.base import REALM_USER, REALM_THEME 14 from piecrust.sources.base import REALM_USER, REALM_THEME
14 15
29 self.appfactory = appfactory 30 self.appfactory = appfactory
30 self.app = app 31 self.app = app
31 self.out_dir = out_dir 32 self.out_dir = out_dir
32 self.force = force 33 self.force = force
33 34
34 self._pipeline_classes = {}
35 for pclass in app.plugin_loader.getPipelines():
36 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
37
38 self.allowed_pipelines = allowed_pipelines 35 self.allowed_pipelines = allowed_pipelines
39 if allowed_pipelines is None: 36 if allowed_pipelines is None:
40 self.allowed_pipelines = list(self._pipeline_classes.keys()) 37 self.allowed_pipelines = list(self._pipeline_classes.keys())
41
42 self._records = None
43 38
44 def bake(self): 39 def bake(self):
45 start_time = time.perf_counter() 40 start_time = time.perf_counter()
46 logger.debug(" Bake Output: %s" % self.out_dir) 41 logger.debug(" Bake Output: %s" % self.out_dir)
47 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) 42 logger.debug(" Root URL: %s" % self.app.config.get('site/root'))
61 with format_timed_scope(logger, "loaded previous bake records", 56 with format_timed_scope(logger, "loaded previous bake records",
62 level=logging.DEBUG, colored=False): 57 level=logging.DEBUG, colored=False):
63 previous_records = load_records(records_path) 58 previous_records = load_records(records_path)
64 else: 59 else:
65 previous_records = MultiRecord() 60 previous_records = MultiRecord()
66 self._records = MultiRecord() 61 current_records = MultiRecord()
67 62
68 # Figure out if we need to clean the cache because important things 63 # Figure out if we need to clean the cache because important things
69 # have changed. 64 # have changed.
70 is_cache_valid = self._handleCacheValidity(previous_records, 65 is_cache_valid = self._handleCacheValidity(previous_records,
71 self._records) 66 current_records)
72 if not is_cache_valid: 67 if not is_cache_valid:
73 previous_records = MultiRecord() 68 previous_records = MultiRecord()
74 69
75 # Create the bake records history which tracks what's up-to-date 70 # Create the bake records history which tracks what's up-to-date
76 # or not since last time we baked to the given output folder. 71 # or not since last time we baked to the given output folder.
77 record_histories = MultiRecordHistory(previous_records, self._records) 72 record_histories = MultiRecordHistory(
73 previous_records, current_records)
78 74
79 # Pre-create all caches. 75 # Pre-create all caches.
80 for cache_name in ['app', 'baker', 'pages', 'renders']: 76 for cache_name in ['app', 'baker', 'pages', 'renders']:
81 self.app.cache.getCache(cache_name) 77 self.app.cache.getCache(cache_name)
82 78
84 # separately so we can handle "overriding" (i.e. one realm overrides 80 # separately so we can handle "overriding" (i.e. one realm overrides
85 # another realm's pages, like the user realm overriding the theme 81 # another realm's pages, like the user realm overriding the theme
86 # realm). 82 # realm).
87 # 83 #
88 # Also, create and initialize each pipeline for each source. 84 # Also, create and initialize each pipeline for each source.
89 sources_by_realm = {} 85 has_any_pp = False
86 ppmngr = PipelineManager(
87 self.app, self.out_dir, record_histories)
90 for source in self.app.sources: 88 for source in self.app.sources:
91 pname = source.config['pipeline'] 89 pname = get_pipeline_name_for_source(source)
92 if pname in self.allowed_pipelines: 90 if pname in self.allowed_pipelines:
93 srclist = sources_by_realm.setdefault( 91 ppinfo = ppmngr.createPipeline(source)
94 source.config['realm'], []) 92 logger.debug(
95 93 "Created pipeline '%s' for source: %s" %
96 pp = self._pipeline_classes[pname](source) 94 (ppinfo.pipeline.PIPELINE_NAME, source.name))
97 95 has_any_pp = True
98 record_name = _get_record_name(source.name, pname)
99 record_history = record_histories.getHistory(record_name)
100 ppctx = PipelineContext(self.out_dir, record_history,
101 force=self.force)
102 pp.initialize(ppctx)
103
104 srclist.append((source, pp, ppctx))
105 else: 96 else:
106 logger.debug( 97 logger.debug(
107 "Skip source '%s' because pipeline '%s' is ignored." % 98 "Skip source '%s' because pipeline '%s' is ignored." %
108 (source.name, pname)) 99 (source.name, pname))
100 if not has_any_pp:
101 raise Exception("The website has no content sources, or the bake "
102 "command was invoked with all pipelines filtered "
103 "out. There's nothing to do.")
109 104
110 # Create the worker processes. 105 # Create the worker processes.
111 pool = self._createWorkerPool(records_path) 106 pool_userdata = _PoolUserData(self, ppmngr, current_records)
107 pool = self._createWorkerPool(records_path, pool_userdata)
108 realm_list = [REALM_USER, REALM_THEME]
112 109
113 # Bake the realms -- user first, theme second, so that a user item 110 # Bake the realms -- user first, theme second, so that a user item
114 # can override a theme item. 111 # can override a theme item.
115 realm_list = [REALM_USER, REALM_THEME] 112 # Do this for as many times as we have pipeline passes left to do.
116 for realm in realm_list: 113 pp_by_pass_and_realm = {}
117 srclist = sources_by_realm.get(realm) 114 for ppinfo in ppmngr.getPipelines():
118 if srclist is not None: 115 pp_by_realm = pp_by_pass_and_realm.setdefault(
119 self._bakeRealm(pool, srclist) 116 ppinfo.pipeline.PASS_NUM, {})
120 117 pplist = pp_by_realm.setdefault(
121 # Handle deletions. 118 ppinfo.pipeline.source.config['realm'], [])
122 for realm in realm_list: 119 pplist.append(ppinfo)
123 srclist = sources_by_realm.get(realm) 120
124 if srclist is not None: 121 for pp_pass in sorted(pp_by_pass_and_realm.keys()):
125 self._deleteStaleOutputs(pool, srclist) 122 logger.debug("Pipelines pass %d" % pp_pass)
126 123 pp_by_realm = pp_by_pass_and_realm[pp_pass]
127 # Collapse records. 124 for realm in realm_list:
128 for realm in realm_list: 125 pplist = pp_by_realm.get(realm)
129 srclist = sources_by_realm.get(realm) 126 if pplist is not None:
130 if srclist is not None: 127 self._bakeRealm(pool, pplist)
131 self._collapseRecords(srclist) 128
129 # Handle deletions, collapse records, etc.
130 ppmngr.buildHistoryDiffs()
131 ppmngr.deleteStaleOutputs()
132 ppmngr.collapseRecords()
132 133
133 # All done with the workers. Close the pool and get reports. 134 # All done with the workers. Close the pool and get reports.
134 pool_stats = pool.close() 135 pool_stats = pool.close()
135 total_stats = ExecutionStats() 136 total_stats = ExecutionStats()
136 for ps in pool_stats: 137 for ps in pool_stats:
137 if ps is not None: 138 if ps is not None:
138 total_stats.mergeStats(ps) 139 total_stats.mergeStats(ps)
139 record_histories.current.stats = total_stats 140 current_records.stats = total_stats
140 141
141 # Shutdown the pipelines. 142 # Shutdown the pipelines.
142 for realm in realm_list: 143 ppmngr.shutdownPipelines()
143 srclist = sources_by_realm.get(realm)
144 if srclist is not None:
145 for _, pp, ppctx in srclist:
146 pp.shutdown(ppctx)
147 144
148 # Backup previous records. 145 # Backup previous records.
149 records_dir, records_fn = os.path.split(records_path) 146 records_dir, records_fn = os.path.split(records_path)
150 records_id, _ = os.path.splitext(records_fn) 147 records_id, _ = os.path.splitext(records_fn)
151 for i in range(8, -1, -1): 148 for i in range(8, -1, -1):
162 os.rename(records_path_i, records_path_next) 159 os.rename(records_path_i, records_path_next)
163 160
164 # Save the bake records. 161 # Save the bake records.
165 with format_timed_scope(logger, "saved bake records.", 162 with format_timed_scope(logger, "saved bake records.",
166 level=logging.DEBUG, colored=False): 163 level=logging.DEBUG, colored=False):
167 record_histories.current.bake_time = time.time() 164 current_records.bake_time = time.time()
168 record_histories.current.out_dir = self.out_dir 165 current_records.out_dir = self.out_dir
169 record_histories.current.save(records_path) 166 current_records.save(records_path)
170 167
171 # All done. 168 # All done.
172 self.app.config.set('baker/is_baking', False) 169 self.app.config.set('baker/is_baking', False)
173 logger.debug(format_timed(start_time, 'done baking')) 170 logger.debug(format_timed(start_time, 'done baking'))
174 171
175 self._records = None 172 return current_records
176 return record_histories.current
177 173
178 def _handleCacheValidity(self, previous_records, current_records): 174 def _handleCacheValidity(self, previous_records, current_records):
179 start_time = time.perf_counter() 175 start_time = time.perf_counter()
180 176
181 reason = None 177 reason = None
214 current_records.incremental_count += 1 210 current_records.incremental_count += 1
215 logger.debug(format_timed( 211 logger.debug(format_timed(
216 start_time, "cache is assumed valid", colored=False)) 212 start_time, "cache is assumed valid", colored=False))
217 return True 213 return True
218 214
219 def _bakeRealm(self, pool, srclist): 215 def _bakeRealm(self, pool, pplist):
220 for source, pp, ppctx in srclist: 216 # Start with the first pass, where we iterate on the content sources'
221 logger.debug("Queuing jobs for source '%s' using pipeline '%s'." % 217 # items and run jobs on those.
222 (source.name, pp.PIPELINE_NAME)) 218 pool.userdata.cur_pass = 0
223 jobs = [BakeJob(source.name, item.spec, item.metadata) 219 next_pass_jobs = {}
224 for item in source.getAllContents()] 220 pool.userdata.next_pass_jobs = next_pass_jobs
221 for ppinfo in pplist:
222 src = ppinfo.source
223 pp = ppinfo.pipeline
224
225 logger.debug(
226 "Queuing jobs for source '%s' using pipeline '%s' (pass 0)." %
227 (src.name, pp.PIPELINE_NAME))
228
229 next_pass_jobs[src.name] = []
230 jobs = pp.createJobs()
225 pool.queueJobs(jobs) 231 pool.queueJobs(jobs)
226 pool.wait() 232 pool.wait()
227 233
228 def _deleteStaleOutputs(self, pool, srclist): 234 # Now let's see if any job created a follow-up job. Let's keep
229 for source, pp, ppctx in srclist: 235 # processing those jobs as long as they create new ones.
230 ppctx.record_history.build() 236 pool.userdata.cur_pass = 1
231 237 while True:
232 to_delete = pp.getDeletions(ppctx) 238 had_any_job = False
233 if to_delete is not None: 239
234 for path, reason in to_delete: 240 # Make a copy of out next pass jobs and reset the list, so
235 logger.debug("Removing '%s': %s" % (path, reason)) 241 # the first jobs to be processed don't mess it up as we're
236 ppctx.current_record.deleted_out_paths.append(path) 242 # still iterating on it.
237 try: 243 next_pass_jobs = pool.userdata.next_pass_jobs
238 os.remove(path) 244 pool.userdata.next_pass_jobs = {}
239 except FileNotFoundError: 245
240 pass 246 for sn, jobs in next_pass_jobs.items():
241 logger.info('[delete] %s' % path) 247 if jobs:
242 248 logger.debug(
243 def _collapseRecords(self, srclist): 249 "Queuing jobs for source '%s' (pass %d)." %
244 for source, pp, ppctx in srclist: 250 (sn, pool.userdata.cur_pass))
245 pp.collapseRecords(ppctx) 251 pool.userdata.next_pass_jobs[sn] = []
252 pool.queueJobs(jobs)
253 had_any_job = True
254
255 if not had_any_job:
256 break
257
258 pool.wait()
259 pool.userdata.cur_pass += 1
246 260
247 def _logErrors(self, item_spec, errors): 261 def _logErrors(self, item_spec, errors):
248 logger.error("Errors found in %s:" % item_spec) 262 logger.error("Errors found in %s:" % item_spec)
249 for e in errors: 263 for e in errors:
250 logger.error(" " + e) 264 logger.error(" " + e)
251 265
252 def _createWorkerPool(self, previous_records_path): 266 def _createWorkerPool(self, previous_records_path, pool_userdata):
253 from piecrust.workerpool import WorkerPool 267 from piecrust.workerpool import WorkerPool
254 from piecrust.baking.worker import BakeWorkerContext, BakeWorker 268 from piecrust.baking.worker import BakeWorkerContext, BakeWorker
255 269
256 worker_count = self.app.config.get('baker/workers') 270 worker_count = self.app.config.get('baker/workers')
257 batch_size = self.app.config.get('baker/batch_size') 271 batch_size = self.app.config.get('baker/batch_size')
266 worker_count=worker_count, 280 worker_count=worker_count,
267 batch_size=batch_size, 281 batch_size=batch_size,
268 worker_class=BakeWorker, 282 worker_class=BakeWorker,
269 initargs=(ctx,), 283 initargs=(ctx,),
270 callback=self._handleWorkerResult, 284 callback=self._handleWorkerResult,
271 error_callback=self._handleWorkerError) 285 error_callback=self._handleWorkerError,
286 userdata=pool_userdata)
272 return pool 287 return pool
273 288
274 def _handleWorkerResult(self, job, res): 289 def _handleWorkerResult(self, job, res, userdata):
275 record_name = _get_record_name(job.source_name, res.pipeline_name) 290 cur_pass = userdata.cur_pass
276 record = self._records.getRecord(record_name) 291 record = userdata.records.getRecord(job.record_name)
277 record.entries.append(res.record_entry) 292
293 if cur_pass == 0:
294 record.addEntry(res.record_entry)
295 else:
296 ppinfo = userdata.ppmngr.getPipeline(job.source_name)
297 ppmrctx = PipelineMergeRecordContext(
298 record, job, cur_pass)
299 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx)
300
301 npj = res.next_pass_job
302 if npj is not None:
303 npj.data['pass'] = cur_pass + 1
304 userdata.next_pass_jobs[job.source_name].append(npj)
305
278 if not res.record_entry.success: 306 if not res.record_entry.success:
279 record.success = False 307 record.success = False
280 self._records.success = False 308 userdata.records.success = False
281 self._logErrors(job.item_spec, res.record_entry.errors) 309 self._logErrors(job.content_item.spec, res.record_entry.errors)
282 310
283 def _handleWorkerError(self, job, exc_data): 311 def _handleWorkerError(self, job, exc_data, userdata):
284 e = RecordEntry() 312 cur_pass = userdata.cur_pass
285 e.item_spec = job.item_spec 313 record = userdata.records.getRecord(job.record_name)
286 e.errors.append(str(exc_data)) 314
287 315 if cur_pass == 0:
288 ppname = self.app.getSource(job.source_name).config['pipeline'] 316 ppinfo = userdata.ppmngr.getPipeline(job.source_name)
289 record_name = _get_record_name(job.source_name, ppname) 317 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry
290 record_name = self._getRecordName(job) 318 e = entry_class()
291 record = self._records.getRecord(record_name) 319 e.item_spec = job.content_item.spec
292 record.entries.append(e) 320 e.errors.append(str(exc_data))
321 record.addEntry(e)
322 else:
323 e = record.getEntry(job.content_item.spec)
324 e.errors.append(str(exc_data))
293 325
294 record.success = False 326 record.success = False
295 self._records.success = False 327 userdata.records.success = False
296 328
297 self._logErrors(job.item_spec, e.errors) 329 self._logErrors(job.content_item.spec, e.errors)
298 if self.app.debug: 330 if self.app.debug:
299 logger.error(exc_data.traceback) 331 logger.error(exc_data.traceback)
300 332
301 333
302 def _get_record_name(source_name, pipeline_name): 334 class _PoolUserData:
303 return '%s@%s' % (source_name, pipeline_name) 335 def __init__(self, baker, ppmngr, current_records):
336 self.baker = baker
337 self.ppmngr = ppmngr
338 self.records = current_records
339 self.cur_pass = 0
340 self.next_pass_jobs = {}