comparison piecrust/baking/worker.py @ 852:4850f8c21b6e

core: Start of the big refactor for PieCrust 3.0. * Everything is a `ContentSource`, including assets directories. * Most content sources are subclasses of the base file-system source. * A source is processed by a "pipeline", and there are 2 built-in pipelines, one for assets and one for pages. The asset pipeline is vaguely functional, but the page pipeline is completely broken right now. * Rewrite the baking process as just running appropriate pipelines on each content item. This should allow for better parallelization.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 17 May 2017 00:11:48 -0700
parents c3cb2f9df882
children f070a4fc033c
comparison
equal deleted inserted replaced
851:2c7e57d80bba 852:4850f8c21b6e
1 import time 1 import time
2 import logging 2 import logging
3 from piecrust.app import PieCrust, apply_variant_and_values 3 from piecrust.pipelines.base import PipelineContext, PipelineResult
4 from piecrust.baking.records import BakeRecord, _get_transition_key 4 from piecrust.pipelines.records import (
5 from piecrust.baking.single import PageBaker, BakingError 5 MultiRecordHistory, MultiRecord, Record, load_records)
6 from piecrust.environment import AbortedSourceUseError 6 from piecrust.sources.base import ContentItem
7 from piecrust.rendering import (
8 QualifiedPage, PageRenderingContext, render_page_segments)
9 from piecrust.routing import create_route_metadata
10 from piecrust.sources.base import PageFactory
11 from piecrust.workerpool import IWorker 7 from piecrust.workerpool import IWorker
12 8
13 9
14 logger = logging.getLogger(__name__) 10 logger = logging.getLogger(__name__)
15 11
16 12
17 class BakeWorkerContext(object): 13 class BakeWorkerContext(object):
18 def __init__(self, appfactory, out_dir, *, 14 def __init__(self, appfactory, out_dir, *,
19 force=False, previous_record_path=None): 15 force=False, previous_records_path=None,
16 allowed_pipelines=None):
20 self.appfactory = appfactory 17 self.appfactory = appfactory
21 self.out_dir = out_dir 18 self.out_dir = out_dir
22 self.force = force 19 self.force = force
23 self.previous_record_path = previous_record_path 20 self.previous_records_path = previous_records_path
24 self.app = None 21 self.allowed_pipelines = allowed_pipelines
25 self.previous_record = None
26 self.previous_record_index = None
27 22
28 23
29 class BakeWorker(IWorker): 24 class BakeWorker(IWorker):
30 def __init__(self, ctx): 25 def __init__(self, ctx):
31 self.ctx = ctx 26 self.ctx = ctx
32 self.work_start_time = time.perf_counter() 27 self.app = None
28 self.record_history = None
29 self._work_start_time = time.perf_counter()
30 self._sources = {}
31 self._ppctx = None
33 32
34 def initialize(self): 33 def initialize(self):
35 # Create the app local to this worker. 34 # Create the app local to this worker.
36 app = self.ctx.appfactory.create() 35 app = self.ctx.appfactory.create()
37 app.config.set('baker/is_baking', True) 36 app.config.set('baker/is_baking', True)
38 app.config.set('baker/worker_id', self.wid) 37 app.config.set('baker/worker_id', self.wid)
39 app.env.base_asset_url_format = '%uri%' 38 app.config.set('site/base_asset_url_format', '%uri')
39
40 app.env.fs_cache_only_for_main_page = True 40 app.env.fs_cache_only_for_main_page = True
41 app.env.registerTimer("BakeWorker_%d_Total" % self.wid) 41
42 app.env.registerTimer("BakeWorkerInit") 42 stats = app.env.stats
43 app.env.registerTimer("JobReceive") 43 stats.registerTimer("BakeWorker_%d_Total" % self.wid)
44 app.env.registerCounter("SourceUseAbortions") 44 stats.registerTimer("BakeWorkerInit")
45 app.env.registerManifest("LoadJobs") 45 stats.registerTimer("JobReceive")
46 app.env.registerManifest("RenderJobs") 46 stats.registerTimer('LoadJob', raise_if_registered=False)
47 app.env.registerManifest("BakeJobs") 47 stats.registerTimer('RenderFirstSubJob',
48 self.ctx.app = app 48 raise_if_registered=False)
49 stats.registerTimer('BakeJob', raise_if_registered=False)
50
51 stats.registerCounter("SourceUseAbortions")
52
53 stats.registerManifest("LoadJobs")
54 stats.registerManifest("RenderJobs")
55 stats.registerManifest("BakeJobs")
56
57 self.app = app
49 58
50 # Load previous record 59 # Load previous record
51 if self.ctx.previous_record_path: 60 if self.ctx.previous_records_path:
52 self.ctx.previous_record = BakeRecord.load( 61 previous_records = load_records(self.ctx.previous_records_path)
53 self.ctx.previous_record_path) 62 else:
54 self.ctx.previous_record_index = {} 63 previous_records = MultiRecord()
55 for e in self.ctx.previous_record.entries: 64 current_records = MultiRecord()
56 key = _get_transition_key(e.path, e.extra_key) 65 self.record_history = MultiRecordHistory(
57 self.ctx.previous_record_index[key] = e 66 previous_records, current_records)
58 67
59 # Create the job handlers. 68 # Cache sources and create pipelines.
60 job_handlers = { 69 ppclasses = {}
61 JOB_LOAD: LoadJobHandler(self.ctx), 70 for ppclass in app.plugin_loader.getPipelines():
62 JOB_RENDER_FIRST: RenderFirstSubJobHandler(self.ctx), 71 ppclasses[ppclass.PIPELINE_NAME] = ppclass
63 JOB_BAKE: BakeJobHandler(self.ctx)}
64 for jt, jh in job_handlers.items():
65 app.env.registerTimer(type(jh).__name__)
66 self.job_handlers = job_handlers
67 72
68 app.env.stepTimerSince("BakeWorkerInit", self.work_start_time) 73 self._ppctx = PipelineContext(self.ctx.out_dir, self.record_history,
74 worker_id=self.wid,
75 force=self.ctx.force)
76 for src in app.sources:
77 ppname = src.config['pipeline']
78 if (self.ctx.allowed_pipelines is not None and
79 ppname not in self.ctx.allowed_pipelines):
80 continue
81
82 pp = ppclasses[ppname](src)
83 pp.initialize(self._ppctx)
84 self._sources[src.name] = (src, pp)
85
86 stats.stepTimerSince("BakeWorkerInit", self._work_start_time)
69 87
70 def process(self, job): 88 def process(self, job):
71 handler = self.job_handlers[job['type']] 89 logger.debug("Received job: %s@%s" % (job.source_name, job.item_spec))
72 with self.ctx.app.env.timerScope(type(handler).__name__): 90 src, pp = self._sources[job.source_name]
73 return handler.handleJob(job['job']) 91 item = ContentItem(job.item_spec, job.item_metadata)
74 92
75 def getReport(self, pool_reports): 93 record_class = pp.RECORD_CLASS or Record
76 self.ctx.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, 94 ppres = PipelineResult(record_class())
77 self.work_start_time) 95 ppres.record.item_spec = job.item_spec
78 data = self.ctx.app.env.getStats() 96 pp.run(item, self._ppctx, ppres)
79 data.timers.update(pool_reports) 97 return ppres
80 return { 98
81 'type': 'stats', 99 def getStats(self):
82 'data': data} 100 stats = self.app.env.stats
101 stats.stepTimerSince("BakeWorker_%d_Total" % self.wid,
102 self._work_start_time)
103 return stats
83 104
84 def shutdown(self): 105 def shutdown(self):
85 for jh in self.job_handlers.values(): 106 for src, pp in self._sources.values():
86 jh.shutdown() 107 pp.shutdown(self._ppctx)
87 108
88 109
89 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) 110 class BakeJob:
111 def __init__(self, source_name, item_spec, item_metadata):
112 self.source_name = source_name
113 self.item_spec = item_spec
114 self.item_metadata = item_metadata
90 115
91 116
92 class JobHandler(object): 117 class JobHandler:
93 def __init__(self, ctx): 118 def __init__(self, ctx):
94 self.ctx = ctx 119 self.ctx = ctx
95 120
96 @property 121 @property
97 def app(self): 122 def app(self):
109 while ex is not None: 134 while ex is not None:
110 errors.append(str(ex)) 135 errors.append(str(ex))
111 ex = ex.__cause__ 136 ex = ex.__cause__
112 return errors 137 return errors
113 138
114
115 def save_factory(fac):
116 return {
117 'source_name': fac.source.name,
118 'rel_path': fac.rel_path,
119 'metadata': fac.metadata}
120
121
122 def load_factory(app, info):
123 source = app.getSource(info['source_name'])
124 return PageFactory(source, info['rel_path'], info['metadata'])
125
126
127 class LoadJobHandler(JobHandler):
128 def handleJob(self, job):
129 # Just make sure the page has been cached.
130 fac = load_factory(self.app, job)
131 logger.debug("Loading page: %s" % fac.ref_spec)
132 self.app.env.addManifestEntry('LoadJobs', fac.ref_spec)
133 result = {
134 'source_name': fac.source.name,
135 'path': fac.path,
136 'config': None,
137 'timestamp': None,
138 'errors': None}
139 try:
140 page = fac.buildPage()
141 page._load()
142 result['config'] = page.config.getAll()
143 result['timestamp'] = page.datetime.timestamp()
144 except Exception as ex:
145 logger.debug("Got loading error. Sending it to master.")
146 result['errors'] = _get_errors(ex)
147 if self.ctx.app.debug:
148 logger.exception(ex)
149 return result
150
151
152 class RenderFirstSubJobHandler(JobHandler):
153 def handleJob(self, job):
154 # Render the segments for the first sub-page of this page.
155 fac = load_factory(self.app, job['factory_info'])
156 self.app.env.addManifestEntry('RenderJobs', fac.ref_spec)
157
158 route_index = job['route_index']
159 route = self.app.routes[route_index]
160
161 page = fac.buildPage()
162 route_metadata = create_route_metadata(page)
163 qp = QualifiedPage(page, route, route_metadata)
164 ctx = PageRenderingContext(qp)
165 self.app.env.abort_source_use = True
166
167 result = {
168 'path': fac.path,
169 'aborted': False,
170 'errors': None}
171 logger.debug("Preparing page: %s" % fac.ref_spec)
172 try:
173 render_page_segments(ctx)
174 except AbortedSourceUseError:
175 logger.debug("Page %s was aborted." % fac.ref_spec)
176 self.app.env.stepCounter("SourceUseAbortions")
177 result['aborted'] = True
178 except Exception as ex:
179 logger.debug("Got rendering error. Sending it to master.")
180 result['errors'] = _get_errors(ex)
181 if self.ctx.app.debug:
182 logger.exception(ex)
183 finally:
184 self.app.env.abort_source_use = False
185 return result
186
187
188 class BakeJobHandler(JobHandler):
189 def __init__(self, ctx):
190 super(BakeJobHandler, self).__init__(ctx)
191 self.page_baker = PageBaker(ctx.app, ctx.out_dir, ctx.force)
192
193 def shutdown(self):
194 self.page_baker.shutdown()
195
196 def handleJob(self, job):
197 # Actually bake the page and all its sub-pages to the output folder.
198 fac = load_factory(self.app, job['factory_info'])
199 self.app.env.addManifestEntry('BakeJobs', fac.ref_spec)
200
201 route_index = job['route_index']
202 route_metadata = job['route_metadata']
203 route = self.app.routes[route_index]
204
205 gen_name = job['generator_name']
206 gen_key = job['generator_record_key']
207 dirty_source_names = job['dirty_source_names']
208
209 page = fac.buildPage()
210 qp = QualifiedPage(page, route, route_metadata)
211
212 result = {
213 'path': fac.path,
214 'generator_name': gen_name,
215 'generator_record_key': gen_key,
216 'sub_entries': None,
217 'errors': None}
218
219 if job.get('needs_config', False):
220 result['config'] = page.config.getAll()
221
222 previous_entry = None
223 if self.ctx.previous_record_index is not None:
224 key = _get_transition_key(fac.path, gen_key)
225 previous_entry = self.ctx.previous_record_index.get(key)
226
227 logger.debug("Baking page: %s" % fac.ref_spec)
228 logger.debug("With route metadata: %s" % route_metadata)
229 try:
230 sub_entries = self.page_baker.bake(
231 qp, previous_entry, dirty_source_names, gen_name)
232 result['sub_entries'] = sub_entries
233
234 except Exception as ex:
235 logger.debug("Got baking error. Sending it to master.")
236 result['errors'] = _get_errors(ex)
237 if self.ctx.app.debug:
238 logger.exception(ex)
239
240 return result
241