Mercurial > piecrust2
comparison piecrust/baking/worker.py @ 411:e7b865f8f335
bake: Enable multiprocess baking.
Baking is now done by running a worker per CPU, and sending jobs to them.
This changes several things across the codebase:
* Ability to not cache things related to pages other than the 'main' page
(i.e. the page at the bottom of the execution stack).
* Decouple the baking process from the bake records, so only the main process
keeps track (and modifies) the bake record.
* Remove the need for 'batch page getters' and loading a page directly from
the page factories.
There are various smaller changes too included here, including support for
scope performance timers that are saved with the bake record and can be
printed out to the console. Yes I got carried away.
For testing, the in-memory 'mock' file-system doesn't work anymore, since
we're spawning processes, so this is replaced by a 'tmpfs' file-system which
is saved in temporary files on disk and deleted after tests have run.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 12 Jun 2015 17:09:19 -0700 |
parents | |
children | 0e9a94b7fdfa |
comparison
equal
deleted
inserted
replaced
410:d1a472464e57 | 411:e7b865f8f335 |
---|---|
1 import time | |
2 import copy | |
3 import queue | |
4 import logging | |
5 from piecrust.app import PieCrust | |
6 from piecrust.baking.single import PageBaker, BakingError | |
7 from piecrust.rendering import ( | |
8 QualifiedPage, PageRenderingContext, render_page_segments) | |
9 from piecrust.sources.base import PageFactory | |
10 | |
11 | |
12 logger = logging.getLogger(__name__) | |
13 | |
14 | |
15 def worker_func(wid, ctx): | |
16 logger.debug("Worker %d booting up..." % wid) | |
17 w = BakeWorker(wid, ctx) | |
18 w.run() | |
19 | |
20 | |
21 class BakeWorkerContext(object): | |
22 def __init__(self, root_dir, out_dir, | |
23 work_queue, results, abort_event, | |
24 force=False, debug=False): | |
25 self.root_dir = root_dir | |
26 self.out_dir = out_dir | |
27 self.work_queue = work_queue | |
28 self.results = results | |
29 self.abort_event = abort_event | |
30 self.force = force | |
31 self.debug = debug | |
32 | |
33 | |
34 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) | |
35 | |
36 | |
37 class BakeWorkerJob(object): | |
38 def __init__(self, job_type, payload): | |
39 self.job_type = job_type | |
40 self.payload = payload | |
41 | |
42 | |
43 class BakeWorker(object): | |
44 def __init__(self, wid, ctx): | |
45 self.wid = wid | |
46 self.ctx = ctx | |
47 | |
48 def run(self): | |
49 logger.debug("Working %d initializing..." % self.wid) | |
50 work_start_time = time.perf_counter() | |
51 | |
52 # Create the app local to this worker. | |
53 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) | |
54 app.env.fs_cache_only_for_main_page = True | |
55 app.env.registerTimer("Worker_%d" % self.wid) | |
56 app.env.registerTimer("JobReceive") | |
57 | |
58 # Create the job handlers. | |
59 job_handlers = { | |
60 JOB_LOAD: LoadJobHandler(app, self.ctx), | |
61 JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), | |
62 JOB_BAKE: BakeJobHandler(app, self.ctx)} | |
63 for jt, jh in job_handlers.items(): | |
64 app.env.registerTimer(type(jh).__name__) | |
65 | |
66 # Start working! | |
67 while not self.ctx.abort_event.is_set(): | |
68 try: | |
69 with app.env.timerScope('JobReceive'): | |
70 job = self.ctx.work_queue.get(True, 0.01) | |
71 except queue.Empty: | |
72 continue | |
73 | |
74 try: | |
75 handler = job_handlers[job.job_type] | |
76 with app.env.timerScope(type(handler).__name__): | |
77 handler.handleJob(job) | |
78 except Exception as ex: | |
79 self.ctx.abort_event.set() | |
80 self.abort_exception = ex | |
81 self.success = False | |
82 logger.debug("[%d] Critical error, aborting." % self.wid) | |
83 if self.ctx.app.debug: | |
84 logger.exception(ex) | |
85 break | |
86 finally: | |
87 self.ctx.work_queue.task_done() | |
88 | |
89 # Send our timers to the main process before exiting. | |
90 app.env.stepTimer("Worker_%d" % self.wid, | |
91 time.perf_counter() - work_start_time) | |
92 self.ctx.results.put_nowait(app.env._timers) | |
93 | |
94 | |
95 class JobHandler(object): | |
96 def __init__(self, app, ctx): | |
97 self.app = app | |
98 self.ctx = ctx | |
99 | |
100 def handleJob(self, job): | |
101 raise NotImplementedError() | |
102 | |
103 | |
104 def _get_errors(ex): | |
105 errors = [] | |
106 while ex is not None: | |
107 errors.append(str(ex)) | |
108 ex = ex.__cause__ | |
109 return errors | |
110 | |
111 | |
112 class PageFactoryInfo(object): | |
113 def __init__(self, fac): | |
114 self.source_name = fac.source.name | |
115 self.rel_path = fac.rel_path | |
116 self.metadata = fac.metadata | |
117 | |
118 def build(self, app): | |
119 source = app.getSource(self.source_name) | |
120 return PageFactory(source, self.rel_path, self.metadata) | |
121 | |
122 | |
123 class LoadJobPayload(object): | |
124 def __init__(self, fac): | |
125 self.factory_info = PageFactoryInfo(fac) | |
126 | |
127 | |
128 class LoadJobResult(object): | |
129 def __init__(self, source_name, path): | |
130 self.source_name = source_name | |
131 self.path = path | |
132 self.config = None | |
133 self.errors = None | |
134 | |
135 | |
136 class RenderFirstSubJobPayload(object): | |
137 def __init__(self, fac): | |
138 self.factory_info = PageFactoryInfo(fac) | |
139 | |
140 | |
141 class RenderFirstSubJobResult(object): | |
142 def __init__(self, path): | |
143 self.path = path | |
144 self.used_assets = None | |
145 self.used_pagination = None | |
146 self.pagination_has_more = False | |
147 self.errors = None | |
148 | |
149 | |
150 class BakeJobPayload(object): | |
151 def __init__(self, fac, route_metadata, previous_entry, | |
152 first_render_info, dirty_source_names, tax_info=None): | |
153 self.factory_info = PageFactoryInfo(fac) | |
154 self.route_metadata = route_metadata | |
155 self.previous_entry = previous_entry | |
156 self.dirty_source_names = dirty_source_names | |
157 self.first_render_info = first_render_info | |
158 self.taxonomy_info = tax_info | |
159 | |
160 | |
161 class BakeJobResult(object): | |
162 def __init__(self, path, tax_info=None): | |
163 self.path = path | |
164 self.taxonomy_info = tax_info | |
165 self.bake_info = None | |
166 self.errors = None | |
167 | |
168 | |
169 class LoadJobHandler(JobHandler): | |
170 def handleJob(self, job): | |
171 # Just make sure the page has been cached. | |
172 fac = job.payload.factory_info.build(self.app) | |
173 logger.debug("Loading page: %s" % fac.ref_spec) | |
174 result = LoadJobResult(fac.source.name, fac.path) | |
175 try: | |
176 page = fac.buildPage() | |
177 page._load() | |
178 result.config = page.config.get() | |
179 except Exception as ex: | |
180 result.errors = _get_errors(ex) | |
181 | |
182 self.ctx.results.put_nowait(result) | |
183 | |
184 | |
185 class RenderFirstSubJobHandler(JobHandler): | |
186 def handleJob(self, job): | |
187 # Render the segments for the first sub-page of this page. | |
188 fac = job.payload.factory_info.build(self.app) | |
189 | |
190 # These things should be OK as they're checked upstream by the baker. | |
191 route = self.app.getRoute(fac.source.name, fac.metadata, | |
192 skip_taxonomies=True) | |
193 assert route is not None | |
194 | |
195 page = fac.buildPage() | |
196 route_metadata = copy.deepcopy(fac.metadata) | |
197 qp = QualifiedPage(page, route, route_metadata) | |
198 ctx = PageRenderingContext(qp) | |
199 | |
200 result = RenderFirstSubJobResult(fac.path) | |
201 logger.debug("Preparing page: %s" % fac.ref_spec) | |
202 try: | |
203 render_page_segments(ctx) | |
204 result.used_assets = ctx.used_assets | |
205 result.used_pagination = ctx.used_pagination is not None | |
206 if result.used_pagination: | |
207 result.pagination_has_more = ctx.used_pagination.has_more | |
208 except Exception as ex: | |
209 logger.debug("Got rendering error. Sending it to master.") | |
210 result.errors = _get_errors(ex) | |
211 | |
212 self.ctx.results.put_nowait(result) | |
213 | |
214 | |
215 class BakeJobHandler(JobHandler): | |
216 def __init__(self, app, ctx): | |
217 super(BakeJobHandler, self).__init__(app, ctx) | |
218 self.page_baker = PageBaker(app, ctx.out_dir, ctx.force) | |
219 | |
220 def handleJob(self, job): | |
221 # Actually bake the page and all its sub-pages to the output folder. | |
222 fac = job.payload.factory_info.build(self.app) | |
223 | |
224 route_metadata = job.payload.route_metadata | |
225 tax_info = job.payload.taxonomy_info | |
226 if tax_info is not None: | |
227 route = self.app.getTaxonomyRoute(tax_info.taxonomy_name, | |
228 tax_info.source_name) | |
229 else: | |
230 route = self.app.getRoute(fac.source.name, route_metadata, | |
231 skip_taxonomies=True) | |
232 assert route is not None | |
233 | |
234 result = BakeJobResult(fac.path, tax_info) | |
235 previous_entry = job.payload.previous_entry | |
236 first_render_info = job.payload.first_render_info | |
237 dirty_source_names = job.payload.dirty_source_names | |
238 logger.debug("Baking page: %s" % fac.ref_spec) | |
239 try: | |
240 report = self.page_baker.bake(fac, route, route_metadata, | |
241 previous_entry, first_render_info, | |
242 dirty_source_names, tax_info) | |
243 result.bake_info = report | |
244 | |
245 except BakingError as ex: | |
246 logger.debug("Got baking error. Sending it to master.") | |
247 result.errors = _get_errors(ex) | |
248 | |
249 self.ctx.results.put_nowait(result) | |
250 |