comparison piecrust/pipelines/page.py @ 1136:5f97b5b59dfe

bake: Optimize cache handling for the baking process. - Get rid of the 2-level pipeline runs... handle a single set of passes. - Go back to load/render segments/layout passes for pages. - Add descriptions of what each job batch does. - Improve the taxonomy pipeline so it doesn't re-bake terms that don't need to be re-baked. - Simplify some of the code.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 23 Apr 2018 21:47:49 -0700
parents 3bcb2d446397
children
comparison
equal deleted inserted replaced
1135:6350ee084273 1136:5f97b5b59dfe
1 import copy 1 import copy
2 import time
3 import logging 2 import logging
4 from piecrust.pipelines.base import ( 3 from piecrust.pipelines.base import (
5 ContentPipeline, create_job, content_item_from_job) 4 ContentPipeline, create_job, content_item_from_job)
6 from piecrust.pipelines._pagebaker import PageBaker, get_output_path 5 from piecrust.pipelines._pagebaker import PageBaker, get_output_path
7 from piecrust.pipelines._pagerecords import ( 6 from piecrust.pipelines._pagerecords import (
8 PagePipelineRecordEntry, 7 PagePipelineRecordEntry, SubPageFlags)
9 add_page_job_result, merge_job_result_into_record_entry) 8 from piecrust.rendering import RenderingContext, render_page_segments
10 from piecrust.sources.base import AbortedSourceUseError 9 from piecrust.sources.base import AbortedSourceUseError
11 10
12 11
13 logger = logging.getLogger(__name__) 12 logger = logging.getLogger(__name__)
14 13
36 35
37 def createJobs(self, ctx): 36 def createJobs(self, ctx):
38 pass_num = ctx.pass_num 37 pass_num = ctx.pass_num
39 if pass_num == 0: 38 if pass_num == 0:
40 ctx.current_record.user_data['dirty_source_names'] = set() 39 ctx.current_record.user_data['dirty_source_names'] = set()
41 return self._createLoadJobs(ctx) 40 return self._createLoadJobs(ctx), "load"
42 if pass_num == 1: 41 if pass_num == 1:
43 return self._createSecondPassJobs(ctx) 42 return self._createSegmentJobs(ctx), "render"
44 if pass_num == 2: 43 if pass_num == 2:
45 return self._createThirdPassJobs(ctx) 44 return self._createLayoutJobs(ctx), "layout"
46 raise Exception("Unexpected pipeline pass: %d" % pass_num) 45 raise Exception("Unexpected pipeline pass: %d" % pass_num)
47 46
48 def _createLoadJobs(self, ctx): 47 def _createLoadJobs(self, ctx):
49 # Here we load all the pages in the source, making sure they all 48 # Here we load all the pages in the source, making sure they all
50 # have a valid cache for their configuration and contents. 49 # have a valid cache for their configuration and contents.
53 jobs.append(create_job(self, item.spec)) 52 jobs.append(create_job(self, item.spec))
54 if len(jobs) > 0: 53 if len(jobs) > 0:
55 return jobs 54 return jobs
56 return None 55 return None
57 56
58 def _createSecondPassJobs(self, ctx): 57 def _createSegmentJobs(self, ctx):
59 jobs = [] 58 jobs = []
60 59
61 app = self.app 60 app = self.app
61 pass_num = ctx.pass_num
62 out_dir = self.ctx.out_dir 62 out_dir = self.ctx.out_dir
63 uri_getter = self.source.route.getUri 63 uri_getter = self.source.route.getUri
64 pretty_urls = app.config.get('site/pretty_urls') 64 pretty_urls = app.config.get('site/pretty_urls')
65 65
66 used_paths = _get_used_paths_from_records(
67 ctx.record_histories.current.records)
68 history = ctx.record_histories.getHistory(ctx.record_name).copy() 66 history = ctx.record_histories.getHistory(ctx.record_name).copy()
69 history.build() 67 history.build()
70 68
71 pass_num = ctx.pass_num 69 cur_rec_used_paths = {}
70 history.current.user_data['used_paths'] = cur_rec_used_paths
71 all_records = ctx.record_histories.current.records
72 72
73 for prev, cur in history.diffs: 73 for prev, cur in history.diffs:
74 # Ignore pages that disappeared since last bake. 74 # Ignore pages that disappeared since last bake.
75 if cur is None: 75 if cur is None:
76 continue 76 continue
77 77
78 # Skip draft pages. 78 # Skip draft pages.
79 if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT: 79 if cur.hasFlag(PagePipelineRecordEntry.FLAG_IS_DRAFT):
80 continue 80 continue
81 81
82 # For pages that are known to use other sources, we make a dummy 82 # Skip pages that haven't changed since last bake.
83 # job that will effectively get directly passed on to the next 83 if (prev and not cur.hasFlag(
84 # step. 84 PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED)):
85 continue
86
87 # For pages that are known to use other sources in their own
88 # content segments (we don't care about the layout yet), we
89 # postpone them to the next pipeline pass immediately, because they
90 # might need populated render caches for those sources' pages.
85 if prev: 91 if prev:
86 usn1, usn2 = prev.getAllUsedSourceNames() 92 usn1, _ = prev.getAllUsedSourceNames()
87 if usn1 or usn2: 93 if usn1:
88 jobs.append(create_job(self, cur.item_spec, 94 logger.debug("Postponing: %s" % cur.item_spec)
89 pass_num=pass_num, 95 cur.flags |= \
90 uses_sources=True)) 96 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
91 continue 97 continue
92 98
93 # Check if this item has been overriden by a previous pipeline 99 # Check if this item has been overriden by a previous pipeline
94 # run... for instance, we could be the pipeline for a "theme pages" 100 # run... for instance, we could be the pipeline for a "theme pages"
95 # source, and some of our pages have been overriden by a user 101 # source, and some of our pages have been overriden by a user
96 # page that writes out to the same URL. 102 # page that writes out to the same URL.
97 uri = uri_getter(cur.route_params) 103 uri = uri_getter(cur.route_params)
98 path = get_output_path(app, out_dir, uri, pretty_urls) 104 out_path = get_output_path(app, out_dir, uri, pretty_urls)
99 override = used_paths.get(path) 105 override = _find_used_path_spec(all_records, out_path)
100 if override is not None: 106 if override is not None:
101 override_source_name, override_entry = override 107 override_source_name, override_entry_spec = override
102 override_source = app.getSource(override_source_name) 108 override_source = app.getSource(override_source_name)
103 if override_source.config['realm'] == \ 109 if override_source.config['realm'] == \
104 self.source.config['realm']: 110 self.source.config['realm']:
105 logger.error( 111 logger.error(
106 "Page '%s' would get baked to '%s' " 112 "Page '%s' would get baked to '%s' "
107 "but is overriden by '%s'." % 113 "but is overriden by '%s'." %
108 (cur.item_spec, path, override_entry.item_spec)) 114 (cur.item_spec, out_path, override_entry_spec))
109 else: 115 else:
110 logger.debug( 116 logger.debug(
111 "Page '%s' would get baked to '%s' " 117 "Page '%s' would get baked to '%s' "
112 "but is overriden by '%s'." % 118 "but is overriden by '%s'." %
113 (cur.item_spec, path, override_entry.item_spec)) 119 (cur.item_spec, out_path, override_entry_spec))
114 120
115 cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN 121 cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
116 continue 122 continue
117 123
118 # Nope, all good, let's create a job for this item. 124 # Nope, all good, let's create a job for this item.
125 cur.flags |= PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED
126 cur_rec_used_paths[out_path] = cur.item_spec
127
119 jobs.append(create_job(self, cur.item_spec, 128 jobs.append(create_job(self, cur.item_spec,
120 pass_num=pass_num)) 129 pass_num=pass_num))
121 130
122 if len(jobs) > 0: 131 if len(jobs) > 0:
123 return jobs 132 return jobs
124 return None 133 return None
125 134
126 def _createThirdPassJobs(self, ctx): 135 def _createLayoutJobs(self, ctx):
127 # Get the list of all sources that had anything baked. 136 # Get the list of all sources that had anything baked.
128 dirty_source_names = set() 137 dirty_source_names = set()
129 all_records = ctx.record_histories.current.records 138 all_records = ctx.record_histories.current.records
130 for rec in all_records: 139 for rec in all_records:
131 rec_dsn = rec.user_data.get('dirty_source_names') 140 rec_dsn = rec.user_data.get('dirty_source_names')
132 if rec_dsn: 141 if rec_dsn:
133 dirty_source_names |= rec_dsn 142 dirty_source_names |= rec_dsn
134 143
135 # Now look at the stuff we bake for our own source on the first pass.
136 # For anything that wasn't baked (i.e. it was considered 'up to date')
137 # we look at the records from last time, and if they say that some
138 # page was using a source that is "dirty", then we force bake it.
139 #
140 # The common example for this is a blog index page which hasn't been
141 # touched, but needs to be re-baked because someone added or edited
142 # a post.
143 jobs = [] 144 jobs = []
144 pass_num = ctx.pass_num 145 pass_num = ctx.pass_num
145 history = ctx.record_histories.getHistory(ctx.record_name).copy() 146 history = ctx.record_histories.getHistory(ctx.record_name).copy()
146 history.build() 147 history.build()
147 for prev, cur in history.diffs: 148 for prev, cur in history.diffs:
148 if not cur: 149 if not cur or cur.hasFlag(PagePipelineRecordEntry.FLAG_OVERRIDEN):
149 continue 150 continue
150 if cur.was_any_sub_baked: 151
151 continue 152 do_bake = False
153 force_segments = False
154 force_layout = False
155
156 # Make sure we bake the layout for pages that got their segments
157 # re-rendered.
158 if cur.hasFlag(PagePipelineRecordEntry.FLAG_SEGMENTS_RENDERED):
159 do_bake = True
160
161 # Now look at the stuff we baked for our own source on the second
162 # pass. For anything that wasn't baked (i.e. it was considered 'up
163 # to date') we look at the records from last time, and if they say
164 # that some page was using a source that is "dirty", then we force
165 # bake it.
166 #
167 # The common example for this is a blog index page which hasn't
168 # been touched, but needs to be re-baked because someone added or
169 # edited a post.
152 if prev: 170 if prev:
153 usn1, usn2 = prev.getAllUsedSourceNames() 171 usn1, usn2 = prev.getAllUsedSourceNames()
154 force_segments = any(map(lambda u: u in dirty_source_names, 172 force_segments = any(map(lambda u: u in dirty_source_names,
155 usn1)) 173 usn1))
156 force_layout = any(map(lambda u: u in dirty_source_names, 174 force_layout = any(map(lambda u: u in dirty_source_names,
157 usn2)) 175 usn2))
176
158 if force_segments or force_layout: 177 if force_segments or force_layout:
159 jobs.append(create_job(self, prev.item_spec, 178 # Yep, we need to force-rebake some aspect of this page.
160 pass_num=pass_num, 179 do_bake = True
161 force_segments=force_segments, 180
162 force_layout=force_layout)) 181 elif not do_bake:
163 else:
164 # This page uses other sources, but no source was dirty 182 # This page uses other sources, but no source was dirty
165 # this time around (it was a null build, maybe). We 183 # this time around (it was a null build, maybe). We
166 # don't have any work to do, but we need to carry over 184 # don't have any work to do, but we need to carry over
167 # any information we have, otherwise the post bake step 185 # any information we have, otherwise the post bake step
168 # will think we need to delete last bake's outputs. 186 # will think we need to delete last bake's outputs.
169 cur.subs = copy.deepcopy(prev.subs) 187 cur.subs = copy.deepcopy(prev.subs)
188 for cur_sub in cur.subs:
189 cur_sub['flags'] = \
190 SubPageFlags.FLAG_COLLAPSED_FROM_LAST_RUN
191
192 if do_bake:
193 jobs.append(create_job(self, cur.item_spec,
194 pass_num=pass_num,
195 force_segments=force_segments,
196 force_layout=force_layout))
170 197
171 if len(jobs) > 0: 198 if len(jobs) > 0:
172 return jobs 199 return jobs
173 return None 200 return None
174 201
175 def handleJobResult(self, result, ctx): 202 def handleJobResult(self, result, ctx):
176 pass_num = ctx.pass_num 203 pass_num = ctx.pass_num
177 step_num = ctx.step_num
178 204
179 if pass_num == 0: 205 if pass_num == 0:
180 # Just went through a "load page" job. Let's create a record 206 # Just went through a "load page" job. Let's create a record
181 # entry with the information we got from the worker. 207 # entry with the information we got from the worker.
182 new_entry = self.createRecordEntry(result['item_spec']) 208 new_entry = self.createRecordEntry(result['item_spec'])
186 new_entry.timestamp = result['timestamp'] 212 new_entry.timestamp = result['timestamp']
187 ctx.record.addEntry(new_entry) 213 ctx.record.addEntry(new_entry)
188 214
189 # If this page was modified, flag its entire source as "dirty", 215 # If this page was modified, flag its entire source as "dirty",
190 # so any pages using that source can be re-baked. 216 # so any pages using that source can be re-baked.
191 if (new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED): 217 if new_entry.flags & PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED:
192 ctx.record.user_data['dirty_source_names'].add( 218 ctx.record.user_data['dirty_source_names'].add(
193 self.source.name) 219 self.source.name)
220
221 # If this page is new
222
223 elif pass_num == 1:
224 # Just went through the "render segments" job.
225 existing = ctx.record_entry
226 existing.flags |= result.get('flags',
227 PagePipelineRecordEntry.FLAG_NONE)
228
194 else: 229 else:
195 # Update the entry with the new information. 230 # Update the entry with the new information.
196 existing = ctx.record_entry 231 existing = ctx.record_entry
197 if not result.get('postponed', False): 232 existing.flags |= result.get('flags',
198 merge_job_result_into_record_entry(existing, result) 233 PagePipelineRecordEntry.FLAG_NONE)
234 existing.errors += result.get('errors', [])
235 existing.subs += result.get('subs', [])
199 236
200 def run(self, job, ctx, result): 237 def run(self, job, ctx, result):
201 pass_num = job.get('pass_num', 0) 238 pass_num = job.get('pass_num', 0)
202 step_num = job.get('step_num', 0)
203 239
204 if pass_num == 0: 240 if pass_num == 0:
205 if step_num == 0: 241 return self._loadPage(job, ctx, result)
206 return self._loadPage(job, ctx, result)
207 242
208 elif pass_num == 1: 243 elif pass_num == 1:
209 if step_num == 0: 244 return self._renderSegments(job, ctx, result)
210 return self._renderOrPostpone(job, ctx, result) 245
211 elif step_num == 1: 246 elif pass_num >= 2:
212 return self._renderAlways(job, ctx, result) 247 return self._renderLayout(job, ctx, result)
213
214 elif pass_num == 2:
215 if step_num == 0:
216 return self._renderAlways(job, ctx, result)
217
218 raise Exception("Unexpected pipeline pass/step: %d/%d" %
219 (pass_num, step_num))
220 248
221 def getDeletions(self, ctx): 249 def getDeletions(self, ctx):
222 for prev, cur in ctx.record_history.diffs: 250 for prev, cur in ctx.record_history.diffs:
223 if prev and not cur: 251 if prev and not cur:
224 for sub in prev.subs: 252 for sub in prev.subs:
248 if page.was_modified: 276 if page.was_modified:
249 result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED 277 result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
250 if page.config.get(self._draft_setting): 278 if page.config.get(self._draft_setting):
251 result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT 279 result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT
252 280
253 def _renderOrPostpone(self, job, ctx, result): 281 def _renderSegments(self, job, ctx, result):
254 # See if we should immediately kick this job off to the next step.
255 if job.get('uses_sources', False):
256 result['postponed'] = True
257 result['next_step_job'] = create_job(self, job['job_spec'][1])
258 return
259
260 # Here our job is to render the page's segments so that they're 282 # Here our job is to render the page's segments so that they're
261 # cached in memory and on disk... unless we detect that the page 283 # cached in memory and on disk... unless we detect that the page
262 # is using some other sources, in which case we abort and we'll try 284 # is using some other sources, in which case we abort and we'll try
263 # again on the second pass. 285 # again on the second pass.
264 content_item = content_item_from_job(self, job) 286 content_item = content_item_from_job(self, job)
265 logger.debug("Conditional render for: %s" % content_item.spec) 287 logger.debug("Render segments for: %s" % content_item.spec)
266 page = self.app.getPage(self.source, content_item) 288 page = self.app.getPage(self.source, content_item)
267 if page.config.get(self._draft_setting): 289 if page.config.get(self._draft_setting):
268 raise Exception("Shouldn't have a draft page in a render job!") 290 raise Exception("Shouldn't have a draft page in a render job!")
269 291
270 prev_entry = ctx.previous_entry
271
272 env = self.app.env 292 env = self.app.env
273 env.abort_source_use = True 293 env.abort_source_use = True
274 add_page_job_result(result)
275 try: 294 try:
276 rdr_subs = self._pagebaker.bake(page, prev_entry) 295 rdr_ctx = RenderingContext(page)
277 result['subs'] = rdr_subs 296 render_page_segments(rdr_ctx)
278 except AbortedSourceUseError: 297 except AbortedSourceUseError:
279 logger.debug("Page was aborted for using source: %s" % 298 logger.debug("Page was aborted for using source: %s" %
280 content_item.spec) 299 content_item.spec)
281 result['flags'] |= \ 300 result['flags'] = \
282 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE 301 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
283 env.stats.stepCounter("SourceUseAbortions") 302 env.stats.stepCounter("SourceUseAbortions")
284 env.stats.addManifestEntry("SourceUseAbortions", content_item.spec) 303 env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
285 result['next_step_job'] = create_job(self, content_item.spec)
286 finally: 304 finally:
287 env.abort_source_use = False 305 env.abort_source_use = False
288 306
289 def _renderAlways(self, job, ctx, result): 307 def _renderLayout(self, job, ctx, result):
290 content_item = content_item_from_job(self, job) 308 content_item = content_item_from_job(self, job)
291 logger.debug("Full render for: %s" % content_item.spec) 309 logger.debug("Render layout for: %s" % content_item.spec)
292 page = self.app.getPage(self.source, content_item) 310 page = self.app.getPage(self.source, content_item)
293 prev_entry = ctx.previous_entry 311 prev_entry = ctx.previous_entry
294 rdr_subs = self._pagebaker.bake( 312 rdr_subs = self._pagebaker.bake(
295 page, prev_entry, 313 page, prev_entry,
296 force_segments=job.get('force_segments'), 314 force_segments=job.get('force_segments'),
297 force_layout=job.get('force_layout')) 315 force_layout=job.get('force_layout'))
298
299 add_page_job_result(result)
300 result['subs'] = rdr_subs 316 result['subs'] = rdr_subs
301 317
302 def _get_used_paths_from_records(records): 318
303 used_paths = {} 319 def _find_used_path_spec(records, path):
304 for rec in records: 320 for rec in records:
305 src_name = rec.name.split('@')[0] 321 up = rec.user_data.get('used_paths')
306 for e in rec.getEntries(): 322 if up is not None:
307 paths = e.getAllOutputPaths() 323 entry_spec = up.get(path)
308 if paths is not None: 324 if entry_spec is not None:
309 for p in paths: 325 src_name = rec.name.split('@')[0]
310 used_paths[p] = (src_name, e) 326 return (src_name, entry_spec)
311 return used_paths 327 return None