comparison piecrust/baking/baker.py @ 411:e7b865f8f335

bake: Enable multiprocess baking. Baking is now done by running a worker per CPU, and sending jobs to them. This changes several things across the codebase: * Ability to not cache things related to pages other than the 'main' page (i.e. the page at the bottom of the execution stack). * Decouple the baking process from the bake records, so only the main process keeps track (and modifies) the bake record. * Remove the need for 'batch page getters' and loading a page directly from the page factories. There are various smaller changes too included here, including support for scope performance timers that are saved with the bake record and can be printed out to the console. Yes I got carried away. For testing, the in-memory 'mock' file-system doesn't work anymore, since we're spawning processes, so this is replaced by a 'tmpfs' file-system which is saved in temporary files on disk and deleted after tests have run.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 12 Jun 2015 17:09:19 -0700
parents c2ca72fb7f0b
children 0e9a94b7fdfa
comparison
equal deleted inserted replaced
410:d1a472464e57 411:e7b865f8f335
1 import copy
1 import time 2 import time
2 import os.path 3 import os.path
4 import queue
3 import hashlib 5 import hashlib
4 import logging 6 import logging
5 import threading 7 import multiprocessing
6 from piecrust.baking.records import ( 8 from piecrust.baking.records import (
7 TransitionalBakeRecord, BakeRecordPageEntry) 9 BakeRecordEntry, TransitionalBakeRecord, TaxonomyInfo, FirstRenderInfo)
8 from piecrust.baking.scheduler import BakeScheduler 10 from piecrust.baking.worker import (
9 from piecrust.baking.single import (BakingError, PageBaker) 11 BakeWorkerJob, LoadJobPayload, RenderFirstSubJobPayload,
10 from piecrust.chefutil import format_timed, log_friendly_exception 12 BakeJobPayload,
13 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE)
14 from piecrust.chefutil import (
15 format_timed_scope, format_timed)
11 from piecrust.sources.base import ( 16 from piecrust.sources.base import (
12 REALM_NAMES, REALM_USER, REALM_THEME) 17 REALM_NAMES, REALM_USER, REALM_THEME)
13 18
14 19
15 logger = logging.getLogger(__name__) 20 logger = logging.getLogger(__name__)
19 def __init__(self, app, out_dir, force=False): 24 def __init__(self, app, out_dir, force=False):
20 assert app and out_dir 25 assert app and out_dir
21 self.app = app 26 self.app = app
22 self.out_dir = out_dir 27 self.out_dir = out_dir
23 self.force = force 28 self.force = force
24 self.num_workers = app.config.get('baker/workers', 4) 29 self.num_workers = app.config.get('baker/workers',
30 multiprocessing.cpu_count())
25 31
26 # Remember what taxonomy pages we should skip 32 # Remember what taxonomy pages we should skip
27 # (we'll bake them repeatedly later with each taxonomy term) 33 # (we'll bake them repeatedly later with each taxonomy term)
28 self.taxonomy_pages = [] 34 self.taxonomy_pages = []
29 logger.debug("Gathering taxonomy page paths:") 35 logger.debug("Gathering taxonomy page paths:")
30 for tax in self.app.taxonomies: 36 for tax in self.app.taxonomies:
31 for src in self.app.sources: 37 for src in self.app.sources:
32 path = tax.resolvePagePath(src.name) 38 tax_page_ref = tax.getPageRef(src)
33 if path is not None: 39 for path in tax_page_ref.possible_paths:
34 self.taxonomy_pages.append(path) 40 self.taxonomy_pages.append(path)
35 logger.debug(" - %s" % path) 41 logger.debug(" - %s" % path)
36 42
37 def bake(self): 43 def bake(self):
38 logger.debug(" Bake Output: %s" % self.out_dir) 44 logger.debug(" Bake Output: %s" % self.out_dir)
39 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) 45 logger.debug(" Root URL: %s" % self.app.config.get('site/root'))
40 46
41 # Get into bake mode. 47 # Get into bake mode.
42 start_time = time.clock() 48 start_time = time.perf_counter()
43 self.app.config.set('baker/is_baking', True) 49 self.app.config.set('baker/is_baking', True)
44 self.app.env.base_asset_url_format = '%uri%' 50 self.app.env.base_asset_url_format = '%uri%'
45 51
46 # Make sure the output directory exists. 52 # Make sure the output directory exists.
47 if not os.path.isdir(self.out_dir): 53 if not os.path.isdir(self.out_dir):
50 # Load/create the bake record. 56 # Load/create the bake record.
51 record = TransitionalBakeRecord() 57 record = TransitionalBakeRecord()
52 record_cache = self.app.cache.getCache('baker') 58 record_cache = self.app.cache.getCache('baker')
53 record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest() 59 record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest()
54 record_name = record_id + '.record' 60 record_name = record_id + '.record'
61 previous_record_path = None
55 if not self.force and record_cache.has(record_name): 62 if not self.force and record_cache.has(record_name):
56 t = time.clock() 63 with format_timed_scope(logger, "loaded previous bake record",
57 record.loadPrevious(record_cache.getCachePath(record_name)) 64 level=logging.DEBUG, colored=False):
58 logger.debug(format_timed( 65 previous_record_path = record_cache.getCachePath(record_name)
59 t, 'loaded previous bake record', 66 record.loadPrevious(previous_record_path)
60 colored=False))
61 record.current.success = True 67 record.current.success = True
62 68
63 # Figure out if we need to clean the cache because important things 69 # Figure out if we need to clean the cache because important things
64 # have changed. 70 # have changed.
65 self._handleCacheValidity(record) 71 self._handleCacheValidity(record)
66 72
73 # Pre-create all caches.
74 for cache_name in ['app', 'baker', 'pages', 'renders']:
75 self.app.cache.getCache(cache_name)
76
67 # Gather all sources by realm -- we're going to bake each realm 77 # Gather all sources by realm -- we're going to bake each realm
68 # separately so we can handle "overlaying" (i.e. one realm overrides 78 # separately so we can handle "overriding" (i.e. one realm overrides
69 # another realm's pages). 79 # another realm's pages, like the user realm overriding the theme
80 # realm).
70 sources_by_realm = {} 81 sources_by_realm = {}
71 for source in self.app.sources: 82 for source in self.app.sources:
72 srclist = sources_by_realm.setdefault(source.realm, []) 83 srclist = sources_by_realm.setdefault(source.realm, [])
73 srclist.append(source) 84 srclist.append(source)
85
86 # Create the worker processes.
87 pool = self._createWorkerPool()
74 88
75 # Bake the realms. 89 # Bake the realms.
76 realm_list = [REALM_USER, REALM_THEME] 90 realm_list = [REALM_USER, REALM_THEME]
77 for realm in realm_list: 91 for realm in realm_list:
78 srclist = sources_by_realm.get(realm) 92 srclist = sources_by_realm.get(realm)
79 if srclist is not None: 93 if srclist is not None:
80 self._bakeRealm(record, realm, srclist) 94 self._bakeRealm(record, pool, realm, srclist)
81 95
82 # Bake taxonomies. 96 # Bake taxonomies.
83 self._bakeTaxonomies(record) 97 self._bakeTaxonomies(record, pool)
98
99 # All done with the workers.
100 self._terminateWorkerPool(pool)
101
102 # Get the timing information from the workers.
103 record.current.timers = {}
104 for _ in range(len(pool.workers)):
105 try:
106 timers = pool.results.get(True, 0.1)
107 except queue.Empty:
108 logger.error("Didn't get timing information from all workers.")
109 break
110
111 for name, val in timers.items():
112 main_val = record.current.timers.setdefault(name, 0)
113 record.current.timers[name] = main_val + val
84 114
85 # Delete files from the output. 115 # Delete files from the output.
86 self._handleDeletetions(record) 116 self._handleDeletetions(record)
87 117
88 # Backup previous records. 118 # Backup previous records.
96 if os.path.exists(record_path_next): 126 if os.path.exists(record_path_next):
97 os.remove(record_path_next) 127 os.remove(record_path_next)
98 os.rename(record_path, record_path_next) 128 os.rename(record_path, record_path_next)
99 129
100 # Save the bake record. 130 # Save the bake record.
101 t = time.clock() 131 with format_timed_scope(logger, "saved bake record.",
102 record.current.bake_time = time.time() 132 level=logging.DEBUG, colored=False):
103 record.current.out_dir = self.out_dir 133 record.current.bake_time = time.time()
104 record.saveCurrent(record_cache.getCachePath(record_name)) 134 record.current.out_dir = self.out_dir
105 logger.debug(format_timed(t, 'saved bake record', colored=False)) 135 record.saveCurrent(record_cache.getCachePath(record_name))
106 136
107 # All done. 137 # All done.
108 self.app.config.set('baker/is_baking', False) 138 self.app.config.set('baker/is_baking', False)
109 logger.debug(format_timed(start_time, 'done baking')) 139 logger.debug(format_timed(start_time, 'done baking'))
110 140
111 return record.detach() 141 return record.detach()
112 142
113 def _handleCacheValidity(self, record): 143 def _handleCacheValidity(self, record):
114 start_time = time.clock() 144 start_time = time.perf_counter()
115 145
116 reason = None 146 reason = None
117 if self.force: 147 if self.force:
118 reason = "ordered to" 148 reason = "ordered to"
119 elif not self.app.config.get('__cache_valid'): 149 elif not self.app.config.get('__cache_valid'):
150 record.incremental_count += 1 180 record.incremental_count += 1
151 logger.debug(format_timed( 181 logger.debug(format_timed(
152 start_time, "cache is assumed valid", 182 start_time, "cache is assumed valid",
153 colored=False)) 183 colored=False))
154 184
155 def _bakeRealm(self, record, realm, srclist): 185 def _bakeRealm(self, record, pool, realm, srclist):
156 # Gather all page factories from the sources and queue them 186 start_time = time.perf_counter()
157 # for the workers to pick up. Just skip taxonomy pages for now. 187 try:
158 logger.debug("Baking realm %s" % REALM_NAMES[realm]) 188 all_factories = []
159 pool, queue, abort = self._createWorkerPool(record, self.num_workers) 189 for source in srclist:
160 190 factories = source.getPageFactories()
161 for source in srclist: 191 all_factories += [f for f in factories
162 factories = source.getPageFactories() 192 if f.path not in self.taxonomy_pages]
193
194 self._loadRealmPages(record, pool, all_factories)
195 self._renderRealmPages(record, pool, all_factories)
196 self._bakeRealmPages(record, pool, all_factories)
197 finally:
198 page_count = len(all_factories)
199 logger.info(format_timed(
200 start_time,
201 "baked %d %s pages" %
202 (page_count, REALM_NAMES[realm].lower())))
203
204 def _loadRealmPages(self, record, pool, factories):
205 with format_timed_scope(logger,
206 "loaded %d pages" % len(factories),
207 level=logging.DEBUG, colored=False):
163 for fac in factories: 208 for fac in factories:
164 if fac.path in self.taxonomy_pages: 209 job = BakeWorkerJob(
165 logger.debug( 210 JOB_LOAD,
166 "Skipping taxonomy page: %s:%s" % 211 LoadJobPayload(fac))
167 (source.name, fac.ref_spec)) 212 pool.queue.put_nowait(job)
213
214 def _handler(res):
215 # Create the record entry for this page.
216 record_entry = BakeRecordEntry(res.source_name, res.path)
217 record_entry.config = res.config
218 if res.errors:
219 record_entry.errors += res.errors
220 record.current.success = False
221 record.addEntry(record_entry)
222
223 self._waitOnWorkerPool(
224 pool,
225 expected_result_count=len(factories),
226 result_handler=_handler)
227
228 def _renderRealmPages(self, record, pool, factories):
229 with format_timed_scope(logger,
230 "prepared %d pages" % len(factories),
231 level=logging.DEBUG, colored=False):
232 expected_result_count = 0
233 for fac in factories:
234 record_entry = record.getCurrentEntry(fac.path)
235 if record_entry.errors:
236 logger.debug("Ignoring %s because it had previous "
237 "errors." % fac.ref_spec)
168 continue 238 continue
169 239
170 entry = BakeRecordPageEntry(fac.source.name, fac.rel_path, 240 # Make sure the source and the route exist for this page,
171 fac.path) 241 # otherwise we add errors to the record entry and we'll skip
172 record.addEntry(entry) 242 # this page for the rest of the bake.
173 243 source = self.app.getSource(fac.source.name)
174 route = self.app.getRoute(source.name, fac.metadata, 244 if source is None:
245 record_entry.errors.append(
246 "Can't get source for page: %s" % fac.ref_spec)
247 logger.error(record_entry.errors[-1])
248 continue
249
250 route = self.app.getRoute(fac.source.name, fac.metadata,
175 skip_taxonomies=True) 251 skip_taxonomies=True)
176 if route is None: 252 if route is None:
177 entry.errors.append( 253 record_entry.errors.append(
178 "Can't get route for page: %s" % fac.ref_spec) 254 "Can't get route for page: %s" % fac.ref_spec)
179 logger.error(entry.errors[-1]) 255 logger.error(record_entry.errors[-1])
180 continue 256 continue
181 257
182 queue.addJob(BakeWorkerJob(fac, route, entry)) 258 # All good, queue the job.
183 259 job = BakeWorkerJob(
184 success = self._waitOnWorkerPool(pool, abort) 260 JOB_RENDER_FIRST,
185 record.current.success &= success 261 RenderFirstSubJobPayload(fac))
186 262 pool.queue.put_nowait(job)
187 def _bakeTaxonomies(self, record): 263 expected_result_count += 1
188 logger.debug("Baking taxonomies") 264
189 265 def _handler(res):
266 entry = record.getCurrentEntry(res.path)
267
268 entry.first_render_info = FirstRenderInfo()
269 entry.first_render_info.used_assets = res.used_assets
270 entry.first_render_info.used_pagination = \
271 res.used_pagination
272 entry.first_render_info.pagination_has_more = \
273 res.pagination_has_more
274
275 if res.errors:
276 entry.errors += res.errors
277 record.current.success = False
278
279 self._waitOnWorkerPool(
280 pool,
281 expected_result_count=expected_result_count,
282 result_handler=_handler)
283
284 def _bakeRealmPages(self, record, pool, factories):
285 with format_timed_scope(logger,
286 "baked %d pages" % len(factories),
287 level=logging.DEBUG, colored=False):
288 expected_result_count = 0
289 for fac in factories:
290 if self._queueBakeJob(record, pool, fac):
291 expected_result_count += 1
292
293 def _handler(res):
294 entry = record.getCurrentEntry(res.path, res.taxonomy_info)
295 entry.bake_info = res.bake_info
296 if res.errors:
297 entry.errors += res.errors
298 if entry.has_any_error:
299 record.current.success = False
300
301 self._waitOnWorkerPool(
302 pool,
303 expected_result_count=expected_result_count,
304 result_handler=_handler)
305
306 def _bakeTaxonomies(self, record, pool):
307 with format_timed_scope(logger, 'built taxonomy buckets',
308 level=logging.DEBUG, colored=False):
309 buckets = self._buildTaxonomyBuckets(record)
310
311 start_time = time.perf_counter()
312 page_count = self._bakeTaxonomyBuckets(record, pool, buckets)
313 logger.info(format_timed(start_time,
314 "baked %d taxonomy pages." % page_count))
315
316 def _buildTaxonomyBuckets(self, record):
190 # Let's see all the taxonomy terms for which we must bake a 317 # Let's see all the taxonomy terms for which we must bake a
191 # listing page... first, pre-populate our big map of used terms. 318 # listing page... first, pre-populate our big map of used terms.
192 # For each source name, we have a list of taxonomies, and for each 319 # For each source name, we have a list of taxonomies, and for each
193 # taxonomies, a list of terms, some being 'dirty', some used last 320 # taxonomies, a list of terms, some being 'dirty', some used last
194 # time, etc. 321 # time, etc.
248 tt_info = buckets[sn][tn] 375 tt_info = buckets[sn][tn]
249 tt_info.all_terms.add(terms) 376 tt_info.all_terms.add(terms)
250 if not tt_info.dirty_terms.isdisjoint(set(terms)): 377 if not tt_info.dirty_terms.isdisjoint(set(terms)):
251 tt_info.dirty_terms.add(terms) 378 tt_info.dirty_terms.add(terms)
252 379
380 return buckets
381
382 def _bakeTaxonomyBuckets(self, record, pool, buckets):
253 # Start baking those terms. 383 # Start baking those terms.
254 pool, queue, abort = self._createWorkerPool(record, self.num_workers) 384 expected_result_count = 0
255 for source_name, source_taxonomies in buckets.items(): 385 for source_name, source_taxonomies in buckets.items():
256 for tax_name, tt_info in source_taxonomies.items(): 386 for tax_name, tt_info in source_taxonomies.items():
257 terms = tt_info.dirty_terms 387 terms = tt_info.dirty_terms
258 if len(terms) == 0: 388 if len(terms) == 0:
259 continue 389 continue
260 390
261 logger.debug( 391 logger.debug(
262 "Baking '%s' for source '%s': %s" % 392 "Baking '%s' for source '%s': %s" %
263 (tax_name, source_name, terms)) 393 (tax_name, source_name, terms))
264 tax = self.app.getTaxonomy(tax_name) 394 tax = self.app.getTaxonomy(tax_name)
265 route = self.app.getTaxonomyRoute(tax_name, source_name) 395 source = self.app.getSource(source_name)
266 tax_page_ref = tax.getPageRef(source_name) 396 tax_page_ref = tax.getPageRef(source)
267 if not tax_page_ref.exists: 397 if not tax_page_ref.exists:
268 logger.debug( 398 logger.debug(
269 "No taxonomy page found at '%s', skipping." % 399 "No taxonomy page found at '%s', skipping." %
270 tax.page_ref) 400 tax.page_ref)
271 continue 401 continue
272 402
273 logger.debug( 403 logger.debug(
274 "Using taxonomy page: %s:%s" % 404 "Using taxonomy page: %s:%s" %
275 (tax_page_ref.source_name, tax_page_ref.rel_path)) 405 (tax_page_ref.source_name, tax_page_ref.rel_path))
406 fac = tax_page_ref.getFactory()
407
276 for term in terms: 408 for term in terms:
277 fac = tax_page_ref.getFactory()
278 logger.debug( 409 logger.debug(
279 "Queuing: %s [%s=%s]" % 410 "Queuing: %s [%s=%s]" %
280 (fac.ref_spec, tax_name, term)) 411 (fac.ref_spec, tax_name, term))
281 entry = BakeRecordPageEntry( 412 tax_info = TaxonomyInfo(tax_name, source_name, term)
282 fac.source.name, fac.rel_path, fac.path, 413
283 (tax_name, term, source_name)) 414 cur_entry = BakeRecordEntry(
284 record.addEntry(entry) 415 fac.source.name, fac.path, tax_info)
285 queue.addJob(BakeWorkerJob(fac, route, entry)) 416 record.addEntry(cur_entry)
286 417
287 success = self._waitOnWorkerPool(pool, abort) 418 if self._queueBakeJob(record, pool, fac, tax_info):
288 record.current.success &= success 419 expected_result_count += 1
420
421 def _handler(res):
422 entry = record.getCurrentEntry(res.path, res.taxonomy_info)
423 entry.bake_info = res.bake_info
424 if res.errors:
425 entry.errors += res.errors
426 if entry.has_any_error:
427 record.current.success = False
428
429 self._waitOnWorkerPool(
430 pool,
431 expected_result_count=expected_result_count,
432 result_handler=_handler)
289 433
290 # Now we create bake entries for all the terms that were *not* dirty. 434 # Now we create bake entries for all the terms that were *not* dirty.
291 # This is because otherwise, on the next incremental bake, we wouldn't 435 # This is because otherwise, on the next incremental bake, we wouldn't
292 # find any entry for those things, and figure that we need to delete 436 # find any entry for those things, and figure that we need to delete
293 # their outputs. 437 # their outputs.
294 for prev_entry, cur_entry in record.transitions.values(): 438 for prev_entry, cur_entry in record.transitions.values():
295 # Only consider taxonomy-related entries that don't have any 439 # Only consider taxonomy-related entries that don't have any
296 # current version. 440 # current version.
297 if (prev_entry and prev_entry.taxonomy_info and 441 if (prev_entry and prev_entry.taxonomy_info and
298 not cur_entry): 442 not cur_entry):
299 sn = prev_entry.source_name 443 ti = prev_entry.taxonomy_info
300 tn, tt, tsn = prev_entry.taxonomy_info 444 tt_info = buckets[ti.source_name][ti.taxonomy_name]
301 tt_info = buckets[tsn][tn] 445 if ti.term in tt_info.all_terms:
302 if tt in tt_info.all_terms:
303 logger.debug("Creating unbaked entry for taxonomy " 446 logger.debug("Creating unbaked entry for taxonomy "
304 "term '%s:%s'." % (tn, tt)) 447 "term '%s:%s'." % (ti.taxonomy_name, ti.term))
305 record.collapseEntry(prev_entry) 448 record.collapseEntry(prev_entry)
306 else: 449 else:
307 logger.debug("Taxonomy term '%s:%s' isn't used anymore." % 450 logger.debug("Taxonomy term '%s:%s' isn't used anymore." %
308 (tn, tt)) 451 (ti.taxonomy_name, ti.term))
452
453 return expected_result_count
454
455 def _queueBakeJob(self, record, pool, fac, tax_info=None):
456 # Get the previous (if any) and current entry for this page.
457 pair = record.getPreviousAndCurrentEntries(fac.path, tax_info)
458 assert pair is not None
459 prev_entry, cur_entry = pair
460 assert cur_entry is not None
461
462 # Ignore if there were errors in the previous passes.
463 if cur_entry.errors:
464 logger.debug("Ignoring %s because it had previous "
465 "errors." % fac.ref_spec)
466 return False
467
468 # Build the route metadata and find the appropriate route.
469 route_metadata = copy.deepcopy(fac.metadata)
470 if tax_info is not None:
471 tax = self.app.getTaxonomy(tax_info.taxonomy_name)
472 route = self.app.getTaxonomyRoute(tax_info.taxonomy_name,
473 tax_info.source_name)
474
475 slugified_term = route.slugifyTaxonomyTerm(tax_info.term)
476 route_metadata[tax.term_name] = slugified_term
477 else:
478 route = self.app.getRoute(fac.source.name, route_metadata,
479 skip_taxonomies=True)
480 assert route is not None
481
482 # Figure out if this page is overriden by another previously
483 # baked page. This happens for example when the user has
484 # made a page that has the same page/URL as a theme page.
485 page = fac.buildPage()
486 uri = route.getUri(route_metadata, provider=page)
487 override_entry = record.getOverrideEntry(page.path, uri)
488 if override_entry is not None:
489 override_source = self.app.getSource(
490 override_entry.source_name)
491 if override_source.realm == fac.source.realm:
492 cur_entry.errors.append(
493 "Page '%s' maps to URL '%s' but is overriden "
494 "by page '%s'." %
495 (fac.ref_spec, uri, override_entry.path))
496 logger.error(cur_entry.errors[-1])
497 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN
498 return False
499
500 job = BakeWorkerJob(
501 JOB_BAKE,
502 BakeJobPayload(fac, route_metadata, prev_entry,
503 cur_entry.first_render_info,
504 record.dirty_source_names,
505 tax_info))
506 pool.queue.put_nowait(job)
507 return True
309 508
310 def _handleDeletetions(self, record): 509 def _handleDeletetions(self, record):
311 for path, reason in record.getDeletions(): 510 for path, reason in record.getDeletions():
312 logger.debug("Removing '%s': %s" % (path, reason)) 511 logger.debug("Removing '%s': %s" % (path, reason))
313 try: 512 try:
316 except OSError: 515 except OSError:
317 # Not a big deal if that file had already been removed 516 # Not a big deal if that file had already been removed
318 # by the user. 517 # by the user.
319 pass 518 pass
320 519
321 def _createWorkerPool(self, record, pool_size=4): 520 def _createWorkerPool(self):
322 pool = [] 521 from piecrust.baking.worker import BakeWorkerContext, worker_func
323 queue = BakeScheduler(record) 522
324 abort = threading.Event() 523 pool = _WorkerPool()
325 for i in range(pool_size): 524 for i in range(self.num_workers):
326 ctx = BakeWorkerContext( 525 ctx = BakeWorkerContext(
327 self.app, self.out_dir, self.force, 526 self.app.root_dir, self.out_dir,
328 record, queue, abort) 527 pool.queue, pool.results, pool.abort_event,
329 worker = BakeWorker(i, ctx) 528 force=self.force, debug=self.app.debug)
330 pool.append(worker) 529 w = multiprocessing.Process(
331 return pool, queue, abort 530 target=worker_func, args=(i, ctx))
332
333 def _waitOnWorkerPool(self, pool, abort):
334 for w in pool:
335 w.start() 531 w.start()
336 532 pool.workers.append(w)
337 success = True 533 return pool
534
535 def _terminateWorkerPool(self, pool):
536 pool.abort_event.set()
537 for w in pool.workers:
538 w.join()
539
540 def _waitOnWorkerPool(self, pool,
541 expected_result_count=-1, result_handler=None):
542 assert result_handler is None or expected_result_count >= 0
543 abort_with_exception = None
338 try: 544 try:
339 for w in pool: 545 if result_handler is None:
340 w.join() 546 pool.queue.join()
341 success &= w.success 547 else:
342 except KeyboardInterrupt: 548 got_count = 0
549 while got_count < expected_result_count:
550 try:
551 res = pool.results.get(True, 10)
552 except queue.Empty:
553 logger.error(
554 "Got %d results, expected %d, and timed-out "
555 "for 10 seconds. A worker might be stuck?" %
556 (got_count, expected_result_count))
557 abort_with_exception = Exception("Worker time-out.")
558 break
559
560 got_count += 1
561 result_handler(res)
562 except KeyboardInterrupt as kiex:
343 logger.warning("Bake aborted by user... " 563 logger.warning("Bake aborted by user... "
344 "waiting for workers to stop.") 564 "waiting for workers to stop.")
345 abort.set() 565 abort_with_exception = kiex
346 for w in pool: 566
347 w.join() 567 if abort_with_exception:
348 raise 568 pool.abort_event.set()
349 569 for w in pool.workers:
350 if abort.is_set(): 570 w.join(2)
351 excs = [w.abort_exception for w in pool 571 raise abort_with_exception
352 if w.abort_exception is not None] 572
353 logger.error("Baking was aborted due to %s error(s):" % len(excs)) 573
354 if self.app.debug: 574 class _WorkerPool(object):
355 for e in excs: 575 def __init__(self):
356 logger.exception(e) 576 self.queue = multiprocessing.JoinableQueue()
357 else: 577 self.results = multiprocessing.Queue()
358 for e in excs: 578 self.abort_event = multiprocessing.Event()
359 log_friendly_exception(logger, e) 579 self.workers = []
360 raise BakingError("Baking was aborted due to errors.")
361
362 return success
363
364
365 class BakeWorkerContext(object):
366 def __init__(self, app, out_dir, force, record, work_queue,
367 abort_event):
368 self.app = app
369 self.out_dir = out_dir
370 self.force = force
371 self.record = record
372 self.work_queue = work_queue
373 self.abort_event = abort_event
374
375
376 class BakeWorkerJob(object):
377 def __init__(self, factory, route, record_entry):
378 self.factory = factory
379 self.route = route
380 self.record_entry = record_entry
381
382 @property
383 def source(self):
384 return self.factory.source
385
386
387 class BakeWorker(threading.Thread):
388 def __init__(self, wid, ctx):
389 super(BakeWorker, self).__init__(name=('worker%d' % wid))
390 self.wid = wid
391 self.ctx = ctx
392 self.abort_exception = None
393 self.success = True
394 self._page_baker = PageBaker(
395 ctx.app, ctx.out_dir, ctx.force,
396 ctx.record)
397
398 def run(self):
399 while(not self.ctx.abort_event.is_set()):
400 try:
401 job = self.ctx.work_queue.getNextJob(wait_timeout=1)
402 if job is None:
403 logger.debug(
404 "[%d] No more work... shutting down." %
405 self.wid)
406 break
407 success = self._unsafeRun(job)
408 logger.debug("[%d] Done with page." % self.wid)
409 self.ctx.work_queue.onJobFinished(job)
410 self.success &= success
411 except Exception as ex:
412 self.ctx.abort_event.set()
413 self.abort_exception = ex
414 self.success = False
415 logger.debug("[%d] Critical error, aborting." % self.wid)
416 if self.ctx.app.debug:
417 logger.exception(ex)
418 break
419
420 def _unsafeRun(self, job):
421 start_time = time.clock()
422
423 entry = job.record_entry
424 try:
425 self._page_baker.bake(job.factory, job.route, entry)
426 except BakingError as ex:
427 logger.debug("Got baking error. Adding it to the record.")
428 while ex:
429 entry.errors.append(str(ex))
430 ex = ex.__cause__
431
432 has_error = False
433 for e in entry.getAllErrors():
434 has_error = True
435 logger.error(e)
436 if has_error:
437 return False
438
439 if entry.was_any_sub_baked:
440 first_sub = entry.subs[0]
441
442 friendly_uri = first_sub.out_uri
443 if friendly_uri == '':
444 friendly_uri = '[main page]'
445
446 friendly_count = ''
447 if entry.num_subs > 1:
448 friendly_count = ' (%d pages)' % entry.num_subs
449 logger.info(format_timed(
450 start_time, '[%d] %s%s' %
451 (self.wid, friendly_uri, friendly_count)))
452
453 return True
454 580
455 581
456 class _TaxonomyTermsInfo(object): 582 class _TaxonomyTermsInfo(object):
457 def __init__(self): 583 def __init__(self):
458 self.dirty_terms = set() 584 self.dirty_terms = set()
461 def __str__(self): 587 def __str__(self):
462 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms) 588 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms)
463 589
464 def __repr__(self): 590 def __repr__(self):
465 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms) 591 return 'dirty:%s, all:%s' % (self.dirty_terms, self.all_terms)
592