comparison piecrust/baking/worker.py @ 447:aefe70229fdd

bake: Commonize worker pool code between html and asset baking. The `workerpool` package now defines a generic-ish worker pool. It's similar to the Python framework pool but with a simpler use-case (only one way to queue jobs) and support for workers to send a final "report" to the master process, which we use to get timing information here. The rest of the changes basically remove a whole bunch of duplicated code that's not needed anymore.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 05 Jul 2015 00:09:41 -0700
parents dc8518c51cbe
children 838f3964f400
comparison
equal deleted inserted replaced
446:4cdf6c2157a0 447:aefe70229fdd
1 import time 1 import time
2 import queue
3 import logging 2 import logging
4 from piecrust.app import PieCrust 3 from piecrust.app import PieCrust
5 from piecrust.baking.single import PageBaker, BakingError 4 from piecrust.baking.single import PageBaker, BakingError
6 from piecrust.rendering import ( 5 from piecrust.rendering import (
7 QualifiedPage, PageRenderingContext, render_page_segments) 6 QualifiedPage, PageRenderingContext, render_page_segments)
8 from piecrust.routing import create_route_metadata 7 from piecrust.routing import create_route_metadata
9 from piecrust.sources.base import PageFactory 8 from piecrust.sources.base import PageFactory
9 from piecrust.workerpool import IWorker
10 10
11 11
12 logger = logging.getLogger(__name__) 12 logger = logging.getLogger(__name__)
13
14
15 def worker_func(wid, ctx):
16 if ctx.is_profiling:
17 try:
18 import cProfile as profile
19 except ImportError:
20 import profile
21
22 ctx.is_profiling = False
23 profile.runctx('_real_worker_func(wid, ctx)',
24 globals(), locals(),
25 filename='BakeWorker-%d.prof' % wid)
26 else:
27 _real_worker_func(wid, ctx)
28
29
30 def _real_worker_func(wid, ctx):
31 logger.debug("Worker %d booting up..." % wid)
32 w = BakeWorker(wid, ctx)
33 w.run()
34 13
35 14
36 class BakeWorkerContext(object): 15 class BakeWorkerContext(object):
37 def __init__(self, root_dir, sub_cache_dir, out_dir, 16 def __init__(self, root_dir, sub_cache_dir, out_dir,
38 work_queue, results, abort_event, 17 force=False, debug=False):
39 force=False, debug=False, is_profiling=False):
40 self.root_dir = root_dir 18 self.root_dir = root_dir
41 self.sub_cache_dir = sub_cache_dir 19 self.sub_cache_dir = sub_cache_dir
42 self.out_dir = out_dir 20 self.out_dir = out_dir
43 self.work_queue = work_queue
44 self.results = results
45 self.abort_event = abort_event
46 self.force = force 21 self.force = force
47 self.debug = debug 22 self.debug = debug
48 self.is_profiling = is_profiling 23
49 24
50 25 class BakeWorker(IWorker):
51 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) 26 def __init__(self, ctx):
52
53
54 class BakeWorkerJob(object):
55 def __init__(self, job_type, payload):
56 self.job_type = job_type
57 self.payload = payload
58
59
60 class BakeWorker(object):
61 def __init__(self, wid, ctx):
62 self.wid = wid
63 self.ctx = ctx 27 self.ctx = ctx
64 28 self.work_start_time = time.perf_counter()
65 def run(self): 29
66 logger.debug("Working %d initializing..." % self.wid) 30 def initialize(self):
67 work_start_time = time.perf_counter()
68
69 # Create the app local to this worker. 31 # Create the app local to this worker.
70 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) 32 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
71 app._useSubCacheDir(self.ctx.sub_cache_dir) 33 app._useSubCacheDir(self.ctx.sub_cache_dir)
72 app.env.fs_cache_only_for_main_page = True 34 app.env.fs_cache_only_for_main_page = True
73 app.env.registerTimer("BakeWorker_%d_Total" % self.wid) 35 app.env.registerTimer("BakeWorker_%d_Total" % self.wid)
74 app.env.registerTimer("BakeWorkerInit") 36 app.env.registerTimer("BakeWorkerInit")
75 app.env.registerTimer("JobReceive") 37 app.env.registerTimer("JobReceive")
38 self.app = app
76 39
77 # Create the job handlers. 40 # Create the job handlers.
78 job_handlers = { 41 job_handlers = {
79 JOB_LOAD: LoadJobHandler(app, self.ctx), 42 JOB_LOAD: LoadJobHandler(app, self.ctx),
80 JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), 43 JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx),
81 JOB_BAKE: BakeJobHandler(app, self.ctx)} 44 JOB_BAKE: BakeJobHandler(app, self.ctx)}
82 for jt, jh in job_handlers.items(): 45 for jt, jh in job_handlers.items():
83 app.env.registerTimer(type(jh).__name__) 46 app.env.registerTimer(type(jh).__name__)
84 47 self.job_handlers = job_handlers
85 app.env.stepTimerSince("BakeWorkerInit", work_start_time) 48
86 49 app.env.stepTimerSince("BakeWorkerInit", self.work_start_time)
87 # Start working! 50
88 aborted_with_exception = None 51 def process(self, job):
89 while not self.ctx.abort_event.is_set(): 52 handler = self.job_handlers[job.job_type]
90 try: 53 with self.app.env.timerScope(type(handler).__name__):
91 with app.env.timerScope('JobReceive'): 54 return handler.handleJob(job)
92 job = self.ctx.work_queue.get(True, 0.01) 55
93 except queue.Empty: 56 def getReport(self):
94 continue 57 self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
95 58 self.work_start_time)
96 try: 59 return {
97 handler = job_handlers[job.job_type] 60 'type': 'timers',
98 with app.env.timerScope(type(handler).__name__): 61 'data': self.app.env._timers}
99 handler.handleJob(job) 62
100 except Exception as ex: 63
101 self.ctx.abort_event.set() 64 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3)
102 aborted_with_exception = ex 65
103 logger.debug("[%d] Critical error, aborting." % self.wid) 66
104 if self.ctx.debug: 67 class BakeWorkerJob(object):
105 logger.exception(ex) 68 def __init__(self, job_type, payload):
106 break 69 self.job_type = job_type
107 finally: 70 self.payload = payload
108 self.ctx.work_queue.task_done()
109
110 if aborted_with_exception is not None:
111 msgs = _get_errors(aborted_with_exception)
112 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
113
114 # Send our timers to the main process before exiting.
115 app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid,
116 work_start_time)
117 self.ctx.results.put_nowait({
118 'type': 'timers', 'data': app.env._timers})
119 71
120 72
121 class JobHandler(object): 73 class JobHandler(object):
122 def __init__(self, app, ctx): 74 def __init__(self, app, ctx):
123 self.app = app 75 self.app = app
201 except Exception as ex: 153 except Exception as ex:
202 logger.debug("Got loading error. Sending it to master.") 154 logger.debug("Got loading error. Sending it to master.")
203 result.errors = _get_errors(ex) 155 result.errors = _get_errors(ex)
204 if self.ctx.debug: 156 if self.ctx.debug:
205 logger.exception(ex) 157 logger.exception(ex)
206 158 return result
207 self.ctx.results.put_nowait(result)
208 159
209 160
210 class RenderFirstSubJobHandler(JobHandler): 161 class RenderFirstSubJobHandler(JobHandler):
211 def handleJob(self, job): 162 def handleJob(self, job):
212 # Render the segments for the first sub-page of this page. 163 # Render the segments for the first sub-page of this page.
229 except Exception as ex: 180 except Exception as ex:
230 logger.debug("Got rendering error. Sending it to master.") 181 logger.debug("Got rendering error. Sending it to master.")
231 result.errors = _get_errors(ex) 182 result.errors = _get_errors(ex)
232 if self.ctx.debug: 183 if self.ctx.debug:
233 logger.exception(ex) 184 logger.exception(ex)
234 185 return result
235 self.ctx.results.put_nowait(result)
236 186
237 187
238 class BakeJobHandler(JobHandler): 188 class BakeJobHandler(JobHandler):
239 def __init__(self, app, ctx): 189 def __init__(self, app, ctx):
240 super(BakeJobHandler, self).__init__(app, ctx) 190 super(BakeJobHandler, self).__init__(app, ctx)
270 logger.debug("Got baking error. Sending it to master.") 220 logger.debug("Got baking error. Sending it to master.")
271 result.errors = _get_errors(ex) 221 result.errors = _get_errors(ex)
272 if self.ctx.debug: 222 if self.ctx.debug:
273 logger.exception(ex) 223 logger.exception(ex)
274 224
275 self.ctx.results.put_nowait(result) 225 return result
276 226