Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 411:e7b865f8f335
bake: Enable multiprocess baking.
Baking is now done by running a worker per CPU, and sending jobs to them.
This changes several things across the codebase:
* Ability to not cache things related to pages other than the 'main' page
(i.e. the page at the bottom of the execution stack).
* Decouple the baking process from the bake records, so only the main process
keeps track (and modifies) the bake record.
* Remove the need for 'batch page getters' and loading a page directly from
the page factories.
There are various smaller changes too included here, including support for
scope performance timers that are saved with the bake record and can be
printed out to the console. Yes I got carried away.
For testing, the in-memory 'mock' file-system doesn't work anymore, since
we're spawning processes, so this is replaced by a 'tmpfs' file-system which
is saved in temporary files on disk and deleted after tests have run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 12 Jun 2015 17:09:19 -0700 |
parents | c2ca72fb7f0b |
children | 0e9a94b7fdfa |
comparison
equal
deleted
inserted
replaced
410:d1a472464e57 | 411:e7b865f8f335 |
---|---|
1 import copy | |
1 import time | 2 import time |
2 import os.path | 3 import os.path |
4 import queue | |
3 import hashlib | 5 import hashlib |
4 import logging | 6 import logging |
5 import threading | 7 import multiprocessing |
6 from piecrust.baking.records import ( | 8 from piecrust.baking.records import ( |
7 TransitionalBakeRecord, BakeRecordPageEntry) | 9 BakeRecordEntry, TransitionalBakeRecord, TaxonomyInfo, FirstRenderInfo) |
8 from piecrust.baking.scheduler import BakeScheduler | 10 from piecrust.baking.worker import ( |
9 from piecrust.baking.single import (BakingError, PageBaker) | 11 BakeWorkerJob, LoadJobPayload, RenderFirstSubJobPayload, |
10 from piecrust.chefutil import format_timed, log_friendly_exception | 12 BakeJobPayload, |
13 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE) | |
14 from piecrust.chefutil import ( | |
15 format_timed_scope, format_timed) | |
11 from piecrust.sources.base import ( | 16 from piecrust.sources.base import ( |
12 REALM_NAMES, REALM_USER, REALM_THEME) | 17 REALM_NAMES, REALM_USER, REALM_THEME) |
13 | 18 |
14 | 19 |
15 logger = logging.getLogger(__name__) | 20 logger = logging.getLogger(__name__) |
19 def __init__(self, app, out_dir, force=False): | 24 def __init__(self, app, out_dir, force=False): |
20 assert app and out_dir | 25 assert app and out_dir |
21 self.app = app | 26 self.app = app |
22 self.out_dir = out_dir | 27 self.out_dir = out_dir |
23 self.force = force | 28 self.force = force |
24 self.num_workers = app.config.get('baker/workers', 4) | 29 self.num_workers = app.config.get('baker/workers', |
30 multiprocessing.cpu_count()) | |
25 | 31 |
26 # Remember what taxonomy pages we should skip | 32 # Remember what taxonomy pages we should skip |
27 # (we'll bake them repeatedly later with each taxonomy term) | 33 # (we'll bake them repeatedly later with each taxonomy term) |
28 self.taxonomy_pages = [] | 34 self.taxonomy_pages = [] |
29 logger.debug("Gathering taxonomy page paths:") | 35 logger.debug("Gathering taxonomy page paths:") |
30 for tax in self.app.taxonomies: | 36 for tax in self.app.taxonomies: |
31 for src in self.app.sources: | 37 for src in self.app.sources: |
32 path = tax.resolvePagePath(src.name) | 38 tax_page_ref = tax.getPageRef(src) |
33 if path is not None: | 39 for path in tax_page_ref.possible_paths: |
34 self.taxonomy_pages.append(path) | 40 self.taxonomy_pages.append(path) |
35 logger.debug(" - %s" % path) | 41 logger.debug(" - %s" % path) |
36 | 42 |
37 def bake(self): | 43 def bake(self): |
38 logger.debug(" Bake Output: %s" % self.out_dir) | 44 logger.debug(" Bake Output: %s" % self.out_dir) |
39 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) | 45 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) |
40 | 46 |
41 # Get into bake mode. | 47 # Get into bake mode. |
42 start_time = time.clock() | 48 start_time = time.perf_counter() |
43 self.app.config.set('baker/is_baking', True) | 49 self.app.config.set('baker/is_baking', True) |
44 self.app.env.base_asset_url_format = '%uri%' | 50 self.app.env.base_asset_url_format = '%uri%' |
45 | 51 |
46 # Make sure the output directory exists. | 52 # Make sure the output directory exists. |
47 if not os.path.isdir(self.out_dir): | 53 if not os.path.isdir(self.out_dir): |
50 # Load/create the bake record. | 56 # Load/create the bake record. |
51 record = TransitionalBakeRecord() | 57 record = TransitionalBakeRecord() |
52 record_cache = self.app.cache.getCache('baker') | 58 record_cache = self.app.cache.getCache('baker') |
53 record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest() | 59 record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest() |
54 record_name = record_id + '.record' | 60 record_name = record_id + '.record' |
61 previous_record_path = None | |
55 if not self.force and record_cache.has(record_name): | 62 if not self.force and record_cache.has(record_name): |
56 t = time.clock() | 63 with format_timed_scope(logger, "loaded previous bake record", |
57 record.loadPrevious(record_cache.getCachePath(record_name)) | 64 level=logging.DEBUG, colored=False): |
58 logger.debug(format_timed( | 65 previous_record_path = record_cache.getCachePath(record_name) |
59 t, 'loaded previous bake record', | 66 record.loadPrevious(previous_record_path) |
60 colored=False)) | |
61 record.current.success = True | 67 record.current.success = True |
62 | 68 |
63 # Figure out if we need to clean the cache because important things | 69 # Figure out if we need to clean the cache because important things |
64 # have changed. | 70 # have changed. |
65 self._handleCacheValidity(record) | 71 self._handleCacheValidity(record) |
66 | 72 |
73 # Pre-create all caches. | |
74 for cache_name in ['app', 'baker', 'pages', 'renders']: | |
75 self.app.cache.getCache(cache_name) | |
76 | |
67 # Gather all sources by realm -- we're going to bake each realm | 77 # Gather all sources by realm -- we're going to bake each realm |
68 # separately so we can handle "overlaying" (i.e. one realm overrides | 78 # separately so we can handle "overriding" (i.e. one realm overrides |
69 # another realm's pages). | 79 # another realm's pages, like the user realm overriding the theme |
80 # realm). | |
70 sources_by_realm = {} | 81 sources_by_realm = {} |
71 for source in self.app.sources: | 82 for source in self.app.sources: |
72 srclist = sources_by_realm.setdefault(source.realm, []) | 83 srclist = sources_by_realm.setdefault(source.realm, []) |
73 srclist.append(source) | 84 srclist.append(source) |
85 | |
86 # Create the worker processes. | |
87 pool = self._createWorkerPool() | |
74 | 88 |
75 # Bake the realms. | 89 # Bake the realms. |
76 realm_list = [REALM_USER, REALM_THEME] | 90 realm_list = [REALM_USER, REALM_THEME] |
77 for realm in realm_list: | 91 for realm in realm_list: |
78 srclist = sources_by_realm.get(realm) | 92 srclist = sources_by_realm.get(realm) |
79 if srclist is not None: | 93 if srclist is not None: |
80 self._bakeRealm(record, realm, srclist) | 94 self._bakeRealm(record, pool, realm, srclist) |
81 | 95 |
82 # Bake taxonomies. | 96 # Bake taxonomies. |
83 self._bakeTaxonomies(record) | 97 self._bakeTaxonomies(record, pool) |
98 | |
99 # All done with the workers. | |
100 self._terminateWorkerPool(pool) | |
101 | |
102 # Get the timing information from the workers. | |
103 record.current.timers = {} | |
104 for _ in range(len(pool.workers)): | |
105 try: | |
106 timers = pool.results.get(True, 0.1) | |
107 except queue.Empty: | |
108 logger.error("Didn't get timing information from all workers.") | |
109 break | |
110 | |
111 for name, val in timers.items(): | |
112 main_val = record.current.timers.setdefault(name, 0) | |
113 record.current.timers[name] = main_val + val | |
84 | 114 |
85 # Delete files from the output. | 115 # Delete files from the output. |
86 self._handleDeletetions(record) | 116 self._handleDeletetions(record) |
87 | 117 |
88 # Backup previous records. | 118 # Backup previous records. |
96 if os.path.exists(record_path_next): | 126 if os.path.exists(record_path_next): |
97 os.remove(record_path_next) | 127 os.remove(record_path_next) |
98 os.rename(record_path, record_path_next) | 128 os.rename(record_path, record_path_next) |
99 | 129 |
100 # Save the bake record. | 130 # Save the bake record. |
101 t = time.clock() | 131 with format_timed_scope(logger, "saved bake record.", |
102 record.current.bake_time = time.time() | 132 level=logging.DEBUG, colored=False): |
103 record.current.out_dir = self.out_dir | 133 record.current.bake_time = time.time() |
104 record.saveCurrent(record_cache.getCachePath(record_name)) | 134 record.current.out_dir = self.out_dir |
105 logger.debug(format_timed(t, 'saved bake record', colored=False)) | 135 record.saveCurrent(record_cache.getCachePath(record_name)) |
106 | 136 |
107 # All done. | 137 # All done. |
108 self.app.config.set('baker/is_baking', False) | 138 self.app.config.set('baker/is_baking', False) |
109 logger.debug(format_timed(start_time, 'done baking')) | 139 logger.debug(format_timed(start_time, 'done baking')) |
110 | 140 |
111 return record.detach() | 141 return record.detach() |
112 | 142 |
113 def _handleCacheValidity(self, record): | 143 def _handleCacheValidity(self, record): |
114 start_time = time.clock() | 144 start_time = time.perf_counter() |
115 | 145 |
116 reason = None | 146 reason = None |
117 if self.force: | 147 if self.force: |
118 reason = "ordered to" | 148 reason = "ordered to" |
119 elif not self.app.config.get('__cache_valid'): | 149 elif not self.app.config.get('__cache_valid'): |
150 record.incremental_count += 1 | 180 record.incremental_count += 1 |
151 logger.debug(format_timed( | 181 logger.debug(format_timed( |
152 start_time, "cache is assumed valid", | 182 start_time, "cache is assumed valid", |
153 colored=False)) | 183 colored=False)) |
154 | 184 |
155 def _bakeRealm(self, record, realm, srclist): | 185 def _bakeRealm(self, record, pool, realm, srclist): |
156 # Gather all page factories from the sources and queue them | 186 start_time = time.perf_counter() |
157 # for the workers to pick up. Just skip taxonomy pages for now. | 187 try: |
158 logger.debug("Baking realm %s" % REALM_NAMES[realm]) | 188 all_factories = [] |
159 pool, queue, abort = self._createWorkerPool(record, self.num_workers) | 189 for source in srclist: |
160 | 190 factories = source.getPageFactories() |
161 for source in srclist: | 191 all_factories += [f for f in factories |
162 factories = source.getPageFactories() | 192 if f.path not in self.taxonomy_pages] |
193 | |
194 self._loadRealmPages(record, pool, all_factories) | |
195 self._renderRealmPages(record, pool, all_factories) | |
196 self._bakeRealmPages(record, pool, all_factories) | |
197 finally: | |
198 page_count = len(all_factories) | |
199 logger.info(format_timed( | |
200 start_time, | |
201 "baked %d %s pages" % | |
202 (page_count, REALM_NAMES[realm].lower()))) | |
203 | |
204 def _loadRealmPages(self, record, pool, factories): | |
205 with format_timed_scope(logger, | |
206 "loaded %d pages" % len(factories), | |
207 level=logging.DEBUG, colored=False): | |
163 for fac in factories: | 208 for fac in factories: |
164 if fac.path in self.taxonomy_pages: | 209 job = BakeWorkerJob( |
165 logger.debug( | 210 JOB_LOAD, |
166 "Skipping taxonomy page: %s:%s" % | 211 LoadJobPayload(fac)) |
167 (source.name, fac.ref_spec)) | 212 pool.queue.put_nowait(job) |
213 | |
214 def _handler(res): | |
215 # Create the record entry for this page. | |
216 record_entry = BakeRecordEntry(res.source_name, res.path) | |
217 record_entry.config = res.config | |
218 if res.errors: | |
219 record_entry.errors += res.errors | |
220 record.current.success = False | |
221 record.addEntry(record_entry) | |
222 | |
223 self._waitOnWorkerPool( | |
224 pool, | |
225 expected_result_count=len(factories), | |
226 result_handler=_handler) | |
227 | |
228 def _renderRealmPages(self, record, pool, factories): | |
229 with format_timed_scope(logger, | |
230 "prepared %d pages" % len(factories), | |
231 level=logging.DEBUG, colored=False): | |
232 expected_result_count = 0 | |
233 for fac in factories: | |
234 record_entry = record.getCurrentEntry(fac.path) | |
235 if record_entry.errors: | |
236 logger.debug("Ignoring %s because it had previous " | |
237 "errors." % fac.ref_spec) | |
168 continue | 238 continue |
169 | 239 |
170 entry = BakeRecordPageEntry(fac.source.name, fac.rel_path, | 240 # Make sure the source and the route exist for this page, |
171 fac.path) | 241 # otherwise we add errors to the record entry and we'll skip |
172 record.addEntry(entry) | 242 # this page for the rest of the bake. |
173 | 243 source = self.app.getSource(fac.source.name) |
174 route = self.app.getRoute(source.name, fac.metadata, | 244 if source is None: |
245 record_entry.errors.append( | |
246 "Can't get source for page: %s" % fac.ref_spec) | |
247 logger.error(record_entry.errors[-1]) | |
248 continue | |
249 | |
250 route = self.app.getRoute(fac.source.name, fac.metadata, | |
175 skip_taxonomies=True) | 251 skip_taxonomies=True) |
176 if route is None: | 252 if route is None: |
177 entry.errors.append( | 253 record_entry.errors.append( |
178 "Can't get route for page: %s" % fac.ref_spec) | 254 "Can't get route for page: %s" % fac.ref_spec) |
179 logger.error(entry.errors[-1]) | 255 logger.error(record_entry.errors[-1]) |
180 continue | 256 continue |
181 | 257 |
182 queue.addJob(BakeWorkerJob(fac, route, entry)) | 258 # All good, queue the job. |
183 | 259 job = BakeWorkerJob( |
184 success = self._waitOnWorkerPool(pool, abort) | 260 JOB_RENDER_FIRST, |
185 record.current.success &= success | 261 RenderFirstSubJobPayload(fac)) |
186 | 262 pool.queue.put_nowait(job) |
187 def _bakeTaxonomies(self, record): | 263 expected_result_count += 1 |
188 logger.debug("Baking taxonomies") | 264 |
189 | 265 def _handler(res): |
266 entry = record.getCurrentEntry(res.path) | |
267 | |
268 entry.first_render_info = FirstRenderInfo() | |
269 entry.first_render_info.used_assets = res.used_assets | |
270 entry.first_render_info.used_pagination = \ | |
271 res.used_pagination | |
272 entry.first_render_info.pagination_has_more = \ | |
273 res.pagination_has_more | |
274 | |
275 if res.errors: | |
276 entry.errors += res.errors | |
277 record.current.success = False | |
278 | |
279 self._waitOnWorkerPool( | |
280 pool, | |
281 expected_result_count=expected_result_count, | |
282 result_handler=_handler) | |
283 | |
284 def _bakeRealmPages(self, record, pool, factories): | |
285 with format_timed_scope(logger, | |
286 "baked %d pages" % len(factories), | |
287 level=logging.DEBUG, colored=False): | |
288 expected_result_count = 0 | |
289 for fac in factories: | |
290 if self._queueBakeJob(record, pool, fac): | |
291 expected_result_count += 1 | |
292 | |
293 def _handler(res): | |
294 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | |
295 entry.bake_info = res.bake_info | |
296 if res.errors: | |
297 entry.errors += res.errors | |
298 if entry.has_any_error: | |
299 record.current.success = False | |
300 | |
301 self._waitOnWorkerPool( | |
302 pool, | |
303 expected_result_count=expected_result_count, | |
304 result_handler=_handler) | |
305 | |
306 def _bakeTaxonomies(self, record, pool): | |
307 with format_timed_scope(logger, 'built taxonomy buckets', | |
308 level=logging.DEBUG, colored=False): | |
309 buckets = self._buildTaxonomyBuckets(record) | |
310 | |
311 start_time = time.perf_counter() | |
312 page_count = self._bakeTaxonomyBuckets(record, pool, buckets) | |
313 logger.info(format_timed(start_time, | |
314 "baked %d taxonomy pages." % page_count)) | |
315 | |
316 def _buildTaxonomyBuckets(self, record): | |
190 # Let's see all the taxonomy terms for which we must bake a | 317 # Let's see all the taxonomy terms for which we must bake a |
191 # listing page... first, pre-populate our big map of used terms. | 318 # listing page... first, pre-populate our big map of used terms. |
192 # For each source name, we have a list of taxonomies, and for each | 319 # For each source name, we have a list of taxonomies, and for each |
193 # taxonomies, a list of terms, some being 'dirty', some used last | 320 # taxonomies, a list of terms, some being 'dirty', some used last |
194 # time, etc. | 321 # time, etc. |
248 tt_info = buckets[sn][tn] | 375 tt_info = buckets[sn][tn] |
249 tt_info.all_terms.add(terms) | 376 tt_info.all_terms.add(terms) |
250 if not tt_info.dirty_terms.isdisjoint(set(terms)): | 377 if not tt_info.dirty_terms.isdisjoint(set(terms)): |
251 tt_info.dirty_terms.add(terms) | 378 tt_info.dirty_terms.add(terms) |
252 | 379 |
380 return buckets | |
381 | |
382 def _bakeTaxonomyBuckets(self, record, pool, buckets): | |
253 # Start baking those terms. | 383 # Start baking those terms. |
254 pool, queue, abort = self._createWorkerPool(record, self.num_workers) | 384 expected_result_count = 0 |
255 for source_name, source_taxonomies in buckets.items(): | 385 for source_name, source_taxonomies in buckets.items(): |
256 for tax_name, tt_info in source_taxonomies.items(): | 386 for tax_name, tt_info in source_taxonomies.items(): |
257 terms = tt_info.dirty_terms | 387 terms = tt_info.dirty_terms |
258 if len(terms) == 0: | 388 if len(terms) == 0: |
259 continue | 389 continue |
260 | 390 |
261 logger.debug( | 391 logger.debug( |
262 "Baking '%s' for source '%s': %s" % | 392 "Baking '%s' for source '%s': %s" % |
263 (tax_name, source_name, terms)) | 393 (tax_name, source_name, terms)) |
264 tax = self.app.getTaxonomy(tax_name) | 394 tax = self.app.getTaxonomy(tax_name) |
265 route = self.app.getTaxonomyRoute(tax_name, source_name) | 395 source = self.app.getSource(source_name) |
266 tax_page_ref = tax.getPageRef(source_name) | 396 tax_page_ref = tax.getPageRef(source) |
267 if not tax_page_ref.exists: | 397 if not tax_page_ref.exists: |
268 logger.debug( | 398 logger.debug( |
269 "No taxonomy page found at '%s', skipping." % | 399 "No taxonomy page found at '%s', skipping." % |
270 tax.page_ref) | 400 tax.page_ref) |
271 continue | 401 continue |
272 | 402 |
273 logger.debug( | 403 logger.debug( |
274 "Using taxonomy page: %s:%s" % | 404 "Using taxonomy page: %s:%s" % |
275 (tax_page_ref.source_name, tax_page_ref.rel_path)) | 405 (tax_page_ref.source_name, tax_page_ref.rel_path)) |
406 fac = tax_page_ref.getFactory() | |
407 | |
276 for term in terms: | 408 for term in terms: |
277 fac = tax_page_ref.getFactory() | |
278 logger.debug( | 409 logger.debug( |
279 "Queuing: %s [%s=%s]" % | 410 "Queuing: %s [%s=%s]" % |
280 (fac.ref_spec, tax_name, term)) | 411 (fac.ref_spec, tax_name, term)) |
281 entry = BakeRecordPageEntry( | 412 tax_info = TaxonomyInfo(tax_name, source_name, term) |
282 fac.source.name, fac.rel_path, fac.path, | 413 |
283 (tax_name, term, source_name)) | 414 cur_entry = BakeRecordEntry( |
284 record.addEntry(entry) | 415 fac.source.name, fac.path, tax_info) |
285 queue.addJob(BakeWorkerJob(fac, route, entry)) | 416 record.addEntry(cur_entry) |
286 | 417 |
287 success = self._waitOnWorkerPool(pool, abort) | 418 if self._queueBakeJob(record, pool, fac, tax_info): |
288 record.current.success &= success | 419 expected_result_count += 1 |
420 | |
421 def _handler(res): | |
422 entry = record.getCurrentEntry(res.path, res.taxonomy_info) | |
423 entry.bake_info = res.bake_info | |
424 if res.errors: | |
425 entry.errors += res.errors | |
426 if entry.has_any_error: | |
427 record.current.success = False | |
428 | |
429 self._waitOnWorkerPool( | |
430 pool, | |
431 expected_result_count=expected_result_count, | |
432 result_handler=_handler) | |
289 | 433 |
290 # Now we create bake entries for all the terms that were *not* dirty. | 434 # Now we create bake entries for all the terms that were *not* dirty. |
291 # This is because otherwise, on the next incremental bake, we wouldn't | 435 # This is because otherwise, on the next incremental bake, we wouldn't |
292 # find any entry for those things, and figure that we need to delete | 436 # find any entry for those things, and figure that we need to delete |
293 # their outputs. | 437 # their outputs. |
294 for prev_entry, cur_entry in record.transitions.values(): | 438 for prev_entry, cur_entry in record.transitions.values(): |
295 # Only consider taxonomy-related entries that don't have any | 439 # Only consider taxonomy-related entries that don't have any |
296 # current version. | 440 # current version. |
297 if (prev_entry and prev_entry.taxonomy_info and | 441 if (prev_entry and prev_entry.taxonomy_info and |
298 not cur_entry): | 442 not cur_entry): |
299 sn = prev_entry.source_name | 443 ti = prev_entry.taxonomy_info |
300 tn, tt, tsn = prev_entry.taxonomy_info | 444 tt_info = buckets[ti.source_name][ti.taxonomy_name] |
301 tt_info = buckets[tsn][tn] | 445 if ti.term in tt_info.all_terms: |
302 if tt in tt_info.all_terms: | |
303 logger.debug("Creating unbaked entry for taxonomy " | 446 logger.debug("Creating unbaked entry for taxonomy " |
304 "term '%s:%s'." % (tn, tt)) | 447 "term '%s:%s'." % (ti.taxonomy_name, ti.term)) |
305 record.collapseEntry(prev_entry) | 448 record.collapseEntry(prev_entry) |
306 else: | 449 else: |
307 logger.debug("Taxonomy term '%s:%s' isn't used anymore." % | 450 logger.debug("Taxonomy term '%s:%s' isn't used anymore." % |
308 (tn, tt)) | 451 (ti.taxonomy_name, ti.term)) |
452 | |
453 return expected_result_count | |
454 | |
455 def _queueBakeJob(self, record, pool, fac, tax_info=None): | |
456 # Get the previous (if any) and current entry for this page. | |
457 pair = record.getPreviousAndCurrentEntries(fac.path, tax_info) | |
458 assert pair is not None | |
459 prev_entry, cur_entry = pair | |
460 assert cur_entry is not None | |
461 | |
462 # Ignore if there were errors in the previous passes. | |
463 if cur_entry.errors: | |
464 logger.debug("Ignoring %s because it had previous " | |
465 "errors." % fac.ref_spec) | |
466 return False | |
467 | |
468 # Build the route metadata and find the appropriate route. | |
469 route_metadata = copy.deepcopy(fac.metadata) | |
470 if tax_info is not None: | |
471 tax = self.app.getTaxonomy(tax_info.taxonomy_name) | |
472 route = self.app.getTaxonomyRoute(tax_info.taxonomy_name, | |
473 tax_info.source_name) | |
474 | |
475 slugified_term = route.slugifyTaxonomyTerm(tax_info.term) | |
476 route_metadata[tax.term_name] = slugified_term | |
477 else: | |
478 route = self.app.getRoute(fac.source.name, route_metadata, | |
479 skip_taxonomies=True) | |
480 assert route is not None | |
481 | |
482 # Figure out if this page is overriden by another previously | |
483 # baked page. This happens for example when the user has | |
484 # made a page that has the same page/URL as a theme page. | |
485 page = fac.buildPage() | |
486 uri = route.getUri(route_metadata, provider=page) | |
487 override_entry = record.getOverrideEntry(page.path, uri) | |
488 if override_entry is not None: | |
489 override_source = self.app.getSource( | |
490 override_entry.source_name) | |
491 if override_source.realm == fac.source.realm: | |
492 cur_entry.errors.append( | |
493 "Page '%s' maps to URL '%s' but is overriden " | |
494 "by page '%s'." % | |
495 (fac.ref_spec, uri, override_entry.path)) | |
496 logger.error(cur_entry.errors[-1]) | |
497 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN | |
498 return False | |
499 | |
500 job = BakeWorkerJob( | |
501 JOB_BAKE, | |
502 BakeJobPayload(fac, route_metadata, prev_entry, | |
503 cur_entry.first_render_info, | |
504 record.dirty_source_names, | |
505 tax_info)) | |
506 pool.queue.put_nowait(job) | |
507 return True | |
309 | 508 |
310 def _handleDeletetions(self, record): | 509 def _handleDeletetions(self, record): |
311 for path, reason in record.getDeletions(): | 510 for path, reason in record.getDeletions(): |
312 logger.debug("Removing '%s': %s" % (path, reason)) | 511 logger.debug("Removing '%s': %s" % (path, reason)) |
313 try: | 512 try: |
316 except OSError: | 515 except OSError: |
317 # Not a big deal if that file had already been removed | 516 # Not a big deal if that file had already been removed |
318 # by the user. | 517 # by the user. |
319 pass | 518 pass |
320 | 519 |
321 def _createWorkerPool(self, record, pool_size=4): | 520 def _createWorkerPool(self): |
322 pool = [] | 521 from piecrust.baking.worker import BakeWorkerContext, worker_func |
323 queue = BakeScheduler(record) | 522 |
324 abort = threading.Event() | 523 pool = _WorkerPool() |
325 for i in range(pool_size): | 524 for i in range(self.num_workers): |
326 ctx = BakeWorkerContext( | 525 ctx = BakeWorkerContext( |
327 self.app, self.out_dir, self.force, | 526 self.app.root_dir, self.out_dir, |
328 record, queue, abort) | 527 pool.queue, pool.results, pool.abort_event, |
329 worker = BakeWorker(i, ctx) | 528 force=self.force, debug=self.app.debug) |
330 pool.append(worker) | 529 w = multiprocessing.Process( |
331 return pool, queue, abort | 530 target=worker_func, args=(i, ctx)) |
332 | |
333 def _waitOnWorkerPool(self, pool, abort): | |
334 for w in pool: | |
335 w.start() | 531 w.start() |
336 | 532 pool.workers.append(w) |
337 success = True | 533 return pool |
534 | |
535 def _terminateWorkerPool(self, pool): | |
536 pool.abort_event.set() | |
537 for w in pool.workers: | |
538 w.join() | |
539 | |
540 def _waitOnWorkerPool(self, pool, | |
541 expected_result_count=-1, result_handler=None): | |
542 assert result_handler is None or expected_result_count >= 0 | |
543 abort_with_exception = None | |
338 try: | 544 try: |
339 for w in pool: | 545 if result_handler is None: |
340 w.join() | 546 pool.queue.join() |
341 success &= w.success | 547 else: |
342 except KeyboardInterrupt: | 548 got_count = 0 |
549 while got_count < expected_result_count: | |
550 try: | |
551 res = pool.results.get(True, 10) | |
552 except queue.Empty: | |
553 logger.error( | |
554 "Got %d results, expected %d, and timed-out " | |
555 "for 10 seconds. A worker might be stuck?" % | |
556 (got_count, expected_result_count)) | |
557 abort_with_exception = Exception("Worker time-out.") | |
558 break | |
559 | |
560 got_count += 1 | |
561 result_handler(res) | |
562 except KeyboardInterrupt as kiex: | |
343 logger.warning("Bake aborted by user... " | 563 logger.warning("Bake aborted by user... " |
344 "waiting for workers to stop.") | 564 "waiting for workers to stop.") |
345 abort.set() | 565 abort_with_exception = kiex |
346 for w in pool: | 566 |
347 w.join() | 567 if abort_with_exception: |
348 raise | 568 pool.abort_event.set() |
349 | 569 for w in pool.workers: |
350 if abort.is_set(): | 570 w.join(2) |
351 excs = [w.abort_exception for w in pool | 571 raise abort_with_exception |
352 if w.abort_exception is not None] | 572 |
353 logger.error("Baking was aborted due to %s error(s):" % len(excs)) | 573 |
354 if self.app.debug: | 574 class _WorkerPool(object): |
355 for e in excs: | 575 def __init__(self): |
356 logger.exception(e) | 576 self.queue = multiprocessing.JoinableQueue() |
357 else: | 577 self.results = multiprocessing.Queue() |
358 for e in excs: | 578 self.abort_event = multiprocessing.Event() |
359 log_friendly_exception(logger, e) | 579 self.workers = [] |
360 raise BakingError("Baking was aborted due to errors.") | |
361 | |
362 return success | |
363 | |
364 | |
365 class BakeWorkerContext(object): | |
366 def __init__(self, app, out_dir, force, record, work_queue, | |
367 abort_event): | |
368 self.app = app | |
369 self.out_dir = out_dir | |
370 self.force = force | |
371 self.record = record | |
372 self.work_queue = work_queue | |
373 self.abort_event = abort_event | |
374 | |
375 | |
376 class BakeWorkerJob(object): | |
377 def __init__(self, factory, route, record_entry): | |
378 self.factory = factory | |
379 self.route = route | |
380 self.record_entry = record_entry | |
381 | |
382 @property | |
383 def source(self): | |
384 return self.factory.source | |
385 | |
386 | |
387 class BakeWorker(threading.Thread): | |
388 def __init__(self, wid, ctx): | |
389 super(BakeWorker, self).__init__(name=('worker%d' % wid)) | |
390 self.wid = wid | |
391 self.ctx = ctx | |
392 self.abort_exception = None | |
393 self.success = True | |
394 self._page_baker = PageBaker( | |
395 ctx.app, ctx.out_dir, ctx.force, | |
396 ctx.record) | |
397 | |
398 def run(self): | |
399 while(not self.ctx.abort_event.is_set()): | |
400 try: | |
401 job = self.ctx.work_queue.getNextJob(wait_timeout=1) | |
402 if job is None: | |
403 logger.debug( | |
404 "[%d] No more work... shutting down." % | |
405 self.wid) | |
406 break | |
407 success = self._unsafeRun(job) | |
408 logger.debug("[%d] Done with page." % self.wid) | |
409 self.ctx.work_queue.onJobFinished(job) | |
410 self.success &= success | |
411 except Exception as ex: | |
412 self.ctx.abort_event.set() | |
413 self.abort_exception = ex | |
414 self.success = False | |
415 logger.debug("[%d] Critical error, aborting." % self.wid) | |
416 if self.ctx.app.debug: | |
417 logger.exception(ex) | |
418 break | |
419 | |
420 def _unsafeRun(self, job): | |
421 start_time = time.clock() | |
422 | |
423 entry = job.record_entry | |
424 try: | |
425 self._page_baker.bake(job.factory, job.route, entry) | |
426 except BakingError as ex: | |
427 logger.debug("Got baking error. Adding it to the record.") | |
428 while ex: | |
429 entry.errors.append(str(ex)) | |
430 ex = ex.__cause__ | |
431 | |
432 has_error = False | |
433 for e in entry.getAllErrors(): | |
434 has_error = True | |
435 logger.error(e) | |
436 if has_error: | |
437 return False | |
438 | |
439 if entry.was_any_sub_baked: | |
440 first_sub = entry.subs[0] | |
441 | |
442 friendly_uri = first_sub.out_uri | |
443 if friendly_uri == '': | |
444 friendly_uri = '[main page]' | |
445 | |
446 friendly_count = '' | |
447 if entry.num_subs > 1: | |
448 friendly_count = ' (%d pages)' % entry.num_subs | |
449 logger.info(format_timed( | |
450 start_time, '[%d] %s%s' % | |
451 (self.wid, friendly_uri, friendly_count))) | |
452 | |
453 return True | |
454 | 580 |
455 | 581 |
456 class _TaxonomyTermsInfo(object): | 582 class _TaxonomyTermsInfo(object): |
457 def __init__(self): | 583 def __init__(self): |
458 self.dirty_terms = set() | 584 self.dirty_terms = set() |
461 def __str__(self): | 587 def __str__(self): |
462 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms) | 588 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms) |
463 | 589 |
464 def __repr__(self): | 590 def __repr__(self): |
465 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms) | 591 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms) |
592 |