Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | f070a4fc033c |
children | 448710d84121 |
comparison
equal
deleted
inserted
replaced
853:f070a4fc033c | 854:08e02c2a2a1a |
---|---|
1 import time | 1 import time |
2 import os.path | 2 import os.path |
3 import hashlib | 3 import hashlib |
4 import logging | 4 import logging |
5 from piecrust.baking.worker import BakeJob | |
6 from piecrust.chefutil import ( | 5 from piecrust.chefutil import ( |
7 format_timed_scope, format_timed) | 6 format_timed_scope, format_timed) |
8 from piecrust.environment import ExecutionStats | 7 from piecrust.environment import ExecutionStats |
9 from piecrust.pipelines.base import PipelineContext | 8 from piecrust.pipelines.base import ( |
9 PipelineMergeRecordContext, PipelineManager, | |
10 get_pipeline_name_for_source) | |
10 from piecrust.pipelines.records import ( | 11 from piecrust.pipelines.records import ( |
11 MultiRecordHistory, MultiRecord, RecordEntry, | 12 MultiRecordHistory, MultiRecord, RecordEntry, |
12 load_records) | 13 load_records) |
13 from piecrust.sources.base import REALM_USER, REALM_THEME | 14 from piecrust.sources.base import REALM_USER, REALM_THEME |
14 | 15 |
29 self.appfactory = appfactory | 30 self.appfactory = appfactory |
30 self.app = app | 31 self.app = app |
31 self.out_dir = out_dir | 32 self.out_dir = out_dir |
32 self.force = force | 33 self.force = force |
33 | 34 |
34 self._pipeline_classes = {} | |
35 for pclass in app.plugin_loader.getPipelines(): | |
36 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass | |
37 | |
38 self.allowed_pipelines = allowed_pipelines | 35 self.allowed_pipelines = allowed_pipelines |
39 if allowed_pipelines is None: | 36 if allowed_pipelines is None: |
40 self.allowed_pipelines = list(self._pipeline_classes.keys()) | 37 self.allowed_pipelines = list(self._pipeline_classes.keys()) |
41 | |
42 self._records = None | |
43 | 38 |
44 def bake(self): | 39 def bake(self): |
45 start_time = time.perf_counter() | 40 start_time = time.perf_counter() |
46 logger.debug(" Bake Output: %s" % self.out_dir) | 41 logger.debug(" Bake Output: %s" % self.out_dir) |
47 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) | 42 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) |
61 with format_timed_scope(logger, "loaded previous bake records", | 56 with format_timed_scope(logger, "loaded previous bake records", |
62 level=logging.DEBUG, colored=False): | 57 level=logging.DEBUG, colored=False): |
63 previous_records = load_records(records_path) | 58 previous_records = load_records(records_path) |
64 else: | 59 else: |
65 previous_records = MultiRecord() | 60 previous_records = MultiRecord() |
66 self._records = MultiRecord() | 61 current_records = MultiRecord() |
67 | 62 |
68 # Figure out if we need to clean the cache because important things | 63 # Figure out if we need to clean the cache because important things |
69 # have changed. | 64 # have changed. |
70 is_cache_valid = self._handleCacheValidity(previous_records, | 65 is_cache_valid = self._handleCacheValidity(previous_records, |
71 self._records) | 66 current_records) |
72 if not is_cache_valid: | 67 if not is_cache_valid: |
73 previous_records = MultiRecord() | 68 previous_records = MultiRecord() |
74 | 69 |
75 # Create the bake records history which tracks what's up-to-date | 70 # Create the bake records history which tracks what's up-to-date |
76 # or not since last time we baked to the given output folder. | 71 # or not since last time we baked to the given output folder. |
77 record_histories = MultiRecordHistory(previous_records, self._records) | 72 record_histories = MultiRecordHistory( |
73 previous_records, current_records) | |
78 | 74 |
79 # Pre-create all caches. | 75 # Pre-create all caches. |
80 for cache_name in ['app', 'baker', 'pages', 'renders']: | 76 for cache_name in ['app', 'baker', 'pages', 'renders']: |
81 self.app.cache.getCache(cache_name) | 77 self.app.cache.getCache(cache_name) |
82 | 78 |
84 # separately so we can handle "overriding" (i.e. one realm overrides | 80 # separately so we can handle "overriding" (i.e. one realm overrides |
85 # another realm's pages, like the user realm overriding the theme | 81 # another realm's pages, like the user realm overriding the theme |
86 # realm). | 82 # realm). |
87 # | 83 # |
88 # Also, create and initialize each pipeline for each source. | 84 # Also, create and initialize each pipeline for each source. |
89 sources_by_realm = {} | 85 has_any_pp = False |
86 ppmngr = PipelineManager( | |
87 self.app, self.out_dir, record_histories) | |
90 for source in self.app.sources: | 88 for source in self.app.sources: |
91 pname = source.config['pipeline'] | 89 pname = get_pipeline_name_for_source(source) |
92 if pname in self.allowed_pipelines: | 90 if pname in self.allowed_pipelines: |
93 srclist = sources_by_realm.setdefault( | 91 ppinfo = ppmngr.createPipeline(source) |
94 source.config['realm'], []) | 92 logger.debug( |
95 | 93 "Created pipeline '%s' for source: %s" % |
96 pp = self._pipeline_classes[pname](source) | 94 (ppinfo.pipeline.PIPELINE_NAME, source.name)) |
97 | 95 has_any_pp = True |
98 record_name = _get_record_name(source.name, pname) | |
99 record_history = record_histories.getHistory(record_name) | |
100 ppctx = PipelineContext(self.out_dir, record_history, | |
101 force=self.force) | |
102 pp.initialize(ppctx) | |
103 | |
104 srclist.append((source, pp, ppctx)) | |
105 else: | 96 else: |
106 logger.debug( | 97 logger.debug( |
107 "Skip source '%s' because pipeline '%s' is ignored." % | 98 "Skip source '%s' because pipeline '%s' is ignored." % |
108 (source.name, pname)) | 99 (source.name, pname)) |
100 if not has_any_pp: | |
101 raise Exception("The website has no content sources, or the bake " | |
102 "command was invoked with all pipelines filtered " | |
103 "out. There's nothing to do.") | |
109 | 104 |
110 # Create the worker processes. | 105 # Create the worker processes. |
111 pool = self._createWorkerPool(records_path) | 106 pool_userdata = _PoolUserData(self, ppmngr, current_records) |
107 pool = self._createWorkerPool(records_path, pool_userdata) | |
108 realm_list = [REALM_USER, REALM_THEME] | |
112 | 109 |
113 # Bake the realms -- user first, theme second, so that a user item | 110 # Bake the realms -- user first, theme second, so that a user item |
114 # can override a theme item. | 111 # can override a theme item. |
115 realm_list = [REALM_USER, REALM_THEME] | 112 # Do this for as many times as we have pipeline passes left to do. |
116 for realm in realm_list: | 113 pp_by_pass_and_realm = {} |
117 srclist = sources_by_realm.get(realm) | 114 for ppinfo in ppmngr.getPipelines(): |
118 if srclist is not None: | 115 pp_by_realm = pp_by_pass_and_realm.setdefault( |
119 self._bakeRealm(pool, srclist) | 116 ppinfo.pipeline.PASS_NUM, {}) |
120 | 117 pplist = pp_by_realm.setdefault( |
121 # Handle deletions. | 118 ppinfo.pipeline.source.config['realm'], []) |
122 for realm in realm_list: | 119 pplist.append(ppinfo) |
123 srclist = sources_by_realm.get(realm) | 120 |
124 if srclist is not None: | 121 for pp_pass in sorted(pp_by_pass_and_realm.keys()): |
125 self._deleteStaleOutputs(pool, srclist) | 122 logger.debug("Pipelines pass %d" % pp_pass) |
126 | 123 pp_by_realm = pp_by_pass_and_realm[pp_pass] |
127 # Collapse records. | 124 for realm in realm_list: |
128 for realm in realm_list: | 125 pplist = pp_by_realm.get(realm) |
129 srclist = sources_by_realm.get(realm) | 126 if pplist is not None: |
130 if srclist is not None: | 127 self._bakeRealm(pool, pplist) |
131 self._collapseRecords(srclist) | 128 |
129 # Handle deletions, collapse records, etc. | |
130 ppmngr.buildHistoryDiffs() | |
131 ppmngr.deleteStaleOutputs() | |
132 ppmngr.collapseRecords() | |
132 | 133 |
133 # All done with the workers. Close the pool and get reports. | 134 # All done with the workers. Close the pool and get reports. |
134 pool_stats = pool.close() | 135 pool_stats = pool.close() |
135 total_stats = ExecutionStats() | 136 total_stats = ExecutionStats() |
136 for ps in pool_stats: | 137 for ps in pool_stats: |
137 if ps is not None: | 138 if ps is not None: |
138 total_stats.mergeStats(ps) | 139 total_stats.mergeStats(ps) |
139 record_histories.current.stats = total_stats | 140 current_records.stats = total_stats |
140 | 141 |
141 # Shutdown the pipelines. | 142 # Shutdown the pipelines. |
142 for realm in realm_list: | 143 ppmngr.shutdownPipelines() |
143 srclist = sources_by_realm.get(realm) | |
144 if srclist is not None: | |
145 for _, pp, ppctx in srclist: | |
146 pp.shutdown(ppctx) | |
147 | 144 |
148 # Backup previous records. | 145 # Backup previous records. |
149 records_dir, records_fn = os.path.split(records_path) | 146 records_dir, records_fn = os.path.split(records_path) |
150 records_id, _ = os.path.splitext(records_fn) | 147 records_id, _ = os.path.splitext(records_fn) |
151 for i in range(8, -1, -1): | 148 for i in range(8, -1, -1): |
162 os.rename(records_path_i, records_path_next) | 159 os.rename(records_path_i, records_path_next) |
163 | 160 |
164 # Save the bake records. | 161 # Save the bake records. |
165 with format_timed_scope(logger, "saved bake records.", | 162 with format_timed_scope(logger, "saved bake records.", |
166 level=logging.DEBUG, colored=False): | 163 level=logging.DEBUG, colored=False): |
167 record_histories.current.bake_time = time.time() | 164 current_records.bake_time = time.time() |
168 record_histories.current.out_dir = self.out_dir | 165 current_records.out_dir = self.out_dir |
169 record_histories.current.save(records_path) | 166 current_records.save(records_path) |
170 | 167 |
171 # All done. | 168 # All done. |
172 self.app.config.set('baker/is_baking', False) | 169 self.app.config.set('baker/is_baking', False) |
173 logger.debug(format_timed(start_time, 'done baking')) | 170 logger.debug(format_timed(start_time, 'done baking')) |
174 | 171 |
175 self._records = None | 172 return current_records |
176 return record_histories.current | |
177 | 173 |
178 def _handleCacheValidity(self, previous_records, current_records): | 174 def _handleCacheValidity(self, previous_records, current_records): |
179 start_time = time.perf_counter() | 175 start_time = time.perf_counter() |
180 | 176 |
181 reason = None | 177 reason = None |
214 current_records.incremental_count += 1 | 210 current_records.incremental_count += 1 |
215 logger.debug(format_timed( | 211 logger.debug(format_timed( |
216 start_time, "cache is assumed valid", colored=False)) | 212 start_time, "cache is assumed valid", colored=False)) |
217 return True | 213 return True |
218 | 214 |
219 def _bakeRealm(self, pool, srclist): | 215 def _bakeRealm(self, pool, pplist): |
220 for source, pp, ppctx in srclist: | 216 # Start with the first pass, where we iterate on the content sources' |
221 logger.debug("Queuing jobs for source '%s' using pipeline '%s'." % | 217 # items and run jobs on those. |
222 (source.name, pp.PIPELINE_NAME)) | 218 pool.userdata.cur_pass = 0 |
223 jobs = [BakeJob(source.name, item.spec, item.metadata) | 219 next_pass_jobs = {} |
224 for item in source.getAllContents()] | 220 pool.userdata.next_pass_jobs = next_pass_jobs |
221 for ppinfo in pplist: | |
222 src = ppinfo.source | |
223 pp = ppinfo.pipeline | |
224 | |
225 logger.debug( | |
226 "Queuing jobs for source '%s' using pipeline '%s' (pass 0)." % | |
227 (src.name, pp.PIPELINE_NAME)) | |
228 | |
229 next_pass_jobs[src.name] = [] | |
230 jobs = pp.createJobs() | |
225 pool.queueJobs(jobs) | 231 pool.queueJobs(jobs) |
226 pool.wait() | 232 pool.wait() |
227 | 233 |
228 def _deleteStaleOutputs(self, pool, srclist): | 234 # Now let's see if any job created a follow-up job. Let's keep |
229 for source, pp, ppctx in srclist: | 235 # processing those jobs as long as they create new ones. |
230 ppctx.record_history.build() | 236 pool.userdata.cur_pass = 1 |
231 | 237 while True: |
232 to_delete = pp.getDeletions(ppctx) | 238 had_any_job = False |
233 if to_delete is not None: | 239 |
234 for path, reason in to_delete: | 240 # Make a copy of out next pass jobs and reset the list, so |
235 logger.debug("Removing '%s': %s" % (path, reason)) | 241 # the first jobs to be processed don't mess it up as we're |
236 ppctx.current_record.deleted_out_paths.append(path) | 242 # still iterating on it. |
237 try: | 243 next_pass_jobs = pool.userdata.next_pass_jobs |
238 os.remove(path) | 244 pool.userdata.next_pass_jobs = {} |
239 except FileNotFoundError: | 245 |
240 pass | 246 for sn, jobs in next_pass_jobs.items(): |
241 logger.info('[delete] %s' % path) | 247 if jobs: |
242 | 248 logger.debug( |
243 def _collapseRecords(self, srclist): | 249 "Queuing jobs for source '%s' (pass %d)." % |
244 for source, pp, ppctx in srclist: | 250 (sn, pool.userdata.cur_pass)) |
245 pp.collapseRecords(ppctx) | 251 pool.userdata.next_pass_jobs[sn] = [] |
252 pool.queueJobs(jobs) | |
253 had_any_job = True | |
254 | |
255 if not had_any_job: | |
256 break | |
257 | |
258 pool.wait() | |
259 pool.userdata.cur_pass += 1 | |
246 | 260 |
247 def _logErrors(self, item_spec, errors): | 261 def _logErrors(self, item_spec, errors): |
248 logger.error("Errors found in %s:" % item_spec) | 262 logger.error("Errors found in %s:" % item_spec) |
249 for e in errors: | 263 for e in errors: |
250 logger.error(" " + e) | 264 logger.error(" " + e) |
251 | 265 |
252 def _createWorkerPool(self, previous_records_path): | 266 def _createWorkerPool(self, previous_records_path, pool_userdata): |
253 from piecrust.workerpool import WorkerPool | 267 from piecrust.workerpool import WorkerPool |
254 from piecrust.baking.worker import BakeWorkerContext, BakeWorker | 268 from piecrust.baking.worker import BakeWorkerContext, BakeWorker |
255 | 269 |
256 worker_count = self.app.config.get('baker/workers') | 270 worker_count = self.app.config.get('baker/workers') |
257 batch_size = self.app.config.get('baker/batch_size') | 271 batch_size = self.app.config.get('baker/batch_size') |
266 worker_count=worker_count, | 280 worker_count=worker_count, |
267 batch_size=batch_size, | 281 batch_size=batch_size, |
268 worker_class=BakeWorker, | 282 worker_class=BakeWorker, |
269 initargs=(ctx,), | 283 initargs=(ctx,), |
270 callback=self._handleWorkerResult, | 284 callback=self._handleWorkerResult, |
271 error_callback=self._handleWorkerError) | 285 error_callback=self._handleWorkerError, |
286 userdata=pool_userdata) | |
272 return pool | 287 return pool |
273 | 288 |
274 def _handleWorkerResult(self, job, res): | 289 def _handleWorkerResult(self, job, res, userdata): |
275 record_name = _get_record_name(job.source_name, res.pipeline_name) | 290 cur_pass = userdata.cur_pass |
276 record = self._records.getRecord(record_name) | 291 record = userdata.records.getRecord(job.record_name) |
277 record.entries.append(res.record_entry) | 292 |
293 if cur_pass == 0: | |
294 record.addEntry(res.record_entry) | |
295 else: | |
296 ppinfo = userdata.ppmngr.getPipeline(job.source_name) | |
297 ppmrctx = PipelineMergeRecordContext( | |
298 record, job, cur_pass) | |
299 ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) | |
300 | |
301 npj = res.next_pass_job | |
302 if npj is not None: | |
303 npj.data['pass'] = cur_pass + 1 | |
304 userdata.next_pass_jobs[job.source_name].append(npj) | |
305 | |
278 if not res.record_entry.success: | 306 if not res.record_entry.success: |
279 record.success = False | 307 record.success = False |
280 self._records.success = False | 308 userdata.records.success = False |
281 self._logErrors(job.item_spec, res.record_entry.errors) | 309 self._logErrors(job.content_item.spec, res.record_entry.errors) |
282 | 310 |
283 def _handleWorkerError(self, job, exc_data): | 311 def _handleWorkerError(self, job, exc_data, userdata): |
284 e = RecordEntry() | 312 cur_pass = userdata.cur_pass |
285 e.item_spec = job.item_spec | 313 record = userdata.records.getRecord(job.record_name) |
286 e.errors.append(str(exc_data)) | 314 |
287 | 315 if cur_pass == 0: |
288 ppname = self.app.getSource(job.source_name).config['pipeline'] | 316 ppinfo = userdata.ppmngr.getPipeline(job.source_name) |
289 record_name = _get_record_name(job.source_name, ppname) | 317 entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry |
290 record_name = self._getRecordName(job) | 318 e = entry_class() |
291 record = self._records.getRecord(record_name) | 319 e.item_spec = job.content_item.spec |
292 record.entries.append(e) | 320 e.errors.append(str(exc_data)) |
321 record.addEntry(e) | |
322 else: | |
323 e = record.getEntry(job.content_item.spec) | |
324 e.errors.append(str(exc_data)) | |
293 | 325 |
294 record.success = False | 326 record.success = False |
295 self._records.success = False | 327 userdata.records.success = False |
296 | 328 |
297 self._logErrors(job.item_spec, e.errors) | 329 self._logErrors(job.content_item.spec, e.errors) |
298 if self.app.debug: | 330 if self.app.debug: |
299 logger.error(exc_data.traceback) | 331 logger.error(exc_data.traceback) |
300 | 332 |
301 | 333 |
302 def _get_record_name(source_name, pipeline_name): | 334 class _PoolUserData: |
303 return '%s@%s' % (source_name, pipeline_name) | 335 def __init__(self, baker, ppmngr, current_records): |
336 self.baker = baker | |
337 self.ppmngr = ppmngr | |
338 self.records = current_records | |
339 self.cur_pass = 0 | |
340 self.next_pass_jobs = {} |