comparison piecrust/baking/baker.py @ 853:f070a4fc033c

core: Continue PieCrust3 refactor, simplify pages. The asset pipeline is still the only function pipeline at this point. * No more `QualifiedPage`, and several other pieces of code deleted. * Data providers are simpler and more focused. For instance, the page iterator doesn't try to support other types of items. * Route parameters are proper known source metadata to remove the confusion between the two. * Make the baker and pipeline more correctly manage records and record histories. * Add support for record collapsing and deleting stale outputs in the asset pipeline.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 21 May 2017 00:06:59 -0700
parents 4850f8c21b6e
children 08e02c2a2a1a
comparison
equal deleted inserted replaced
852:4850f8c21b6e 853:f070a4fc033c
14 14
15 15
16 logger = logging.getLogger(__name__) 16 logger = logging.getLogger(__name__)
17 17
18 18
19 def get_bake_records_path(app, out_dir): 19 def get_bake_records_path(app, out_dir, *, suffix=''):
20 records_cache = app.cache.getCache('baker') 20 records_cache = app.cache.getCache('baker')
21 records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest() 21 records_id = hashlib.md5(out_dir.encode('utf8')).hexdigest()
22 records_name = records_id + '.record' 22 records_name = '%s%s.records' % (records_id, suffix)
23 return records_cache.getCachePath(records_name) 23 return records_cache.getCachePath(records_name)
24 24
25 25
26 class Baker(object): 26 class Baker(object):
27 def __init__(self, appfactory, app, out_dir, 27 def __init__(self, appfactory, app, out_dir,
72 if not is_cache_valid: 72 if not is_cache_valid:
73 previous_records = MultiRecord() 73 previous_records = MultiRecord()
74 74
75 # Create the bake records history which tracks what's up-to-date 75 # Create the bake records history which tracks what's up-to-date
76 # or not since last time we baked to the given output folder. 76 # or not since last time we baked to the given output folder.
77 record_history = MultiRecordHistory(previous_records, self._records) 77 record_histories = MultiRecordHistory(previous_records, self._records)
78 78
79 # Pre-create all caches. 79 # Pre-create all caches.
80 for cache_name in ['app', 'baker', 'pages', 'renders']: 80 for cache_name in ['app', 'baker', 'pages', 'renders']:
81 self.app.cache.getCache(cache_name) 81 self.app.cache.getCache(cache_name)
82 82
85 # another realm's pages, like the user realm overriding the theme 85 # another realm's pages, like the user realm overriding the theme
86 # realm). 86 # realm).
87 # 87 #
88 # Also, create and initialize each pipeline for each source. 88 # Also, create and initialize each pipeline for each source.
89 sources_by_realm = {} 89 sources_by_realm = {}
90 ppctx = PipelineContext(self.out_dir, record_history,
91 force=self.force)
92 for source in self.app.sources: 90 for source in self.app.sources:
93 pname = source.config['pipeline'] 91 pname = source.config['pipeline']
94 if pname in self.allowed_pipelines: 92 if pname in self.allowed_pipelines:
95 srclist = sources_by_realm.setdefault( 93 srclist = sources_by_realm.setdefault(
96 source.config['realm'], []) 94 source.config['realm'], [])
97 95
98 pp = self._pipeline_classes[pname](source) 96 pp = self._pipeline_classes[pname](source)
97
98 record_name = _get_record_name(source.name, pname)
99 record_history = record_histories.getHistory(record_name)
100 ppctx = PipelineContext(self.out_dir, record_history,
101 force=self.force)
99 pp.initialize(ppctx) 102 pp.initialize(ppctx)
100 srclist.append((source, pp)) 103
104 srclist.append((source, pp, ppctx))
101 else: 105 else:
102 logger.debug( 106 logger.debug(
103 "Skip source '%s' because pipeline '%s' is ignored." % 107 "Skip source '%s' because pipeline '%s' is ignored." %
104 (source.name, pname)) 108 (source.name, pname))
105 109
110 # can override a theme item. 114 # can override a theme item.
111 realm_list = [REALM_USER, REALM_THEME] 115 realm_list = [REALM_USER, REALM_THEME]
112 for realm in realm_list: 116 for realm in realm_list:
113 srclist = sources_by_realm.get(realm) 117 srclist = sources_by_realm.get(realm)
114 if srclist is not None: 118 if srclist is not None:
115 self._bakeRealm(record_history, pool, realm, srclist) 119 self._bakeRealm(pool, srclist)
120
121 # Handle deletions.
122 for realm in realm_list:
123 srclist = sources_by_realm.get(realm)
124 if srclist is not None:
125 self._deleteStaleOutputs(pool, srclist)
126
127 # Collapse records.
128 for realm in realm_list:
129 srclist = sources_by_realm.get(realm)
130 if srclist is not None:
131 self._collapseRecords(srclist)
116 132
117 # All done with the workers. Close the pool and get reports. 133 # All done with the workers. Close the pool and get reports.
118 pool_stats = pool.close() 134 pool_stats = pool.close()
119 total_stats = ExecutionStats() 135 total_stats = ExecutionStats()
120 for ps in pool_stats: 136 for ps in pool_stats:
121 if ps is not None: 137 if ps is not None:
122 total_stats.mergeStats(ps) 138 total_stats.mergeStats(ps)
123 record_history.current.stats = total_stats 139 record_histories.current.stats = total_stats
124 140
125 # Shutdown the pipelines. 141 # Shutdown the pipelines.
126 for realm in realm_list: 142 for realm in realm_list:
127 srclist = sources_by_realm.get(realm) 143 srclist = sources_by_realm.get(realm)
128 if srclist is not None: 144 if srclist is not None:
129 for _, pp in srclist: 145 for _, pp, ppctx in srclist:
130 pp.shutdown(ppctx) 146 pp.shutdown(ppctx)
131 147
132 # Backup previous records. 148 # Backup previous records.
133 records_dir, records_fn = os.path.split(records_path) 149 records_dir, records_fn = os.path.split(records_path)
134 records_id, _ = os.path.splitext(records_fn) 150 records_id, _ = os.path.splitext(records_fn)
135 for i in range(8, -1, -1): 151 for i in range(8, -1, -1):
136 suffix = '' if i == 0 else '.%d' % i 152 suffix = '' if i == 0 else '.%d' % i
137 records_path_i = os.path.join( 153 records_path_i = os.path.join(
138 records_dir, 154 records_dir,
139 '%s%s.record' % (records_id, suffix)) 155 '%s%s.records' % (records_id, suffix))
140 if os.path.exists(records_path_i): 156 if os.path.exists(records_path_i):
141 records_path_next = os.path.join( 157 records_path_next = os.path.join(
142 records_dir, 158 records_dir,
143 '%s.%s.record' % (records_id, i + 1)) 159 '%s.%s.records' % (records_id, i + 1))
144 if os.path.exists(records_path_next): 160 if os.path.exists(records_path_next):
145 os.remove(records_path_next) 161 os.remove(records_path_next)
146 os.rename(records_path_i, records_path_next) 162 os.rename(records_path_i, records_path_next)
147 163
148 # Save the bake record. 164 # Save the bake records.
149 with format_timed_scope(logger, "saved bake records.", 165 with format_timed_scope(logger, "saved bake records.",
150 level=logging.DEBUG, colored=False): 166 level=logging.DEBUG, colored=False):
151 record_history.current.bake_time = time.time() 167 record_histories.current.bake_time = time.time()
152 record_history.current.out_dir = self.out_dir 168 record_histories.current.out_dir = self.out_dir
153 record_history.current.save(records_path) 169 record_histories.current.save(records_path)
154 170
155 # All done. 171 # All done.
156 self.app.config.set('baker/is_baking', False) 172 self.app.config.set('baker/is_baking', False)
157 logger.debug(format_timed(start_time, 'done baking')) 173 logger.debug(format_timed(start_time, 'done baking'))
158 174
159 self._records = None 175 self._records = None
160 return record_history.current 176 return record_histories.current
161 177
162 def _handleCacheValidity(self, previous_records, current_records): 178 def _handleCacheValidity(self, previous_records, current_records):
163 start_time = time.perf_counter() 179 start_time = time.perf_counter()
164 180
165 reason = None 181 reason = None
168 elif not self.app.config.get('__cache_valid'): 184 elif not self.app.config.get('__cache_valid'):
169 # The configuration file was changed, or we're running a new 185 # The configuration file was changed, or we're running a new
170 # version of the app. 186 # version of the app.
171 reason = "not valid anymore" 187 reason = "not valid anymore"
172 elif previous_records.invalidated: 188 elif previous_records.invalidated:
173 # We have no valid previous bake record. 189 # We have no valid previous bake records.
174 reason = "need bake record regeneration" 190 reason = "need bake records regeneration"
175 else: 191 else:
176 # Check if any template has changed since the last bake. Since 192 # Check if any template has changed since the last bake. Since
177 # there could be some advanced conditional logic going on, we'd 193 # there could be some advanced conditional logic going on, we'd
178 # better just force a bake from scratch if that's the case. 194 # better just force a bake from scratch if that's the case.
179 max_time = 0 195 max_time = 0
198 current_records.incremental_count += 1 214 current_records.incremental_count += 1
199 logger.debug(format_timed( 215 logger.debug(format_timed(
200 start_time, "cache is assumed valid", colored=False)) 216 start_time, "cache is assumed valid", colored=False))
201 return True 217 return True
202 218
203 def _bakeRealm(self, record_history, pool, realm, srclist): 219 def _bakeRealm(self, pool, srclist):
204 for source, pp in srclist: 220 for source, pp, ppctx in srclist:
205 logger.debug("Queuing jobs for source '%s' using pipeline '%s'." % 221 logger.debug("Queuing jobs for source '%s' using pipeline '%s'." %
206 (source.name, pp.PIPELINE_NAME)) 222 (source.name, pp.PIPELINE_NAME))
207 jobs = [BakeJob(source.name, item.spec, item.metadata) 223 jobs = [BakeJob(source.name, item.spec, item.metadata)
208 for item in source.getAllContents()] 224 for item in source.getAllContents()]
209 pool.queueJobs(jobs) 225 pool.queueJobs(jobs)
210 pool.wait() 226 pool.wait()
227
228 def _deleteStaleOutputs(self, pool, srclist):
229 for source, pp, ppctx in srclist:
230 ppctx.record_history.build()
231
232 to_delete = pp.getDeletions(ppctx)
233 if to_delete is not None:
234 for path, reason in to_delete:
235 logger.debug("Removing '%s': %s" % (path, reason))
236 ppctx.current_record.deleted_out_paths.append(path)
237 try:
238 os.remove(path)
239 except FileNotFoundError:
240 pass
241 logger.info('[delete] %s' % path)
242
243 def _collapseRecords(self, srclist):
244 for source, pp, ppctx in srclist:
245 pp.collapseRecords(ppctx)
211 246
212 def _logErrors(self, item_spec, errors): 247 def _logErrors(self, item_spec, errors):
213 logger.error("Errors found in %s:" % item_spec) 248 logger.error("Errors found in %s:" % item_spec)
214 for e in errors: 249 for e in errors:
215 logger.error(" " + e) 250 logger.error(" " + e)
235 callback=self._handleWorkerResult, 270 callback=self._handleWorkerResult,
236 error_callback=self._handleWorkerError) 271 error_callback=self._handleWorkerError)
237 return pool 272 return pool
238 273
239 def _handleWorkerResult(self, job, res): 274 def _handleWorkerResult(self, job, res):
240 record_name = self._getRecordName(job) 275 record_name = _get_record_name(job.source_name, res.pipeline_name)
241 record = self._records.getRecord(record_name) 276 record = self._records.getRecord(record_name)
242 record.entries.append(res.record) 277 record.entries.append(res.record_entry)
243 if not res.record.success: 278 if not res.record_entry.success:
244 record.success = False 279 record.success = False
245 self._records.success = False 280 self._records.success = False
246 self._logErrors(job.item_spec, res.record.errors) 281 self._logErrors(job.item_spec, res.record_entry.errors)
247 282
248 def _handleWorkerError(self, job, exc_data): 283 def _handleWorkerError(self, job, exc_data):
249 e = RecordEntry() 284 e = RecordEntry()
250 e.item_spec = job.item_spec 285 e.item_spec = job.item_spec
251 e.errors.append(str(exc_data)) 286 e.errors.append(str(exc_data))
252 287
288 ppname = self.app.getSource(job.source_name).config['pipeline']
289 record_name = _get_record_name(job.source_name, ppname)
253 record_name = self._getRecordName(job) 290 record_name = self._getRecordName(job)
254 record = self._records.getRecord(record_name) 291 record = self._records.getRecord(record_name)
255 record.entries.append(e) 292 record.entries.append(e)
256 293
257 record.success = False 294 record.success = False
259 296
260 self._logErrors(job.item_spec, e.errors) 297 self._logErrors(job.item_spec, e.errors)
261 if self.app.debug: 298 if self.app.debug:
262 logger.error(exc_data.traceback) 299 logger.error(exc_data.traceback)
263 300
264 def _getRecordName(self, job): 301
265 sn = job.source_name 302 def _get_record_name(source_name, pipeline_name):
266 ppn = self.app.getSource(sn).config['pipeline'] 303 return '%s@%s' % (source_name, pipeline_name)
267 return '%s@%s' % (sn, ppn)