Mercurial > piecrust2
comparison piecrust/workerpool.py @ 691:9ae9390192da
bake: Use standard pickle and queue for now to fix some small issues.
* JSON leads to some problems with integers as keys.
* Add some stats to the baking process.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 21 Mar 2016 22:28:57 -0700 |
parents | 61d606fbc313 |
children | d2a87365b85b |
comparison
equal
deleted
inserted
replaced
690:f7207f4dab82 | 691:9ae9390192da |
---|---|
1 import io | |
1 import os | 2 import os |
2 import sys | 3 import sys |
4 import time | |
3 import zlib | 5 import zlib |
6 import pickle | |
4 import logging | 7 import logging |
5 import itertools | 8 import itertools |
6 import threading | 9 import threading |
7 import multiprocessing | 10 import multiprocessing |
8 from piecrust.fastpickle import pickle, unpickle | |
9 | 11 |
10 | 12 |
11 logger = logging.getLogger(__name__) | 13 logger = logging.getLogger(__name__) |
12 | 14 |
13 | 15 |
16 raise NotImplementedError() | 18 raise NotImplementedError() |
17 | 19 |
18 def process(self, job): | 20 def process(self, job): |
19 raise NotImplementedError() | 21 raise NotImplementedError() |
20 | 22 |
21 def getReport(self): | 23 def getReport(self, pool_reports): |
22 return None | 24 return None |
23 | 25 |
24 | 26 |
25 TASK_JOB = 0 | 27 TASK_JOB = 0 |
26 TASK_BATCH = 1 | 28 TASK_BATCH = 1 |
70 | 72 |
71 get = params.inqueue.get | 73 get = params.inqueue.get |
72 put = params.outqueue.put | 74 put = params.outqueue.put |
73 | 75 |
74 completed = 0 | 76 completed = 0 |
77 time_in_get = 0 | |
78 time_in_put = 0 | |
75 while True: | 79 while True: |
80 get_start_time = time.perf_counter() | |
76 try: | 81 try: |
77 task = get() | 82 task = get() |
78 except (EOFError, OSError): | 83 except (EOFError, OSError): |
79 logger.debug("Worker %d encountered connection problem." % wid) | 84 logger.debug("Worker %d encountered connection problem." % wid) |
80 break | 85 break |
86 time_in_get += (time.perf_counter() - get_start_time) | |
81 | 87 |
82 task_type, task_data = task | 88 task_type, task_data = task |
83 if task_type == TASK_END: | 89 if task_type == TASK_END: |
84 logger.debug("Worker %d got end task, exiting." % wid) | 90 logger.debug("Worker %d got end task, exiting." % wid) |
85 try: | 91 wprep = { |
86 rep = (task_type, True, wid, (wid, w.getReport())) | 92 'WorkerTaskGet': time_in_get, |
93 'WorkerResultPut': time_in_put} | |
94 try: | |
95 rep = (task_type, True, wid, (wid, w.getReport(wprep))) | |
87 except Exception as e: | 96 except Exception as e: |
88 logger.debug("Error getting report: %s" % e) | 97 logger.debug("Error getting report: %s" % e) |
89 if params.wrap_exception: | 98 if params.wrap_exception: |
90 e = multiprocessing.ExceptionWithTraceback( | 99 e = multiprocessing.ExceptionWithTraceback( |
91 e, e.__traceback__) | 100 e, e.__traceback__) |
102 except Exception as e: | 111 except Exception as e: |
103 if params.wrap_exception: | 112 if params.wrap_exception: |
104 e = multiprocessing.ExceptionWithTraceback( | 113 e = multiprocessing.ExceptionWithTraceback( |
105 e, e.__traceback__) | 114 e, e.__traceback__) |
106 res = (TASK_JOB, False, wid, e) | 115 res = (TASK_JOB, False, wid, e) |
116 | |
117 put_start_time = time.perf_counter() | |
107 put(res) | 118 put(res) |
119 time_in_put += (time.perf_counter() - put_start_time) | |
108 | 120 |
109 completed += 1 | 121 completed += 1 |
110 | 122 |
111 logger.debug("Worker %d completed %d tasks." % (wid, completed)) | 123 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
112 | 124 |
127 def __init__(self, worker_class, initargs=(), | 139 def __init__(self, worker_class, initargs=(), |
128 worker_count=None, batch_size=None, | 140 worker_count=None, batch_size=None, |
129 wrap_exception=False): | 141 wrap_exception=False): |
130 worker_count = worker_count or os.cpu_count() or 1 | 142 worker_count = worker_count or os.cpu_count() or 1 |
131 | 143 |
132 use_fastqueue = True | 144 use_fastqueue = False |
133 if use_fastqueue: | 145 if use_fastqueue: |
134 self._task_queue = FastQueue() | 146 self._task_queue = FastQueue() |
135 self._result_queue = FastQueue() | 147 self._result_queue = FastQueue() |
136 self._quick_put = self._task_queue.put | 148 self._quick_put = self._task_queue.put |
137 self._quick_get = self._result_queue.get | 149 self._quick_get = self._result_queue.get |
322 logger.error("Worker %d failed to send its report." % wid) | 334 logger.error("Worker %d failed to send its report." % wid) |
323 logger.exception(data) | 335 logger.exception(data) |
324 | 336 |
325 | 337 |
326 class FastQueue(object): | 338 class FastQueue(object): |
327 def __init__(self, compress=False): | 339 def __init__(self): |
328 self._reader, self._writer = multiprocessing.Pipe(duplex=False) | 340 self._reader, self._writer = multiprocessing.Pipe(duplex=False) |
329 self._rlock = multiprocessing.Lock() | 341 self._rlock = multiprocessing.Lock() |
330 self._wlock = multiprocessing.Lock() | 342 self._wlock = multiprocessing.Lock() |
331 self._compress = compress | 343 self._rbuf = io.BytesIO() |
344 self._rbuf.truncate(256) | |
345 self._wbuf = io.BytesIO() | |
346 self._wbuf.truncate(256) | |
332 | 347 |
333 def __getstate__(self): | 348 def __getstate__(self): |
334 return (self._reader, self._writer, self._rlock, self._wlock, | 349 return (self._reader, self._writer, self._rlock, self._wlock) |
335 self._compress) | |
336 | 350 |
337 def __setstate__(self, state): | 351 def __setstate__(self, state): |
338 (self._reader, self._writer, self._rlock, self._wlock, | 352 (self._reader, self._writer, self._rlock, self._wlock) = state |
339 self._compress) = state | |
340 | 353 |
341 def get(self): | 354 def get(self): |
342 with self._rlock: | 355 with self._rlock: |
343 raw = self._reader.recv_bytes() | 356 try: |
344 if self._compress: | 357 with self._rbuf.getbuffer() as b: |
345 data = zlib.decompress(raw) | 358 self._reader.recv_bytes_into(b) |
346 else: | 359 except multiprocessing.BufferTooShort as e: |
347 data = raw | 360 self._rbuf.truncate(len(e.args[0]) * 2) |
348 obj = unpickle(data) | 361 self._rbuf.seek(0) |
349 return obj | 362 self._rbuf.write(e.args[0]) |
363 | |
364 self._rbuf.seek(0) | |
365 return self._unpickle(self._rbuf) | |
350 | 366 |
351 def put(self, obj): | 367 def put(self, obj): |
352 data = pickle(obj) | 368 self._wbuf.seek(0) |
353 if self._compress: | 369 self._pickle(obj, self._wbuf) |
354 raw = zlib.compress(data) | 370 size = self._wbuf.tell() |
355 else: | 371 |
356 raw = data | 372 self._wbuf.seek(0) |
357 with self._wlock: | 373 with self._wlock: |
358 self._writer.send_bytes(raw) | 374 with self._wbuf.getbuffer() as b: |
359 | 375 self._writer.send_bytes(b, 0, size) |
376 | |
377 def _pickle(self, obj, buf): | |
378 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) | |
379 | |
380 def _unpickle(self, buf): | |
381 return pickle.load(buf) | |
382 |