Mercurial > piecrust2
annotate piecrust/workerpool.py @ 562:fd8bd08d1fa8
docs: Add reference entry about the `site/slugify_mode` setting.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Fri, 14 Aug 2015 22:55:36 -0700 |
parents | 22a230d99621 |
children | 8073ae8cb164 |
rev | line source |
---|---|
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
1 import os |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
2 import sys |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
3 import zlib |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
4 import logging |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
5 import itertools |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
6 import threading |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
7 import multiprocessing |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
8 from piecrust.fastpickle import pickle, unpickle |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
9 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
10 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
11 logger = logging.getLogger(__name__) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
12 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
13 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
14 class IWorker(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
15 def initialize(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
16 raise NotImplementedError() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
17 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
18 def process(self, job): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
19 raise NotImplementedError() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
20 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
21 def getReport(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
22 return None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
23 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
24 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
25 TASK_JOB = 0 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
26 TASK_BATCH = 1 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
27 TASK_END = 2 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
28 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
29 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
30 def worker_func(params): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
31 if params.is_profiling: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
32 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
33 import cProfile as profile |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
34 except ImportError: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
35 import profile |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
36 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
37 params.is_profiling = False |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
38 name = params.worker_class.__name__ |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
39 profile.runctx('_real_worker_func(params)', |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
40 globals(), locals(), |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
41 filename='%s-%d.prof' % (name, params.wid)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
42 else: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
43 _real_worker_func(params) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
44 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
45 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
46 def _real_worker_func(params): |
500
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
47 # In a context where `multiprocessing` is using the `spawn` forking model, |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
48 # the new process doesn't inherit anything, so we lost all our logging |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
49 # configuration here. Let's set it up again. |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
50 from piecrust.main import _pre_parse_chef_args |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
51 _pre_parse_chef_args(sys.argv[1:]) |
22a230d99621
bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents:
462
diff
changeset
|
52 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
53 wid = params.wid |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
54 logger.debug("Worker %d initializing..." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
55 |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
56 params.inqueue._writer.close() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
57 params.outqueue._reader.close() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
58 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
59 w = params.worker_class(*params.initargs) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
60 w.wid = wid |
453
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
61 try: |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
62 w.initialize() |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
63 except Exception as ex: |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
64 logger.error("Working failed to initialize:") |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
65 logger.exception(ex) |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
66 params.outqueue.put(None) |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
67 return |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
68 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
69 get = params.inqueue.get |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
70 put = params.outqueue.put |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
71 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
72 completed = 0 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
73 while True: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
74 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
75 task = get() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
76 except (EOFError, OSError): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
77 logger.debug("Worker %d encountered connection problem." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
78 break |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
79 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
80 task_type, task_data = task |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
81 if task_type == TASK_END: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
82 logger.debug("Worker %d got end task, exiting." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
83 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
84 rep = (task_type, True, wid, (wid, w.getReport())) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
85 except Exception as e: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
86 if params.wrap_exception: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
87 e = multiprocessing.ExceptionWithTraceback( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
88 e, e.__traceback__) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
89 rep = (task_type, False, wid, (wid, e)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
90 put(rep) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
91 break |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
92 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
93 if task_type == TASK_JOB: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
94 task_data = (task_data,) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
95 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
96 for t in task_data: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
97 try: |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
98 res = (TASK_JOB, True, wid, w.process(t)) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
99 except Exception as e: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
100 if params.wrap_exception: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
101 e = multiprocessing.ExceptionWithTraceback( |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
102 e, e.__traceback__) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
103 res = (TASK_JOB, False, wid, e) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
104 put(res) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
105 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
106 completed += 1 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
107 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
108 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
109 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
110 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
111 class _WorkerParams(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
112 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
113 wrap_exception=False, is_profiling=False): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
114 self.wid = wid |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
115 self.inqueue = inqueue |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
116 self.outqueue = outqueue |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
117 self.worker_class = worker_class |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
118 self.initargs = initargs |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
119 self.wrap_exception = wrap_exception |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
120 self.is_profiling = is_profiling |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
121 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
122 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
123 class WorkerPool(object): |
462
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
124 def __init__(self, worker_class, initargs=(), |
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
125 worker_count=None, batch_size=None, |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
126 wrap_exception=False): |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
127 worker_count = worker_count or os.cpu_count() or 1 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
128 |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
129 use_fastqueue = True |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
130 if use_fastqueue: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
131 self._task_queue = FastQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
132 self._result_queue = FastQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
133 self._quick_put = self._task_queue.put |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
134 self._quick_get = self._result_queue.get |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
135 else: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
136 self._task_queue = multiprocessing.SimpleQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
137 self._result_queue = multiprocessing.SimpleQueue() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
138 self._quick_put = self._task_queue._writer.send |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
139 self._quick_get = self._result_queue._reader.recv |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
140 |
462
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
141 self._batch_size = batch_size |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
142 self._callback = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
143 self._error_callback = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
144 self._listener = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
145 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
146 main_module = sys.modules['__main__'] |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
147 is_profiling = os.path.basename(main_module.__file__) in [ |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
148 'profile.py', 'cProfile.py'] |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
149 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
150 self._pool = [] |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
151 for i in range(worker_count): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
152 worker_params = _WorkerParams( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
153 i, self._task_queue, self._result_queue, |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
154 worker_class, initargs, |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
155 wrap_exception=wrap_exception, |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
156 is_profiling=is_profiling) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
157 w = multiprocessing.Process(target=worker_func, |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
158 args=(worker_params,)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
159 w.name = w.name.replace('Process', 'PoolWorker') |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
160 w.daemon = True |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
161 w.start() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
162 self._pool.append(w) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
163 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
164 self._result_handler = threading.Thread( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
165 target=WorkerPool._handleResults, |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
166 args=(self,)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
167 self._result_handler.daemon = True |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
168 self._result_handler.start() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
169 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
170 self._closed = False |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
171 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
172 def setHandler(self, callback=None, error_callback=None): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
173 self._callback = callback |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
174 self._error_callback = error_callback |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
175 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
176 def queueJobs(self, jobs, handler=None, chunk_size=None): |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
177 if self._closed: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
178 raise Exception("This worker pool has been closed.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
179 if self._listener is not None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
180 raise Exception("A previous job queue has not finished yet.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
181 |
453
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
182 if any([not p.is_alive() for p in self._pool]): |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
183 raise Exception("Some workers have prematurely exited.") |
8351a77e13f5
bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents:
451
diff
changeset
|
184 |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
185 if handler is not None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
186 self.setHandler(handler) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
187 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
188 if not hasattr(jobs, '__len__'): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
189 jobs = list(jobs) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
190 job_count = len(jobs) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
191 |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
192 res = AsyncResult(self, job_count) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
193 if res._count == 0: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
194 res._event.set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
195 return res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
196 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
197 self._listener = res |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
198 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
199 if chunk_size is None: |
462
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
200 chunk_size = self._batch_size |
04abc97dd3b6
bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents:
461
diff
changeset
|
201 if chunk_size is None: |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
202 chunk_size = max(1, job_count // 50) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
203 logger.debug("Using chunk size of %d" % chunk_size) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
204 |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
205 if chunk_size is None or chunk_size == 1: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
206 for job in jobs: |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
207 self._quick_put((TASK_JOB, job)) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
208 else: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
209 it = iter(jobs) |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
210 while True: |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
211 batch = tuple([i for i in itertools.islice(it, chunk_size)]) |
460
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
212 if not batch: |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
213 break |
55fc8918cb75
bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents:
453
diff
changeset
|
214 self._quick_put((TASK_BATCH, batch)) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
215 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
216 return res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
217 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
218 def close(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
219 if self._listener is not None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
220 raise Exception("A previous job queue has not finished yet.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
221 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
222 logger.debug("Closing worker pool...") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
223 handler = _ReportHandler(len(self._pool)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
224 self._callback = handler._handle |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
225 for w in self._pool: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
226 self._quick_put((TASK_END, None)) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
227 for w in self._pool: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
228 w.join() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
229 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
230 logger.debug("Waiting for reports...") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
231 if not handler.wait(2): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
232 missing = handler.reports.index(None) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
233 logger.warning( |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
234 "Didn't receive all worker reports before timeout. " |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
235 "Missing report from worker %d." % missing) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
236 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
237 logger.debug("Exiting result handler thread...") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
238 self._result_queue.put(None) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
239 self._result_handler.join() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
240 self._closed = True |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
241 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
242 return handler.reports |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
243 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
244 @staticmethod |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
245 def _handleResults(pool): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
246 while True: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
247 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
248 res = pool._quick_get() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
249 except (EOFError, OSError): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
250 logger.debug("Result handler thread encountered connection " |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
251 "problem, exiting.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
252 return |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
253 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
254 if res is None: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
255 logger.debug("Result handler exiting.") |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
256 break |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
257 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
258 task_type, success, wid, data = res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
259 try: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
260 if success and pool._callback: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
261 pool._callback(data) |
451
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
262 elif not success: |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
263 if pool._error_callback: |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
264 pool._error_callback(data) |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
265 else: |
838f3964f400
bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents:
447
diff
changeset
|
266 logger.error(data) |
447
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
267 except Exception as ex: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
268 logger.exception(ex) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
269 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
270 if task_type == TASK_JOB: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
271 pool._listener._onTaskDone() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
272 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
273 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
274 class AsyncResult(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
275 def __init__(self, pool, count): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
276 self._pool = pool |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
277 self._count = count |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
278 self._event = threading.Event() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
279 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
280 def ready(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
281 return self._event.is_set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
282 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
283 def wait(self, timeout=None): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
284 return self._event.wait(timeout) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
285 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
286 def _onTaskDone(self): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
287 self._count -= 1 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
288 if self._count == 0: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
289 self._pool.setHandler(None) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
290 self._pool._listener = None |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
291 self._event.set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
292 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
293 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
294 class _ReportHandler(object): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
295 def __init__(self, worker_count): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
296 self.reports = [None] * worker_count |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
297 self._count = worker_count |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
298 self._received = 0 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
299 self._event = threading.Event() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
300 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
301 def wait(self, timeout=None): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
302 return self._event.wait(timeout) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
303 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
304 def _handle(self, res): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
305 wid, data = res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
306 if wid < 0 or wid > self._count: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
307 logger.error("Ignoring report from unknown worker %d." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
308 return |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
309 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
310 self._received += 1 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
311 self.reports[wid] = data |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
312 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
313 if self._received == self._count: |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
314 self._event.set() |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
315 |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
316 def _handleError(self, res): |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
317 wid, data = res |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
318 logger.error("Worker %d failed to send its report." % wid) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
319 logger.exception(data) |
aefe70229fdd
bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff
changeset
|
320 |
461
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
321 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
322 class FastQueue(object): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
323 def __init__(self, compress=False): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
324 self._reader, self._writer = multiprocessing.Pipe(duplex=False) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
325 self._rlock = multiprocessing.Lock() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
326 self._wlock = multiprocessing.Lock() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
327 self._compress = compress |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
328 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
329 def __getstate__(self): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
330 return (self._reader, self._writer, self._rlock, self._wlock, |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
331 self._compress) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
332 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
333 def __setstate__(self, state): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
334 (self._reader, self._writer, self._rlock, self._wlock, |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
335 self._compress) = state |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
336 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
337 def get(self): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
338 with self._rlock: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
339 raw = self._reader.recv_bytes() |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
340 if self._compress: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
341 data = zlib.decompress(raw) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
342 else: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
343 data = raw |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
344 obj = unpickle(data) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
345 return obj |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
346 |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
347 def put(self, obj): |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
348 data = pickle(obj) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
349 if self._compress: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
350 raw = zlib.compress(data) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
351 else: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
352 raw = data |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
353 with self._wlock: |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
354 self._writer.send_bytes(raw) |
b015e38d4ee1
internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents:
460
diff
changeset
|
355 |