comparison piecrust/baking/baker.py @ 852:4850f8c21b6e

core: Start of the big refactor for PieCrust 3.0. * Everything is a `ContentSource`, including assets directories. * Most content sources are subclasses of the base file-system source. * A source is processed by a "pipeline", and there are 2 built-in pipelines, one for assets and one for pages. The asset pipeline is vaguely functional, but the page pipeline is completely broken right now. * Rewrite the baking process as just running appropriate pipelines on each content item. This should allow for better parallelization.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 17 May 2017 00:11:48 -0700
parents 9a92e2804562
children f070a4fc033c
comparison
equal deleted inserted replaced
851:2c7e57d80bba 852:4850f8c21b6e
1 import time 1 import time
2 import os.path 2 import os.path
3 import hashlib 3 import hashlib
4 import logging 4 import logging
5 from piecrust.baking.records import ( 5 from piecrust.baking.worker import BakeJob
6 BakeRecordEntry, TransitionalBakeRecord)
7 from piecrust.baking.worker import (
8 save_factory,
9 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE)
10 from piecrust.chefutil import ( 6 from piecrust.chefutil import (
11 format_timed_scope, format_timed) 7 format_timed_scope, format_timed)
12 from piecrust.environment import ExecutionStats 8 from piecrust.environment import ExecutionStats
13 from piecrust.generation.base import PageGeneratorBakeContext 9 from piecrust.pipelines.base import PipelineContext
14 from piecrust.routing import create_route_metadata 10 from piecrust.pipelines.records import (
15 from piecrust.sources.base import ( 11 MultiRecordHistory, MultiRecord, RecordEntry,
16 REALM_NAMES, REALM_USER, REALM_THEME) 12 load_records)
13 from piecrust.sources.base import REALM_USER, REALM_THEME
17 14
18 15
19 logger = logging.getLogger(__name__) 16 logger = logging.getLogger(__name__)
20 17
21 18
19 def get_bake_records_path(app, out_dir):
20 records_cache = app.cache.getCache('baker')
21 records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest()
22 records_name = records_id + '.record'
23 return records_cache.getCachePath(records_name)
24
25
22 class Baker(object): 26 class Baker(object):
23 def __init__(self, app, out_dir, force=False, 27 def __init__(self, appfactory, app, out_dir,
24 applied_config_variant=None, 28 force=False, allowed_pipelines=None):
25 applied_config_values=None): 29 self.appfactory = appfactory
26 assert app and out_dir
27 self.app = app 30 self.app = app
28 self.out_dir = out_dir 31 self.out_dir = out_dir
29 self.force = force 32 self.force = force
30 self.applied_config_variant = applied_config_variant 33
31 self.applied_config_values = applied_config_values 34 self._pipeline_classes = {}
32 35 for pclass in app.plugin_loader.getPipelines():
33 # Remember what generator pages we should skip. 36 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
34 self.generator_pages = [] 37
35 logger.debug("Gathering generator page paths:") 38 self.allowed_pipelines = allowed_pipelines
36 for gen in self.app.generators: 39 if allowed_pipelines is None:
37 for path in gen.page_ref.possible_paths: 40 self.allowed_pipelines = list(self._pipeline_classes.keys())
38 self.generator_pages.append(path) 41
39 logger.debug(" - %s" % path) 42 self._records = None
40
41 # Register some timers.
42 self.app.env.registerTimer('LoadJob', raise_if_registered=False)
43 self.app.env.registerTimer('RenderFirstSubJob',
44 raise_if_registered=False)
45 self.app.env.registerTimer('BakeJob', raise_if_registered=False)
46 43
47 def bake(self): 44 def bake(self):
45 start_time = time.perf_counter()
48 logger.debug(" Bake Output: %s" % self.out_dir) 46 logger.debug(" Bake Output: %s" % self.out_dir)
49 logger.debug(" Root URL: %s" % self.app.config.get('site/root')) 47 logger.debug(" Root URL: %s" % self.app.config.get('site/root'))
50 48
51 # Get into bake mode. 49 # Get into bake mode.
52 start_time = time.perf_counter()
53 self.app.config.set('baker/is_baking', True) 50 self.app.config.set('baker/is_baking', True)
54 self.app.env.base_asset_url_format = '%uri%' 51 self.app.config.set('site/base_asset_url_format', '%uri')
55 52
56 # Make sure the output directory exists. 53 # Make sure the output directory exists.
57 if not os.path.isdir(self.out_dir): 54 if not os.path.isdir(self.out_dir):
58 os.makedirs(self.out_dir, 0o755) 55 os.makedirs(self.out_dir, 0o755)
59 56
60 # Load/create the bake record. 57 # Load/create the bake records.
61 record = TransitionalBakeRecord() 58 records_path = get_bake_records_path(
62 record_cache = self.app.cache.getCache('baker') 59 self.app, self.out_dir)
63 record_id = hashlib.md5(self.out_dir.encode('utf8')).hexdigest() 60 if not self.force and os.path.isfile(records_path):
64 record_name = record_id + '.record' 61 with format_timed_scope(logger, "loaded previous bake records",
65 previous_record_path = None
66 if not self.force and record_cache.has(record_name):
67 with format_timed_scope(logger, "loaded previous bake record",
68 level=logging.DEBUG, colored=False): 62 level=logging.DEBUG, colored=False):
69 previous_record_path = record_cache.getCachePath(record_name) 63 previous_records = load_records(records_path)
70 record.loadPrevious(previous_record_path) 64 else:
71 record.current.success = True 65 previous_records = MultiRecord()
66 self._records = MultiRecord()
72 67
73 # Figure out if we need to clean the cache because important things 68 # Figure out if we need to clean the cache because important things
74 # have changed. 69 # have changed.
75 is_cache_valid = self._handleCacheValidity(record) 70 is_cache_valid = self._handleCacheValidity(previous_records,
71 self._records)
76 if not is_cache_valid: 72 if not is_cache_valid:
77 previous_record_path = None 73 previous_records = MultiRecord()
74
75 # Create the bake records history which tracks what's up-to-date
76 # or not since last time we baked to the given output folder.
77 record_history = MultiRecordHistory(previous_records, self._records)
78 78
79 # Pre-create all caches. 79 # Pre-create all caches.
80 for cache_name in ['app', 'baker', 'pages', 'renders']: 80 for cache_name in ['app', 'baker', 'pages', 'renders']:
81 self.app.cache.getCache(cache_name) 81 self.app.cache.getCache(cache_name)
82 82
83 # Gather all sources by realm -- we're going to bake each realm 83 # Gather all sources by realm -- we're going to bake each realm
84 # separately so we can handle "overriding" (i.e. one realm overrides 84 # separately so we can handle "overriding" (i.e. one realm overrides
85 # another realm's pages, like the user realm overriding the theme 85 # another realm's pages, like the user realm overriding the theme
86 # realm). 86 # realm).
87 #
88 # Also, create and initialize each pipeline for each source.
87 sources_by_realm = {} 89 sources_by_realm = {}
90 ppctx = PipelineContext(self.out_dir, record_history,
91 force=self.force)
88 for source in self.app.sources: 92 for source in self.app.sources:
89 srclist = sources_by_realm.setdefault(source.realm, []) 93 pname = source.config['pipeline']
90 srclist.append(source) 94 if pname in self.allowed_pipelines:
95 srclist = sources_by_realm.setdefault(
96 source.config['realm'], [])
97
98 pp = self._pipeline_classes[pname](source)
99 pp.initialize(ppctx)
100 srclist.append((source, pp))
101 else:
102 logger.debug(
103 "Skip source '%s' because pipeline '%s' is ignored." %
104 (source.name, pname))
91 105
92 # Create the worker processes. 106 # Create the worker processes.
93 pool = self._createWorkerPool(previous_record_path) 107 pool = self._createWorkerPool(records_path)
94 108
95 # Bake the realms. 109 # Bake the realms -- user first, theme second, so that a user item
110 # can override a theme item.
96 realm_list = [REALM_USER, REALM_THEME] 111 realm_list = [REALM_USER, REALM_THEME]
97 for realm in realm_list: 112 for realm in realm_list:
98 srclist = sources_by_realm.get(realm) 113 srclist = sources_by_realm.get(realm)
99 if srclist is not None: 114 if srclist is not None:
100 self._bakeRealm(record, pool, realm, srclist) 115 self._bakeRealm(record_history, pool, realm, srclist)
101
102 # Call all the page generators.
103 self._bakePageGenerators(record, pool)
104 116
105 # All done with the workers. Close the pool and get reports. 117 # All done with the workers. Close the pool and get reports.
106 reports = pool.close() 118 pool_stats = pool.close()
107 total_stats = ExecutionStats() 119 total_stats = ExecutionStats()
108 record.current.stats['_Total'] = total_stats 120 for ps in pool_stats:
109 for i in range(len(reports)): 121 if ps is not None:
110 worker_stats = reports[i]['data'] 122 total_stats.mergeStats(ps)
111 if worker_stats is not None: 123 record_history.current.stats = total_stats
112 worker_name = 'BakeWorker_%d' % i 124
113 record.current.stats[worker_name] = worker_stats 125 # Shutdown the pipelines.
114 total_stats.mergeStats(worker_stats) 126 for realm in realm_list:
115 127 srclist = sources_by_realm.get(realm)
116 # Delete files from the output. 128 if srclist is not None:
117 self._handleDeletetions(record) 129 for _, pp in srclist:
130 pp.shutdown(ppctx)
118 131
119 # Backup previous records. 132 # Backup previous records.
133 records_dir, records_fn = os.path.split(records_path)
134 records_id, _ = os.path.splitext(records_fn)
120 for i in range(8, -1, -1): 135 for i in range(8, -1, -1):
121 suffix = '' if i == 0 else '.%d' % i 136 suffix = '' if i == 0 else '.%d' % i
122 record_path = record_cache.getCachePath( 137 records_path_i = os.path.join(
123 '%s%s.record' % (record_id, suffix)) 138 records_dir,
124 if os.path.exists(record_path): 139 '%s%s.record' % (records_id, suffix))
125 record_path_next = record_cache.getCachePath( 140 if os.path.exists(records_path_i):
126 '%s.%s.record' % (record_id, i + 1)) 141 records_path_next = os.path.join(
127 if os.path.exists(record_path_next): 142 records_dir,
128 os.remove(record_path_next) 143 '%s.%s.record' % (records_id, i + 1))
129 os.rename(record_path, record_path_next) 144 if os.path.exists(records_path_next):
145 os.remove(records_path_next)
146 os.rename(records_path_i, records_path_next)
130 147
131 # Save the bake record. 148 # Save the bake record.
132 with format_timed_scope(logger, "saved bake record.", 149 with format_timed_scope(logger, "saved bake records.",
133 level=logging.DEBUG, colored=False): 150 level=logging.DEBUG, colored=False):
134 record.current.bake_time = time.time() 151 record_history.current.bake_time = time.time()
135 record.current.out_dir = self.out_dir 152 record_history.current.out_dir = self.out_dir
136 record.saveCurrent(record_cache.getCachePath(record_name)) 153 record_history.current.save(records_path)
137 154
138 # All done. 155 # All done.
139 self.app.config.set('baker/is_baking', False) 156 self.app.config.set('baker/is_baking', False)
140 logger.debug(format_timed(start_time, 'done baking')) 157 logger.debug(format_timed(start_time, 'done baking'))
141 158
142 return record.detach() 159 self._records = None
143 160 return record_history.current
144 def _handleCacheValidity(self, record): 161
162 def _handleCacheValidity(self, previous_records, current_records):
145 start_time = time.perf_counter() 163 start_time = time.perf_counter()
146 164
147 reason = None 165 reason = None
148 if self.force: 166 if self.force:
149 reason = "ordered to" 167 reason = "ordered to"
150 elif not self.app.config.get('__cache_valid'): 168 elif not self.app.config.get('__cache_valid'):
151 # The configuration file was changed, or we're running a new 169 # The configuration file was changed, or we're running a new
152 # version of the app. 170 # version of the app.
153 reason = "not valid anymore" 171 reason = "not valid anymore"
154 elif (not record.previous.bake_time or 172 elif previous_records.invalidated:
155 not record.previous.hasLatestVersion()):
156 # We have no valid previous bake record. 173 # We have no valid previous bake record.
157 reason = "need bake record regeneration" 174 reason = "need bake record regeneration"
158 else: 175 else:
159 # Check if any template has changed since the last bake. Since 176 # Check if any template has changed since the last bake. Since
160 # there could be some advanced conditional logic going on, we'd 177 # there could be some advanced conditional logic going on, we'd
163 for d in self.app.templates_dirs: 180 for d in self.app.templates_dirs:
164 for dpath, _, filenames in os.walk(d): 181 for dpath, _, filenames in os.walk(d):
165 for fn in filenames: 182 for fn in filenames:
166 full_fn = os.path.join(dpath, fn) 183 full_fn = os.path.join(dpath, fn)
167 max_time = max(max_time, os.path.getmtime(full_fn)) 184 max_time = max(max_time, os.path.getmtime(full_fn))
168 if max_time >= record.previous.bake_time: 185 if max_time >= previous_records.bake_time:
169 reason = "templates modified" 186 reason = "templates modified"
170 187
171 if reason is not None: 188 if reason is not None:
172 # We have to bake everything from scratch. 189 # We have to bake everything from scratch.
173 self.app.cache.clearCaches(except_names=['app', 'baker']) 190 self.app.cache.clearCaches(except_names=['app', 'baker'])
174 self.force = True 191 self.force = True
175 record.incremental_count = 0 192 current_records.incremental_count = 0
176 record.clearPrevious() 193 previous_records = MultiRecord()
177 logger.info(format_timed( 194 logger.info(format_timed(
178 start_time, 195 start_time, "cleaned cache (reason: %s)" % reason))
179 "cleaned cache (reason: %s)" % reason))
180 return False 196 return False
181 else: 197 else:
182 record.incremental_count += 1 198 current_records.incremental_count += 1
183 logger.debug(format_timed( 199 logger.debug(format_timed(
184 start_time, "cache is assumed valid", 200 start_time, "cache is assumed valid", colored=False))
185 colored=False))
186 return True 201 return True
187 202
188 def _bakeRealm(self, record, pool, realm, srclist): 203 def _bakeRealm(self, record_history, pool, realm, srclist):
189 start_time = time.perf_counter() 204 for source, pp in srclist:
190 try: 205 logger.debug("Queuing jobs for source '%s' using pipeline '%s'." %
191 record.current.baked_count[realm] = 0 206 (source.name, pp.PIPELINE_NAME))
192 record.current.total_baked_count[realm] = 0 207 jobs = [BakeJob(source.name, item.spec, item.metadata)
193 208 for item in source.getAllContents()]
194 all_factories = [] 209 pool.queueJobs(jobs)
195 for source in srclist: 210 pool.wait()
196 factories = source.getPageFactories() 211
197 all_factories += [f for f in factories 212 def _logErrors(self, item_spec, errors):
198 if f.path not in self.generator_pages] 213 logger.error("Errors found in %s:" % item_spec)
199
200 self._loadRealmPages(record, pool, all_factories)
201 self._renderRealmPages(record, pool, all_factories)
202 self._bakeRealmPages(record, pool, realm, all_factories)
203 finally:
204 page_count = record.current.baked_count[realm]
205 total_page_count = record.current.total_baked_count[realm]
206 logger.info(format_timed(
207 start_time,
208 "baked %d %s pages (%d total)." %
209 (page_count, REALM_NAMES[realm].lower(),
210 total_page_count)))
211
212 def _loadRealmPages(self, record, pool, factories):
213 def _handler(res):
214 # Create the record entry for this page.
215 # This will also update the `dirty_source_names` for the record
216 # as we add page files whose last modification times are later
217 # than the last bake.
218 record_entry = BakeRecordEntry(res['source_name'], res['path'])
219 record_entry.config = res['config']
220 record_entry.timestamp = res['timestamp']
221 if res['errors']:
222 record_entry.errors += res['errors']
223 record.current.success = False
224 self._logErrors(res['path'], res['errors'])
225 record.addEntry(record_entry)
226
227 logger.debug("Loading %d realm pages..." % len(factories))
228 with format_timed_scope(logger,
229 "loaded %d pages" % len(factories),
230 level=logging.DEBUG, colored=False,
231 timer_env=self.app.env,
232 timer_category='LoadJob'):
233 jobs = []
234 for fac in factories:
235 job = {
236 'type': JOB_LOAD,
237 'job': save_factory(fac)}
238 jobs.append(job)
239 ar = pool.queueJobs(jobs, handler=_handler)
240 ar.wait()
241
242 def _renderRealmPages(self, record, pool, factories):
243 def _handler(res):
244 entry = record.getCurrentEntry(res['path'])
245 if res['errors']:
246 entry.errors += res['errors']
247 record.current.success = False
248 self._logErrors(res['path'], res['errors'])
249
250 logger.debug("Rendering %d realm pages..." % len(factories))
251 with format_timed_scope(logger,
252 "prepared %d pages" % len(factories),
253 level=logging.DEBUG, colored=False,
254 timer_env=self.app.env,
255 timer_category='RenderFirstSubJob'):
256 jobs = []
257 for fac in factories:
258 record_entry = record.getCurrentEntry(fac.path)
259 if record_entry.errors:
260 logger.debug("Ignoring %s because it had previous "
261 "errors." % fac.ref_spec)
262 continue
263
264 # Make sure the source and the route exist for this page,
265 # otherwise we add errors to the record entry and we'll skip
266 # this page for the rest of the bake.
267 source = self.app.getSource(fac.source.name)
268 if source is None:
269 record_entry.errors.append(
270 "Can't get source for page: %s" % fac.ref_spec)
271 logger.error(record_entry.errors[-1])
272 continue
273
274 route = self.app.getSourceRoute(fac.source.name, fac.metadata)
275 if route is None:
276 record_entry.errors.append(
277 "Can't get route for page: %s" % fac.ref_spec)
278 logger.error(record_entry.errors[-1])
279 continue
280
281 # All good, queue the job.
282 route_index = self.app.routes.index(route)
283 job = {
284 'type': JOB_RENDER_FIRST,
285 'job': {
286 'factory_info': save_factory(fac),
287 'route_index': route_index
288 }
289 }
290 jobs.append(job)
291
292 ar = pool.queueJobs(jobs, handler=_handler)
293 ar.wait()
294
295 def _bakeRealmPages(self, record, pool, realm, factories):
296 def _handler(res):
297 entry = record.getCurrentEntry(res['path'])
298 entry.subs = res['sub_entries']
299 if res['errors']:
300 entry.errors += res['errors']
301 self._logErrors(res['path'], res['errors'])
302 if entry.has_any_error:
303 record.current.success = False
304 if entry.subs and entry.was_any_sub_baked:
305 record.current.baked_count[realm] += 1
306 record.current.total_baked_count[realm] += len(entry.subs)
307
308 logger.debug("Baking %d realm pages..." % len(factories))
309 with format_timed_scope(logger,
310 "baked %d pages" % len(factories),
311 level=logging.DEBUG, colored=False,
312 timer_env=self.app.env,
313 timer_category='BakeJob'):
314 jobs = []
315 for fac in factories:
316 job = self._makeBakeJob(record, fac)
317 if job is not None:
318 jobs.append(job)
319
320 ar = pool.queueJobs(jobs, handler=_handler)
321 ar.wait()
322
323 def _bakePageGenerators(self, record, pool):
324 for gen in self.app.generators:
325 ctx = PageGeneratorBakeContext(self.app, record, pool, gen)
326 gen.bake(ctx)
327
328 def _makeBakeJob(self, record, fac):
329 # Get the previous (if any) and current entry for this page.
330 pair = record.getPreviousAndCurrentEntries(fac.path)
331 assert pair is not None
332 prev_entry, cur_entry = pair
333 assert cur_entry is not None
334
335 # Ignore if there were errors in the previous passes.
336 if cur_entry.errors:
337 logger.debug("Ignoring %s because it had previous "
338 "errors." % fac.ref_spec)
339 return None
340
341 # Build the route metadata and find the appropriate route.
342 page = fac.buildPage()
343 route_metadata = create_route_metadata(page)
344 route = self.app.getSourceRoute(fac.source.name, route_metadata)
345 assert route is not None
346
347 # Figure out if this page is overriden by another previously
348 # baked page. This happens for example when the user has
349 # made a page that has the same page/URL as a theme page.
350 uri = route.getUri(route_metadata)
351 override_entry = record.getOverrideEntry(page.path, uri)
352 if override_entry is not None:
353 override_source = self.app.getSource(
354 override_entry.source_name)
355 if override_source.realm == fac.source.realm:
356 cur_entry.errors.append(
357 "Page '%s' maps to URL '%s' but is overriden "
358 "by page '%s'." %
359 (fac.ref_spec, uri, override_entry.path))
360 logger.error(cur_entry.errors[-1])
361 cur_entry.flags |= BakeRecordEntry.FLAG_OVERRIDEN
362 return None
363
364 route_index = self.app.routes.index(route)
365 job = {
366 'type': JOB_BAKE,
367 'job': {
368 'factory_info': save_factory(fac),
369 'generator_name': None,
370 'generator_record_key': None,
371 'route_index': route_index,
372 'route_metadata': route_metadata,
373 'dirty_source_names': record.dirty_source_names
374 }
375 }
376 return job
377
378 def _handleDeletetions(self, record):
379 logger.debug("Handling deletions...")
380 for path, reason in record.getDeletions():
381 logger.debug("Removing '%s': %s" % (path, reason))
382 record.current.deleted.append(path)
383 try:
384 os.remove(path)
385 logger.info('[delete] %s' % path)
386 except OSError:
387 # Not a big deal if that file had already been removed
388 # by the user.
389 pass
390
391 def _logErrors(self, path, errors):
392 rel_path = os.path.relpath(path, self.app.root_dir)
393 logger.error("Errors found in %s:" % rel_path)
394 for e in errors: 214 for e in errors:
395 logger.error(" " + e) 215 logger.error(" " + e)
396 216
397 def _createWorkerPool(self, previous_record_path): 217 def _createWorkerPool(self, previous_records_path):
398 from piecrust.app import PieCrustFactory
399 from piecrust.workerpool import WorkerPool 218 from piecrust.workerpool import WorkerPool
400 from piecrust.baking.worker import BakeWorkerContext, BakeWorker 219 from piecrust.baking.worker import BakeWorkerContext, BakeWorker
401 220
402 appfactory = PieCrustFactory(
403 self.app.root_dir,
404 cache=self.app.cache.enabled,
405 cache_key=self.app.cache_key,
406 config_variant=self.applied_config_variant,
407 config_values=self.applied_config_values,
408 debug=self.app.debug,
409 theme_site=self.app.theme_site)
410
411 worker_count = self.app.config.get('baker/workers') 221 worker_count = self.app.config.get('baker/workers')
412 batch_size = self.app.config.get('baker/batch_size') 222 batch_size = self.app.config.get('baker/batch_size')
413 223
414 ctx = BakeWorkerContext( 224 ctx = BakeWorkerContext(
415 appfactory, 225 self.appfactory,
416 self.out_dir, 226 self.out_dir,
417 force=self.force, 227 force=self.force,
418 previous_record_path=previous_record_path) 228 previous_records_path=previous_records_path,
229 allowed_pipelines=self.allowed_pipelines)
419 pool = WorkerPool( 230 pool = WorkerPool(
420 worker_count=worker_count, 231 worker_count=worker_count,
421 batch_size=batch_size, 232 batch_size=batch_size,
422 worker_class=BakeWorker, 233 worker_class=BakeWorker,
423 initargs=(ctx,)) 234 initargs=(ctx,),
235 callback=self._handleWorkerResult,
236 error_callback=self._handleWorkerError)
424 return pool 237 return pool
425 238
239 def _handleWorkerResult(self, job, res):
240 record_name = self._getRecordName(job)
241 record = self._records.getRecord(record_name)
242 record.entries.append(res.record)
243 if not res.record.success:
244 record.success = False
245 self._records.success = False
246 self._logErrors(job.item_spec, res.record.errors)
247
248 def _handleWorkerError(self, job, exc_data):
249 e = RecordEntry()
250 e.item_spec = job.item_spec
251 e.errors.append(str(exc_data))
252
253 record_name = self._getRecordName(job)
254 record = self._records.getRecord(record_name)
255 record.entries.append(e)
256
257 record.success = False
258 self._records.success = False
259
260 self._logErrors(job.item_spec, e.errors)
261 if self.app.debug:
262 logger.error(exc_data.traceback)
263
264 def _getRecordName(self, job):
265 sn = job.source_name
266 ppn = self.app.getSource(sn).config['pipeline']
267 return '%s@%s' % (sn, ppn)