Mercurial > piecrust2
annotate piecrust/workerpool.py @ 584:9ccc933ac2c7
internal: Refactor the app configuration class.
* Moved to its own module.
* More extensible validation.
* Allow easier setup of defaults so `showconfig` shows more useful stuff.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 01 Jan 2016 23:18:26 -0800 |
parents | 8073ae8cb164 |
children | 61d606fbc313 |
rev | line source |
---|---|
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
1 import os |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
2 import sys |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
3 import zlib |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
4 import logging |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
5 import itertools |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
6 import threading |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
7 import multiprocessing |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
8 from piecrust.fastpickle import pickle, unpickle |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
9 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
10 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
11 logger = logging.getLogger(__name__) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
12 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
13 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
14 class IWorker(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
15 def initialize(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
16 raise NotImplementedError() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
17 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
18 def process(self, job): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
19 raise NotImplementedError() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
20 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
21 def getReport(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
22 return None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
23 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
24 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
25 TASK_JOB = 0 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
26 TASK_BATCH = 1 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
27 TASK_END = 2 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
28 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
29 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
30 def worker_func(params): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
31 if params.is_profiling: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
32 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
33 import cProfile as profile |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
34 except ImportError: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
35 import profile |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
36 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
37 params.is_profiling = False |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
38 name = params.worker_class.__name__ |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
39 profile.runctx('_real_worker_func(params)', |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
40 globals(), locals(), |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
41 filename='%s-%d.prof' % (name, params.wid)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
42 else: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
43 _real_worker_func(params) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
44 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
45 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
46 def _real_worker_func(params): |
500
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
47 # In a context where `multiprocessing` is using the `spawn` forking model, |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
48 # the new process doesn't inherit anything, so we lost all our logging |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
49 # configuration here. Let's set it up again. |
566
8073ae8cb164
bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents:
500
diff
changeset
|
50 if (hasattr(multiprocessing, 'get_start_method') and |
8073ae8cb164
bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents:
500
diff
changeset
|
51 multiprocessing.get_start_method() == 'spawn'): |
8073ae8cb164
bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents:
500
diff
changeset
|
52 from piecrust.main import _pre_parse_chef_args |
8073ae8cb164
bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents:
500
diff
changeset
|
53 _pre_parse_chef_args(sys.argv[1:]) |
500
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
54 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
55 wid = params.wid |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
56 logger.debug("Worker %d initializing..." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
57 |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
58 params.inqueue._writer.close() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
59 params.outqueue._reader.close() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
60 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
61 w = params.worker_class(*params.initargs) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
62 w.wid = wid |
453
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
63 try: |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
64 w.initialize() |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
65 except Exception as ex: |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
66 logger.error("Working failed to initialize:") |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
67 logger.exception(ex) |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
68 params.outqueue.put(None) |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
69 return |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
70 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
71 get = params.inqueue.get |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
72 put = params.outqueue.put |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
73 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
74 completed = 0 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
75 while True: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
76 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
77 task = get() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
78 except (EOFError, OSError): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
79 logger.debug("Worker %d encountered connection problem." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
80 break |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
81 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
82 task_type, task_data = task |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
83 if task_type == TASK_END: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
84 logger.debug("Worker %d got end task, exiting." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
85 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
86 rep = (task_type, True, wid, (wid, w.getReport())) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
87 except Exception as e: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
88 if params.wrap_exception: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
89 e = multiprocessing.ExceptionWithTraceback( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
90 e, e.__traceback__) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
91 rep = (task_type, False, wid, (wid, e)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
92 put(rep) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
93 break |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
94 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
95 if task_type == TASK_JOB: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
96 task_data = (task_data,) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
97 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
98 for t in task_data: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
99 try: |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
100 res = (TASK_JOB, True, wid, w.process(t)) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
101 except Exception as e: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
102 if params.wrap_exception: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
103 e = multiprocessing.ExceptionWithTraceback( |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
104 e, e.__traceback__) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
105 res = (TASK_JOB, False, wid, e) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
106 put(res) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
107 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
108 completed += 1 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
109 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
110 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
111 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
112 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
113 class _WorkerParams(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
114 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
115 wrap_exception=False, is_profiling=False): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
116 self.wid = wid |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
117 self.inqueue = inqueue |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
118 self.outqueue = outqueue |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
119 self.worker_class = worker_class |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
120 self.initargs = initargs |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
121 self.wrap_exception = wrap_exception |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
122 self.is_profiling = is_profiling |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
123 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
124 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
125 class WorkerPool(object): |
462
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
126 def __init__(self, worker_class, initargs=(), |
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
127 worker_count=None, batch_size=None, |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
128 wrap_exception=False): |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
129 worker_count = worker_count or os.cpu_count() or 1 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
130 |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
131 use_fastqueue = True |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
132 if use_fastqueue: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
133 self._task_queue = FastQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
134 self._result_queue = FastQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
135 self._quick_put = self._task_queue.put |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
136 self._quick_get = self._result_queue.get |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
137 else: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
138 self._task_queue = multiprocessing.SimpleQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
139 self._result_queue = multiprocessing.SimpleQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
140 self._quick_put = self._task_queue._writer.send |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
141 self._quick_get = self._result_queue._reader.recv |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
142 |
462
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
143 self._batch_size = batch_size |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
144 self._callback = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
145 self._error_callback = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
146 self._listener = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
147 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
148 main_module = sys.modules['__main__'] |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
149 is_profiling = os.path.basename(main_module.__file__) in [ |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
150 'profile.py', 'cProfile.py'] |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
151 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
152 self._pool = [] |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
153 for i in range(worker_count): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
154 worker_params = _WorkerParams( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
155 i, self._task_queue, self._result_queue, |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
156 worker_class, initargs, |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
157 wrap_exception=wrap_exception, |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
158 is_profiling=is_profiling) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
159 w = multiprocessing.Process(target=worker_func, |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
160 args=(worker_params,)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
161 w.name = w.name.replace('Process', 'PoolWorker') |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
162 w.daemon = True |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
163 w.start() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
164 self._pool.append(w) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
165 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
166 self._result_handler = threading.Thread( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
167 target=WorkerPool._handleResults, |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
168 args=(self,)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
169 self._result_handler.daemon = True |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
170 self._result_handler.start() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
171 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
172 self._closed = False |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
173 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
174 def setHandler(self, callback=None, error_callback=None): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
175 self._callback = callback |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
176 self._error_callback = error_callback |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
177 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
178 def queueJobs(self, jobs, handler=None, chunk_size=None): |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
179 if self._closed: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
180 raise Exception("This worker pool has been closed.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
181 if self._listener is not None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
182 raise Exception("A previous job queue has not finished yet.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
183 |
453
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
184 if any([not p.is_alive() for p in self._pool]): |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
185 raise Exception("Some workers have prematurely exited.") |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
186 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
187 if handler is not None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
188 self.setHandler(handler) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
189 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
190 if not hasattr(jobs, '__len__'): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
191 jobs = list(jobs) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
192 job_count = len(jobs) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
193 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
194 res = AsyncResult(self, job_count) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
195 if res._count == 0: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
196 res._event.set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
197 return res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
198 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
199 self._listener = res |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
200 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
201 if chunk_size is None: |
462
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
202 chunk_size = self._batch_size |
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
203 if chunk_size is None: |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
204 chunk_size = max(1, job_count // 50) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
205 logger.debug("Using chunk size of %d" % chunk_size) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
206 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
207 if chunk_size is None or chunk_size == 1: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
208 for job in jobs: |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
209 self._quick_put((TASK_JOB, job)) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
210 else: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
211 it = iter(jobs) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
212 while True: |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
213 batch = tuple([i for i in itertools.islice(it, chunk_size)]) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
214 if not batch: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
215 break |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
216 self._quick_put((TASK_BATCH, batch)) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
217 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
218 return res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
219 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
220 def close(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
221 if self._listener is not None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
222 raise Exception("A previous job queue has not finished yet.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
223 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
224 logger.debug("Closing worker pool...") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
225 handler = _ReportHandler(len(self._pool)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
226 self._callback = handler._handle |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
227 for w in self._pool: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
228 self._quick_put((TASK_END, None)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
229 for w in self._pool: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
230 w.join() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
231 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
232 logger.debug("Waiting for reports...") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
233 if not handler.wait(2): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
234 missing = handler.reports.index(None) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
235 logger.warning( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
236 "Didn't receive all worker reports before timeout. " |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
237 "Missing report from worker %d." % missing) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
238 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
239 logger.debug("Exiting result handler thread...") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
240 self._result_queue.put(None) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
241 self._result_handler.join() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
242 self._closed = True |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
243 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
244 return handler.reports |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
245 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
246 @staticmethod |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
247 def _handleResults(pool): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
248 while True: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
249 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
250 res = pool._quick_get() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
251 except (EOFError, OSError): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
252 logger.debug("Result handler thread encountered connection " |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
253 "problem, exiting.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
254 return |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
255 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
256 if res is None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
257 logger.debug("Result handler exiting.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
258 break |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
259 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
260 task_type, success, wid, data = res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
261 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
262 if success and pool._callback: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
263 pool._callback(data) |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
264 elif not success: |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
265 if pool._error_callback: |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
266 pool._error_callback(data) |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
267 else: |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
268 logger.error(data) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
269 except Exception as ex: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
270 logger.exception(ex) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
271 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
272 if task_type == TASK_JOB: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
273 pool._listener._onTaskDone() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
274 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
275 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
276 class AsyncResult(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
277 def __init__(self, pool, count): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
278 self._pool = pool |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
279 self._count = count |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
280 self._event = threading.Event() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
281 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
282 def ready(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
283 return self._event.is_set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
284 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
285 def wait(self, timeout=None): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
286 return self._event.wait(timeout) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
287 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
288 def _onTaskDone(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
289 self._count -= 1 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
290 if self._count == 0: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
291 self._pool.setHandler(None) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
292 self._pool._listener = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
293 self._event.set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
294 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
295 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
296 class _ReportHandler(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
297 def __init__(self, worker_count): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
298 self.reports = [None] * worker_count |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
299 self._count = worker_count |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
300 self._received = 0 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
301 self._event = threading.Event() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
302 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
303 def wait(self, timeout=None): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
304 return self._event.wait(timeout) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
305 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
306 def _handle(self, res): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
307 wid, data = res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
308 if wid < 0 or wid > self._count: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
309 logger.error("Ignoring report from unknown worker %d." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
310 return |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
311 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
312 self._received += 1 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
313 self.reports[wid] = data |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
314 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
315 if self._received == self._count: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
316 self._event.set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
317 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
318 def _handleError(self, res): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
319 wid, data = res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
320 logger.error("Worker %d failed to send its report." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
321 logger.exception(data) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
322 |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
323 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
324 class FastQueue(object): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
325 def __init__(self, compress=False): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
326 self._reader, self._writer = multiprocessing.Pipe(duplex=False) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
327 self._rlock = multiprocessing.Lock() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
328 self._wlock = multiprocessing.Lock() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
329 self._compress = compress |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
330 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
331 def __getstate__(self): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
332 return (self._reader, self._writer, self._rlock, self._wlock, |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
333 self._compress) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
334 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
335 def __setstate__(self, state): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
336 (self._reader, self._writer, self._rlock, self._wlock, |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
337 self._compress) = state |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
338 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
339 def get(self): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
340 with self._rlock: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
341 raw = self._reader.recv_bytes() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
342 if self._compress: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
343 data = zlib.decompress(raw) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
344 else: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
345 data = raw |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
346 obj = unpickle(data) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
347 return obj |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
348 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
349 def put(self, obj): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
350 data = pickle(obj) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
351 if self._compress: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
352 raw = zlib.compress(data) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
353 else: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
354 raw = data |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
355 with self._wlock: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
356 self._writer.send_bytes(raw) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
357 |