comparison piecrust/baking/worker.py @ 411:e7b865f8f335

bake: Enable multiprocess baking. Baking is now done by running a worker per CPU, and sending jobs to them. This changes several things across the codebase: * Ability to not cache things related to pages other than the 'main' page (i.e. the page at the bottom of the execution stack). * Decouple the baking process from the bake records, so only the main process keeps track (and modifies) the bake record. * Remove the need for 'batch page getters' and loading a page directly from the page factories. There are various smaller changes too included here, including support for scope performance timers that are saved with the bake record and can be printed out to the console. Yes I got carried away. For testing, the in-memory 'mock' file-system doesn't work anymore, since we're spawning processes, so this is replaced by a 'tmpfs' file-system which is saved in temporary files on disk and deleted after tests have run.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 12 Jun 2015 17:09:19 -0700
parents
children 0e9a94b7fdfa
comparison
equal deleted inserted replaced
410:d1a472464e57 411:e7b865f8f335
1 import time
2 import copy
3 import queue
4 import logging
5 from piecrust.app import PieCrust
6 from piecrust.baking.single import PageBaker, BakingError
7 from piecrust.rendering import (
8 QualifiedPage, PageRenderingContext, render_page_segments)
9 from piecrust.sources.base import PageFactory
10
11
12 logger = logging.getLogger(__name__)
13
14
15 def worker_func(wid, ctx):
16 logger.debug("Worker %d booting up..." % wid)
17 w = BakeWorker(wid, ctx)
18 w.run()
19
20
21 class BakeWorkerContext(object):
22 def __init__(self, root_dir, out_dir,
23 work_queue, results, abort_event,
24 force=False, debug=False):
25 self.root_dir = root_dir
26 self.out_dir = out_dir
27 self.work_queue = work_queue
28 self.results = results
29 self.abort_event = abort_event
30 self.force = force
31 self.debug = debug
32
33
34 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)
35
36
37 class BakeWorkerJob(object):
38 def __init__(self, job_type, payload):
39 self.job_type = job_type
40 self.payload = payload
41
42
43 class BakeWorker(object):
44 def __init__(self, wid, ctx):
45 self.wid = wid
46 self.ctx = ctx
47
48 def run(self):
49 logger.debug("Working %d initializing..." % self.wid)
50 work_start_time = time.perf_counter()
51
52 # Create the app local to this worker.
53 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
54 app.env.fs_cache_only_for_main_page = True
55 app.env.registerTimer("Worker_%d" % self.wid)
56 app.env.registerTimer("JobReceive")
57
58 # Create the job handlers.
59 job_handlers = {
60 JOB_LOAD: LoadJobHandler(app, self.ctx),
61 JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
62 JOB_BAKE: BakeJobHandler(app, self.ctx)}
63 for jt, jh in job_handlers.items():
64 app.env.registerTimer(type(jh).__name__)
65
66 # Start working!
67 while not self.ctx.abort_event.is_set():
68 try:
69 with app.env.timerScope('JobReceive'):
70 job = self.ctx.work_queue.get(True, 0.01)
71 except queue.Empty:
72 continue
73
74 try:
75 handler = job_handlers[job.job_type]
76 with app.env.timerScope(type(handler).__name__):
77 handler.handleJob(job)
78 except Exception as ex:
79 self.ctx.abort_event.set()
80 self.abort_exception = ex
81 self.success = False
82 logger.debug("[%d] Critical error, aborting." % self.wid)
83 if self.ctx.app.debug:
84 logger.exception(ex)
85 break
86 finally:
87 self.ctx.work_queue.task_done()
88
89 # Send our timers to the main process before exiting.
90 app.env.stepTimer("Worker_%d" % self.wid,
91 time.perf_counter() - work_start_time)
92 self.ctx.results.put_nowait(app.env._timers)
93
94
95 class JobHandler(object):
96 def __init__(self, app, ctx):
97 self.app = app
98 self.ctx = ctx
99
100 def handleJob(self, job):
101 raise NotImplementedError()
102
103
104 def _get_errors(ex):
105 errors = []
106 while ex is not None:
107 errors.append(str(ex))
108 ex = ex.__cause__
109 return errors
110
111
112 class PageFactoryInfo(object):
113 def __init__(self, fac):
114 self.source_name = fac.source.name
115 self.rel_path = fac.rel_path
116 self.metadata = fac.metadata
117
118 def build(self, app):
119 source = app.getSource(self.source_name)
120 return PageFactory(source, self.rel_path, self.metadata)
121
122
123 class LoadJobPayload(object):
124 def __init__(self, fac):
125 self.factory_info = PageFactoryInfo(fac)
126
127
128 class LoadJobResult(object):
129 def __init__(self, source_name, path):
130 self.source_name = source_name
131 self.path = path
132 self.config = None
133 self.errors = None
134
135
136 class RenderFirstSubJobPayload(object):
137 def __init__(self, fac):
138 self.factory_info = PageFactoryInfo(fac)
139
140
141 class RenderFirstSubJobResult(object):
142 def __init__(self, path):
143 self.path = path
144 self.used_assets = None
145 self.used_pagination = None
146 self.pagination_has_more = False
147 self.errors = None
148
149
150 class BakeJobPayload(object):
151 def __init__(self, fac, route_metadata, previous_entry,
152 first_render_info, dirty_source_names, tax_info=None):
153 self.factory_info = PageFactoryInfo(fac)
154 self.route_metadata = route_metadata
155 self.previous_entry = previous_entry
156 self.dirty_source_names = dirty_source_names
157 self.first_render_info = first_render_info
158 self.taxonomy_info = tax_info
159
160
161 class BakeJobResult(object):
162 def __init__(self, path, tax_info=None):
163 self.path = path
164 self.taxonomy_info = tax_info
165 self.bake_info = None
166 self.errors = None
167
168
169 class LoadJobHandler(JobHandler):
170 def handleJob(self, job):
171 # Just make sure the page has been cached.
172 fac = job.payload.factory_info.build(self.app)
173 logger.debug("Loading page: %s" % fac.ref_spec)
174 result = LoadJobResult(fac.source.name, fac.path)
175 try:
176 page = fac.buildPage()
177 page._load()
178 result.config = page.config.get()
179 except Exception as ex:
180 result.errors = _get_errors(ex)
181
182 self.ctx.results.put_nowait(result)
183
184
185 class RenderFirstSubJobHandler(JobHandler):
186 def handleJob(self, job):
187 # Render the segments for the first sub-page of this page.
188 fac = job.payload.factory_info.build(self.app)
189
190 # These things should be OK as they're checked upstream by the baker.
191 route = self.app.getRoute(fac.source.name, fac.metadata,
192 skip_taxonomies=True)
193 assert route is not None
194
195 page = fac.buildPage()
196 route_metadata = copy.deepcopy(fac.metadata)
197 qp = QualifiedPage(page, route, route_metadata)
198 ctx = PageRenderingContext(qp)
199
200 result = RenderFirstSubJobResult(fac.path)
201 logger.debug("Preparing page: %s" % fac.ref_spec)
202 try:
203 render_page_segments(ctx)
204 result.used_assets = ctx.used_assets
205 result.used_pagination = ctx.used_pagination is not None
206 if result.used_pagination:
207 result.pagination_has_more = ctx.used_pagination.has_more
208 except Exception as ex:
209 logger.debug("Got rendering error. Sending it to master.")
210 result.errors = _get_errors(ex)
211
212 self.ctx.results.put_nowait(result)
213
214
215 class BakeJobHandler(JobHandler):
216 def __init__(self, app, ctx):
217 super(BakeJobHandler, self).__init__(app, ctx)
218 self.page_baker = PageBaker(app, ctx.out_dir, ctx.force)
219
220 def handleJob(self, job):
221 # Actually bake the page and all its sub-pages to the output folder.
222 fac = job.payload.factory_info.build(self.app)
223
224 route_metadata = job.payload.route_metadata
225 tax_info = job.payload.taxonomy_info
226 if tax_info is not None:
227 route = self.app.getTaxonomyRoute(tax_info.taxonomy_name,
228 tax_info.source_name)
229 else:
230 route = self.app.getRoute(fac.source.name, route_metadata,
231 skip_taxonomies=True)
232 assert route is not None
233
234 result = BakeJobResult(fac.path, tax_info)
235 previous_entry = job.payload.previous_entry
236 first_render_info = job.payload.first_render_info
237 dirty_source_names = job.payload.dirty_source_names
238 logger.debug("Baking page: %s" % fac.ref_spec)
239 try:
240 report = self.page_baker.bake(fac, route, route_metadata,
241 previous_entry, first_render_info,
242 dirty_source_names, tax_info)
243 result.bake_info = report
244
245 except BakingError as ex:
246 logger.debug("Got baking error. Sending it to master.")
247 result.errors = _get_errors(ex)
248
249 self.ctx.results.put_nowait(result)
250