annotate piecrust/workerpool.py @ 462:04abc97dd3b6

bake: Add CLI argument to specify job batch size.
author Ludovic Chabant <ludovic@chabant.com>
date Sat, 11 Jul 2015 18:49:50 -0700
parents b015e38d4ee1
children 22a230d99621
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
1 import os
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
2 import sys
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
3 import zlib
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
4 import logging
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
5 import itertools
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
6 import threading
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
7 import multiprocessing
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
8 from piecrust.fastpickle import pickle, unpickle
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
9
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
10
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
11 logger = logging.getLogger(__name__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
12
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
13
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
14 class IWorker(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
15 def initialize(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
16 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
17
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
18 def process(self, job):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
19 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
20
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
21 def getReport(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
22 return None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
23
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
24
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
25 TASK_JOB = 0
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
26 TASK_BATCH = 1
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
27 TASK_END = 2
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
28
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
29
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
30 def worker_func(params):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
31 if params.is_profiling:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
32 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
33 import cProfile as profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
34 except ImportError:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
35 import profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
36
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
37 params.is_profiling = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
38 name = params.worker_class.__name__
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
39 profile.runctx('_real_worker_func(params)',
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
40 globals(), locals(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
41 filename='%s-%d.prof' % (name, params.wid))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
42 else:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
43 _real_worker_func(params)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
44
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
45
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
46 def _real_worker_func(params):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
47 wid = params.wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
48 logger.debug("Worker %d initializing..." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
49
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
50 params.inqueue._writer.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
51 params.outqueue._reader.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
52
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
53 w = params.worker_class(*params.initargs)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
54 w.wid = wid
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
55 try:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
56 w.initialize()
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
57 except Exception as ex:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
58 logger.error("Working failed to initialize:")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
59 logger.exception(ex)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
60 params.outqueue.put(None)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
61 return
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
62
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
63 get = params.inqueue.get
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
64 put = params.outqueue.put
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
65
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
66 completed = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
67 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
68 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
69 task = get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
70 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
71 logger.debug("Worker %d encountered connection problem." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
72 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
73
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
74 task_type, task_data = task
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
75 if task_type == TASK_END:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
76 logger.debug("Worker %d got end task, exiting." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
77 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
78 rep = (task_type, True, wid, (wid, w.getReport()))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
79 except Exception as e:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
80 if params.wrap_exception:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
81 e = multiprocessing.ExceptionWithTraceback(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
82 e, e.__traceback__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
83 rep = (task_type, False, wid, (wid, e))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
84 put(rep)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
85 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
86
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
87 if task_type == TASK_JOB:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
88 task_data = (task_data,)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
89
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
90 for t in task_data:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
91 try:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
92 res = (TASK_JOB, True, wid, w.process(t))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
93 except Exception as e:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
94 if params.wrap_exception:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
95 e = multiprocessing.ExceptionWithTraceback(
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
96 e, e.__traceback__)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
97 res = (TASK_JOB, False, wid, e)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
98 put(res)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
99
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
100 completed += 1
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
101
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
102 logger.debug("Worker %d completed %d tasks." % (wid, completed))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
103
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
104
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
105 class _WorkerParams(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
106 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
107 wrap_exception=False, is_profiling=False):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
108 self.wid = wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
109 self.inqueue = inqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
110 self.outqueue = outqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
111 self.worker_class = worker_class
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
112 self.initargs = initargs
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
113 self.wrap_exception = wrap_exception
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
114 self.is_profiling = is_profiling
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
115
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
116
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
117 class WorkerPool(object):
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
118 def __init__(self, worker_class, initargs=(),
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
119 worker_count=None, batch_size=None,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
120 wrap_exception=False):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
121 worker_count = worker_count or os.cpu_count() or 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
122
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
123 use_fastqueue = True
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
124 if use_fastqueue:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
125 self._task_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
126 self._result_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
127 self._quick_put = self._task_queue.put
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
128 self._quick_get = self._result_queue.get
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
129 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
130 self._task_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
131 self._result_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
132 self._quick_put = self._task_queue._writer.send
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
133 self._quick_get = self._result_queue._reader.recv
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
134
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
135 self._batch_size = batch_size
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
136 self._callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
137 self._error_callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
138 self._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
139
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
140 main_module = sys.modules['__main__']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
141 is_profiling = os.path.basename(main_module.__file__) in [
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
142 'profile.py', 'cProfile.py']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
143
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
144 self._pool = []
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
145 for i in range(worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
146 worker_params = _WorkerParams(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
147 i, self._task_queue, self._result_queue,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
148 worker_class, initargs,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
149 wrap_exception=wrap_exception,
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
150 is_profiling=is_profiling)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
151 w = multiprocessing.Process(target=worker_func,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
152 args=(worker_params,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
153 w.name = w.name.replace('Process', 'PoolWorker')
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
154 w.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
155 w.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
156 self._pool.append(w)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
157
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
158 self._result_handler = threading.Thread(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
159 target=WorkerPool._handleResults,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
160 args=(self,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
161 self._result_handler.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
162 self._result_handler.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
163
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
164 self._closed = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
165
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
166 def setHandler(self, callback=None, error_callback=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
167 self._callback = callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
168 self._error_callback = error_callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
169
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
170 def queueJobs(self, jobs, handler=None, chunk_size=None):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
171 if self._closed:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
172 raise Exception("This worker pool has been closed.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
173 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
174 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
175
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
176 if any([not p.is_alive() for p in self._pool]):
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
177 raise Exception("Some workers have prematurely exited.")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
178
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
179 if handler is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
180 self.setHandler(handler)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
181
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
182 if not hasattr(jobs, '__len__'):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
183 jobs = list(jobs)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
184 job_count = len(jobs)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
185
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
186 res = AsyncResult(self, job_count)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
187 if res._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
188 res._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
189 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
190
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
191 self._listener = res
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
192
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
193 if chunk_size is None:
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
194 chunk_size = self._batch_size
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
195 if chunk_size is None:
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
196 chunk_size = max(1, job_count // 50)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
197 logger.debug("Using chunk size of %d" % chunk_size)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
198
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
199 if chunk_size is None or chunk_size == 1:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
200 for job in jobs:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
201 self._quick_put((TASK_JOB, job))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
202 else:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
203 it = iter(jobs)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
204 while True:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
205 batch = tuple([i for i in itertools.islice(it, chunk_size)])
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
206 if not batch:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
207 break
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
208 self._quick_put((TASK_BATCH, batch))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
209
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
210 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
211
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
212 def close(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
213 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
214 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
215
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
216 logger.debug("Closing worker pool...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
217 handler = _ReportHandler(len(self._pool))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
218 self._callback = handler._handle
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
219 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
220 self._quick_put((TASK_END, None))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
221 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
222 w.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
223
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
224 logger.debug("Waiting for reports...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
225 if not handler.wait(2):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
226 missing = handler.reports.index(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
227 logger.warning(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
228 "Didn't receive all worker reports before timeout. "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
229 "Missing report from worker %d." % missing)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
230
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
231 logger.debug("Exiting result handler thread...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
232 self._result_queue.put(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
233 self._result_handler.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
234 self._closed = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
235
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
236 return handler.reports
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
237
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
238 @staticmethod
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
239 def _handleResults(pool):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
240 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
241 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
242 res = pool._quick_get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
243 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
244 logger.debug("Result handler thread encountered connection "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
245 "problem, exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
246 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
247
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
248 if res is None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
249 logger.debug("Result handler exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
250 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
251
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
252 task_type, success, wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
253 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
254 if success and pool._callback:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
255 pool._callback(data)
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
256 elif not success:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
257 if pool._error_callback:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
258 pool._error_callback(data)
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
259 else:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
260 logger.error(data)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
261 except Exception as ex:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
262 logger.exception(ex)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
263
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
264 if task_type == TASK_JOB:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
265 pool._listener._onTaskDone()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
266
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
267
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
268 class AsyncResult(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
269 def __init__(self, pool, count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
270 self._pool = pool
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
271 self._count = count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
272 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
273
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
274 def ready(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
275 return self._event.is_set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
276
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
277 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
278 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
279
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
280 def _onTaskDone(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
281 self._count -= 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
282 if self._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
283 self._pool.setHandler(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
284 self._pool._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
285 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
286
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
287
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
288 class _ReportHandler(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
289 def __init__(self, worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
290 self.reports = [None] * worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
291 self._count = worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
292 self._received = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
293 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
294
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
295 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
296 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
297
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
298 def _handle(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
299 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
300 if wid < 0 or wid > self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
301 logger.error("Ignoring report from unknown worker %d." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
302 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
303
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
304 self._received += 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
305 self.reports[wid] = data
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
306
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
307 if self._received == self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
308 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
309
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
310 def _handleError(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
311 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
312 logger.error("Worker %d failed to send its report." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
313 logger.exception(data)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
314
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
315
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
316 class FastQueue(object):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
317 def __init__(self, compress=False):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
318 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
319 self._rlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
320 self._wlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
321 self._compress = compress
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
322
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
323 def __getstate__(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
324 return (self._reader, self._writer, self._rlock, self._wlock,
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
325 self._compress)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
326
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
327 def __setstate__(self, state):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
328 (self._reader, self._writer, self._rlock, self._wlock,
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
329 self._compress) = state
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
330
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
331 def get(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
332 with self._rlock:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
333 raw = self._reader.recv_bytes()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
334 if self._compress:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
335 data = zlib.decompress(raw)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
336 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
337 data = raw
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
338 obj = unpickle(data)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
339 return obj
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
340
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
341 def put(self, obj):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
342 data = pickle(obj)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
343 if self._compress:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
344 raw = zlib.compress(data)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
345 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
346 raw = data
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
347 with self._wlock:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
348 self._writer.send_bytes(raw)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
349