annotate piecrust/workerpool.py @ 691:9ae9390192da

bake: Use standard pickle and queue for now to fix some small issues. * JSON leads to some problems with integers as keys. * Add some stats to the baking process.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 21 Mar 2016 22:28:57 -0700
parents 61d606fbc313
children d2a87365b85b
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
1 import io
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
2 import os
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
3 import sys
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
4 import time
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
5 import zlib
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
6 import pickle
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
7 import logging
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
8 import itertools
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
9 import threading
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
10 import multiprocessing
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
11
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
12
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
13 logger = logging.getLogger(__name__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
14
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
15
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
16 class IWorker(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
17 def initialize(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
18 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
19
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
20 def process(self, job):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
21 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
22
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
23 def getReport(self, pool_reports):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
24 return None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
25
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
26
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
27 TASK_JOB = 0
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
28 TASK_BATCH = 1
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
29 TASK_END = 2
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
30
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
31
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
32 def worker_func(params):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
33 if params.is_profiling:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
34 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
35 import cProfile as profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
36 except ImportError:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
37 import profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
38
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
39 params.is_profiling = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
40 name = params.worker_class.__name__
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
41 profile.runctx('_real_worker_func(params)',
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
42 globals(), locals(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
43 filename='%s-%d.prof' % (name, params.wid))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
44 else:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
45 _real_worker_func(params)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
46
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
47
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
48 def _real_worker_func(params):
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
49 # In a context where `multiprocessing` is using the `spawn` forking model,
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
50 # the new process doesn't inherit anything, so we lost all our logging
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
51 # configuration here. Let's set it up again.
566
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
52 if (hasattr(multiprocessing, 'get_start_method') and
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
53 multiprocessing.get_start_method() == 'spawn'):
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
54 from piecrust.main import _pre_parse_chef_args
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
55 _pre_parse_chef_args(sys.argv[1:])
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
56
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
57 wid = params.wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
58 logger.debug("Worker %d initializing..." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
59
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
60 params.inqueue._writer.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
61 params.outqueue._reader.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
62
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
63 w = params.worker_class(*params.initargs)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
64 w.wid = wid
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
65 try:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
66 w.initialize()
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
67 except Exception as ex:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
68 logger.error("Working failed to initialize:")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
69 logger.exception(ex)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
70 params.outqueue.put(None)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
71 return
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
72
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
73 get = params.inqueue.get
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
74 put = params.outqueue.put
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
75
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
76 completed = 0
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
77 time_in_get = 0
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
78 time_in_put = 0
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
79 while True:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
80 get_start_time = time.perf_counter()
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
81 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
82 task = get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
83 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
84 logger.debug("Worker %d encountered connection problem." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
85 break
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
86 time_in_get += (time.perf_counter() - get_start_time)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
87
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
88 task_type, task_data = task
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
89 if task_type == TASK_END:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
90 logger.debug("Worker %d got end task, exiting." % wid)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
91 wprep = {
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
92 'WorkerTaskGet': time_in_get,
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
93 'WorkerResultPut': time_in_put}
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
94 try:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
95 rep = (task_type, True, wid, (wid, w.getReport(wprep)))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
96 except Exception as e:
687
61d606fbc313 bake: Change `show-timers` to `show-stats`, add stats.
Ludovic Chabant <ludovic@chabant.com>
parents: 566
diff changeset
97 logger.debug("Error getting report: %s" % e)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
98 if params.wrap_exception:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
99 e = multiprocessing.ExceptionWithTraceback(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
100 e, e.__traceback__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
101 rep = (task_type, False, wid, (wid, e))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
102 put(rep)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
103 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
104
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
105 if task_type == TASK_JOB:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
106 task_data = (task_data,)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
107
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
108 for t in task_data:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
109 try:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
110 res = (TASK_JOB, True, wid, w.process(t))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
111 except Exception as e:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
112 if params.wrap_exception:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
113 e = multiprocessing.ExceptionWithTraceback(
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
114 e, e.__traceback__)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
115 res = (TASK_JOB, False, wid, e)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
116
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
117 put_start_time = time.perf_counter()
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
118 put(res)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
119 time_in_put += (time.perf_counter() - put_start_time)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
120
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
121 completed += 1
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
122
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
123 logger.debug("Worker %d completed %d tasks." % (wid, completed))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
124
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
125
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
126 class _WorkerParams(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
127 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
128 wrap_exception=False, is_profiling=False):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
129 self.wid = wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
130 self.inqueue = inqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
131 self.outqueue = outqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
132 self.worker_class = worker_class
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
133 self.initargs = initargs
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
134 self.wrap_exception = wrap_exception
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
135 self.is_profiling = is_profiling
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
136
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
137
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
138 class WorkerPool(object):
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
139 def __init__(self, worker_class, initargs=(),
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
140 worker_count=None, batch_size=None,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
141 wrap_exception=False):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
142 worker_count = worker_count or os.cpu_count() or 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
143
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
144 use_fastqueue = False
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
145 if use_fastqueue:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
146 self._task_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
147 self._result_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
148 self._quick_put = self._task_queue.put
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
149 self._quick_get = self._result_queue.get
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
150 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
151 self._task_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
152 self._result_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
153 self._quick_put = self._task_queue._writer.send
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
154 self._quick_get = self._result_queue._reader.recv
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
155
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
156 self._batch_size = batch_size
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
157 self._callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
158 self._error_callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
159 self._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
160
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
161 main_module = sys.modules['__main__']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
162 is_profiling = os.path.basename(main_module.__file__) in [
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
163 'profile.py', 'cProfile.py']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
164
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
165 self._pool = []
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
166 for i in range(worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
167 worker_params = _WorkerParams(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
168 i, self._task_queue, self._result_queue,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
169 worker_class, initargs,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
170 wrap_exception=wrap_exception,
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
171 is_profiling=is_profiling)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
172 w = multiprocessing.Process(target=worker_func,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
173 args=(worker_params,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
174 w.name = w.name.replace('Process', 'PoolWorker')
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
175 w.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
176 w.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
177 self._pool.append(w)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
178
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
179 self._result_handler = threading.Thread(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
180 target=WorkerPool._handleResults,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
181 args=(self,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
182 self._result_handler.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
183 self._result_handler.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
184
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
185 self._closed = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
186
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
187 def setHandler(self, callback=None, error_callback=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
188 self._callback = callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
189 self._error_callback = error_callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
190
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
191 def queueJobs(self, jobs, handler=None, chunk_size=None):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
192 if self._closed:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
193 raise Exception("This worker pool has been closed.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
194 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
195 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
196
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
197 if any([not p.is_alive() for p in self._pool]):
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
198 raise Exception("Some workers have prematurely exited.")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
199
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
200 if handler is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
201 self.setHandler(handler)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
202
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
203 if not hasattr(jobs, '__len__'):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
204 jobs = list(jobs)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
205 job_count = len(jobs)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
206
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
207 res = AsyncResult(self, job_count)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
208 if res._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
209 res._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
210 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
211
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
212 self._listener = res
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
213
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
214 if chunk_size is None:
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
215 chunk_size = self._batch_size
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
216 if chunk_size is None:
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
217 chunk_size = max(1, job_count // 50)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
218 logger.debug("Using chunk size of %d" % chunk_size)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
219
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
220 if chunk_size is None or chunk_size == 1:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
221 for job in jobs:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
222 self._quick_put((TASK_JOB, job))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
223 else:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
224 it = iter(jobs)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
225 while True:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
226 batch = tuple([i for i in itertools.islice(it, chunk_size)])
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
227 if not batch:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
228 break
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
229 self._quick_put((TASK_BATCH, batch))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
230
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
231 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
232
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
233 def close(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
234 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
235 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
236
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
237 logger.debug("Closing worker pool...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
238 handler = _ReportHandler(len(self._pool))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
239 self._callback = handler._handle
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
240 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
241 self._quick_put((TASK_END, None))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
242 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
243 w.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
244
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
245 logger.debug("Waiting for reports...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
246 if not handler.wait(2):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
247 missing = handler.reports.index(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
248 logger.warning(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
249 "Didn't receive all worker reports before timeout. "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
250 "Missing report from worker %d." % missing)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
251
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
252 logger.debug("Exiting result handler thread...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
253 self._result_queue.put(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
254 self._result_handler.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
255 self._closed = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
256
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
257 return handler.reports
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
258
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
259 @staticmethod
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
260 def _handleResults(pool):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
261 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
262 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
263 res = pool._quick_get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
264 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
265 logger.debug("Result handler thread encountered connection "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
266 "problem, exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
267 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
268
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
269 if res is None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
270 logger.debug("Result handler exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
271 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
272
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
273 task_type, success, wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
274 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
275 if success and pool._callback:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
276 pool._callback(data)
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
277 elif not success:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
278 if pool._error_callback:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
279 pool._error_callback(data)
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
280 else:
687
61d606fbc313 bake: Change `show-timers` to `show-stats`, add stats.
Ludovic Chabant <ludovic@chabant.com>
parents: 566
diff changeset
281 logger.error("Got error data:")
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
282 logger.error(data)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
283 except Exception as ex:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
284 logger.exception(ex)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
285
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
286 if task_type == TASK_JOB:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
287 pool._listener._onTaskDone()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
288
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
289
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
290 class AsyncResult(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
291 def __init__(self, pool, count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
292 self._pool = pool
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
293 self._count = count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
294 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
295
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
296 def ready(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
297 return self._event.is_set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
298
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
299 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
300 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
301
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
302 def _onTaskDone(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
303 self._count -= 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
304 if self._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
305 self._pool.setHandler(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
306 self._pool._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
307 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
308
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
309
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
310 class _ReportHandler(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
311 def __init__(self, worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
312 self.reports = [None] * worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
313 self._count = worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
314 self._received = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
315 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
316
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
317 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
318 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
319
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
320 def _handle(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
321 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
322 if wid < 0 or wid > self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
323 logger.error("Ignoring report from unknown worker %d." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
324 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
325
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
326 self._received += 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
327 self.reports[wid] = data
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
328
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
329 if self._received == self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
330 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
331
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
332 def _handleError(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
333 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
334 logger.error("Worker %d failed to send its report." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
335 logger.exception(data)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
336
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
337
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
338 class FastQueue(object):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
339 def __init__(self):
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
340 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
341 self._rlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
342 self._wlock = multiprocessing.Lock()
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
343 self._rbuf = io.BytesIO()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
344 self._rbuf.truncate(256)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
345 self._wbuf = io.BytesIO()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
346 self._wbuf.truncate(256)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
347
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
348 def __getstate__(self):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
349 return (self._reader, self._writer, self._rlock, self._wlock)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
350
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
351 def __setstate__(self, state):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
352 (self._reader, self._writer, self._rlock, self._wlock) = state
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
353
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
354 def get(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
355 with self._rlock:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
356 try:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
357 with self._rbuf.getbuffer() as b:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
358 self._reader.recv_bytes_into(b)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
359 except multiprocessing.BufferTooShort as e:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
360 self._rbuf.truncate(len(e.args[0]) * 2)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
361 self._rbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
362 self._rbuf.write(e.args[0])
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
363
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
364 self._rbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
365 return self._unpickle(self._rbuf)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
366
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
367 def put(self, obj):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
368 self._wbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
369 self._pickle(obj, self._wbuf)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
370 size = self._wbuf.tell()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
371
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
372 self._wbuf.seek(0)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
373 with self._wlock:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
374 with self._wbuf.getbuffer() as b:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
375 self._writer.send_bytes(b, 0, size)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
376
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
377 def _pickle(self, obj, buf):
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
378 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
379
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
380 def _unpickle(self, buf):
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
381 return pickle.load(buf)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
382