comparison piecrust/workerpool.py @ 852:4850f8c21b6e

core: Start of the big refactor for PieCrust 3.0. * Everything is a `ContentSource`, including assets directories. * Most content sources are subclasses of the base file-system source. * A source is processed by a "pipeline", and there are 2 built-in pipelines, one for assets and one for pages. The asset pipeline is vaguely functional, but the page pipeline is completely broken right now. * Rewrite the baking process as just running appropriate pipelines on each content item. This should allow for better parallelization.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 17 May 2017 00:11:48 -0700
parents c62d83e17abf
children 08e02c2a2a1a
comparison
equal deleted inserted replaced
851:2c7e57d80bba 852:4850f8c21b6e
1 import io 1 import io
2 import os 2 import os
3 import sys 3 import sys
4 import time 4 import time
5 import zlib 5 import pickle
6 import queue
7 import logging 6 import logging
8 import itertools
9 import threading 7 import threading
8 import traceback
10 import multiprocessing 9 import multiprocessing
11 from piecrust import fastpickle 10 from piecrust import fastpickle
11 from piecrust.environment import ExecutionStats
12 12
13 13
14 logger = logging.getLogger(__name__) 14 logger = logging.getLogger(__name__)
15 15
16 use_fastqueue = True 16 use_fastqueue = True
17 use_fastpickle = False
17 18
18 19
19 class IWorker(object): 20 class IWorker(object):
21 """ Interface for a pool worker.
22 """
20 def initialize(self): 23 def initialize(self):
21 raise NotImplementedError() 24 raise NotImplementedError()
22 25
23 def process(self, job): 26 def process(self, job):
24 raise NotImplementedError() 27 raise NotImplementedError()
25 28
26 def getReport(self, pool_reports): 29 def getStats(self):
27 return None 30 return None
28 31
29 def shutdown(self): 32 def shutdown(self):
30 pass 33 pass
31 34
32 35
36 class WorkerExceptionData:
37 def __init__(self, wid):
38 super().__init__()
39 self.wid = wid
40 t, v, tb = sys.exc_info()
41 self.type = t
42 self.value = '\n'.join(_get_errors(v))
43 self.traceback = ''.join(traceback.format_exception(t, v, tb))
44
45 def __str__(self):
46 return str(self.value)
47
48
49 def _get_errors(ex):
50 errors = []
51 while ex is not None:
52 msg = str(ex)
53 errors.append(msg)
54 ex = ex.__cause__
55 return errors
56
57
33 TASK_JOB = 0 58 TASK_JOB = 0
34 TASK_BATCH = 1 59 TASK_END = 1
35 TASK_END = 2
36 60
37 61
38 def worker_func(params): 62 def worker_func(params):
39 if params.is_profiling: 63 if params.is_profiling:
40 try: 64 try:
50 else: 74 else:
51 _real_worker_func(params) 75 _real_worker_func(params)
52 76
53 77
54 def _real_worker_func(params): 78 def _real_worker_func(params):
79 wid = params.wid
80
81 stats = ExecutionStats()
82 stats.registerTimer('WorkerInit')
83 init_start_time = time.perf_counter()
84
55 # In a context where `multiprocessing` is using the `spawn` forking model, 85 # In a context where `multiprocessing` is using the `spawn` forking model,
56 # the new process doesn't inherit anything, so we lost all our logging 86 # the new process doesn't inherit anything, so we lost all our logging
57 # configuration here. Let's set it up again. 87 # configuration here. Let's set it up again.
58 if (hasattr(multiprocessing, 'get_start_method') and 88 if (hasattr(multiprocessing, 'get_start_method') and
59 multiprocessing.get_start_method() == 'spawn'): 89 multiprocessing.get_start_method() == 'spawn'):
60 from piecrust.main import _pre_parse_chef_args 90 from piecrust.main import _pre_parse_chef_args
61 _pre_parse_chef_args(sys.argv[1:]) 91 _pre_parse_chef_args(sys.argv[1:])
62 92
63 wid = params.wid
64 logger.debug("Worker %d initializing..." % wid) 93 logger.debug("Worker %d initializing..." % wid)
65 94
66 # We don't need those. 95 # We don't need those.
67 params.inqueue._writer.close() 96 params.inqueue._writer.close()
68 params.outqueue._reader.close() 97 params.outqueue._reader.close()
76 logger.error("Working failed to initialize:") 105 logger.error("Working failed to initialize:")
77 logger.exception(ex) 106 logger.exception(ex)
78 params.outqueue.put(None) 107 params.outqueue.put(None)
79 return 108 return
80 109
81 use_threads = False 110 stats.stepTimerSince('WorkerInit', init_start_time)
82 if use_threads:
83 # Create threads to read/write the jobs and results from/to the
84 # main arbitrator process.
85 local_job_queue = queue.Queue()
86 reader_thread = threading.Thread(
87 target=_job_queue_reader,
88 args=(params.inqueue.get, local_job_queue),
89 name="JobQueueReaderThread")
90 reader_thread.start()
91
92 local_result_queue = queue.Queue()
93 writer_thread = threading.Thread(
94 target=_job_results_writer,
95 args=(local_result_queue, params.outqueue.put),
96 name="JobResultWriterThread")
97 writer_thread.start()
98
99 get = local_job_queue.get
100 put = local_result_queue.put_nowait
101 else:
102 get = params.inqueue.get
103 put = params.outqueue.put
104 111
105 # Start pumping! 112 # Start pumping!
106 completed = 0 113 completed = 0
107 time_in_get = 0 114 time_in_get = 0
108 time_in_put = 0 115 time_in_put = 0
116 get = params.inqueue.get
117 put = params.outqueue.put
118
109 while True: 119 while True:
110 get_start_time = time.perf_counter() 120 get_start_time = time.perf_counter()
111 task = get() 121 task = get()
112 time_in_get += (time.perf_counter() - get_start_time) 122 time_in_get += (time.perf_counter() - get_start_time)
113 123
114 task_type, task_data = task 124 task_type, task_data = task
125
126 # End task... gather stats to send back to the main process.
115 if task_type == TASK_END: 127 if task_type == TASK_END:
116 logger.debug("Worker %d got end task, exiting." % wid) 128 logger.debug("Worker %d got end task, exiting." % wid)
117 wprep = { 129 stats.registerTimer('WorkerTaskGet', time=time_in_get)
118 'WorkerTaskGet': time_in_get, 130 stats.registerTimer('WorkerResultPut', time=time_in_put)
119 'WorkerResultPut': time_in_put} 131 try:
120 try: 132 stats.mergeStats(w.getStats())
121 rep = (task_type, True, wid, (wid, w.getReport(wprep))) 133 rep = (task_type, task_data, True, wid, (wid, stats))
122 except Exception as e: 134 except Exception as e:
123 logger.debug("Error getting report: %s" % e) 135 logger.debug(
124 if params.wrap_exception: 136 "Error getting report, sending exception to main process:")
125 e = multiprocessing.ExceptionWithTraceback( 137 logger.debug(traceback.format_exc())
126 e, e.__traceback__) 138 we = WorkerExceptionData(wid)
127 rep = (task_type, False, wid, (wid, e)) 139 rep = (task_type, task_data, False, wid, (wid, we))
128 put(rep) 140 put(rep)
129 break 141 break
130 142
131 if task_type == TASK_JOB: 143 # Job task... just do it.
132 task_data = (task_data,) 144 elif task_type == TASK_JOB:
133 145 try:
134 for t in task_data: 146 res = (task_type, task_data, True, wid, w.process(task_data))
135 try:
136 res = (TASK_JOB, True, wid, w.process(t))
137 except Exception as e: 147 except Exception as e:
138 if params.wrap_exception: 148 logger.debug(
139 e = multiprocessing.ExceptionWithTraceback( 149 "Error processing job, sending exception to main process:")
140 e, e.__traceback__) 150 logger.debug(traceback.format_exc())
141 res = (TASK_JOB, False, wid, e) 151 we = WorkerExceptionData(wid)
152 res = (task_type, task_data, False, wid, we)
142 153
143 put_start_time = time.perf_counter() 154 put_start_time = time.perf_counter()
144 put(res) 155 put(res)
145 time_in_put += (time.perf_counter() - put_start_time) 156 time_in_put += (time.perf_counter() - put_start_time)
146 157
147 completed += 1 158 completed += 1
148 159
149 if use_threads: 160 else:
150 logger.debug("Worker %d waiting for reader/writer threads." % wid) 161 raise Exception("Unknown task type: %s" % task_type)
151 local_result_queue.put_nowait(None)
152 reader_thread.join()
153 writer_thread.join()
154 162
155 w.shutdown() 163 w.shutdown()
156
157 logger.debug("Worker %d completed %d tasks." % (wid, completed)) 164 logger.debug("Worker %d completed %d tasks." % (wid, completed))
158 165
159 166
160 def _job_queue_reader(getter, out_queue): 167 class _WorkerParams:
161 while True:
162 try:
163 task = getter()
164 except (EOFError, OSError):
165 logger.debug("Worker encountered connection problem.")
166 break
167
168 out_queue.put_nowait(task)
169
170 if task[0] == TASK_END:
171 # Done reading jobs from the main process.
172 logger.debug("Got end task, exiting task queue reader thread.")
173 break
174
175
176 def _job_results_writer(in_queue, putter):
177 while True:
178 res = in_queue.get()
179 if res is not None:
180 putter(res)
181 in_queue.task_done()
182 else:
183 # Got sentinel. Exit.
184 in_queue.task_done()
185 break
186 logger.debug("Exiting result queue writer thread.")
187
188
189 class _WorkerParams(object):
190 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), 168 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
191 wrap_exception=False, is_profiling=False): 169 is_profiling=False):
192 self.wid = wid 170 self.wid = wid
193 self.inqueue = inqueue 171 self.inqueue = inqueue
194 self.outqueue = outqueue 172 self.outqueue = outqueue
195 self.worker_class = worker_class 173 self.worker_class = worker_class
196 self.initargs = initargs 174 self.initargs = initargs
197 self.wrap_exception = wrap_exception
198 self.is_profiling = is_profiling 175 self.is_profiling = is_profiling
199 176
200 177
201 class WorkerPool(object): 178 class WorkerPool:
202 def __init__(self, worker_class, initargs=(), 179 def __init__(self, worker_class, initargs=(), *,
203 worker_count=None, batch_size=None, 180 callback=None, error_callback=None,
204 wrap_exception=False): 181 worker_count=None, batch_size=None):
205 worker_count = worker_count or os.cpu_count() or 1 182 worker_count = worker_count or os.cpu_count() or 1
206 183
207 if use_fastqueue: 184 if use_fastqueue:
208 self._task_queue = FastQueue() 185 self._task_queue = FastQueue()
209 self._result_queue = FastQueue() 186 self._result_queue = FastQueue()
213 self._task_queue = multiprocessing.SimpleQueue() 190 self._task_queue = multiprocessing.SimpleQueue()
214 self._result_queue = multiprocessing.SimpleQueue() 191 self._result_queue = multiprocessing.SimpleQueue()
215 self._quick_put = self._task_queue._writer.send 192 self._quick_put = self._task_queue._writer.send
216 self._quick_get = self._result_queue._reader.recv 193 self._quick_get = self._result_queue._reader.recv
217 194
195 self._callback = callback
196 self._error_callback = error_callback
218 self._batch_size = batch_size 197 self._batch_size = batch_size
219 self._callback = None 198 self._jobs_left = 0
220 self._error_callback = None 199 self._event = threading.Event()
221 self._listener = None
222 200
223 main_module = sys.modules['__main__'] 201 main_module = sys.modules['__main__']
224 is_profiling = os.path.basename(main_module.__file__) in [ 202 is_profiling = os.path.basename(main_module.__file__) in [
225 'profile.py', 'cProfile.py'] 203 'profile.py', 'cProfile.py']
226 204
227 self._pool = [] 205 self._pool = []
228 for i in range(worker_count): 206 for i in range(worker_count):
229 worker_params = _WorkerParams( 207 worker_params = _WorkerParams(
230 i, self._task_queue, self._result_queue, 208 i, self._task_queue, self._result_queue,
231 worker_class, initargs, 209 worker_class, initargs,
232 wrap_exception=wrap_exception, 210 is_profiling=is_profiling)
233 is_profiling=is_profiling)
234 w = multiprocessing.Process(target=worker_func, 211 w = multiprocessing.Process(target=worker_func,
235 args=(worker_params,)) 212 args=(worker_params,))
236 w.name = w.name.replace('Process', 'PoolWorker') 213 w.name = w.name.replace('Process', 'PoolWorker')
237 w.daemon = True 214 w.daemon = True
238 w.start() 215 w.start()
239 self._pool.append(w) 216 self._pool.append(w)
240 217
241 self._result_handler = threading.Thread( 218 self._result_handler = threading.Thread(
242 target=WorkerPool._handleResults, 219 target=WorkerPool._handleResults,
243 args=(self,)) 220 args=(self,))
244 self._result_handler.daemon = True 221 self._result_handler.daemon = True
245 self._result_handler.start() 222 self._result_handler.start()
246 223
247 self._closed = False 224 self._closed = False
248 225
249 def setHandler(self, callback=None, error_callback=None): 226 def queueJobs(self, jobs):
250 self._callback = callback
251 self._error_callback = error_callback
252
253 def queueJobs(self, jobs, handler=None, chunk_size=None):
254 if self._closed: 227 if self._closed:
255 raise Exception("This worker pool has been closed.") 228 raise Exception("This worker pool has been closed.")
256 if self._listener is not None: 229
257 raise Exception("A previous job queue has not finished yet.") 230 for job in jobs:
258 231 self._jobs_left += 1
259 if any([not p.is_alive() for p in self._pool]): 232 self._quick_put((TASK_JOB, job))
260 raise Exception("Some workers have prematurely exited.") 233
261 234 if self._jobs_left > 0:
262 if handler is not None: 235 self._event.clear()
263 self.setHandler(handler) 236
264 237 def wait(self, timeout=None):
265 if not hasattr(jobs, '__len__'): 238 return self._event.wait(timeout)
266 jobs = list(jobs)
267 job_count = len(jobs)
268
269 res = AsyncResult(self, job_count)
270 if res._count == 0:
271 res._event.set()
272 return res
273
274 self._listener = res
275
276 if chunk_size is None:
277 chunk_size = self._batch_size
278 if chunk_size is None:
279 chunk_size = max(1, job_count // 50)
280 logger.debug("Using chunk size of %d" % chunk_size)
281
282 if chunk_size is None or chunk_size == 1:
283 for job in jobs:
284 self._quick_put((TASK_JOB, job))
285 else:
286 it = iter(jobs)
287 while True:
288 batch = tuple([i for i in itertools.islice(it, chunk_size)])
289 if not batch:
290 break
291 self._quick_put((TASK_BATCH, batch))
292
293 return res
294 239
295 def close(self): 240 def close(self):
296 if self._listener is not None: 241 if self._jobs_left > 0 or not self._event.is_set():
297 raise Exception("A previous job queue has not finished yet.") 242 raise Exception("A previous job queue has not finished yet.")
298 243
299 logger.debug("Closing worker pool...") 244 logger.debug("Closing worker pool...")
300 handler = _ReportHandler(len(self._pool)) 245 handler = _ReportHandler(len(self._pool))
301 self._callback = handler._handle 246 self._callback = handler._handle
247 self._error_callback = handler._handleError
302 for w in self._pool: 248 for w in self._pool:
303 self._quick_put((TASK_END, None)) 249 self._quick_put((TASK_END, None))
304 for w in self._pool: 250 for w in self._pool:
305 w.join() 251 w.join()
306 252
307 logger.debug("Waiting for reports...") 253 logger.debug("Waiting for reports...")
308 if not handler.wait(2): 254 if not handler.wait(2):
309 missing = handler.reports.index(None) 255 missing = handler.reports.index(None)
310 logger.warning( 256 logger.warning(
311 "Didn't receive all worker reports before timeout. " 257 "Didn't receive all worker reports before timeout. "
312 "Missing report from worker %d." % missing) 258 "Missing report from worker %d." % missing)
313 259
314 logger.debug("Exiting result handler thread...") 260 logger.debug("Exiting result handler thread...")
315 self._result_queue.put(None) 261 self._result_queue.put(None)
316 self._result_handler.join() 262 self._result_handler.join()
317 self._closed = True 263 self._closed = True
318 264
319 return handler.reports 265 return handler.reports
266
267 def _onTaskDone(self):
268 self._jobs_left -= 1
269 if self._jobs_left == 0:
270 self._event.set()
320 271
321 @staticmethod 272 @staticmethod
322 def _handleResults(pool): 273 def _handleResults(pool):
323 while True: 274 while True:
324 try: 275 try:
330 281
331 if res is None: 282 if res is None:
332 logger.debug("Result handler exiting.") 283 logger.debug("Result handler exiting.")
333 break 284 break
334 285
335 task_type, success, wid, data = res 286 task_type, task_data, success, wid, data = res
336 try: 287 try:
337 if success and pool._callback: 288 if success:
338 pool._callback(data) 289 if pool._callback:
339 elif not success: 290 pool._callback(task_data, data)
291 else:
340 if pool._error_callback: 292 if pool._error_callback:
341 pool._error_callback(data) 293 pool._error_callback(task_data, data)
342 else: 294 else:
343 logger.error("Got error data:") 295 logger.error(
296 "Worker %d failed to process a job:" % wid)
344 logger.error(data) 297 logger.error(data)
345 except Exception as ex: 298 except Exception as ex:
346 logger.exception(ex) 299 logger.exception(ex)
347 300
348 if task_type == TASK_JOB: 301 if task_type == TASK_JOB:
349 pool._listener._onTaskDone() 302 pool._onTaskDone()
350 303
351 304
352 class AsyncResult(object): 305 class _ReportHandler:
353 def __init__(self, pool, count):
354 self._pool = pool
355 self._count = count
356 self._event = threading.Event()
357
358 def ready(self):
359 return self._event.is_set()
360
361 def wait(self, timeout=None):
362 return self._event.wait(timeout)
363
364 def _onTaskDone(self):
365 self._count -= 1
366 if self._count == 0:
367 self._pool.setHandler(None)
368 self._pool._listener = None
369 self._event.set()
370
371
372 class _ReportHandler(object):
373 def __init__(self, worker_count): 306 def __init__(self, worker_count):
374 self.reports = [None] * worker_count 307 self.reports = [None] * worker_count
375 self._count = worker_count 308 self._count = worker_count
376 self._received = 0 309 self._received = 0
377 self._event = threading.Event() 310 self._event = threading.Event()
378 311
379 def wait(self, timeout=None): 312 def wait(self, timeout=None):
380 return self._event.wait(timeout) 313 return self._event.wait(timeout)
381 314
382 def _handle(self, res): 315 def _handle(self, job, res):
383 wid, data = res 316 wid, data = res
384 if wid < 0 or wid > self._count: 317 if wid < 0 or wid > self._count:
385 logger.error("Ignoring report from unknown worker %d." % wid) 318 logger.error("Ignoring report from unknown worker %d." % wid)
386 return 319 return
387 320
389 self.reports[wid] = data 322 self.reports[wid] = data
390 323
391 if self._received == self._count: 324 if self._received == self._count:
392 self._event.set() 325 self._event.set()
393 326
394 def _handleError(self, res): 327 def _handleError(self, job, res):
395 wid, data = res 328 logger.error("Worker %d failed to send its report." % res.wid)
396 logger.error("Worker %d failed to send its report." % wid) 329 logger.error(res)
397 logger.exception(data) 330
398 331
399 332 class FastQueue:
400 class FastQueue(object):
401 def __init__(self): 333 def __init__(self):
402 self._reader, self._writer = multiprocessing.Pipe(duplex=False) 334 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
403 self._rlock = multiprocessing.Lock() 335 self._rlock = multiprocessing.Lock()
404 self._wlock = multiprocessing.Lock() 336 self._wlock = multiprocessing.Lock()
405 self._initBuffers() 337 self._initBuffers()
427 self._rbuf.truncate(bufsize * 2) 359 self._rbuf.truncate(bufsize * 2)
428 self._rbuf.seek(0) 360 self._rbuf.seek(0)
429 self._rbuf.write(e.args[0]) 361 self._rbuf.write(e.args[0])
430 362
431 self._rbuf.seek(0) 363 self._rbuf.seek(0)
432 return self._unpickle(self._rbuf, bufsize) 364 return _unpickle(self._rbuf, bufsize)
433 365
434 def put(self, obj): 366 def put(self, obj):
435 self._wbuf.seek(0) 367 self._wbuf.seek(0)
436 self._pickle(obj, self._wbuf) 368 _pickle(obj, self._wbuf)
437 size = self._wbuf.tell() 369 size = self._wbuf.tell()
438 370
439 self._wbuf.seek(0) 371 self._wbuf.seek(0)
440 with self._wlock: 372 with self._wlock:
441 with self._wbuf.getbuffer() as b: 373 with self._wbuf.getbuffer() as b:
442 self._writer.send_bytes(b, 0, size) 374 self._writer.send_bytes(b, 0, size)
443 375
444 def _pickle(self, obj, buf): 376
445 fastpickle.pickle_intob(obj, buf) 377 def _pickle_fast(obj, buf):
446 378 fastpickle.pickle_intob(obj, buf)
447 def _unpickle(self, buf, bufsize): 379
448 return fastpickle.unpickle_fromb(buf, bufsize) 380
449 381 def _unpickle_fast(buf, bufsize):
382 return fastpickle.unpickle_fromb(buf, bufsize)
383
384
385 def _pickle_default(obj, buf):
386 pickle.dump(obj, buf)
387
388
389 def _unpickle_default(buf, bufsize):
390 return pickle.load(buf)
391
392
393 if use_fastpickle:
394 _pickle = _pickle_fast
395 _unpickle = _unpickle_fast
396 else:
397 _pickle = _pickle_default
398 _unpickle = _unpickle_default
399