annotate piecrust/workerpool.py @ 693:d2a87365b85b

bake: Use threads to read/write from/to the main arbitrator process. Since the GIL is released most of the time during blocking I/O operations, this should let the worker threads do more during that time.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 23 Mar 2016 01:53:57 -0700
parents 9ae9390192da
children 9e5393fcfab2
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
1 import io
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
2 import os
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
3 import sys
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
4 import time
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
5 import zlib
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
6 import queue
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
7 import pickle
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
8 import logging
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
9 import itertools
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
10 import threading
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
11 import multiprocessing
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
12
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
13
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
14 logger = logging.getLogger(__name__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
15
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
16
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
17 class IWorker(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
18 def initialize(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
19 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
20
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
21 def process(self, job):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
22 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
23
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
24 def getReport(self, pool_reports):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
25 return None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
26
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
27
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
28 TASK_JOB = 0
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
29 TASK_BATCH = 1
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
30 TASK_END = 2
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
31
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
32
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
33 def worker_func(params):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
34 if params.is_profiling:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
35 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
36 import cProfile as profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
37 except ImportError:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
38 import profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
39
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
40 params.is_profiling = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
41 name = params.worker_class.__name__
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
42 profile.runctx('_real_worker_func(params)',
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
43 globals(), locals(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
44 filename='%s-%d.prof' % (name, params.wid))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
45 else:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
46 _real_worker_func(params)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
47
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
48
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
49 def _real_worker_func(params):
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
50 # In a context where `multiprocessing` is using the `spawn` forking model,
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
51 # the new process doesn't inherit anything, so we lost all our logging
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
52 # configuration here. Let's set it up again.
566
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
53 if (hasattr(multiprocessing, 'get_start_method') and
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
54 multiprocessing.get_start_method() == 'spawn'):
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
55 from piecrust.main import _pre_parse_chef_args
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
56 _pre_parse_chef_args(sys.argv[1:])
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
57
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
58 wid = params.wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
59 logger.debug("Worker %d initializing..." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
60
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
61 # We don't need those.
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
62 params.inqueue._writer.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
63 params.outqueue._reader.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
64
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
65 # Initialize the underlying worker class.
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
66 w = params.worker_class(*params.initargs)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
67 w.wid = wid
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
68 try:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
69 w.initialize()
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
70 except Exception as ex:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
71 logger.error("Working failed to initialize:")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
72 logger.exception(ex)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
73 params.outqueue.put(None)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
74 return
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
75
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
76 # Create threads to read/write the jobs and results from/to the
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
77 # main arbitrator process.
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
78 local_job_queue = queue.Queue()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
79 reader_thread = threading.Thread(
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
80 target=_job_queue_reader,
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
81 args=(params.inqueue.get, local_job_queue),
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
82 name="JobQueueReaderThread")
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
83 reader_thread.start()
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
84
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
85 local_result_queue = queue.Queue()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
86 writer_thread = threading.Thread(
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
87 target=_job_results_writer,
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
88 args=(local_result_queue, params.outqueue.put),
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
89 name="JobResultWriterThread")
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
90 writer_thread.start()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
91
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
92 # Start pumping!
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
93 completed = 0
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
94 time_in_get = 0
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
95 time_in_put = 0
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
96 while True:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
97 get_start_time = time.perf_counter()
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
98 task = local_job_queue.get()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
99 local_job_queue.task_done()
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
100 time_in_get += (time.perf_counter() - get_start_time)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
101
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
102 task_type, task_data = task
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
103 if task_type == TASK_END:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
104 logger.debug("Worker %d got end task, exiting." % wid)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
105 wprep = {
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
106 'WorkerTaskGet': time_in_get,
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
107 'WorkerResultPut': time_in_put}
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
108 try:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
109 rep = (task_type, True, wid, (wid, w.getReport(wprep)))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
110 except Exception as e:
687
61d606fbc313 bake: Change `show-timers` to `show-stats`, add stats.
Ludovic Chabant <ludovic@chabant.com>
parents: 566
diff changeset
111 logger.debug("Error getting report: %s" % e)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
112 if params.wrap_exception:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
113 e = multiprocessing.ExceptionWithTraceback(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
114 e, e.__traceback__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
115 rep = (task_type, False, wid, (wid, e))
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
116 local_result_queue.put_nowait(rep)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
117 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
118
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
119 if task_type == TASK_JOB:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
120 task_data = (task_data,)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
121
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
122 for t in task_data:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
123 try:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
124 res = (TASK_JOB, True, wid, w.process(t))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
125 except Exception as e:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
126 if params.wrap_exception:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
127 e = multiprocessing.ExceptionWithTraceback(
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
128 e, e.__traceback__)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
129 res = (TASK_JOB, False, wid, e)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
130
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
131 put_start_time = time.perf_counter()
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
132 local_result_queue.put_nowait(res)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
133 time_in_put += (time.perf_counter() - put_start_time)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
134
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
135 completed += 1
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
136
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
137 logger.debug("Worker %d waiting for reader/writer threads." % wid)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
138 local_result_queue.put_nowait(None)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
139 reader_thread.join()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
140 writer_thread.join()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
141
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
142 logger.debug("Worker %d completed %d tasks." % (wid, completed))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
143
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
144
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
145 def _job_queue_reader(getter, out_queue):
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
146 while True:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
147 try:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
148 task = getter()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
149 except (EOFError, OSError):
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
150 logger.debug("Worker %d encountered connection problem." % wid)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
151 break
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
152
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
153 out_queue.put_nowait(task)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
154
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
155 if task[0] == TASK_END:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
156 # Done reading jobs from the main process.
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
157 logger.debug("Got end task, exiting task queue reader thread.")
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
158 break
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
159
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
160
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
161 def _job_results_writer(in_queue, putter):
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
162 while True:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
163 res = in_queue.get()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
164 if res is not None:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
165 putter(res)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
166 in_queue.task_done()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
167 else:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
168 # Got sentinel. Exit.
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
169 in_queue.task_done()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
170 break
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
171 logger.debug("Exiting result queue writer thread.")
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
172
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
173
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
174 class _WorkerParams(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
175 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
176 wrap_exception=False, is_profiling=False):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
177 self.wid = wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
178 self.inqueue = inqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
179 self.outqueue = outqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
180 self.worker_class = worker_class
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
181 self.initargs = initargs
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
182 self.wrap_exception = wrap_exception
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
183 self.is_profiling = is_profiling
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
184
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
185
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
186 class WorkerPool(object):
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
187 def __init__(self, worker_class, initargs=(),
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
188 worker_count=None, batch_size=None,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
189 wrap_exception=False):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
190 worker_count = worker_count or os.cpu_count() or 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
191
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
192 use_fastqueue = False
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
193 if use_fastqueue:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
194 self._task_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
195 self._result_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
196 self._quick_put = self._task_queue.put
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
197 self._quick_get = self._result_queue.get
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
198 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
199 self._task_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
200 self._result_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
201 self._quick_put = self._task_queue._writer.send
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
202 self._quick_get = self._result_queue._reader.recv
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
203
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
204 self._batch_size = batch_size
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
205 self._callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
206 self._error_callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
207 self._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
208
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
209 main_module = sys.modules['__main__']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
210 is_profiling = os.path.basename(main_module.__file__) in [
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
211 'profile.py', 'cProfile.py']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
212
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
213 self._pool = []
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
214 for i in range(worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
215 worker_params = _WorkerParams(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
216 i, self._task_queue, self._result_queue,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
217 worker_class, initargs,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
218 wrap_exception=wrap_exception,
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
219 is_profiling=is_profiling)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
220 w = multiprocessing.Process(target=worker_func,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
221 args=(worker_params,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
222 w.name = w.name.replace('Process', 'PoolWorker')
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
223 w.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
224 w.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
225 self._pool.append(w)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
226
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
227 self._result_handler = threading.Thread(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
228 target=WorkerPool._handleResults,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
229 args=(self,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
230 self._result_handler.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
231 self._result_handler.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
232
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
233 self._closed = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
234
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
235 def setHandler(self, callback=None, error_callback=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
236 self._callback = callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
237 self._error_callback = error_callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
238
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
239 def queueJobs(self, jobs, handler=None, chunk_size=None):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
240 if self._closed:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
241 raise Exception("This worker pool has been closed.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
242 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
243 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
244
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
245 if any([not p.is_alive() for p in self._pool]):
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
246 raise Exception("Some workers have prematurely exited.")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
247
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
248 if handler is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
249 self.setHandler(handler)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
250
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
251 if not hasattr(jobs, '__len__'):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
252 jobs = list(jobs)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
253 job_count = len(jobs)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
254
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
255 res = AsyncResult(self, job_count)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
256 if res._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
257 res._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
258 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
259
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
260 self._listener = res
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
261
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
262 if chunk_size is None:
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
263 chunk_size = self._batch_size
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
264 if chunk_size is None:
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
265 chunk_size = max(1, job_count // 50)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
266 logger.debug("Using chunk size of %d" % chunk_size)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
267
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
268 if chunk_size is None or chunk_size == 1:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
269 for job in jobs:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
270 self._quick_put((TASK_JOB, job))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
271 else:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
272 it = iter(jobs)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
273 while True:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
274 batch = tuple([i for i in itertools.islice(it, chunk_size)])
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
275 if not batch:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
276 break
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
277 self._quick_put((TASK_BATCH, batch))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
278
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
279 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
280
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
281 def close(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
282 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
283 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
284
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
285 logger.debug("Closing worker pool...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
286 handler = _ReportHandler(len(self._pool))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
287 self._callback = handler._handle
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
288 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
289 self._quick_put((TASK_END, None))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
290 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
291 w.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
292
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
293 logger.debug("Waiting for reports...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
294 if not handler.wait(2):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
295 missing = handler.reports.index(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
296 logger.warning(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
297 "Didn't receive all worker reports before timeout. "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
298 "Missing report from worker %d." % missing)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
299
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
300 logger.debug("Exiting result handler thread...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
301 self._result_queue.put(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
302 self._result_handler.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
303 self._closed = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
304
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
305 return handler.reports
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
306
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
307 @staticmethod
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
308 def _handleResults(pool):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
309 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
310 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
311 res = pool._quick_get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
312 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
313 logger.debug("Result handler thread encountered connection "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
314 "problem, exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
315 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
316
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
317 if res is None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
318 logger.debug("Result handler exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
319 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
320
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
321 task_type, success, wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
322 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
323 if success and pool._callback:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
324 pool._callback(data)
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
325 elif not success:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
326 if pool._error_callback:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
327 pool._error_callback(data)
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
328 else:
687
61d606fbc313 bake: Change `show-timers` to `show-stats`, add stats.
Ludovic Chabant <ludovic@chabant.com>
parents: 566
diff changeset
329 logger.error("Got error data:")
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
330 logger.error(data)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
331 except Exception as ex:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
332 logger.exception(ex)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
333
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
334 if task_type == TASK_JOB:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
335 pool._listener._onTaskDone()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
336
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
337
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
338 class AsyncResult(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
339 def __init__(self, pool, count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
340 self._pool = pool
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
341 self._count = count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
342 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
343
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
344 def ready(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
345 return self._event.is_set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
346
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
347 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
348 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
349
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
350 def _onTaskDone(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
351 self._count -= 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
352 if self._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
353 self._pool.setHandler(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
354 self._pool._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
355 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
356
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
357
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
358 class _ReportHandler(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
359 def __init__(self, worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
360 self.reports = [None] * worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
361 self._count = worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
362 self._received = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
363 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
364
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
365 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
366 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
367
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
368 def _handle(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
369 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
370 if wid < 0 or wid > self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
371 logger.error("Ignoring report from unknown worker %d." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
372 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
373
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
374 self._received += 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
375 self.reports[wid] = data
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
376
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
377 if self._received == self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
378 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
379
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
380 def _handleError(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
381 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
382 logger.error("Worker %d failed to send its report." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
383 logger.exception(data)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
384
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
385
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
386 class FastQueue(object):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
387 def __init__(self):
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
388 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
389 self._rlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
390 self._wlock = multiprocessing.Lock()
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
391 self._rbuf = io.BytesIO()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
392 self._rbuf.truncate(256)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
393 self._wbuf = io.BytesIO()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
394 self._wbuf.truncate(256)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
395
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
396 def __getstate__(self):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
397 return (self._reader, self._writer, self._rlock, self._wlock)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
398
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
399 def __setstate__(self, state):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
400 (self._reader, self._writer, self._rlock, self._wlock) = state
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
401
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
402 def get(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
403 with self._rlock:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
404 try:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
405 with self._rbuf.getbuffer() as b:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
406 self._reader.recv_bytes_into(b)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
407 except multiprocessing.BufferTooShort as e:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
408 self._rbuf.truncate(len(e.args[0]) * 2)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
409 self._rbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
410 self._rbuf.write(e.args[0])
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
411
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
412 self._rbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
413 return self._unpickle(self._rbuf)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
414
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
415 def put(self, obj):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
416 self._wbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
417 self._pickle(obj, self._wbuf)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
418 size = self._wbuf.tell()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
419
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
420 self._wbuf.seek(0)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
421 with self._wlock:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
422 with self._wbuf.getbuffer() as b:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
423 self._writer.send_bytes(b, 0, size)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
424
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
425 def _pickle(self, obj, buf):
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
426 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
427
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
428 def _unpickle(self, buf):
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
429 return pickle.load(buf)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
430