comparison piecrust/pipelines/page.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents 45ad976712ec
children 1857dbd4580f
comparison
equal deleted inserted replaced
988:f83ae0a5d793 989:8adc27285d93
1 import time
1 import logging 2 import logging
2 from piecrust.pipelines.base import ContentPipeline 3 from piecrust.pipelines.base import (
4 ContentPipeline, create_job, content_item_from_job)
3 from piecrust.pipelines._pagebaker import PageBaker, get_output_path 5 from piecrust.pipelines._pagebaker import PageBaker, get_output_path
4 from piecrust.pipelines._pagerecords import PagePipelineRecordEntry 6 from piecrust.pipelines._pagerecords import (
7 PagePipelineRecordEntry,
8 add_page_job_result, merge_job_result_into_record_entry)
5 from piecrust.sources.base import AbortedSourceUseError 9 from piecrust.sources.base import AbortedSourceUseError
6 10
7 11
8 logger = logging.getLogger(__name__) 12 logger = logging.getLogger(__name__)
9 13
10 14
11 class PagePipeline(ContentPipeline): 15 class PagePipeline(ContentPipeline):
12 PIPELINE_NAME = 'page' 16 PIPELINE_NAME = 'page'
13 RECORD_ENTRY_CLASS = PagePipelineRecordEntry 17 RECORD_ENTRY_CLASS = PagePipelineRecordEntry
18 PASS_NUM = [0, 1]
14 19
15 def __init__(self, source, ppctx): 20 def __init__(self, source, ppctx):
16 super().__init__(source, ppctx) 21 super().__init__(source, ppctx)
17 self._pagebaker = None 22 self._pagebaker = None
18 self._stats = source.app.env.stats 23 self._stats = source.app.env.stats
19 self._draft_setting = self.app.config['baker/no_bake_setting'] 24 self._draft_setting = self.app.config['baker/no_bake_setting']
20 25
21 def initialize(self): 26 def initialize(self):
22 stats = self.app.env.stats 27 stats = self._stats
23 stats.registerCounter('SourceUseAbortions', raise_if_registered=False) 28 stats.registerCounter('SourceUseAbortions', raise_if_registered=False)
24 stats.registerManifest('SourceUseAbortions', raise_if_registered=False) 29 stats.registerManifest('SourceUseAbortions', raise_if_registered=False)
25 30
26 self._pagebaker = PageBaker(self.app, 31 self._pagebaker = PageBaker(self.app,
27 self.ctx.out_dir, 32 self.ctx.out_dir,
28 force=self.ctx.force) 33 force=self.ctx.force)
29 self._pagebaker.startWriterQueue() 34 self._pagebaker.startWriterQueue()
30 35
36 def loadAllContents(self):
37 # Here we load all the pages in the source, making sure they all
38 # have a valid cache for their configuration and contents.
39 # We also create the record entries while we're at it.
40 source = self.source
41 page_fac = self.app.getPage
42 record_fac = self.createRecordEntry
43 for item in source.getAllContents():
44 page = page_fac(source, item)
45
46 cur_entry = record_fac(item.spec)
47 cur_entry.config = page.config.getAll()
48 cur_entry.route_params = item.metadata['route_params']
49 cur_entry.timestamp = page.datetime.timestamp()
50
51 if page.config.get(self._draft_setting):
52 cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
53
54 yield cur_entry
55
31 def createJobs(self, ctx): 56 def createJobs(self, ctx):
32 used_paths = {} 57 if ctx.pass_num == 0:
33 for rec in ctx.record_histories.current.records: 58 return self._createFirstPassJobs(ctx)
34 src_name = rec.name.split('@')[0] 59 return self._createSecondPassJobs(ctx)
35 for e in rec.getEntries(): 60
36 paths = e.getAllOutputPaths() 61 def _createFirstPassJobs(self, ctx):
37 if paths is not None:
38 for p in paths:
39 used_paths[p] = (src_name, e)
40
41 jobs = [] 62 jobs = []
63
42 app = self.app 64 app = self.app
43 route = self.source.route
44 out_dir = self.ctx.out_dir 65 out_dir = self.ctx.out_dir
66 uri_getter = self.source.route.getUri
45 pretty_urls = app.config.get('site/pretty_urls') 67 pretty_urls = app.config.get('site/pretty_urls')
46 record = ctx.record_histories.current.getRecord(self.record_name) 68
47 69 used_paths = _get_used_paths_from_records(
48 for item in self.source.getAllContents(): 70 ctx.record_histories.current.records)
49 route_params = item.metadata['route_params'] 71 history = ctx.record_histories.getHistory(ctx.record_name).copy()
50 uri = route.getUri(route_params) 72 history.build()
73
74 record = ctx.current_record
75 record.user_data['dirty_source_names'] = set()
76
77 for prev, cur in history.diffs:
78 # Ignore pages that disappeared since last bake.
79 if cur is None:
80 continue
81
82 # Skip draft pages.
83 if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT:
84 continue
85
86 # Skip pages that are known to use other sources... we'll
87 # schedule them in the second pass.
88 if prev and prev.getAllUsedSourceNames():
89 continue
90
91 # Check if this item has been overriden by a previous pipeline
92 # run... for instance, we could be the pipeline for a "theme pages"
93 # source, and some of our pages have been overriden by a user
94 # page that writes out to the same URL.
95 uri = uri_getter(cur.route_params)
51 path = get_output_path(app, out_dir, uri, pretty_urls) 96 path = get_output_path(app, out_dir, uri, pretty_urls)
97
52 override = used_paths.get(path) 98 override = used_paths.get(path)
53
54 if override is not None: 99 if override is not None:
55 override_source_name, override_entry = override 100 override_source_name, override_entry = override
56 override_source = app.getSource(override_source_name) 101 override_source = app.getSource(override_source_name)
57 if override_source.config['realm'] == \ 102 if override_source.config['realm'] == \
58 self.source.config['realm']: 103 self.source.config['realm']:
59 logger.error( 104 logger.error(
60 "Page '%s' would get baked to '%s' " 105 "Page '%s' would get baked to '%s' "
61 "but is overriden by '%s'." % 106 "but is overriden by '%s'." %
62 (item.spec, path, override_entry.item_spec)) 107 (enrty.item_spec, path, override_entry.item_spec))
63 else: 108 else:
64 logger.debug( 109 logger.debug(
65 "Page '%s' would get baked to '%s' " 110 "Page '%s' would get baked to '%s' "
66 "but is overriden by '%s'." % 111 "but is overriden by '%s'." %
67 (item.spec, path, override_entry.item_spec)) 112 (cur.item_spec, path, override_entry.item_spec))
68 113
69 entry = PagePipelineRecordEntry() 114 cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN
70 entry.item_spec = item.spec 115 continue
71 entry.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN 116
72 record.addEntry(entry) 117 # Nope, all good, let's create a job for this item.
73 118 jobs.append(create_job(self, cur.item_spec))
74 continue
75
76 jobs.append(self.createJob(item))
77 119
78 if len(jobs) > 0: 120 if len(jobs) > 0:
79 return jobs 121 return jobs
80 return None 122 return None
81 123
82 def mergeRecordEntry(self, record_entry, ctx): 124 def _createSecondPassJobs(self, ctx):
83 existing = ctx.record.getEntry(record_entry.item_spec) 125 # Get the list of all sources that had anything baked.
84 existing.flags |= record_entry.flags 126 dirty_source_names = set()
85 existing.errors += record_entry.errors 127 all_records = ctx.record_histories.current.records
86 existing.subs += record_entry.subs 128 for rec in all_records:
129 rec_dsn = rec.user_data.get('dirty_source_names')
130 if rec_dsn:
131 dirty_source_names |= rec_dsn
132
133 # Now look at the stuff we bake for our own source on the first pass.
134 # For anything that wasn't baked (i.e. it was considered 'up to date')
135 # we look at the records from last time, and if they say that some
136 # page was using a source that is "dirty", then we force bake it.
137 #
138 # The common example for this is a blog index page which hasn't been
139 # touched, but needs to be re-baked because someone added or edited
140 # a post.
141 jobs = []
142 pass_num = ctx.pass_num
143 history = ctx.record_histories.getHistory(ctx.record_name).copy()
144 history.build()
145 for prev, cur in history.diffs:
146 if cur and cur.was_any_sub_baked:
147 continue
148 if prev and any(map(
149 lambda usn: usn in dirty_source_names,
150 prev.getAllUsedSourceNames())):
151 jobs.append(create_job(self, prev.item_spec,
152 pass_num=pass_num,
153 force_bake=True))
154 if len(jobs) > 0:
155 return jobs
156 return None
157
158 def handleJobResult(self, result, ctx):
159 existing = ctx.record_entry
160 merge_job_result_into_record_entry(existing, result)
161 if existing.was_any_sub_baked:
162 ctx.record.user_data['dirty_source_names'].add(self.source.name)
87 163
88 def run(self, job, ctx, result): 164 def run(self, job, ctx, result):
89 step_num = job.step_num 165 pass_num = job.get('pass_num', 0)
90 if step_num == 0: 166 step_num = job.get('step_num', 0)
91 self._loadPage(job.content_item, ctx, result) 167 if pass_num == 0:
92 elif step_num == 1: 168 if step_num == 0:
93 self._renderOrPostpone(job.content_item, ctx, result) 169 self._renderOrPostpone(job, ctx, result)
94 elif step_num == 2: 170 elif step_num == 1:
95 self._renderAlways(job.content_item, ctx, result) 171 self._renderAlways(job, ctx, result)
172 elif pass_num == 1:
173 self._renderAlways(job, ctx, result)
96 174
97 def getDeletions(self, ctx): 175 def getDeletions(self, ctx):
98 for prev, cur in ctx.record_history.diffs: 176 for prev, cur in ctx.record_history.diffs:
99 if prev and not cur: 177 if prev and not cur:
100 for sub in prev.subs: 178 for sub in prev.subs:
101 yield (sub.out_path, 'previous source file was removed') 179 yield (sub['out_path'], 'previous source file was removed')
102 elif prev and cur: 180 elif prev and cur:
103 prev_out_paths = [o.out_path for o in prev.subs] 181 prev_out_paths = [o['out_path'] for o in prev.subs]
104 cur_out_paths = [o.out_path for o in cur.subs] 182 cur_out_paths = [o['out_path'] for o in cur.subs]
105 diff = set(prev_out_paths) - set(cur_out_paths) 183 diff = set(prev_out_paths) - set(cur_out_paths)
106 for p in diff: 184 for p in diff:
107 yield (p, 'source file changed outputs') 185 yield (p, 'source file changed outputs')
108 186
109 def collapseRecords(self, ctx): 187 def collapseRecords(self, ctx):
110 pass 188 pass
111 189
112 def shutdown(self): 190 def shutdown(self):
113 self._pagebaker.stopWriterQueue() 191 self._pagebaker.stopWriterQueue()
114 192
115 def _loadPage(self, content_item, ctx, result): 193 def _renderOrPostpone(self, job, ctx, result):
116 logger.debug("Loading page: %s" % content_item.spec)
117 page = self.app.getPage(self.source, content_item)
118 record_entry = result.record_entry
119 record_entry.config = page.config.getAll()
120 record_entry.timestamp = page.datetime.timestamp()
121
122 if not page.config.get(self._draft_setting):
123 result.next_step_job = self.createJob(content_item)
124 else:
125 record_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
126
127 def _renderOrPostpone(self, content_item, ctx, result):
128 # Here our job is to render the page's segments so that they're 194 # Here our job is to render the page's segments so that they're
129 # cached in memory and on disk... unless we detect that the page 195 # cached in memory and on disk... unless we detect that the page
130 # is using some other sources, in which case we abort and we'll try 196 # is using some other sources, in which case we abort and we'll try
131 # again on the second pass. 197 # again on the second pass.
198 content_item = content_item_from_job(self, job)
132 logger.debug("Conditional render for: %s" % content_item.spec) 199 logger.debug("Conditional render for: %s" % content_item.spec)
133 page = self.app.getPage(self.source, content_item) 200 page = self.app.getPage(self.source, content_item)
201 if page.config.get(self._draft_setting):
202 return
203
134 prev_entry = ctx.previous_entry 204 prev_entry = ctx.previous_entry
135 cur_entry = result.record_entry 205
136 self.app.env.abort_source_use = True 206 env = self.app.env
207 env.abort_source_use = True
208 add_page_job_result(result)
137 try: 209 try:
138 self._pagebaker.bake(page, prev_entry, cur_entry) 210 rdr_subs = self._pagebaker.bake(page, prev_entry)
211 result['subs'] = rdr_subs
139 except AbortedSourceUseError: 212 except AbortedSourceUseError:
140 logger.debug("Page was aborted for using source: %s" % 213 logger.debug("Page was aborted for using source: %s" %
141 content_item.spec) 214 content_item.spec)
142 self.app.env.stats.stepCounter("SourceUseAbortions") 215 result['flags'] |= \
143 self.app.env.stats.addManifestEntry("SourceUseAbortions", 216 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE
144 content_item.spec) 217 env.stats.stepCounter("SourceUseAbortions")
145 result.next_step_job = self.createJob(content_item) 218 env.stats.addManifestEntry("SourceUseAbortions", content_item.spec)
219 result['next_step_job'] = create_job(self, content_item.spec)
146 finally: 220 finally:
147 self.app.env.abort_source_use = False 221 env.abort_source_use = False
148 222
149 def _renderAlways(self, content_item, ctx, result): 223 def _renderAlways(self, job, ctx, result):
224 content_item = content_item_from_job(self, job)
150 logger.debug("Full render for: %s" % content_item.spec) 225 logger.debug("Full render for: %s" % content_item.spec)
151 page = self.app.getPage(self.source, content_item) 226 page = self.app.getPage(self.source, content_item)
152 prev_entry = ctx.previous_entry 227 prev_entry = ctx.previous_entry
153 cur_entry = result.record_entry 228 rdr_subs = self._pagebaker.bake(page, prev_entry,
154 self._pagebaker.bake(page, prev_entry, cur_entry) 229 force=job.get('force_bake'))
230
231 add_page_job_result(result)
232 result['subs'] = rdr_subs
233
234 def _get_used_paths_from_records(records):
235 used_paths = {}
236 for rec in records:
237 src_name = rec.name.split('@')[0]
238 for e in rec.getEntries():
239 paths = e.getAllOutputPaths()
240 if paths is not None:
241 for p in paths:
242 used_paths[p] = (src_name, e)
243 return used_paths