Mercurial > piecrust2
comparison piecrust/pipelines/page.py @ 1015:fa489c5e829e
bake: Load pages in parallel again.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 29 Nov 2017 20:37:57 -0800 |
parents | 1857dbd4580f |
children | 3c6e6e7b9639 |
comparison
equal
deleted
inserted
replaced
1014:071f30aa04bb | 1015:fa489c5e829e |
---|---|
14 | 14 |
15 | 15 |
16 class PagePipeline(ContentPipeline): | 16 class PagePipeline(ContentPipeline): |
17 PIPELINE_NAME = 'page' | 17 PIPELINE_NAME = 'page' |
18 RECORD_ENTRY_CLASS = PagePipelineRecordEntry | 18 RECORD_ENTRY_CLASS = PagePipelineRecordEntry |
19 PASS_NUM = [0, 1] | 19 PASS_NUM = [0, 1, 2] |
20 | 20 |
21 def __init__(self, source, ppctx): | 21 def __init__(self, source, ppctx): |
22 super().__init__(source, ppctx) | 22 super().__init__(source, ppctx) |
23 self._pagebaker = None | 23 self._pagebaker = None |
24 self._stats = source.app.env.stats | 24 self._stats = source.app.env.stats |
32 self._pagebaker = PageBaker(self.app, | 32 self._pagebaker = PageBaker(self.app, |
33 self.ctx.out_dir, | 33 self.ctx.out_dir, |
34 force=self.ctx.force) | 34 force=self.ctx.force) |
35 self._pagebaker.startWriterQueue() | 35 self._pagebaker.startWriterQueue() |
36 | 36 |
37 def loadAllContents(self): | 37 def createJobs(self, ctx): |
38 if ctx.pass_num == 0: | |
39 return self._createLoadJobs(ctx) | |
40 if ctx.pass_num == 1: | |
41 return self._createSecondPassJobs(ctx) | |
42 return self._createThirdPassJobs(ctx) | |
43 | |
44 def _createLoadJobs(self, ctx): | |
38 # Here we load all the pages in the source, making sure they all | 45 # Here we load all the pages in the source, making sure they all |
39 # have a valid cache for their configuration and contents. | 46 # have a valid cache for their configuration and contents. |
40 # We also create the record entries while we're at it. | 47 jobs = [] |
41 source = self.source | 48 for item in self.source.getAllContents(): |
42 page_fac = self.app.getPage | 49 jobs.append(create_job(self, item.spec)) |
43 record_fac = self.createRecordEntry | 50 if len(jobs) > 0: |
44 for item in source.getAllContents(): | 51 return jobs |
45 page = page_fac(source, item) | 52 return None |
46 | 53 |
47 cur_entry = record_fac(item.spec) | 54 def _createSecondPassJobs(self, ctx): |
48 cur_entry.config = page.config.getAll() | |
49 cur_entry.route_params = item.metadata['route_params'] | |
50 cur_entry.timestamp = page.datetime.timestamp() | |
51 | |
52 if page.was_modified: | |
53 cur_entry.flags |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED | |
54 if page.config.get(self._draft_setting): | |
55 cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT | |
56 | |
57 yield cur_entry | |
58 | |
59 def createJobs(self, ctx): | |
60 if ctx.pass_num == 0: | |
61 return self._createFirstPassJobs(ctx) | |
62 return self._createSecondPassJobs(ctx) | |
63 | |
64 def _createFirstPassJobs(self, ctx): | |
65 jobs = [] | 55 jobs = [] |
66 | 56 |
67 app = self.app | 57 app = self.app |
68 out_dir = self.ctx.out_dir | 58 out_dir = self.ctx.out_dir |
69 uri_getter = self.source.route.getUri | 59 uri_getter = self.source.route.getUri |
123 | 113 |
124 if len(jobs) > 0: | 114 if len(jobs) > 0: |
125 return jobs | 115 return jobs |
126 return None | 116 return None |
127 | 117 |
128 def _createSecondPassJobs(self, ctx): | 118 def _createThirdPassJobs(self, ctx): |
129 # Get the list of all sources that had anything baked. | 119 # Get the list of all sources that had anything baked. |
130 dirty_source_names = set() | 120 dirty_source_names = set() |
131 all_records = ctx.record_histories.current.records | 121 all_records = ctx.record_histories.current.records |
132 for rec in all_records: | 122 for rec in all_records: |
133 rec_dsn = rec.user_data.get('dirty_source_names') | 123 rec_dsn = rec.user_data.get('dirty_source_names') |
169 if len(jobs) > 0: | 159 if len(jobs) > 0: |
170 return jobs | 160 return jobs |
171 return None | 161 return None |
172 | 162 |
173 def handleJobResult(self, result, ctx): | 163 def handleJobResult(self, result, ctx): |
174 existing = ctx.record_entry | 164 step_num = ctx.step_num |
175 merge_job_result_into_record_entry(existing, result) | 165 |
176 if existing.was_any_sub_baked: | 166 if step_num == 0: |
177 ctx.record.user_data['dirty_source_names'].add(self.source.name) | 167 print(result) |
168 new_entry = self.createRecordEntry(result['item_spec']) | |
169 new_entry.config = result['config'] | |
170 new_entry.route_params = result['route_params'] | |
171 new_entry.timestamp = result['timestamp'] | |
172 ctx.record.addEntry(new_entry) | |
173 else: | |
174 existing = ctx.record_entry | |
175 merge_job_result_into_record_entry(existing, result) | |
176 | |
177 if existing.was_any_sub_baked: | |
178 ctx.record.user_data['dirty_source_names'].add(self.source.name) | |
178 | 179 |
179 def run(self, job, ctx, result): | 180 def run(self, job, ctx, result): |
180 pass_num = job.get('pass_num', 0) | 181 pass_num = job.get('pass_num', 0) |
181 step_num = job.get('step_num', 0) | 182 step_num = job.get('step_num', 0) |
182 if pass_num == 0: | 183 if pass_num == 0: |
183 if step_num == 0: | 184 if step_num == 0: |
185 self._loadPage(job, ctx, result) | |
186 elif step_num == 1: | |
184 self._renderOrPostpone(job, ctx, result) | 187 self._renderOrPostpone(job, ctx, result) |
185 elif step_num == 1: | 188 elif step_num == 2: |
186 self._renderAlways(job, ctx, result) | 189 self._renderAlways(job, ctx, result) |
190 else: | |
191 raise Exception("Unexpected pipeline step: %d" % step_num) | |
187 elif pass_num == 1: | 192 elif pass_num == 1: |
188 self._renderAlways(job, ctx, result) | 193 self._renderAlways(job, ctx, result) |
194 else: | |
195 raise Exception("Unexpected pipeline pass: %d" % pass_num) | |
189 | 196 |
190 def getDeletions(self, ctx): | 197 def getDeletions(self, ctx): |
191 for prev, cur in ctx.record_history.diffs: | 198 for prev, cur in ctx.record_history.diffs: |
192 if prev and not cur: | 199 if prev and not cur: |
193 for sub in prev.subs: | 200 for sub in prev.subs: |
202 def collapseRecords(self, ctx): | 209 def collapseRecords(self, ctx): |
203 pass | 210 pass |
204 | 211 |
205 def shutdown(self): | 212 def shutdown(self): |
206 self._pagebaker.stopWriterQueue() | 213 self._pagebaker.stopWriterQueue() |
214 | |
215 def _loadPage(self, job, ctx, result): | |
216 content_item = content_item_from_job(self, job) | |
217 page = self.app.getPage(self.source, content_item) | |
218 | |
219 trigger_next_job = True | |
220 result['flags'] = PagePipelineRecordEntry.FLAG_NONE | |
221 result['config'] = page.config.getAll() | |
222 result['route_params'] = item.metadata['route_params'] | |
223 result['timestamp'] = page.datetime.timestamp() | |
224 | |
225 if page.was_modified: | |
226 result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED | |
227 if page.config.get(self._draft_setting): | |
228 result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT | |
229 trigger_next_job = False | |
230 | |
231 if trigger_next_job: | |
232 result['next_step_job'] = create_job(self, content_item.spec) | |
207 | 233 |
208 def _renderOrPostpone(self, job, ctx, result): | 234 def _renderOrPostpone(self, job, ctx, result): |
209 # Here our job is to render the page's segments so that they're | 235 # Here our job is to render the page's segments so that they're |
210 # cached in memory and on disk... unless we detect that the page | 236 # cached in memory and on disk... unless we detect that the page |
211 # is using some other sources, in which case we abort and we'll try | 237 # is using some other sources, in which case we abort and we'll try |