comparison piecrust/pipelines/page.py @ 1015:fa489c5e829e

bake: Load pages in parallel again.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 29 Nov 2017 20:37:57 -0800
parents 1857dbd4580f
children 3c6e6e7b9639
comparison
equal deleted inserted replaced
1014:071f30aa04bb 1015:fa489c5e829e
14 14
15 15
16 class PagePipeline(ContentPipeline): 16 class PagePipeline(ContentPipeline):
17 PIPELINE_NAME = 'page' 17 PIPELINE_NAME = 'page'
18 RECORD_ENTRY_CLASS = PagePipelineRecordEntry 18 RECORD_ENTRY_CLASS = PagePipelineRecordEntry
19 PASS_NUM = [0, 1] 19 PASS_NUM = [0, 1, 2]
20 20
21 def __init__(self, source, ppctx): 21 def __init__(self, source, ppctx):
22 super().__init__(source, ppctx) 22 super().__init__(source, ppctx)
23 self._pagebaker = None 23 self._pagebaker = None
24 self._stats = source.app.env.stats 24 self._stats = source.app.env.stats
32 self._pagebaker = PageBaker(self.app, 32 self._pagebaker = PageBaker(self.app,
33 self.ctx.out_dir, 33 self.ctx.out_dir,
34 force=self.ctx.force) 34 force=self.ctx.force)
35 self._pagebaker.startWriterQueue() 35 self._pagebaker.startWriterQueue()
36 36
37 def loadAllContents(self): 37 def createJobs(self, ctx):
38 if ctx.pass_num == 0:
39 return self._createLoadJobs(ctx)
40 if ctx.pass_num == 1:
41 return self._createSecondPassJobs(ctx)
42 return self._createThirdPassJobs(ctx)
43
44 def _createLoadJobs(self, ctx):
38 # Here we load all the pages in the source, making sure they all 45 # Here we load all the pages in the source, making sure they all
39 # have a valid cache for their configuration and contents. 46 # have a valid cache for their configuration and contents.
40 # We also create the record entries while we're at it. 47 jobs = []
41 source = self.source 48 for item in self.source.getAllContents():
42 page_fac = self.app.getPage 49 jobs.append(create_job(self, item.spec))
43 record_fac = self.createRecordEntry 50 if len(jobs) > 0:
44 for item in source.getAllContents(): 51 return jobs
45 page = page_fac(source, item) 52 return None
46 53
47 cur_entry = record_fac(item.spec) 54 def _createSecondPassJobs(self, ctx):
48 cur_entry.config = page.config.getAll()
49 cur_entry.route_params = item.metadata['route_params']
50 cur_entry.timestamp = page.datetime.timestamp()
51
52 if page.was_modified:
53 cur_entry.flags |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
54 if page.config.get(self._draft_setting):
55 cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT
56
57 yield cur_entry
58
59 def createJobs(self, ctx):
60 if ctx.pass_num == 0:
61 return self._createFirstPassJobs(ctx)
62 return self._createSecondPassJobs(ctx)
63
64 def _createFirstPassJobs(self, ctx):
65 jobs = [] 55 jobs = []
66 56
67 app = self.app 57 app = self.app
68 out_dir = self.ctx.out_dir 58 out_dir = self.ctx.out_dir
69 uri_getter = self.source.route.getUri 59 uri_getter = self.source.route.getUri
123 113
124 if len(jobs) > 0: 114 if len(jobs) > 0:
125 return jobs 115 return jobs
126 return None 116 return None
127 117
128 def _createSecondPassJobs(self, ctx): 118 def _createThirdPassJobs(self, ctx):
129 # Get the list of all sources that had anything baked. 119 # Get the list of all sources that had anything baked.
130 dirty_source_names = set() 120 dirty_source_names = set()
131 all_records = ctx.record_histories.current.records 121 all_records = ctx.record_histories.current.records
132 for rec in all_records: 122 for rec in all_records:
133 rec_dsn = rec.user_data.get('dirty_source_names') 123 rec_dsn = rec.user_data.get('dirty_source_names')
169 if len(jobs) > 0: 159 if len(jobs) > 0:
170 return jobs 160 return jobs
171 return None 161 return None
172 162
173 def handleJobResult(self, result, ctx): 163 def handleJobResult(self, result, ctx):
174 existing = ctx.record_entry 164 step_num = ctx.step_num
175 merge_job_result_into_record_entry(existing, result) 165
176 if existing.was_any_sub_baked: 166 if step_num == 0:
177 ctx.record.user_data['dirty_source_names'].add(self.source.name) 167 print(result)
168 new_entry = self.createRecordEntry(result['item_spec'])
169 new_entry.config = result['config']
170 new_entry.route_params = result['route_params']
171 new_entry.timestamp = result['timestamp']
172 ctx.record.addEntry(new_entry)
173 else:
174 existing = ctx.record_entry
175 merge_job_result_into_record_entry(existing, result)
176
177 if existing.was_any_sub_baked:
178 ctx.record.user_data['dirty_source_names'].add(self.source.name)
178 179
179 def run(self, job, ctx, result): 180 def run(self, job, ctx, result):
180 pass_num = job.get('pass_num', 0) 181 pass_num = job.get('pass_num', 0)
181 step_num = job.get('step_num', 0) 182 step_num = job.get('step_num', 0)
182 if pass_num == 0: 183 if pass_num == 0:
183 if step_num == 0: 184 if step_num == 0:
185 self._loadPage(job, ctx, result)
186 elif step_num == 1:
184 self._renderOrPostpone(job, ctx, result) 187 self._renderOrPostpone(job, ctx, result)
185 elif step_num == 1: 188 elif step_num == 2:
186 self._renderAlways(job, ctx, result) 189 self._renderAlways(job, ctx, result)
190 else:
191 raise Exception("Unexpected pipeline step: %d" % step_num)
187 elif pass_num == 1: 192 elif pass_num == 1:
188 self._renderAlways(job, ctx, result) 193 self._renderAlways(job, ctx, result)
194 else:
195 raise Exception("Unexpected pipeline pass: %d" % pass_num)
189 196
190 def getDeletions(self, ctx): 197 def getDeletions(self, ctx):
191 for prev, cur in ctx.record_history.diffs: 198 for prev, cur in ctx.record_history.diffs:
192 if prev and not cur: 199 if prev and not cur:
193 for sub in prev.subs: 200 for sub in prev.subs:
202 def collapseRecords(self, ctx): 209 def collapseRecords(self, ctx):
203 pass 210 pass
204 211
205 def shutdown(self): 212 def shutdown(self):
206 self._pagebaker.stopWriterQueue() 213 self._pagebaker.stopWriterQueue()
214
215 def _loadPage(self, job, ctx, result):
216 content_item = content_item_from_job(self, job)
217 page = self.app.getPage(self.source, content_item)
218
219 trigger_next_job = True
220 result['flags'] = PagePipelineRecordEntry.FLAG_NONE
221 result['config'] = page.config.getAll()
222 result['route_params'] = item.metadata['route_params']
223 result['timestamp'] = page.datetime.timestamp()
224
225 if page.was_modified:
226 result['flags'] |= PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED
227 if page.config.get(self._draft_setting):
228 result['flags'] |= PagePipelineRecordEntry.FLAG_IS_DRAFT
229 trigger_next_job = False
230
231 if trigger_next_job:
232 result['next_step_job'] = create_job(self, content_item.spec)
207 233
208 def _renderOrPostpone(self, job, ctx, result): 234 def _renderOrPostpone(self, job, ctx, result):
209 # Here our job is to render the page's segments so that they're 235 # Here our job is to render the page's segments so that they're
210 # cached in memory and on disk... unless we detect that the page 236 # cached in memory and on disk... unless we detect that the page
211 # is using some other sources, in which case we abort and we'll try 237 # is using some other sources, in which case we abort and we'll try