Mercurial > piecrust2
comparison piecrust/baking/baker.py @ 852:4850f8c21b6e
core: Start of the big refactor for PieCrust 3.0.
* Everything is a `ContentSource`, including assets directories.
* Most content sources are subclasses of the base file-system source.
* A source is processed by a "pipeline", and there are 2 built-in pipelines,
one for assets and one for pages. The asset pipeline is vaguely functional,
but the page pipeline is completely broken right now.
* Rewrite the baking process as just running appropriate pipelines on each
content item. This should allow for better parallelization.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 17 May 2017 00:11:48 -0700 |
parents | 9a92e2804562 |
children | f070a4fc033c |
comparison
equal
deleted
inserted
replaced
851:2c7e57d80bba | 852:4850f8c21b6e |
---|---|
1 import time | 1 import time |
2 import os.path | 2 import os.path |
3 import hashlib | 3 import hashlib |
4 import logging | 4 import logging |
5 from piecrust.baking.records import ( | 5 from piecrust.baking.worker import BakeJob |
6 BakeRecordEntry, TransitionalBakeRecord) | |
7 from piecrust.baking.worker import ( | |
8 save_factory, | |
9 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE) | |
10 from piecrust.chefutil import ( | 6 from piecrust.chefutil import ( |
11 format_timed_scope, format_timed) | 7 format_timed_scope, format_timed) |
12 from piecrust.environment import ExecutionStats | 8 from piecrust.environment import ExecutionStats |
13 from piecrust.generation.base import PageGeneratorBakeContext | 9 from piecrust.pipelines.base import PipelineContext |
14 from piecrust.routing import create_route_metadata | 10 from piecrust.pipelines.records import ( |
15 from piecrust.sources.base import ( | 11 MultiRecordHistory, MultiRecord, RecordEntry, |
16 REALM_NAMES, REALM_USER, REALM_THEME) | 12 load_records) |
13 from piecrust.sources.base import REALM_USER, REALM_THEME | |
17 | 14 |
18 | 15 |
19 logger = logging.getLogger(__name__) | 16 logger = logging.getLogger(__name__) |
20 | 17 |
21 | 18 |
19 def get_bake_records_path(app, out_dir): | |
20 records_cache = app.cache.getCache('baker') | |
21 records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest() | |
22 records_name = records_id + '.record' | |
23 return records_cache.getCachePath(records_name) | |
24 | |
25 | |
22 class Baker(object): | 26 class Baker(object): |
23 def __init__(self, app, out_dir, force=False, | 27 def __init__(self, appfactory, app, out_dir, |
24 applied_config_variant=None, | 28 force=False, allowed_pipelines=None): |
25 applied_config_values=None): | 29 self.appfactory = appfactory |
26 assert app and out_dir | |
27 self.app = app | 30 self.app = app |
28 self.out_dir = out_dir | 31 self.out_dir = out_dir |
29 self.force = force | 32 self.force = force |
30 self.applied_config_variant = applied_config_variant | 33 |
31 self.applied_config_values = applied_config_values | 34 self._pipeline_classes = {} |
32 | 35 for pclass in app.plugin_loader.getPipelines(): |
33 # Remember what generator pages we should skip. | 36 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass |
34 self.generator_pages = [] | 37 |
35 logger.debug("Gathering generator page paths:") | 38 self.allowed_pipelines = allowed_pipelines |
36 for gen in self.app.generators: | 39 if allowed_pipelines is None: |
37 for path in gen.page_ref.possible_paths: | 40 self.allowed_pipelines = list(self._pipeline_classes.keys()) |
38 self.generator_pages.append(path) | 41 |
39 logger.debug(" - %s" % path) | 42 self._records = None |
40 | |
41 # Register some timers. | |
42 self.app.env.registerTimer('LoadJob', raise_if_registered=False) | |
43 self.app.env.registerTimer('RenderFirstSubJob', | |
44 raise_if_registered=False) | |
45 self.app.env.registerTimer('BakeJob', raise_if_registered=False) | |
46 | 43 |
47 def bake(self): | 44 def bake(self): |
45 start_time = time.perf_counter() | |
48 logger.debug(" Bake Output: %s" % self.out_dir) | 46 logger.debug(" Bake Output: %s" % self.out_dir) |
49 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) | 47 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) |
50 | 48 |
51 # Get into bake mode. | 49 # Get into bake mode. |
52 start_time = time.perf_counter() | |
53 self.app.config.set('baker/is_baking', True) | 50 self.app.config.set('baker/is_baking', True) |
54 self.app.env.base_asset_url_format = '%uri%' | 51 self.app.config.set('site/base_asset_url_format', '%uri') |
55 | 52 |
56 # Make sure the output directory exists. | 53 # Make sure the output directory exists. |
57 if not os.path.isdir(self.out_dir): | 54 if not os.path.isdir(self.out_dir): |
58 os.makedirs(self.out_dir, 0o755) | 55 os.makedirs(self.out_dir, 0o755) |
59 | 56 |
60 # Load/create the bake record. | 57 # Load/create the bake records. |
61 record = TransitionalBakeRecord() | 58 records_path = get_bake_records_path( |
62 record_cache = self.app.cache.getCache('baker') | 59 self.app, self.out_dir) |
63 record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest() | 60 if not self.force and os.path.isfile(records_path): |
64 record_name = record_id + '.record' | 61 with format_timed_scope(logger, "loaded previous bake records", |
65 previous_record_path = None | |
66 if not self.force and record_cache.has(record_name): | |
67 with format_timed_scope(logger, "loaded previous bake record", | |
68 level=logging.DEBUG, colored=False): | 62 level=logging.DEBUG, colored=False): |
69 previous_record_path = record_cache.getCachePath(record_name) | 63 previous_records = load_records(records_path) |
70 record.loadPrevious(previous_record_path) | 64 else: |
71 record.current.success = True | 65 previous_records = MultiRecord() |
66 self._records = MultiRecord() | |
72 | 67 |
73 # Figure out if we need to clean the cache because important things | 68 # Figure out if we need to clean the cache because important things |
74 # have changed. | 69 # have changed. |
75 is_cache_valid = self._handleCacheValidity(record) | 70 is_cache_valid = self._handleCacheValidity(previous_records, |
71 self._records) | |
76 if not is_cache_valid: | 72 if not is_cache_valid: |
77 previous_record_path = None | 73 previous_records = MultiRecord() |
74 | |
75 # Create the bake records history which tracks what's up-to-date | |
76 # or not since last time we baked to the given output folder. | |
77 record_history = MultiRecordHistory(previous_records, self._records) | |
78 | 78 |
79 # Pre-create all caches. | 79 # Pre-create all caches. |
80 for cache_name in ['app', 'baker', 'pages', 'renders']: | 80 for cache_name in ['app', 'baker', 'pages', 'renders']: |
81 self.app.cache.getCache(cache_name) | 81 self.app.cache.getCache(cache_name) |
82 | 82 |
83 # Gather all sources by realm -- we're going to bake each realm | 83 # Gather all sources by realm -- we're going to bake each realm |
84 # separately so we can handle "overriding" (i.e. one realm overrides | 84 # separately so we can handle "overriding" (i.e. one realm overrides |
85 # another realm's pages, like the user realm overriding the theme | 85 # another realm's pages, like the user realm overriding the theme |
86 # realm). | 86 # realm). |
87 # | |
88 # Also, create and initialize each pipeline for each source. | |
87 sources_by_realm = {} | 89 sources_by_realm = {} |
90 ppctx = PipelineContext(self.out_dir, record_history, | |
91 force=self.force) | |
88 for source in self.app.sources: | 92 for source in self.app.sources: |
89 srclist = sources_by_realm.setdefault(source.realm, []) | 93 pname = source.config['pipeline'] |
90 srclist.append(source) | 94 if pname in self.allowed_pipelines: |
95 srclist = sources_by_realm.setdefault( | |
96 source.config['realm'], []) | |
97 | |
98 pp = self._pipeline_classes[pname](source) | |
99 pp.initialize(ppctx) | |
100 srclist.append((source, pp)) | |
101 else: | |
102 logger.debug( | |
103 "Skip source '%s' because pipeline '%s' is ignored." % | |
104 (source.name, pname)) | |
91 | 105 |
92 # Create the worker processes. | 106 # Create the worker processes. |
93 pool = self._createWorkerPool(previous_record_path) | 107 pool = self._createWorkerPool(records_path) |
94 | 108 |
95 # Bake the realms. | 109 # Bake the realms -- user first, theme second, so that a user item |
110 # can override a theme item. | |
96 realm_list = [REALM_USER, REALM_THEME] | 111 realm_list = [REALM_USER, REALM_THEME] |
97 for realm in realm_list: | 112 for realm in realm_list: |
98 srclist = sources_by_realm.get(realm) | 113 srclist = sources_by_realm.get(realm) |
99 if srclist is not None: | 114 if srclist is not None: |
100 self._bakeRealm(record, pool, realm, srclist) | 115 self._bakeRealm(record_history, pool, realm, srclist) |
101 | |
102 # Call all the page generators. | |
103 self._bakePageGenerators(record, pool) | |
104 | 116 |
105 # All done with the workers. Close the pool and get reports. | 117 # All done with the workers. Close the pool and get reports. |
106 reports = pool.close() | 118 pool_stats = pool.close() |
107 total_stats = ExecutionStats() | 119 total_stats = ExecutionStats() |
108 record.current.stats['_Total'] = total_stats | 120 for ps in pool_stats: |
109 for i in range(len(reports)): | 121 if ps is not None: |
110 worker_stats = reports[i]['data'] | 122 total_stats.mergeStats(ps) |
111 if worker_stats is not None: | 123 record_history.current.stats = total_stats |
112 worker_name = 'BakeWorker_%d' % i | 124 |
113 record.current.stats[worker_name] = worker_stats | 125 # Shutdown the pipelines. |
114 total_stats.mergeStats(worker_stats) | 126 for realm in realm_list: |
115 | 127 srclist = sources_by_realm.get(realm) |
116 # Delete files from the output. | 128 if srclist is not None: |
117 self._handleDeletetions(record) | 129 for _, pp in srclist: |
130 pp.shutdown(ppctx) | |
118 | 131 |
119 # Backup previous records. | 132 # Backup previous records. |
133 records_dir, records_fn = os.path.split(records_path) | |
134 records_id, _ = os.path.splitext(records_fn) | |
120 for i in range(8, -1, -1): | 135 for i in range(8, -1, -1): |
121 suffix = '' if i == 0 else '.%d' % i | 136 suffix = '' if i == 0 else '.%d' % i |
122 record_path = record_cache.getCachePath( | 137 records_path_i = os.path.join( |
123 '%s%s.record' % (record_id, suffix)) | 138 records_dir, |
124 if os.path.exists(record_path): | 139 '%s%s.record' % (records_id, suffix)) |
125 record_path_next = record_cache.getCachePath( | 140 if os.path.exists(records_path_i): |
126 '%s.%s.record' % (record_id, i + 1)) | 141 records_path_next = os.path.join( |
127 if os.path.exists(record_path_next): | 142 records_dir, |
128 os.remove(record_path_next) | 143 '%s.%s.record' % (records_id, i + 1)) |
129 os.rename(record_path, record_path_next) | 144 if os.path.exists(records_path_next): |
145 os.remove(records_path_next) | |
146 os.rename(records_path_i, records_path_next) | |
130 | 147 |
131 # Save the bake record. | 148 # Save the bake record. |
132 with format_timed_scope(logger, "saved bake record.", | 149 with format_timed_scope(logger, "saved bake records.", |
133 level=logging.DEBUG, colored=False): | 150 level=logging.DEBUG, colored=False): |
134 record.current.bake_time = time.time() | 151 record_history.current.bake_time = time.time() |
135 record.current.out_dir = self.out_dir | 152 record_history.current.out_dir = self.out_dir |
136 record.saveCurrent(record_cache.getCachePath(record_name)) | 153 record_history.current.save(records_path) |
137 | 154 |
138 # All done. | 155 # All done. |
139 self.app.config.set('baker/is_baking', False) | 156 self.app.config.set('baker/is_baking', False) |
140 logger.debug(format_timed(start_time, 'done baking')) | 157 logger.debug(format_timed(start_time, 'done baking')) |
141 | 158 |
142 return record.detach() | 159 self._records = None |
143 | 160 return record_history.current |
144 def _handleCacheValidity(self, record): | 161 |
162 def _handleCacheValidity(self, previous_records, current_records): | |
145 start_time = time.perf_counter() | 163 start_time = time.perf_counter() |
146 | 164 |
147 reason = None | 165 reason = None |
148 if self.force: | 166 if self.force: |
149 reason = "ordered to" | 167 reason = "ordered to" |
150 elif not self.app.config.get('__cache_valid'): | 168 elif not self.app.config.get('__cache_valid'): |
151 # The configuration file was changed, or we're running a new | 169 # The configuration file was changed, or we're running a new |
152 # version of the app. | 170 # version of the app. |
153 reason = "not valid anymore" | 171 reason = "not valid anymore" |
154 elif (not record.previous.bake_time or | 172 elif previous_records.invalidated: |
155 not record.previous.hasLatestVersion()): | |
156 # We have no valid previous bake record. | 173 # We have no valid previous bake record. |
157 reason = "need bake record regeneration" | 174 reason = "need bake record regeneration" |
158 else: | 175 else: |
159 # Check if any template has changed since the last bake. Since | 176 # Check if any template has changed since the last bake. Since |
160 # there could be some advanced conditional logic going on, we'd | 177 # there could be some advanced conditional logic going on, we'd |
163 for d in self.app.templates_dirs: | 180 for d in self.app.templates_dirs: |
164 for dpath, _, filenames in os.walk(d): | 181 for dpath, _, filenames in os.walk(d): |
165 for fn in filenames: | 182 for fn in filenames: |
166 full_fn = os.path.join(dpath, fn) | 183 full_fn = os.path.join(dpath, fn) |
167 max_time = max(max_time, os.path.getmtime(full_fn)) | 184 max_time = max(max_time, os.path.getmtime(full_fn)) |
168 if max_time >= record.previous.bake_time: | 185 if max_time >= previous_records.bake_time: |
169 reason = "templates modified" | 186 reason = "templates modified" |
170 | 187 |
171 if reason is not None: | 188 if reason is not None: |
172 # We have to bake everything from scratch. | 189 # We have to bake everything from scratch. |
173 self.app.cache.clearCaches(except_names=['app', 'baker']) | 190 self.app.cache.clearCaches(except_names=['app', 'baker']) |
174 self.force = True | 191 self.force = True |
175 record.incremental_count = 0 | 192 current_records.incremental_count = 0 |
176 record.clearPrevious() | 193 previous_records = MultiRecord() |
177 logger.info(format_timed( | 194 logger.info(format_timed( |
178 start_time, | 195 start_time, "cleaned cache (reason: %s)" % reason)) |
179 "cleaned cache (reason: %s)" % reason)) | |
180 return False | 196 return False |
181 else: | 197 else: |
182 record.incremental_count += 1 | 198 current_records.incremental_count += 1 |
183 logger.debug(format_timed( | 199 logger.debug(format_timed( |
184 start_time, "cache is assumed valid", | 200 start_time, "cache is assumed valid", colored=False)) |
185 colored=False)) | |
186 return True | 201 return True |
187 | 202 |
188 def _bakeRealm(self, record, pool, realm, srclist): | 203 def _bakeRealm(self, record_history, pool, realm, srclist): |
189 start_time = time.perf_counter() | 204 for source, pp in srclist: |
190 try: | 205 logger.debug("Queuing jobs for source '%s' using pipeline '%s'." % |
191 record.current.baked_count[realm] = 0 | 206 (source.name, pp.PIPELINE_NAME)) |
192 record.current.total_baked_count[realm] = 0 | 207 jobs = [BakeJob(source.name, item.spec, item.metadata) |
193 | 208 for item in source.getAllContents()] |
194 all_factories = [] | 209 pool.queueJobs(jobs) |
195 for source in srclist: | 210 pool.wait() |
196 factories = source.getPageFactories() | 211 |
197 all_factories += [f for f in factories | 212 def _logErrors(self, item_spec, errors): |
198 if f.path not in self.generator_pages] | 213 logger.error("Errors found in %s:" % item_spec) |
199 | |
200 self._loadRealmPages(record, pool, all_factories) | |
201 self._renderRealmPages(record, pool, all_factories) | |
202 self._bakeRealmPages(record, pool, realm, all_factories) | |
203 finally: | |
204 page_count = record.current.baked_count[realm] | |
205 total_page_count = record.current.total_baked_count[realm] | |
206 logger.info(format_timed( | |
207 start_time, | |
208 "baked %d %s pages (%d total)." % | |
209 (page_count, REALM_NAMES[realm].lower(), | |
210 total_page_count))) | |
211 | |
212 def _loadRealmPages(self, record, pool, factories): | |
213 def _handler(res): | |
214 # Create the record entry for this page. | |
215 # This will also update the `dirty_source_names` for the record | |
216 # as we add page files whose last modification times are later | |
217 # than the last bake. | |
218 record_entry = BakeRecordEntry(res['source_name'], res['path']) | |
219 record_entry.config = res['config'] | |
220 record_entry.timestamp = res['timestamp'] | |
221 if res['errors']: | |
222 record_entry.errors += res['errors'] | |
223 record.current.success = False | |
224 self._logErrors(res['path'], res['errors']) | |
225 record.addEntry(record_entry) | |
226 | |
227 logger.debug("Loading %d realm pages..." % len(factories)) | |
228 with format_timed_scope(logger, | |
229 "loaded %d pages" % len(factories), | |
230 level=logging.DEBUG, colored=False, | |
231 timer_env=self.app.env, | |
232 timer_category='LoadJob'): | |
233 jobs = [] | |
234 for fac in factories: | |
235 job = { | |
236 'type': JOB_LOAD, | |
237 'job': save_factory(fac)} | |
238 jobs.append(job) | |
239 ar = pool.queueJobs(jobs, handler=_handler) | |
240 ar.wait() | |
241 | |
242 def _renderRealmPages(self, record, pool, factories): | |
243 def _handler(res): | |
244 entry = record.getCurrentEntry(res['path']) | |
245 if res['errors']: | |
246 entry.errors += res['errors'] | |
247 record.current.success = False | |
248 self._logErrors(res['path'], res['errors']) | |
249 | |
250 logger.debug("Rendering %d realm pages..." % len(factories)) | |
251 with format_timed_scope(logger, | |
252 "prepared %d pages" % len(factories), | |
253 level=logging.DEBUG, colored=False, | |
254 timer_env=self.app.env, | |
255 timer_category='RenderFirstSubJob'): | |
256 jobs = [] | |
257 for fac in factories: | |
258 record_entry = record.getCurrentEntry(fac.path) | |
259 if record_entry.errors: | |
260 logger.debug("Ignoring %s because it had previous " | |
261 "errors." % fac.ref_spec) | |
262 continue | |
263 | |
264 # Make sure the source and the route exist for this page, | |
265 # otherwise we add errors to the record entry and we'll skip | |
266 # this page for the rest of the bake. | |
267 source = self.app.getSource(fac.source.name) | |
268 if source is None: | |
269 record_entry.errors.append( | |
270 "Can't get source for page: %s" % fac.ref_spec) | |
271 logger.error(record_entry.errors[-1]) | |
272 continue | |
273 | |
274 route = self.app.getSourceRoute(fac.source.name, fac.metadata) | |
275 if route is None: | |
276 record_entry.errors.append( | |
277 "Can't get route for page: %s" % fac.ref_spec) | |
278 logger.error(record_entry.errors[-1]) | |
279 continue | |
280 | |
281 # All good, queue the job. | |
282 route_index = self.app.routes.index(route) | |
283 job = { | |
284 'type': JOB_RENDER_FIRST, | |
285 'job': { | |
286 'factory_info': save_factory(fac), | |
287 'route_index': route_index | |
288 } | |
289 } | |
290 jobs.append(job) | |
291 | |
292 ar = pool.queueJobs(jobs, handler=_handler) | |
293 ar.wait() | |
294 | |
295 def _bakeRealmPages(self, record, pool, realm, factories): | |
296 def _handler(res): | |
297 entry = record.getCurrentEntry(res['path']) | |
298 entry.subs = res['sub_entries'] | |
299 if res['errors']: | |
300 entry.errors += res['errors'] | |
301 self._logErrors(res['path'], res['errors']) | |
302 if entry.has_any_error: | |
303 record.current.success = False | |
304 if entry.subs and entry.was_any_sub_baked: | |
305 record.current.baked_count[realm] += 1 | |
306 record.current.total_baked_count[realm] += len(entry.subs) | |
307 | |
308 logger.debug("Baking %d realm pages..." % len(factories)) | |
309 with format_timed_scope(logger, | |
310 "baked %d pages" % len(factories), | |
311 level=logging.DEBUG, colored=False, | |
312 timer_env=self.app.env, | |
313 timer_category='BakeJob'): | |
314 jobs = [] | |
315 for fac in factories: | |
316 job = self._makeBakeJob(record, fac) | |
317 if job is not None: | |
318 jobs.append(job) | |
319 | |
320 ar = pool.queueJobs(jobs, handler=_handler) | |
321 ar.wait() | |
322 | |
323 def _bakePageGenerators(self, record, pool): | |
324 for gen in self.app.generators: | |
325 ctx = PageGeneratorBakeContext(self.app, record, pool, gen) | |
326 gen.bake(ctx) | |
327 | |
328 def _makeBakeJob(self, record, fac): | |
329 # Get the previous (if any) and current entry for this page. | |
330 pair = record.getPreviousAndCurrentEntries(fac.path) | |
331 assert pair is not None | |
332 prev_entry, cur_entry = pair | |
333 assert cur_entry is not None | |
334 | |
335 # Ignore if there were errors in the previous passes. | |
336 if cur_entry.errors: | |
337 logger.debug("Ignoring %s because it had previous " | |
338 "errors." % fac.ref_spec) | |
339 return None | |
340 | |
341 # Build the route metadata and find the appropriate route. | |
342 page = fac.buildPage() | |
343 route_metadata = create_route_metadata(page) | |
344 route = self.app.getSourceRoute(fac.source.name, route_metadata) | |
345 assert route is not None | |
346 | |
347 # Figure out if this page is overriden by another previously | |
348 # baked page. This happens for example when the user has | |
349 # made a page that has the same page/URL as a theme page. | |
350 uri = route.getUri(route_metadata) | |
351 override_entry = record.getOverrideEntry(page.path, uri) | |
352 if override_entry is not None: | |
353 override_source = self.app.getSource( | |
354 override_entry.source_name) | |
355 if override_source.realm == fac.source.realm: | |
356 cur_entry.errors.append( | |
357 "Page '%s' maps to URL '%s' but is overriden " | |
358 "by page '%s'." % | |
359 (fac.ref_spec, uri, override_entry.path)) | |
360 logger.error(cur_entry.errors[-1]) | |
361 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN | |
362 return None | |
363 | |
364 route_index = self.app.routes.index(route) | |
365 job = { | |
366 'type': JOB_BAKE, | |
367 'job': { | |
368 'factory_info': save_factory(fac), | |
369 'generator_name': None, | |
370 'generator_record_key': None, | |
371 'route_index': route_index, | |
372 'route_metadata': route_metadata, | |
373 'dirty_source_names': record.dirty_source_names | |
374 } | |
375 } | |
376 return job | |
377 | |
378 def _handleDeletetions(self, record): | |
379 logger.debug("Handling deletions...") | |
380 for path, reason in record.getDeletions(): | |
381 logger.debug("Removing '%s': %s" % (path, reason)) | |
382 record.current.deleted.append(path) | |
383 try: | |
384 os.remove(path) | |
385 logger.info('[delete] %s' % path) | |
386 except OSError: | |
387 # Not a big deal if that file had already been removed | |
388 # by the user. | |
389 pass | |
390 | |
391 def _logErrors(self, path, errors): | |
392 rel_path = os.path.relpath(path, self.app.root_dir) | |
393 logger.error("Errors found in %s:" % rel_path) | |
394 for e in errors: | 214 for e in errors: |
395 logger.error(" " + e) | 215 logger.error(" " + e) |
396 | 216 |
397 def _createWorkerPool(self, previous_record_path): | 217 def _createWorkerPool(self, previous_records_path): |
398 from piecrust.app import PieCrustFactory | |
399 from piecrust.workerpool import WorkerPool | 218 from piecrust.workerpool import WorkerPool |
400 from piecrust.baking.worker import BakeWorkerContext, BakeWorker | 219 from piecrust.baking.worker import BakeWorkerContext, BakeWorker |
401 | 220 |
402 appfactory = PieCrustFactory( | |
403 self.app.root_dir, | |
404 cache=self.app.cache.enabled, | |
405 cache_key=self.app.cache_key, | |
406 config_variant=self.applied_config_variant, | |
407 config_values=self.applied_config_values, | |
408 debug=self.app.debug, | |
409 theme_site=self.app.theme_site) | |
410 | |
411 worker_count = self.app.config.get('baker/workers') | 221 worker_count = self.app.config.get('baker/workers') |
412 batch_size = self.app.config.get('baker/batch_size') | 222 batch_size = self.app.config.get('baker/batch_size') |
413 | 223 |
414 ctx = BakeWorkerContext( | 224 ctx = BakeWorkerContext( |
415 appfactory, | 225 self.appfactory, |
416 self.out_dir, | 226 self.out_dir, |
417 force=self.force, | 227 force=self.force, |
418 previous_record_path=previous_record_path) | 228 previous_records_path=previous_records_path, |
229 allowed_pipelines=self.allowed_pipelines) | |
419 pool = WorkerPool( | 230 pool = WorkerPool( |
420 worker_count=worker_count, | 231 worker_count=worker_count, |
421 batch_size=batch_size, | 232 batch_size=batch_size, |
422 worker_class=BakeWorker, | 233 worker_class=BakeWorker, |
423 initargs=(ctx,)) | 234 initargs=(ctx,), |
235 callback=self._handleWorkerResult, | |
236 error_callback=self._handleWorkerError) | |
424 return pool | 237 return pool |
425 | 238 |
239 def _handleWorkerResult(self, job, res): | |
240 record_name = self._getRecordName(job) | |
241 record = self._records.getRecord(record_name) | |
242 record.entries.append(res.record) | |
243 if not res.record.success: | |
244 record.success = False | |
245 self._records.success = False | |
246 self._logErrors(job.item_spec, res.record.errors) | |
247 | |
248 def _handleWorkerError(self, job, exc_data): | |
249 e = RecordEntry() | |
250 e.item_spec = job.item_spec | |
251 e.errors.append(str(exc_data)) | |
252 | |
253 record_name = self._getRecordName(job) | |
254 record = self._records.getRecord(record_name) | |
255 record.entries.append(e) | |
256 | |
257 record.success = False | |
258 self._records.success = False | |
259 | |
260 self._logErrors(job.item_spec, e.errors) | |
261 if self.app.debug: | |
262 logger.error(exc_data.traceback) | |
263 | |
264 def _getRecordName(self, job): | |
265 sn = job.source_name | |
266 ppn = self.app.getSource(sn).config['pipeline'] | |
267 return '%s@%s' % (sn, ppn) |