annotate piecrust/workerpool.py @ 661:2f780b191541

internal: Fix a bug with registering taxonomy terms that are not strings. Some objects, like the blog data provider's taxnonomy entries, can render as strings, but are objects themselves. When registering them as "used terms", we need to use their string representation.
author Ludovic Chabant <ludovic@chabant.com>
date Tue, 01 Mar 2016 22:26:09 -0800
parents 8073ae8cb164
children 61d606fbc313
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
1 import os
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
2 import sys
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
3 import zlib
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
4 import logging
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
5 import itertools
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
6 import threading
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
7 import multiprocessing
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
8 from piecrust.fastpickle import pickle, unpickle
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
9
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
10
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
11 logger = logging.getLogger(__name__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
12
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
13
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
14 class IWorker(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
15 def initialize(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
16 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
17
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
18 def process(self, job):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
19 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
20
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
21 def getReport(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
22 return None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
23
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
24
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
25 TASK_JOB = 0
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
26 TASK_BATCH = 1
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
27 TASK_END = 2
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
28
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
29
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
30 def worker_func(params):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
31 if params.is_profiling:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
32 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
33 import cProfile as profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
34 except ImportError:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
35 import profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
36
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
37 params.is_profiling = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
38 name = params.worker_class.__name__
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
39 profile.runctx('_real_worker_func(params)',
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
40 globals(), locals(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
41 filename='%s-%d.prof' % (name, params.wid))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
42 else:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
43 _real_worker_func(params)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
44
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
45
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
46 def _real_worker_func(params):
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
47 # In a context where `multiprocessing` is using the `spawn` forking model,
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
48 # the new process doesn't inherit anything, so we lost all our logging
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
49 # configuration here. Let's set it up again.
566
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
50 if (hasattr(multiprocessing, 'get_start_method') and
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
51 multiprocessing.get_start_method() == 'spawn'):
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
52 from piecrust.main import _pre_parse_chef_args
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
53 _pre_parse_chef_args(sys.argv[1:])
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
54
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
55 wid = params.wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
56 logger.debug("Worker %d initializing..." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
57
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
58 params.inqueue._writer.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
59 params.outqueue._reader.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
60
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
61 w = params.worker_class(*params.initargs)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
62 w.wid = wid
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
63 try:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
64 w.initialize()
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
65 except Exception as ex:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
66 logger.error("Working failed to initialize:")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
67 logger.exception(ex)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
68 params.outqueue.put(None)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
69 return
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
70
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
71 get = params.inqueue.get
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
72 put = params.outqueue.put
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
73
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
74 completed = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
75 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
76 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
77 task = get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
78 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
79 logger.debug("Worker %d encountered connection problem." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
80 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
81
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
82 task_type, task_data = task
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
83 if task_type == TASK_END:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
84 logger.debug("Worker %d got end task, exiting." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
85 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
86 rep = (task_type, True, wid, (wid, w.getReport()))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
87 except Exception as e:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
88 if params.wrap_exception:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
89 e = multiprocessing.ExceptionWithTraceback(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
90 e, e.__traceback__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
91 rep = (task_type, False, wid, (wid, e))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
92 put(rep)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
93 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
94
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
95 if task_type == TASK_JOB:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
96 task_data = (task_data,)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
97
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
98 for t in task_data:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
99 try:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
100 res = (TASK_JOB, True, wid, w.process(t))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
101 except Exception as e:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
102 if params.wrap_exception:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
103 e = multiprocessing.ExceptionWithTraceback(
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
104 e, e.__traceback__)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
105 res = (TASK_JOB, False, wid, e)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
106 put(res)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
107
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
108 completed += 1
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
109
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
110 logger.debug("Worker %d completed %d tasks." % (wid, completed))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
111
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
112
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
113 class _WorkerParams(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
114 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
115 wrap_exception=False, is_profiling=False):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
116 self.wid = wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
117 self.inqueue = inqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
118 self.outqueue = outqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
119 self.worker_class = worker_class
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
120 self.initargs = initargs
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
121 self.wrap_exception = wrap_exception
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
122 self.is_profiling = is_profiling
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
123
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
124
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
125 class WorkerPool(object):
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
126 def __init__(self, worker_class, initargs=(),
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
127 worker_count=None, batch_size=None,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
128 wrap_exception=False):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
129 worker_count = worker_count or os.cpu_count() or 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
130
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
131 use_fastqueue = True
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
132 if use_fastqueue:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
133 self._task_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
134 self._result_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
135 self._quick_put = self._task_queue.put
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
136 self._quick_get = self._result_queue.get
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
137 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
138 self._task_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
139 self._result_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
140 self._quick_put = self._task_queue._writer.send
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
141 self._quick_get = self._result_queue._reader.recv
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
142
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
143 self._batch_size = batch_size
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
144 self._callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
145 self._error_callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
146 self._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
147
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
148 main_module = sys.modules['__main__']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
149 is_profiling = os.path.basename(main_module.__file__) in [
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
150 'profile.py', 'cProfile.py']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
151
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
152 self._pool = []
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
153 for i in range(worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
154 worker_params = _WorkerParams(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
155 i, self._task_queue, self._result_queue,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
156 worker_class, initargs,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
157 wrap_exception=wrap_exception,
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
158 is_profiling=is_profiling)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
159 w = multiprocessing.Process(target=worker_func,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
160 args=(worker_params,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
161 w.name = w.name.replace('Process', 'PoolWorker')
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
162 w.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
163 w.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
164 self._pool.append(w)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
165
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
166 self._result_handler = threading.Thread(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
167 target=WorkerPool._handleResults,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
168 args=(self,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
169 self._result_handler.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
170 self._result_handler.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
171
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
172 self._closed = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
173
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
174 def setHandler(self, callback=None, error_callback=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
175 self._callback = callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
176 self._error_callback = error_callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
177
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
178 def queueJobs(self, jobs, handler=None, chunk_size=None):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
179 if self._closed:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
180 raise Exception("This worker pool has been closed.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
181 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
182 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
183
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
184 if any([not p.is_alive() for p in self._pool]):
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
185 raise Exception("Some workers have prematurely exited.")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
186
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
187 if handler is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
188 self.setHandler(handler)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
189
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
190 if not hasattr(jobs, '__len__'):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
191 jobs = list(jobs)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
192 job_count = len(jobs)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
193
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
194 res = AsyncResult(self, job_count)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
195 if res._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
196 res._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
197 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
198
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
199 self._listener = res
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
200
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
201 if chunk_size is None:
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
202 chunk_size = self._batch_size
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
203 if chunk_size is None:
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
204 chunk_size = max(1, job_count // 50)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
205 logger.debug("Using chunk size of %d" % chunk_size)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
206
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
207 if chunk_size is None or chunk_size == 1:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
208 for job in jobs:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
209 self._quick_put((TASK_JOB, job))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
210 else:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
211 it = iter(jobs)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
212 while True:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
213 batch = tuple([i for i in itertools.islice(it, chunk_size)])
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
214 if not batch:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
215 break
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
216 self._quick_put((TASK_BATCH, batch))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
217
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
218 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
219
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
220 def close(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
221 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
222 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
223
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
224 logger.debug("Closing worker pool...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
225 handler = _ReportHandler(len(self._pool))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
226 self._callback = handler._handle
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
227 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
228 self._quick_put((TASK_END, None))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
229 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
230 w.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
231
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
232 logger.debug("Waiting for reports...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
233 if not handler.wait(2):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
234 missing = handler.reports.index(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
235 logger.warning(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
236 "Didn't receive all worker reports before timeout. "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
237 "Missing report from worker %d." % missing)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
238
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
239 logger.debug("Exiting result handler thread...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
240 self._result_queue.put(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
241 self._result_handler.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
242 self._closed = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
243
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
244 return handler.reports
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
245
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
246 @staticmethod
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
247 def _handleResults(pool):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
248 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
249 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
250 res = pool._quick_get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
251 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
252 logger.debug("Result handler thread encountered connection "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
253 "problem, exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
254 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
255
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
256 if res is None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
257 logger.debug("Result handler exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
258 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
259
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
260 task_type, success, wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
261 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
262 if success and pool._callback:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
263 pool._callback(data)
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
264 elif not success:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
265 if pool._error_callback:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
266 pool._error_callback(data)
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
267 else:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
268 logger.error(data)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
269 except Exception as ex:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
270 logger.exception(ex)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
271
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
272 if task_type == TASK_JOB:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
273 pool._listener._onTaskDone()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
274
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
275
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
276 class AsyncResult(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
277 def __init__(self, pool, count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
278 self._pool = pool
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
279 self._count = count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
280 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
281
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
282 def ready(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
283 return self._event.is_set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
284
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
285 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
286 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
287
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
288 def _onTaskDone(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
289 self._count -= 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
290 if self._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
291 self._pool.setHandler(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
292 self._pool._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
293 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
294
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
295
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
296 class _ReportHandler(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
297 def __init__(self, worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
298 self.reports = [None] * worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
299 self._count = worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
300 self._received = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
301 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
302
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
303 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
304 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
305
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
306 def _handle(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
307 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
308 if wid < 0 or wid > self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
309 logger.error("Ignoring report from unknown worker %d." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
310 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
311
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
312 self._received += 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
313 self.reports[wid] = data
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
314
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
315 if self._received == self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
316 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
317
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
318 def _handleError(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
319 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
320 logger.error("Worker %d failed to send its report." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
321 logger.exception(data)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
322
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
323
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
324 class FastQueue(object):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
325 def __init__(self, compress=False):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
326 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
327 self._rlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
328 self._wlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
329 self._compress = compress
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
330
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
331 def __getstate__(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
332 return (self._reader, self._writer, self._rlock, self._wlock,
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
333 self._compress)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
334
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
335 def __setstate__(self, state):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
336 (self._reader, self._writer, self._rlock, self._wlock,
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
337 self._compress) = state
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
338
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
339 def get(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
340 with self._rlock:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
341 raw = self._reader.recv_bytes()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
342 if self._compress:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
343 data = zlib.decompress(raw)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
344 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
345 data = raw
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
346 obj = unpickle(data)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
347 return obj
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
348
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
349 def put(self, obj):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
350 data = pickle(obj)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
351 if self._compress:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
352 raw = zlib.compress(data)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
353 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
354 raw = data
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
355 with self._wlock:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
356 self._writer.send_bytes(raw)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
357