Mercurial > piecrust2
comparison piecrust/pipelines/page.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | 45ad976712ec |
children | 1857dbd4580f |
comparison
equal
deleted
inserted
replaced
988:f83ae0a5d793 | 989:8adc27285d93 |
---|---|
1 import time | |
1 import logging | 2 import logging |
2 from piecrust.pipelines.base import ContentPipeline | 3 from piecrust.pipelines.base import ( |
4 ContentPipeline, create_job, content_item_from_job) | |
3 from piecrust.pipelines._pagebaker import PageBaker, get_output_path | 5 from piecrust.pipelines._pagebaker import PageBaker, get_output_path |
4 from piecrust.pipelines._pagerecords import PagePipelineRecordEntry | 6 from piecrust.pipelines._pagerecords import ( |
7 PagePipelineRecordEntry, | |
8 add_page_job_result, merge_job_result_into_record_entry) | |
5 from piecrust.sources.base import AbortedSourceUseError | 9 from piecrust.sources.base import AbortedSourceUseError |
6 | 10 |
7 | 11 |
8 logger = logging.getLogger(__name__) | 12 logger = logging.getLogger(__name__) |
9 | 13 |
10 | 14 |
11 class PagePipeline(ContentPipeline): | 15 class PagePipeline(ContentPipeline): |
12 PIPELINE_NAME = 'page' | 16 PIPELINE_NAME = 'page' |
13 RECORD_ENTRY_CLASS = PagePipelineRecordEntry | 17 RECORD_ENTRY_CLASS = PagePipelineRecordEntry |
18 PASS_NUM = [0, 1] | |
14 | 19 |
15 def __init__(self, source, ppctx): | 20 def __init__(self, source, ppctx): |
16 super().__init__(source, ppctx) | 21 super().__init__(source, ppctx) |
17 self._pagebaker = None | 22 self._pagebaker = None |
18 self._stats = source.app.env.stats | 23 self._stats = source.app.env.stats |
19 self._draft_setting = self.app.config['baker/no_bake_setting'] | 24 self._draft_setting = self.app.config['baker/no_bake_setting'] |
20 | 25 |
21 def initialize(self): | 26 def initialize(self): |
22 stats = self.app.env.stats | 27 stats = self._stats |
23 stats.registerCounter('SourceUseAbortions', raise_if_registered=False) | 28 stats.registerCounter('SourceUseAbortions', raise_if_registered=False) |
24 stats.registerManifest('SourceUseAbortions', raise_if_registered=False) | 29 stats.registerManifest('SourceUseAbortions', raise_if_registered=False) |
25 | 30 |
26 self._pagebaker = PageBaker(self.app, | 31 self._pagebaker = PageBaker(self.app, |
27 self.ctx.out_dir, | 32 self.ctx.out_dir, |
28 force=self.ctx.force) | 33 force=self.ctx.force) |
29 self._pagebaker.startWriterQueue() | 34 self._pagebaker.startWriterQueue() |
30 | 35 |
36 def loadAllContents(self): | |
37 # Here we load all the pages in the source, making sure they all | |
38 # have a valid cache for their configuration and contents. | |
39 # We also create the record entries while we're at it. | |
40 source = self.source | |
41 page_fac = self.app.getPage | |
42 record_fac = self.createRecordEntry | |
43 for item in source.getAllContents(): | |
44 page = page_fac(source, item) | |
45 | |
46 cur_entry = record_fac(item.spec) | |
47 cur_entry.config = page.config.getAll() | |
48 cur_entry.route_params = item.metadata['route_params'] | |
49 cur_entry.timestamp = page.datetime.timestamp() | |
50 | |
51 if page.config.get(self._draft_setting): | |
52 cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT | |
53 | |
54 yield cur_entry | |
55 | |
31 def createJobs(self, ctx): | 56 def createJobs(self, ctx): |
32 used_paths = {} | 57 if ctx.pass_num == 0: |
33 for rec in ctx.record_histories.current.records: | 58 return self._createFirstPassJobs(ctx) |
34 src_name = rec.name.split('@')[0] | 59 return self._createSecondPassJobs(ctx) |
35 for e in rec.getEntries(): | 60 |
36 paths = e.getAllOutputPaths() | 61 def _createFirstPassJobs(self, ctx): |
37 if paths is not None: | |
38 for p in paths: | |
39 used_paths[p] = (src_name, e) | |
40 | |
41 jobs = [] | 62 jobs = [] |
63 | |
42 app = self.app | 64 app = self.app |
43 route = self.source.route | |
44 out_dir = self.ctx.out_dir | 65 out_dir = self.ctx.out_dir |
66 uri_getter = self.source.route.getUri | |
45 pretty_urls = app.config.get('site/pretty_urls') | 67 pretty_urls = app.config.get('site/pretty_urls') |
46 record = ctx.record_histories.current.getRecord(self.record_name) | 68 |
47 | 69 used_paths = _get_used_paths_from_records( |
48 for item in self.source.getAllContents(): | 70 ctx.record_histories.current.records) |
49 route_params = item.metadata['route_params'] | 71 history = ctx.record_histories.getHistory(ctx.record_name).copy() |
50 uri = route.getUri(route_params) | 72 history.build() |
73 | |
74 record = ctx.current_record | |
75 record.user_data['dirty_source_names'] = set() | |
76 | |
77 for prev, cur in history.diffs: | |
78 # Ignore pages that disappeared since last bake. | |
79 if cur is None: | |
80 continue | |
81 | |
82 # Skip draft pages. | |
83 if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT: | |
84 continue | |
85 | |
86 # Skip pages that are known to use other sources... we'll | |
87 # schedule them in the second pass. | |
88 if prev and prev.getAllUsedSourceNames(): | |
89 continue | |
90 | |
91 # Check if this item has been overriden by a previous pipeline | |
92 # run... for instance, we could be the pipeline for a "theme pages" | |
93 # source, and some of our pages have been overriden by a user | |
94 # page that writes out to the same URL. | |
95 uri = uri_getter(cur.route_params) | |
51 path = get_output_path(app, out_dir, uri, pretty_urls) | 96 path = get_output_path(app, out_dir, uri, pretty_urls) |
97 | |
52 override = used_paths.get(path) | 98 override = used_paths.get(path) |
53 | |
54 if override is not None: | 99 if override is not None: |
55 override_source_name, override_entry = override | 100 override_source_name, override_entry = override |
56 override_source = app.getSource(override_source_name) | 101 override_source = app.getSource(override_source_name) |
57 if override_source.config['realm'] == \ | 102 if override_source.config['realm'] == \ |
58 self.source.config['realm']: | 103 self.source.config['realm']: |
59 logger.error( | 104 logger.error( |
60 "Page '%s' would get baked to '%s' " | 105 "Page '%s' would get baked to '%s' " |
61 "but is overriden by '%s'." % | 106 "but is overriden by '%s'." % |
62 (item.spec, path, override_entry.item_spec)) | 107 (enrty.item_spec, path, override_entry.item_spec)) |
63 else: | 108 else: |
64 logger.debug( | 109 logger.debug( |
65 "Page '%s' would get baked to '%s' " | 110 "Page '%s' would get baked to '%s' " |
66 "but is overriden by '%s'." % | 111 "but is overriden by '%s'." % |
67 (item.spec, path, override_entry.item_spec)) | 112 (cur.item_spec, path, override_entry.item_spec)) |
68 | 113 |
69 entry = PagePipelineRecordEntry() | 114 cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN |
70 entry.item_spec = item.spec | 115 continue |
71 entry.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN | 116 |
72 record.addEntry(entry) | 117 # Nope, all good, let's create a job for this item. |
73 | 118 jobs.append(create_job(self, cur.item_spec)) |
74 continue | |
75 | |
76 jobs.append(self.createJob(item)) | |
77 | 119 |
78 if len(jobs) > 0: | 120 if len(jobs) > 0: |
79 return jobs | 121 return jobs |
80 return None | 122 return None |
81 | 123 |
82 def mergeRecordEntry(self, record_entry, ctx): | 124 def _createSecondPassJobs(self, ctx): |
83 existing = ctx.record.getEntry(record_entry.item_spec) | 125 # Get the list of all sources that had anything baked. |
84 existing.flags |= record_entry.flags | 126 dirty_source_names = set() |
85 existing.errors += record_entry.errors | 127 all_records = ctx.record_histories.current.records |
86 existing.subs += record_entry.subs | 128 for rec in all_records: |
129 rec_dsn = rec.user_data.get('dirty_source_names') | |
130 if rec_dsn: | |
131 dirty_source_names |= rec_dsn | |
132 | |
133 # Now look at the stuff we bake for our own source on the first pass. | |
134 # For anything that wasn't baked (i.e. it was considered 'up to date') | |
135 # we look at the records from last time, and if they say that some | |
136 # page was using a source that is "dirty", then we force bake it. | |
137 # | |
138 # The common example for this is a blog index page which hasn't been | |
139 # touched, but needs to be re-baked because someone added or edited | |
140 # a post. | |
141 jobs = [] | |
142 pass_num = ctx.pass_num | |
143 history = ctx.record_histories.getHistory(ctx.record_name).copy() | |
144 history.build() | |
145 for prev, cur in history.diffs: | |
146 if cur and cur.was_any_sub_baked: | |
147 continue | |
148 if prev and any(map( | |
149 lambda usn: usn in dirty_source_names, | |
150 prev.getAllUsedSourceNames())): | |
151 jobs.append(create_job(self, prev.item_spec, | |
152 pass_num=pass_num, | |
153 force_bake=True)) | |
154 if len(jobs) > 0: | |
155 return jobs | |
156 return None | |
157 | |
158 def handleJobResult(self, result, ctx): | |
159 existing = ctx.record_entry | |
160 merge_job_result_into_record_entry(existing, result) | |
161 if existing.was_any_sub_baked: | |
162 ctx.record.user_data['dirty_source_names'].add(self.source.name) | |
87 | 163 |
88 def run(self, job, ctx, result): | 164 def run(self, job, ctx, result): |
89 step_num = job.step_num | 165 pass_num = job.get('pass_num', 0) |
90 if step_num == 0: | 166 step_num = job.get('step_num', 0) |
91 self._loadPage(job.content_item, ctx, result) | 167 if pass_num == 0: |
92 elif step_num == 1: | 168 if step_num == 0: |
93 self._renderOrPostpone(job.content_item, ctx, result) | 169 self._renderOrPostpone(job, ctx, result) |
94 elif step_num == 2: | 170 elif step_num == 1: |
95 self._renderAlways(job.content_item, ctx, result) | 171 self._renderAlways(job, ctx, result) |
172 elif pass_num == 1: | |
173 self._renderAlways(job, ctx, result) | |
96 | 174 |
97 def getDeletions(self, ctx): | 175 def getDeletions(self, ctx): |
98 for prev, cur in ctx.record_history.diffs: | 176 for prev, cur in ctx.record_history.diffs: |
99 if prev and not cur: | 177 if prev and not cur: |
100 for sub in prev.subs: | 178 for sub in prev.subs: |
101 yield (sub.out_path, 'previous source file was removed') | 179 yield (sub['out_path'], 'previous source file was removed') |
102 elif prev and cur: | 180 elif prev and cur: |
103 prev_out_paths = [o.out_path for o in prev.subs] | 181 prev_out_paths = [o['out_path'] for o in prev.subs] |
104 cur_out_paths = [o.out_path for o in cur.subs] | 182 cur_out_paths = [o['out_path'] for o in cur.subs] |
105 diff = set(prev_out_paths) - set(cur_out_paths) | 183 diff = set(prev_out_paths) - set(cur_out_paths) |
106 for p in diff: | 184 for p in diff: |
107 yield (p, 'source file changed outputs') | 185 yield (p, 'source file changed outputs') |
108 | 186 |
109 def collapseRecords(self, ctx): | 187 def collapseRecords(self, ctx): |
110 pass | 188 pass |
111 | 189 |
112 def shutdown(self): | 190 def shutdown(self): |
113 self._pagebaker.stopWriterQueue() | 191 self._pagebaker.stopWriterQueue() |
114 | 192 |
115 def _loadPage(self, content_item, ctx, result): | 193 def _renderOrPostpone(self, job, ctx, result): |
116 logger.debug("Loading page: %s" % content_item.spec) | |
117 page = self.app.getPage(self.source, content_item) | |
118 record_entry = result.record_entry | |
119 record_entry.config = page.config.getAll() | |
120 record_entry.timestamp = page.datetime.timestamp() | |
121 | |
122 if not page.config.get(self._draft_setting): | |
123 result.next_step_job = self.createJob(content_item) | |
124 else: | |
125 record_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT | |
126 | |
127 def _renderOrPostpone(self, content_item, ctx, result): | |
128 # Here our job is to render the page's segments so that they're | 194 # Here our job is to render the page's segments so that they're |
129 # cached in memory and on disk... unless we detect that the page | 195 # cached in memory and on disk... unless we detect that the page |
130 # is using some other sources, in which case we abort and we'll try | 196 # is using some other sources, in which case we abort and we'll try |
131 # again on the second pass. | 197 # again on the second pass. |
198 content_item = content_item_from_job(self, job) | |
132 logger.debug("Conditional render for: %s" % content_item.spec) | 199 logger.debug("Conditional render for: %s" % content_item.spec) |
133 page = self.app.getPage(self.source, content_item) | 200 page = self.app.getPage(self.source, content_item) |
201 if page.config.get(self._draft_setting): | |
202 return | |
203 | |
134 prev_entry = ctx.previous_entry | 204 prev_entry = ctx.previous_entry |
135 cur_entry = result.record_entry | 205 |
136 self.app.env.abort_source_use = True | 206 env = self.app.env |
207 env.abort_source_use = True | |
208 add_page_job_result(result) | |
137 try: | 209 try: |
138 self._pagebaker.bake(page, prev_entry, cur_entry) | 210 rdr_subs = self._pagebaker.bake(page, prev_entry) |
211 result['subs'] = rdr_subs | |
139 except AbortedSourceUseError: | 212 except AbortedSourceUseError: |
140 logger.debug("Page was aborted for using source: %s" % | 213 logger.debug("Page was aborted for using source: %s" % |
141 content_item.spec) | 214 content_item.spec) |
142 self.app.env.stats.stepCounter("SourceUseAbortions") | 215 result['flags'] |= \ |
143 self.app.env.stats.addManifestEntry("SourceUseAbortions", | 216 PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE |
144 content_item.spec) | 217 env.stats.stepCounter("SourceUseAbortions") |
145 result.next_step_job = self.createJob(content_item) | 218 env.stats.addManifestEntry("SourceUseAbortions", content_item.spec) |
219 result['next_step_job'] = create_job(self, content_item.spec) | |
146 finally: | 220 finally: |
147 self.app.env.abort_source_use = False | 221 env.abort_source_use = False |
148 | 222 |
149 def _renderAlways(self, content_item, ctx, result): | 223 def _renderAlways(self, job, ctx, result): |
224 content_item = content_item_from_job(self, job) | |
150 logger.debug("Full render for: %s" % content_item.spec) | 225 logger.debug("Full render for: %s" % content_item.spec) |
151 page = self.app.getPage(self.source, content_item) | 226 page = self.app.getPage(self.source, content_item) |
152 prev_entry = ctx.previous_entry | 227 prev_entry = ctx.previous_entry |
153 cur_entry = result.record_entry | 228 rdr_subs = self._pagebaker.bake(page, prev_entry, |
154 self._pagebaker.bake(page, prev_entry, cur_entry) | 229 force=job.get('force_bake')) |
230 | |
231 add_page_job_result(result) | |
232 result['subs'] = rdr_subs | |
233 | |
234 def _get_used_paths_from_records(records): | |
235 used_paths = {} | |
236 for rec in records: | |
237 src_name = rec.name.split('@')[0] | |
238 for e in rec.getEntries(): | |
239 paths = e.getAllOutputPaths() | |
240 if paths is not None: | |
241 for p in paths: | |
242 used_paths[p] = (src_name, e) | |
243 return used_paths |