Mercurial > piecrust2
comparison piecrust/baking/worker.py @ 447:aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
The `workerpool` package now defines a generic-ish worker pool. It's similar
to the Python framework pool but with a simpler use-case (only one way to
queue jobs) and support for workers to send a final "report" to the master
process, which we use to get timing information here.
The rest of the changes basically remove a whole bunch of duplicated code
that's not needed anymore.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 05 Jul 2015 00:09:41 -0700 |
parents | dc8518c51cbe |
children | 838f3964f400 |
comparison
equal
deleted
inserted
replaced
446:4cdf6c2157a0 | 447:aefe70229fdd |
---|---|
1 import time | 1 import time |
2 import queue | |
3 import logging | 2 import logging |
4 from piecrust.app import PieCrust | 3 from piecrust.app import PieCrust |
5 from piecrust.baking.single import PageBaker, BakingError | 4 from piecrust.baking.single import PageBaker, BakingError |
6 from piecrust.rendering import ( | 5 from piecrust.rendering import ( |
7 QualifiedPage, PageRenderingContext, render_page_segments) | 6 QualifiedPage, PageRenderingContext, render_page_segments) |
8 from piecrust.routing import create_route_metadata | 7 from piecrust.routing import create_route_metadata |
9 from piecrust.sources.base import PageFactory | 8 from piecrust.sources.base import PageFactory |
9 from piecrust.workerpool import IWorker | |
10 | 10 |
11 | 11 |
12 logger = logging.getLogger(__name__) | 12 logger = logging.getLogger(__name__) |
13 | |
14 | |
15 def worker_func(wid, ctx): | |
16 if ctx.is_profiling: | |
17 try: | |
18 import cProfile as profile | |
19 except ImportError: | |
20 import profile | |
21 | |
22 ctx.is_profiling = False | |
23 profile.runctx('_real_worker_func(wid, ctx)', | |
24 globals(), locals(), | |
25 filename='BakeWorker-%d.prof' % wid) | |
26 else: | |
27 _real_worker_func(wid, ctx) | |
28 | |
29 | |
30 def _real_worker_func(wid, ctx): | |
31 logger.debug("Worker %d booting up..." % wid) | |
32 w = BakeWorker(wid, ctx) | |
33 w.run() | |
34 | 13 |
35 | 14 |
36 class BakeWorkerContext(object): | 15 class BakeWorkerContext(object): |
37 def __init__(self, root_dir, sub_cache_dir, out_dir, | 16 def __init__(self, root_dir, sub_cache_dir, out_dir, |
38 work_queue, results, abort_event, | 17 force=False, debug=False): |
39 force=False, debug=False, is_profiling=False): | |
40 self.root_dir = root_dir | 18 self.root_dir = root_dir |
41 self.sub_cache_dir = sub_cache_dir | 19 self.sub_cache_dir = sub_cache_dir |
42 self.out_dir = out_dir | 20 self.out_dir = out_dir |
43 self.work_queue = work_queue | |
44 self.results = results | |
45 self.abort_event = abort_event | |
46 self.force = force | 21 self.force = force |
47 self.debug = debug | 22 self.debug = debug |
48 self.is_profiling = is_profiling | 23 |
49 | 24 |
50 | 25 class BakeWorker(IWorker): |
51 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) | 26 def __init__(self, ctx): |
52 | |
53 | |
54 class BakeWorkerJob(object): | |
55 def __init__(self, job_type, payload): | |
56 self.job_type = job_type | |
57 self.payload = payload | |
58 | |
59 | |
60 class BakeWorker(object): | |
61 def __init__(self, wid, ctx): | |
62 self.wid = wid | |
63 self.ctx = ctx | 27 self.ctx = ctx |
64 | 28 self.work_start_time = time.perf_counter() |
65 def run(self): | 29 |
66 logger.debug("Working %d initializing..." % self.wid) | 30 def initialize(self): |
67 work_start_time = time.perf_counter() | |
68 | |
69 # Create the app local to this worker. | 31 # Create the app local to this worker. |
70 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) | 32 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) |
71 app._useSubCacheDir(self.ctx.sub_cache_dir) | 33 app._useSubCacheDir(self.ctx.sub_cache_dir) |
72 app.env.fs_cache_only_for_main_page = True | 34 app.env.fs_cache_only_for_main_page = True |
73 app.env.registerTimer("BakeWorker_%d_Total" % self.wid) | 35 app.env.registerTimer("BakeWorker_%d_Total" % self.wid) |
74 app.env.registerTimer("BakeWorkerInit") | 36 app.env.registerTimer("BakeWorkerInit") |
75 app.env.registerTimer("JobReceive") | 37 app.env.registerTimer("JobReceive") |
38 self.app = app | |
76 | 39 |
77 # Create the job handlers. | 40 # Create the job handlers. |
78 job_handlers = { | 41 job_handlers = { |
79 JOB_LOAD: LoadJobHandler(app, self.ctx), | 42 JOB_LOAD: LoadJobHandler(app, self.ctx), |
80 JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), | 43 JOB_RENDER_FIRST: RenderFirstSubJobHandler(app, self.ctx), |
81 JOB_BAKE: BakeJobHandler(app, self.ctx)} | 44 JOB_BAKE: BakeJobHandler(app, self.ctx)} |
82 for jt, jh in job_handlers.items(): | 45 for jt, jh in job_handlers.items(): |
83 app.env.registerTimer(type(jh).__name__) | 46 app.env.registerTimer(type(jh).__name__) |
84 | 47 self.job_handlers = job_handlers |
85 app.env.stepTimerSince("BakeWorkerInit", work_start_time) | 48 |
86 | 49 app.env.stepTimerSince("BakeWorkerInit", self.work_start_time) |
87 # Start working! | 50 |
88 aborted_with_exception = None | 51 def process(self, job): |
89 while not self.ctx.abort_event.is_set(): | 52 handler = self.job_handlers[job.job_type] |
90 try: | 53 with self.app.env.timerScope(type(handler).__name__): |
91 with app.env.timerScope('JobReceive'): | 54 return handler.handleJob(job) |
92 job = self.ctx.work_queue.get(True, 0.01) | 55 |
93 except queue.Empty: | 56 def getReport(self): |
94 continue | 57 self.app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, |
95 | 58 self.work_start_time) |
96 try: | 59 return { |
97 handler = job_handlers[job.job_type] | 60 'type': 'timers', |
98 with app.env.timerScope(type(handler).__name__): | 61 'data': self.app.env._timers} |
99 handler.handleJob(job) | 62 |
100 except Exception as ex: | 63 |
101 self.ctx.abort_event.set() | 64 JOB_LOAD, JOB_RENDER_FIRST, JOB_BAKE = range(0, 3) |
102 aborted_with_exception = ex | 65 |
103 logger.debug("[%d] Critical error, aborting." % self.wid) | 66 |
104 if self.ctx.debug: | 67 class BakeWorkerJob(object): |
105 logger.exception(ex) | 68 def __init__(self, job_type, payload): |
106 break | 69 self.job_type = job_type |
107 finally: | 70 self.payload = payload |
108 self.ctx.work_queue.task_done() | |
109 | |
110 if aborted_with_exception is not None: | |
111 msgs = _get_errors(aborted_with_exception) | |
112 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs}) | |
113 | |
114 # Send our timers to the main process before exiting. | |
115 app.env.stepTimerSince("BakeWorker_%d_Total" % self.wid, | |
116 work_start_time) | |
117 self.ctx.results.put_nowait({ | |
118 'type': 'timers', 'data': app.env._timers}) | |
119 | 71 |
120 | 72 |
121 class JobHandler(object): | 73 class JobHandler(object): |
122 def __init__(self, app, ctx): | 74 def __init__(self, app, ctx): |
123 self.app = app | 75 self.app = app |
201 except Exception as ex: | 153 except Exception as ex: |
202 logger.debug("Got loading error. Sending it to master.") | 154 logger.debug("Got loading error. Sending it to master.") |
203 result.errors = _get_errors(ex) | 155 result.errors = _get_errors(ex) |
204 if self.ctx.debug: | 156 if self.ctx.debug: |
205 logger.exception(ex) | 157 logger.exception(ex) |
206 | 158 return result |
207 self.ctx.results.put_nowait(result) | |
208 | 159 |
209 | 160 |
210 class RenderFirstSubJobHandler(JobHandler): | 161 class RenderFirstSubJobHandler(JobHandler): |
211 def handleJob(self, job): | 162 def handleJob(self, job): |
212 # Render the segments for the first sub-page of this page. | 163 # Render the segments for the first sub-page of this page. |
229 except Exception as ex: | 180 except Exception as ex: |
230 logger.debug("Got rendering error. Sending it to master.") | 181 logger.debug("Got rendering error. Sending it to master.") |
231 result.errors = _get_errors(ex) | 182 result.errors = _get_errors(ex) |
232 if self.ctx.debug: | 183 if self.ctx.debug: |
233 logger.exception(ex) | 184 logger.exception(ex) |
234 | 185 return result |
235 self.ctx.results.put_nowait(result) | |
236 | 186 |
237 | 187 |
238 class BakeJobHandler(JobHandler): | 188 class BakeJobHandler(JobHandler): |
239 def __init__(self, app, ctx): | 189 def __init__(self, app, ctx): |
240 super(BakeJobHandler, self).__init__(app, ctx) | 190 super(BakeJobHandler, self).__init__(app, ctx) |
270 logger.debug("Got baking error. Sending it to master.") | 220 logger.debug("Got baking error. Sending it to master.") |
271 result.errors = _get_errors(ex) | 221 result.errors = _get_errors(ex) |
272 if self.ctx.debug: | 222 if self.ctx.debug: |
273 logger.exception(ex) | 223 logger.exception(ex) |
274 | 224 |
275 self.ctx.results.put_nowait(result) | 225 return result |
276 | 226 |