comparison piecrust/workerpool.py @ 691:9ae9390192da

bake: Use standard pickle and queue for now to fix some small issues. * JSON leads to some problems with integers as keys. * Add some stats to the baking process.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 21 Mar 2016 22:28:57 -0700
parents 61d606fbc313
children d2a87365b85b
comparison
equal deleted inserted replaced
690:f7207f4dab82 691:9ae9390192da
1 import io
1 import os 2 import os
2 import sys 3 import sys
4 import time
3 import zlib 5 import zlib
6 import pickle
4 import logging 7 import logging
5 import itertools 8 import itertools
6 import threading 9 import threading
7 import multiprocessing 10 import multiprocessing
8 from piecrust.fastpickle import pickle, unpickle
9 11
10 12
11 logger = logging.getLogger(__name__) 13 logger = logging.getLogger(__name__)
12 14
13 15
16 raise NotImplementedError() 18 raise NotImplementedError()
17 19
18 def process(self, job): 20 def process(self, job):
19 raise NotImplementedError() 21 raise NotImplementedError()
20 22
21 def getReport(self): 23 def getReport(self, pool_reports):
22 return None 24 return None
23 25
24 26
25 TASK_JOB = 0 27 TASK_JOB = 0
26 TASK_BATCH = 1 28 TASK_BATCH = 1
70 72
71 get = params.inqueue.get 73 get = params.inqueue.get
72 put = params.outqueue.put 74 put = params.outqueue.put
73 75
74 completed = 0 76 completed = 0
77 time_in_get = 0
78 time_in_put = 0
75 while True: 79 while True:
80 get_start_time = time.perf_counter()
76 try: 81 try:
77 task = get() 82 task = get()
78 except (EOFError, OSError): 83 except (EOFError, OSError):
79 logger.debug("Worker %d encountered connection problem." % wid) 84 logger.debug("Worker %d encountered connection problem." % wid)
80 break 85 break
86 time_in_get += (time.perf_counter() - get_start_time)
81 87
82 task_type, task_data = task 88 task_type, task_data = task
83 if task_type == TASK_END: 89 if task_type == TASK_END:
84 logger.debug("Worker %d got end task, exiting." % wid) 90 logger.debug("Worker %d got end task, exiting." % wid)
85 try: 91 wprep = {
86 rep = (task_type, True, wid, (wid, w.getReport())) 92 'WorkerTaskGet': time_in_get,
93 'WorkerResultPut': time_in_put}
94 try:
95 rep = (task_type, True, wid, (wid, w.getReport(wprep)))
87 except Exception as e: 96 except Exception as e:
88 logger.debug("Error getting report: %s" % e) 97 logger.debug("Error getting report: %s" % e)
89 if params.wrap_exception: 98 if params.wrap_exception:
90 e = multiprocessing.ExceptionWithTraceback( 99 e = multiprocessing.ExceptionWithTraceback(
91 e, e.__traceback__) 100 e, e.__traceback__)
102 except Exception as e: 111 except Exception as e:
103 if params.wrap_exception: 112 if params.wrap_exception:
104 e = multiprocessing.ExceptionWithTraceback( 113 e = multiprocessing.ExceptionWithTraceback(
105 e, e.__traceback__) 114 e, e.__traceback__)
106 res = (TASK_JOB, False, wid, e) 115 res = (TASK_JOB, False, wid, e)
116
117 put_start_time = time.perf_counter()
107 put(res) 118 put(res)
119 time_in_put += (time.perf_counter() - put_start_time)
108 120
109 completed += 1 121 completed += 1
110 122
111 logger.debug("Worker %d completed %d tasks." % (wid, completed)) 123 logger.debug("Worker %d completed %d tasks." % (wid, completed))
112 124
127 def __init__(self, worker_class, initargs=(), 139 def __init__(self, worker_class, initargs=(),
128 worker_count=None, batch_size=None, 140 worker_count=None, batch_size=None,
129 wrap_exception=False): 141 wrap_exception=False):
130 worker_count = worker_count or os.cpu_count() or 1 142 worker_count = worker_count or os.cpu_count() or 1
131 143
132 use_fastqueue = True 144 use_fastqueue = False
133 if use_fastqueue: 145 if use_fastqueue:
134 self._task_queue = FastQueue() 146 self._task_queue = FastQueue()
135 self._result_queue = FastQueue() 147 self._result_queue = FastQueue()
136 self._quick_put = self._task_queue.put 148 self._quick_put = self._task_queue.put
137 self._quick_get = self._result_queue.get 149 self._quick_get = self._result_queue.get
322 logger.error("Worker %d failed to send its report." % wid) 334 logger.error("Worker %d failed to send its report." % wid)
323 logger.exception(data) 335 logger.exception(data)
324 336
325 337
326 class FastQueue(object): 338 class FastQueue(object):
327 def __init__(self, compress=False): 339 def __init__(self):
328 self._reader, self._writer = multiprocessing.Pipe(duplex=False) 340 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
329 self._rlock = multiprocessing.Lock() 341 self._rlock = multiprocessing.Lock()
330 self._wlock = multiprocessing.Lock() 342 self._wlock = multiprocessing.Lock()
331 self._compress = compress 343 self._rbuf = io.BytesIO()
344 self._rbuf.truncate(256)
345 self._wbuf = io.BytesIO()
346 self._wbuf.truncate(256)
332 347
333 def __getstate__(self): 348 def __getstate__(self):
334 return (self._reader, self._writer, self._rlock, self._wlock, 349 return (self._reader, self._writer, self._rlock, self._wlock)
335 self._compress)
336 350
337 def __setstate__(self, state): 351 def __setstate__(self, state):
338 (self._reader, self._writer, self._rlock, self._wlock, 352 (self._reader, self._writer, self._rlock, self._wlock) = state
339 self._compress) = state
340 353
341 def get(self): 354 def get(self):
342 with self._rlock: 355 with self._rlock:
343 raw = self._reader.recv_bytes() 356 try:
344 if self._compress: 357 with self._rbuf.getbuffer() as b:
345 data = zlib.decompress(raw) 358 self._reader.recv_bytes_into(b)
346 else: 359 except multiprocessing.BufferTooShort as e:
347 data = raw 360 self._rbuf.truncate(len(e.args[0]) * 2)
348 obj = unpickle(data) 361 self._rbuf.seek(0)
349 return obj 362 self._rbuf.write(e.args[0])
363
364 self._rbuf.seek(0)
365 return self._unpickle(self._rbuf)
350 366
351 def put(self, obj): 367 def put(self, obj):
352 data = pickle(obj) 368 self._wbuf.seek(0)
353 if self._compress: 369 self._pickle(obj, self._wbuf)
354 raw = zlib.compress(data) 370 size = self._wbuf.tell()
355 else: 371
356 raw = data 372 self._wbuf.seek(0)
357 with self._wlock: 373 with self._wlock:
358 self._writer.send_bytes(raw) 374 with self._wbuf.getbuffer() as b:
359 375 self._writer.send_bytes(b, 0, size)
376
377 def _pickle(self, obj, buf):
378 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
379
380 def _unpickle(self, buf):
381 return pickle.load(buf)
382