annotate piecrust/workerpool.py @ 739:c6035785dbfc

tests: the `PageBaker` now needs to be shutdown. Otherwise it keeps the process alive because of its writer thread.
author Ludovic Chabant <ludovic@chabant.com>
date Thu, 02 Jun 2016 13:01:08 -0700
parents c62d83e17abf
children 4850f8c21b6e
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
1 import io
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
2 import os
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
3 import sys
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
4 import time
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
5 import zlib
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
6 import queue
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
7 import logging
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
8 import itertools
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
9 import threading
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
10 import multiprocessing
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
11 from piecrust import fastpickle
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
12
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
13
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
14 logger = logging.getLogger(__name__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
15
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
16 use_fastqueue = True
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
17
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
18
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
19 class IWorker(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
20 def initialize(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
21 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
22
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
23 def process(self, job):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
24 raise NotImplementedError()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
25
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
26 def getReport(self, pool_reports):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
27 return None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
28
702
c62d83e17abf bake: Some more optimizations.
Ludovic Chabant <ludovic@chabant.com>
parents: 697
diff changeset
29 def shutdown(self):
c62d83e17abf bake: Some more optimizations.
Ludovic Chabant <ludovic@chabant.com>
parents: 697
diff changeset
30 pass
c62d83e17abf bake: Some more optimizations.
Ludovic Chabant <ludovic@chabant.com>
parents: 697
diff changeset
31
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
32
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
33 TASK_JOB = 0
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
34 TASK_BATCH = 1
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
35 TASK_END = 2
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
36
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
37
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
38 def worker_func(params):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
39 if params.is_profiling:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
40 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
41 import cProfile as profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
42 except ImportError:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
43 import profile
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
44
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
45 params.is_profiling = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
46 name = params.worker_class.__name__
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
47 profile.runctx('_real_worker_func(params)',
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
48 globals(), locals(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
49 filename='%s-%d.prof' % (name, params.wid))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
50 else:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
51 _real_worker_func(params)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
52
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
53
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
54 def _real_worker_func(params):
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
55 # In a context where `multiprocessing` is using the `spawn` forking model,
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
56 # the new process doesn't inherit anything, so we lost all our logging
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
57 # configuration here. Let's set it up again.
566
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
58 if (hasattr(multiprocessing, 'get_start_method') and
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
59 multiprocessing.get_start_method() == 'spawn'):
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
60 from piecrust.main import _pre_parse_chef_args
8073ae8cb164 bake: Don't re-setup logging for workers unless we're sure we need it.
Ludovic Chabant <ludovic@chabant.com>
parents: 500
diff changeset
61 _pre_parse_chef_args(sys.argv[1:])
500
22a230d99621 bake: Fix logging configuration for multi-processing on Windows.
Ludovic Chabant <ludovic@chabant.com>
parents: 462
diff changeset
62
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
63 wid = params.wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
64 logger.debug("Worker %d initializing..." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
65
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
66 # We don't need those.
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
67 params.inqueue._writer.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
68 params.outqueue._reader.close()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
69
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
70 # Initialize the underlying worker class.
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
71 w = params.worker_class(*params.initargs)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
72 w.wid = wid
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
73 try:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
74 w.initialize()
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
75 except Exception as ex:
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
76 logger.error("Working failed to initialize:")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
77 logger.exception(ex)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
78 params.outqueue.put(None)
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
79 return
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
80
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
81 use_threads = False
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
82 if use_threads:
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
83 # Create threads to read/write the jobs and results from/to the
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
84 # main arbitrator process.
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
85 local_job_queue = queue.Queue()
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
86 reader_thread = threading.Thread(
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
87 target=_job_queue_reader,
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
88 args=(params.inqueue.get, local_job_queue),
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
89 name="JobQueueReaderThread")
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
90 reader_thread.start()
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
91
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
92 local_result_queue = queue.Queue()
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
93 writer_thread = threading.Thread(
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
94 target=_job_results_writer,
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
95 args=(local_result_queue, params.outqueue.put),
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
96 name="JobResultWriterThread")
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
97 writer_thread.start()
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
98
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
99 get = local_job_queue.get
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
100 put = local_result_queue.put_nowait
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
101 else:
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
102 get = params.inqueue.get
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
103 put = params.outqueue.put
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
104
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
105 # Start pumping!
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
106 completed = 0
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
107 time_in_get = 0
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
108 time_in_put = 0
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
109 while True:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
110 get_start_time = time.perf_counter()
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
111 task = get()
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
112 time_in_get += (time.perf_counter() - get_start_time)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
113
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
114 task_type, task_data = task
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
115 if task_type == TASK_END:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
116 logger.debug("Worker %d got end task, exiting." % wid)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
117 wprep = {
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
118 'WorkerTaskGet': time_in_get,
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
119 'WorkerResultPut': time_in_put}
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
120 try:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
121 rep = (task_type, True, wid, (wid, w.getReport(wprep)))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
122 except Exception as e:
687
61d606fbc313 bake: Change `show-timers` to `show-stats`, add stats.
Ludovic Chabant <ludovic@chabant.com>
parents: 566
diff changeset
123 logger.debug("Error getting report: %s" % e)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
124 if params.wrap_exception:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
125 e = multiprocessing.ExceptionWithTraceback(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
126 e, e.__traceback__)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
127 rep = (task_type, False, wid, (wid, e))
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
128 put(rep)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
129 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
130
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
131 if task_type == TASK_JOB:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
132 task_data = (task_data,)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
133
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
134 for t in task_data:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
135 try:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
136 res = (TASK_JOB, True, wid, w.process(t))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
137 except Exception as e:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
138 if params.wrap_exception:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
139 e = multiprocessing.ExceptionWithTraceback(
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
140 e, e.__traceback__)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
141 res = (TASK_JOB, False, wid, e)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
142
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
143 put_start_time = time.perf_counter()
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
144 put(res)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
145 time_in_put += (time.perf_counter() - put_start_time)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
146
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
147 completed += 1
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
148
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
149 if use_threads:
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
150 logger.debug("Worker %d waiting for reader/writer threads." % wid)
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
151 local_result_queue.put_nowait(None)
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
152 reader_thread.join()
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
153 writer_thread.join()
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
154
702
c62d83e17abf bake: Some more optimizations.
Ludovic Chabant <ludovic@chabant.com>
parents: 697
diff changeset
155 w.shutdown()
c62d83e17abf bake: Some more optimizations.
Ludovic Chabant <ludovic@chabant.com>
parents: 697
diff changeset
156
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
157 logger.debug("Worker %d completed %d tasks." % (wid, completed))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
158
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
159
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
160 def _job_queue_reader(getter, out_queue):
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
161 while True:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
162 try:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
163 task = getter()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
164 except (EOFError, OSError):
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
165 logger.debug("Worker encountered connection problem.")
693
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
166 break
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
167
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
168 out_queue.put_nowait(task)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
169
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
170 if task[0] == TASK_END:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
171 # Done reading jobs from the main process.
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
172 logger.debug("Got end task, exiting task queue reader thread.")
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
173 break
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
174
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
175
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
176 def _job_results_writer(in_queue, putter):
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
177 while True:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
178 res = in_queue.get()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
179 if res is not None:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
180 putter(res)
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
181 in_queue.task_done()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
182 else:
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
183 # Got sentinel. Exit.
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
184 in_queue.task_done()
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
185 break
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
186 logger.debug("Exiting result queue writer thread.")
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
187
d2a87365b85b bake: Use threads to read/write from/to the main arbitrator process.
Ludovic Chabant <ludovic@chabant.com>
parents: 691
diff changeset
188
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
189 class _WorkerParams(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
190 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(),
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
191 wrap_exception=False, is_profiling=False):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
192 self.wid = wid
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
193 self.inqueue = inqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
194 self.outqueue = outqueue
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
195 self.worker_class = worker_class
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
196 self.initargs = initargs
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
197 self.wrap_exception = wrap_exception
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
198 self.is_profiling = is_profiling
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
199
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
200
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
201 class WorkerPool(object):
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
202 def __init__(self, worker_class, initargs=(),
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
203 worker_count=None, batch_size=None,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
204 wrap_exception=False):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
205 worker_count = worker_count or os.cpu_count() or 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
206
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
207 if use_fastqueue:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
208 self._task_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
209 self._result_queue = FastQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
210 self._quick_put = self._task_queue.put
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
211 self._quick_get = self._result_queue.get
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
212 else:
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
213 self._task_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
214 self._result_queue = multiprocessing.SimpleQueue()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
215 self._quick_put = self._task_queue._writer.send
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
216 self._quick_get = self._result_queue._reader.recv
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
217
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
218 self._batch_size = batch_size
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
219 self._callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
220 self._error_callback = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
221 self._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
222
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
223 main_module = sys.modules['__main__']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
224 is_profiling = os.path.basename(main_module.__file__) in [
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
225 'profile.py', 'cProfile.py']
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
226
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
227 self._pool = []
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
228 for i in range(worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
229 worker_params = _WorkerParams(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
230 i, self._task_queue, self._result_queue,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
231 worker_class, initargs,
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
232 wrap_exception=wrap_exception,
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
233 is_profiling=is_profiling)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
234 w = multiprocessing.Process(target=worker_func,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
235 args=(worker_params,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
236 w.name = w.name.replace('Process', 'PoolWorker')
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
237 w.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
238 w.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
239 self._pool.append(w)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
240
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
241 self._result_handler = threading.Thread(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
242 target=WorkerPool._handleResults,
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
243 args=(self,))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
244 self._result_handler.daemon = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
245 self._result_handler.start()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
246
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
247 self._closed = False
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
248
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
249 def setHandler(self, callback=None, error_callback=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
250 self._callback = callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
251 self._error_callback = error_callback
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
252
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
253 def queueJobs(self, jobs, handler=None, chunk_size=None):
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
254 if self._closed:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
255 raise Exception("This worker pool has been closed.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
256 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
257 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
258
453
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
259 if any([not p.is_alive() for p in self._pool]):
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
260 raise Exception("Some workers have prematurely exited.")
8351a77e13f5 bake: Don't pass the previous record entries to the workers.
Ludovic Chabant <ludovic@chabant.com>
parents: 451
diff changeset
261
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
262 if handler is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
263 self.setHandler(handler)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
264
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
265 if not hasattr(jobs, '__len__'):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
266 jobs = list(jobs)
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
267 job_count = len(jobs)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
268
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
269 res = AsyncResult(self, job_count)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
270 if res._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
271 res._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
272 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
273
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
274 self._listener = res
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
275
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
276 if chunk_size is None:
462
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
277 chunk_size = self._batch_size
04abc97dd3b6 bake: Add CLI argument to specify job batch size.
Ludovic Chabant <ludovic@chabant.com>
parents: 461
diff changeset
278 if chunk_size is None:
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
279 chunk_size = max(1, job_count // 50)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
280 logger.debug("Using chunk size of %d" % chunk_size)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
281
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
282 if chunk_size is None or chunk_size == 1:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
283 for job in jobs:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
284 self._quick_put((TASK_JOB, job))
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
285 else:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
286 it = iter(jobs)
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
287 while True:
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
288 batch = tuple([i for i in itertools.islice(it, chunk_size)])
460
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
289 if not batch:
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
290 break
55fc8918cb75 bake: Use batched jobs in the worker pool.
Ludovic Chabant <ludovic@chabant.com>
parents: 453
diff changeset
291 self._quick_put((TASK_BATCH, batch))
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
292
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
293 return res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
294
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
295 def close(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
296 if self._listener is not None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
297 raise Exception("A previous job queue has not finished yet.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
298
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
299 logger.debug("Closing worker pool...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
300 handler = _ReportHandler(len(self._pool))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
301 self._callback = handler._handle
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
302 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
303 self._quick_put((TASK_END, None))
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
304 for w in self._pool:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
305 w.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
306
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
307 logger.debug("Waiting for reports...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
308 if not handler.wait(2):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
309 missing = handler.reports.index(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
310 logger.warning(
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
311 "Didn't receive all worker reports before timeout. "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
312 "Missing report from worker %d." % missing)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
313
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
314 logger.debug("Exiting result handler thread...")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
315 self._result_queue.put(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
316 self._result_handler.join()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
317 self._closed = True
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
318
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
319 return handler.reports
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
320
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
321 @staticmethod
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
322 def _handleResults(pool):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
323 while True:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
324 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
325 res = pool._quick_get()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
326 except (EOFError, OSError):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
327 logger.debug("Result handler thread encountered connection "
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
328 "problem, exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
329 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
330
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
331 if res is None:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
332 logger.debug("Result handler exiting.")
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
333 break
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
334
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
335 task_type, success, wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
336 try:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
337 if success and pool._callback:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
338 pool._callback(data)
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
339 elif not success:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
340 if pool._error_callback:
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
341 pool._error_callback(data)
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
342 else:
687
61d606fbc313 bake: Change `show-timers` to `show-stats`, add stats.
Ludovic Chabant <ludovic@chabant.com>
parents: 566
diff changeset
343 logger.error("Got error data:")
451
838f3964f400 bake: Optimize the bake by not using custom classes for passing info.
Ludovic Chabant <ludovic@chabant.com>
parents: 447
diff changeset
344 logger.error(data)
447
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
345 except Exception as ex:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
346 logger.exception(ex)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
347
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
348 if task_type == TASK_JOB:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
349 pool._listener._onTaskDone()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
350
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
351
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
352 class AsyncResult(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
353 def __init__(self, pool, count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
354 self._pool = pool
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
355 self._count = count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
356 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
357
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
358 def ready(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
359 return self._event.is_set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
360
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
361 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
362 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
363
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
364 def _onTaskDone(self):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
365 self._count -= 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
366 if self._count == 0:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
367 self._pool.setHandler(None)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
368 self._pool._listener = None
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
369 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
370
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
371
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
372 class _ReportHandler(object):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
373 def __init__(self, worker_count):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
374 self.reports = [None] * worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
375 self._count = worker_count
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
376 self._received = 0
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
377 self._event = threading.Event()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
378
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
379 def wait(self, timeout=None):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
380 return self._event.wait(timeout)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
381
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
382 def _handle(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
383 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
384 if wid < 0 or wid > self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
385 logger.error("Ignoring report from unknown worker %d." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
386 return
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
387
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
388 self._received += 1
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
389 self.reports[wid] = data
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
390
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
391 if self._received == self._count:
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
392 self._event.set()
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
393
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
394 def _handleError(self, res):
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
395 wid, data = res
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
396 logger.error("Worker %d failed to send its report." % wid)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
397 logger.exception(data)
aefe70229fdd bake: Commonize worker pool code between html and asset baking.
Ludovic Chabant <ludovic@chabant.com>
parents:
diff changeset
398
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
399
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
400 class FastQueue(object):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
401 def __init__(self):
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
402 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
403 self._rlock = multiprocessing.Lock()
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
404 self._wlock = multiprocessing.Lock()
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
405 self._initBuffers()
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
406
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
407 def _initBuffers(self):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
408 self._rbuf = io.BytesIO()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
409 self._rbuf.truncate(256)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
410 self._wbuf = io.BytesIO()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
411 self._wbuf.truncate(256)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
412
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
413 def __getstate__(self):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
414 return (self._reader, self._writer, self._rlock, self._wlock)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
415
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
416 def __setstate__(self, state):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
417 (self._reader, self._writer, self._rlock, self._wlock) = state
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
418 self._initBuffers()
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
419
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
420 def get(self):
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
421 with self._rlock:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
422 try:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
423 with self._rbuf.getbuffer() as b:
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
424 bufsize = self._reader.recv_bytes_into(b)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
425 except multiprocessing.BufferTooShort as e:
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
426 bufsize = len(e.args[0])
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
427 self._rbuf.truncate(bufsize * 2)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
428 self._rbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
429 self._rbuf.write(e.args[0])
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
430
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
431 self._rbuf.seek(0)
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
432 return self._unpickle(self._rbuf, bufsize)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
433
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
434 def put(self, obj):
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
435 self._wbuf.seek(0)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
436 self._pickle(obj, self._wbuf)
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
437 size = self._wbuf.tell()
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
438
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
439 self._wbuf.seek(0)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
440 with self._wlock:
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
441 with self._wbuf.getbuffer() as b:
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
442 self._writer.send_bytes(b, 0, size)
461
b015e38d4ee1 internal: Handle data serialization more under the hood.
Ludovic Chabant <ludovic@chabant.com>
parents: 460
diff changeset
443
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
444 def _pickle(self, obj, buf):
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
445 fastpickle.pickle_intob(obj, buf)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
446
697
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
447 def _unpickle(self, buf, bufsize):
9e5393fcfab2 bake: Re-enable faster serialization between processes.
Ludovic Chabant <ludovic@chabant.com>
parents: 693
diff changeset
448 return fastpickle.unpickle_fromb(buf, bufsize)
691
9ae9390192da bake: Use standard pickle and queue for now to fix some small issues.
Ludovic Chabant <ludovic@chabant.com>
parents: 687
diff changeset
449