Mercurial > piecrust2
comparison piecrust/workerpool.py @ 852:4850f8c21b6e
core: Start of the big refactor for PieCrust 3.0.
* Everything is a `ContentSource`, including assets directories.
* Most content sources are subclasses of the base file-system source.
* A source is processed by a "pipeline", and there are 2 built-in pipelines,
one for assets and one for pages. The asset pipeline is vaguely functional,
but the page pipeline is completely broken right now.
* Rewrite the baking process as just running appropriate pipelines on each
content item. This should allow for better parallelization.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Wed, 17 May 2017 00:11:48 -0700 |
parents | c62d83e17abf |
children | 08e02c2a2a1a |
comparison
equal
deleted
inserted
replaced
851:2c7e57d80bba | 852:4850f8c21b6e |
---|---|
1 import io | 1 import io |
2 import os | 2 import os |
3 import sys | 3 import sys |
4 import time | 4 import time |
5 import zlib | 5 import pickle |
6 import queue | |
7 import logging | 6 import logging |
8 import itertools | |
9 import threading | 7 import threading |
8 import traceback | |
10 import multiprocessing | 9 import multiprocessing |
11 from piecrust import fastpickle | 10 from piecrust import fastpickle |
11 from piecrust.environment import ExecutionStats | |
12 | 12 |
13 | 13 |
14 logger = logging.getLogger(__name__) | 14 logger = logging.getLogger(__name__) |
15 | 15 |
16 use_fastqueue = True | 16 use_fastqueue = True |
17 use_fastpickle = False | |
17 | 18 |
18 | 19 |
19 class IWorker(object): | 20 class IWorker(object): |
21 """ Interface for a pool worker. | |
22 """ | |
20 def initialize(self): | 23 def initialize(self): |
21 raise NotImplementedError() | 24 raise NotImplementedError() |
22 | 25 |
23 def process(self, job): | 26 def process(self, job): |
24 raise NotImplementedError() | 27 raise NotImplementedError() |
25 | 28 |
26 def getReport(self, pool_reports): | 29 def getStats(self): |
27 return None | 30 return None |
28 | 31 |
29 def shutdown(self): | 32 def shutdown(self): |
30 pass | 33 pass |
31 | 34 |
32 | 35 |
36 class WorkerExceptionData: | |
37 def __init__(self, wid): | |
38 super().__init__() | |
39 self.wid = wid | |
40 t, v, tb = sys.exc_info() | |
41 self.type = t | |
42 self.value = '\n'.join(_get_errors(v)) | |
43 self.traceback = ''.join(traceback.format_exception(t, v, tb)) | |
44 | |
45 def __str__(self): | |
46 return str(self.value) | |
47 | |
48 | |
49 def _get_errors(ex): | |
50 errors = [] | |
51 while ex is not None: | |
52 msg = str(ex) | |
53 errors.append(msg) | |
54 ex = ex.__cause__ | |
55 return errors | |
56 | |
57 | |
33 TASK_JOB = 0 | 58 TASK_JOB = 0 |
34 TASK_BATCH = 1 | 59 TASK_END = 1 |
35 TASK_END = 2 | |
36 | 60 |
37 | 61 |
38 def worker_func(params): | 62 def worker_func(params): |
39 if params.is_profiling: | 63 if params.is_profiling: |
40 try: | 64 try: |
50 else: | 74 else: |
51 _real_worker_func(params) | 75 _real_worker_func(params) |
52 | 76 |
53 | 77 |
54 def _real_worker_func(params): | 78 def _real_worker_func(params): |
79 wid = params.wid | |
80 | |
81 stats = ExecutionStats() | |
82 stats.registerTimer('WorkerInit') | |
83 init_start_time = time.perf_counter() | |
84 | |
55 # In a context where `multiprocessing` is using the `spawn` forking model, | 85 # In a context where `multiprocessing` is using the `spawn` forking model, |
56 # the new process doesn't inherit anything, so we lost all our logging | 86 # the new process doesn't inherit anything, so we lost all our logging |
57 # configuration here. Let's set it up again. | 87 # configuration here. Let's set it up again. |
58 if (hasattr(multiprocessing, 'get_start_method') and | 88 if (hasattr(multiprocessing, 'get_start_method') and |
59 multiprocessing.get_start_method() == 'spawn'): | 89 multiprocessing.get_start_method() == 'spawn'): |
60 from piecrust.main import _pre_parse_chef_args | 90 from piecrust.main import _pre_parse_chef_args |
61 _pre_parse_chef_args(sys.argv[1:]) | 91 _pre_parse_chef_args(sys.argv[1:]) |
62 | 92 |
63 wid = params.wid | |
64 logger.debug("Worker %d initializing..." % wid) | 93 logger.debug("Worker %d initializing..." % wid) |
65 | 94 |
66 # We don't need those. | 95 # We don't need those. |
67 params.inqueue._writer.close() | 96 params.inqueue._writer.close() |
68 params.outqueue._reader.close() | 97 params.outqueue._reader.close() |
76 logger.error("Working failed to initialize:") | 105 logger.error("Working failed to initialize:") |
77 logger.exception(ex) | 106 logger.exception(ex) |
78 params.outqueue.put(None) | 107 params.outqueue.put(None) |
79 return | 108 return |
80 | 109 |
81 use_threads = False | 110 stats.stepTimerSince('WorkerInit', init_start_time) |
82 if use_threads: | |
83 # Create threads to read/write the jobs and results from/to the | |
84 # main arbitrator process. | |
85 local_job_queue = queue.Queue() | |
86 reader_thread = threading.Thread( | |
87 target=_job_queue_reader, | |
88 args=(params.inqueue.get, local_job_queue), | |
89 name="JobQueueReaderThread") | |
90 reader_thread.start() | |
91 | |
92 local_result_queue = queue.Queue() | |
93 writer_thread = threading.Thread( | |
94 target=_job_results_writer, | |
95 args=(local_result_queue, params.outqueue.put), | |
96 name="JobResultWriterThread") | |
97 writer_thread.start() | |
98 | |
99 get = local_job_queue.get | |
100 put = local_result_queue.put_nowait | |
101 else: | |
102 get = params.inqueue.get | |
103 put = params.outqueue.put | |
104 | 111 |
105 # Start pumping! | 112 # Start pumping! |
106 completed = 0 | 113 completed = 0 |
107 time_in_get = 0 | 114 time_in_get = 0 |
108 time_in_put = 0 | 115 time_in_put = 0 |
116 get = params.inqueue.get | |
117 put = params.outqueue.put | |
118 | |
109 while True: | 119 while True: |
110 get_start_time = time.perf_counter() | 120 get_start_time = time.perf_counter() |
111 task = get() | 121 task = get() |
112 time_in_get += (time.perf_counter() - get_start_time) | 122 time_in_get += (time.perf_counter() - get_start_time) |
113 | 123 |
114 task_type, task_data = task | 124 task_type, task_data = task |
125 | |
126 # End task... gather stats to send back to the main process. | |
115 if task_type == TASK_END: | 127 if task_type == TASK_END: |
116 logger.debug("Worker %d got end task, exiting." % wid) | 128 logger.debug("Worker %d got end task, exiting." % wid) |
117 wprep = { | 129 stats.registerTimer('WorkerTaskGet', time=time_in_get) |
118 'WorkerTaskGet': time_in_get, | 130 stats.registerTimer('WorkerResultPut', time=time_in_put) |
119 'WorkerResultPut': time_in_put} | 131 try: |
120 try: | 132 stats.mergeStats(w.getStats()) |
121 rep = (task_type, True, wid, (wid, w.getReport(wprep))) | 133 rep = (task_type, task_data, True, wid, (wid, stats)) |
122 except Exception as e: | 134 except Exception as e: |
123 logger.debug("Error getting report: %s" % e) | 135 logger.debug( |
124 if params.wrap_exception: | 136 "Error getting report, sending exception to main process:") |
125 e = multiprocessing.ExceptionWithTraceback( | 137 logger.debug(traceback.format_exc()) |
126 e, e.__traceback__) | 138 we = WorkerExceptionData(wid) |
127 rep = (task_type, False, wid, (wid, e)) | 139 rep = (task_type, task_data, False, wid, (wid, we)) |
128 put(rep) | 140 put(rep) |
129 break | 141 break |
130 | 142 |
131 if task_type == TASK_JOB: | 143 # Job task... just do it. |
132 task_data = (task_data,) | 144 elif task_type == TASK_JOB: |
133 | 145 try: |
134 for t in task_data: | 146 res = (task_type, task_data, True, wid, w.process(task_data)) |
135 try: | |
136 res = (TASK_JOB, True, wid, w.process(t)) | |
137 except Exception as e: | 147 except Exception as e: |
138 if params.wrap_exception: | 148 logger.debug( |
139 e = multiprocessing.ExceptionWithTraceback( | 149 "Error processing job, sending exception to main process:") |
140 e, e.__traceback__) | 150 logger.debug(traceback.format_exc()) |
141 res = (TASK_JOB, False, wid, e) | 151 we = WorkerExceptionData(wid) |
152 res = (task_type, task_data, False, wid, we) | |
142 | 153 |
143 put_start_time = time.perf_counter() | 154 put_start_time = time.perf_counter() |
144 put(res) | 155 put(res) |
145 time_in_put += (time.perf_counter() - put_start_time) | 156 time_in_put += (time.perf_counter() - put_start_time) |
146 | 157 |
147 completed += 1 | 158 completed += 1 |
148 | 159 |
149 if use_threads: | 160 else: |
150 logger.debug("Worker %d waiting for reader/writer threads." % wid) | 161 raise Exception("Unknown task type: %s" % task_type) |
151 local_result_queue.put_nowait(None) | |
152 reader_thread.join() | |
153 writer_thread.join() | |
154 | 162 |
155 w.shutdown() | 163 w.shutdown() |
156 | |
157 logger.debug("Worker %d completed %d tasks." % (wid, completed)) | 164 logger.debug("Worker %d completed %d tasks." % (wid, completed)) |
158 | 165 |
159 | 166 |
160 def _job_queue_reader(getter, out_queue): | 167 class _WorkerParams: |
161 while True: | |
162 try: | |
163 task = getter() | |
164 except (EOFError, OSError): | |
165 logger.debug("Worker encountered connection problem.") | |
166 break | |
167 | |
168 out_queue.put_nowait(task) | |
169 | |
170 if task[0] == TASK_END: | |
171 # Done reading jobs from the main process. | |
172 logger.debug("Got end task, exiting task queue reader thread.") | |
173 break | |
174 | |
175 | |
176 def _job_results_writer(in_queue, putter): | |
177 while True: | |
178 res = in_queue.get() | |
179 if res is not None: | |
180 putter(res) | |
181 in_queue.task_done() | |
182 else: | |
183 # Got sentinel. Exit. | |
184 in_queue.task_done() | |
185 break | |
186 logger.debug("Exiting result queue writer thread.") | |
187 | |
188 | |
189 class _WorkerParams(object): | |
190 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), | 168 def __init__(self, wid, inqueue, outqueue, worker_class, initargs=(), |
191 wrap_exception=False, is_profiling=False): | 169 is_profiling=False): |
192 self.wid = wid | 170 self.wid = wid |
193 self.inqueue = inqueue | 171 self.inqueue = inqueue |
194 self.outqueue = outqueue | 172 self.outqueue = outqueue |
195 self.worker_class = worker_class | 173 self.worker_class = worker_class |
196 self.initargs = initargs | 174 self.initargs = initargs |
197 self.wrap_exception = wrap_exception | |
198 self.is_profiling = is_profiling | 175 self.is_profiling = is_profiling |
199 | 176 |
200 | 177 |
201 class WorkerPool(object): | 178 class WorkerPool: |
202 def __init__(self, worker_class, initargs=(), | 179 def __init__(self, worker_class, initargs=(), *, |
203 worker_count=None, batch_size=None, | 180 callback=None, error_callback=None, |
204 wrap_exception=False): | 181 worker_count=None, batch_size=None): |
205 worker_count = worker_count or os.cpu_count() or 1 | 182 worker_count = worker_count or os.cpu_count() or 1 |
206 | 183 |
207 if use_fastqueue: | 184 if use_fastqueue: |
208 self._task_queue = FastQueue() | 185 self._task_queue = FastQueue() |
209 self._result_queue = FastQueue() | 186 self._result_queue = FastQueue() |
213 self._task_queue = multiprocessing.SimpleQueue() | 190 self._task_queue = multiprocessing.SimpleQueue() |
214 self._result_queue = multiprocessing.SimpleQueue() | 191 self._result_queue = multiprocessing.SimpleQueue() |
215 self._quick_put = self._task_queue._writer.send | 192 self._quick_put = self._task_queue._writer.send |
216 self._quick_get = self._result_queue._reader.recv | 193 self._quick_get = self._result_queue._reader.recv |
217 | 194 |
195 self._callback = callback | |
196 self._error_callback = error_callback | |
218 self._batch_size = batch_size | 197 self._batch_size = batch_size |
219 self._callback = None | 198 self._jobs_left = 0 |
220 self._error_callback = None | 199 self._event = threading.Event() |
221 self._listener = None | |
222 | 200 |
223 main_module = sys.modules['__main__'] | 201 main_module = sys.modules['__main__'] |
224 is_profiling = os.path.basename(main_module.__file__) in [ | 202 is_profiling = os.path.basename(main_module.__file__) in [ |
225 'profile.py', 'cProfile.py'] | 203 'profile.py', 'cProfile.py'] |
226 | 204 |
227 self._pool = [] | 205 self._pool = [] |
228 for i in range(worker_count): | 206 for i in range(worker_count): |
229 worker_params = _WorkerParams( | 207 worker_params = _WorkerParams( |
230 i, self._task_queue, self._result_queue, | 208 i, self._task_queue, self._result_queue, |
231 worker_class, initargs, | 209 worker_class, initargs, |
232 wrap_exception=wrap_exception, | 210 is_profiling=is_profiling) |
233 is_profiling=is_profiling) | |
234 w = multiprocessing.Process(target=worker_func, | 211 w = multiprocessing.Process(target=worker_func, |
235 args=(worker_params,)) | 212 args=(worker_params,)) |
236 w.name = w.name.replace('Process', 'PoolWorker') | 213 w.name = w.name.replace('Process', 'PoolWorker') |
237 w.daemon = True | 214 w.daemon = True |
238 w.start() | 215 w.start() |
239 self._pool.append(w) | 216 self._pool.append(w) |
240 | 217 |
241 self._result_handler = threading.Thread( | 218 self._result_handler = threading.Thread( |
242 target=WorkerPool._handleResults, | 219 target=WorkerPool._handleResults, |
243 args=(self,)) | 220 args=(self,)) |
244 self._result_handler.daemon = True | 221 self._result_handler.daemon = True |
245 self._result_handler.start() | 222 self._result_handler.start() |
246 | 223 |
247 self._closed = False | 224 self._closed = False |
248 | 225 |
249 def setHandler(self, callback=None, error_callback=None): | 226 def queueJobs(self, jobs): |
250 self._callback = callback | |
251 self._error_callback = error_callback | |
252 | |
253 def queueJobs(self, jobs, handler=None, chunk_size=None): | |
254 if self._closed: | 227 if self._closed: |
255 raise Exception("This worker pool has been closed.") | 228 raise Exception("This worker pool has been closed.") |
256 if self._listener is not None: | 229 |
257 raise Exception("A previous job queue has not finished yet.") | 230 for job in jobs: |
258 | 231 self._jobs_left += 1 |
259 if any([not p.is_alive() for p in self._pool]): | 232 self._quick_put((TASK_JOB, job)) |
260 raise Exception("Some workers have prematurely exited.") | 233 |
261 | 234 if self._jobs_left > 0: |
262 if handler is not None: | 235 self._event.clear() |
263 self.setHandler(handler) | 236 |
264 | 237 def wait(self, timeout=None): |
265 if not hasattr(jobs, '__len__'): | 238 return self._event.wait(timeout) |
266 jobs = list(jobs) | |
267 job_count = len(jobs) | |
268 | |
269 res = AsyncResult(self, job_count) | |
270 if res._count == 0: | |
271 res._event.set() | |
272 return res | |
273 | |
274 self._listener = res | |
275 | |
276 if chunk_size is None: | |
277 chunk_size = self._batch_size | |
278 if chunk_size is None: | |
279 chunk_size = max(1, job_count // 50) | |
280 logger.debug("Using chunk size of %d" % chunk_size) | |
281 | |
282 if chunk_size is None or chunk_size == 1: | |
283 for job in jobs: | |
284 self._quick_put((TASK_JOB, job)) | |
285 else: | |
286 it = iter(jobs) | |
287 while True: | |
288 batch = tuple([i for i in itertools.islice(it, chunk_size)]) | |
289 if not batch: | |
290 break | |
291 self._quick_put((TASK_BATCH, batch)) | |
292 | |
293 return res | |
294 | 239 |
295 def close(self): | 240 def close(self): |
296 if self._listener is not None: | 241 if self._jobs_left > 0 or not self._event.is_set(): |
297 raise Exception("A previous job queue has not finished yet.") | 242 raise Exception("A previous job queue has not finished yet.") |
298 | 243 |
299 logger.debug("Closing worker pool...") | 244 logger.debug("Closing worker pool...") |
300 handler = _ReportHandler(len(self._pool)) | 245 handler = _ReportHandler(len(self._pool)) |
301 self._callback = handler._handle | 246 self._callback = handler._handle |
247 self._error_callback = handler._handleError | |
302 for w in self._pool: | 248 for w in self._pool: |
303 self._quick_put((TASK_END, None)) | 249 self._quick_put((TASK_END, None)) |
304 for w in self._pool: | 250 for w in self._pool: |
305 w.join() | 251 w.join() |
306 | 252 |
307 logger.debug("Waiting for reports...") | 253 logger.debug("Waiting for reports...") |
308 if not handler.wait(2): | 254 if not handler.wait(2): |
309 missing = handler.reports.index(None) | 255 missing = handler.reports.index(None) |
310 logger.warning( | 256 logger.warning( |
311 "Didn't receive all worker reports before timeout. " | 257 "Didn't receive all worker reports before timeout. " |
312 "Missing report from worker %d." % missing) | 258 "Missing report from worker %d." % missing) |
313 | 259 |
314 logger.debug("Exiting result handler thread...") | 260 logger.debug("Exiting result handler thread...") |
315 self._result_queue.put(None) | 261 self._result_queue.put(None) |
316 self._result_handler.join() | 262 self._result_handler.join() |
317 self._closed = True | 263 self._closed = True |
318 | 264 |
319 return handler.reports | 265 return handler.reports |
266 | |
267 def _onTaskDone(self): | |
268 self._jobs_left -= 1 | |
269 if self._jobs_left == 0: | |
270 self._event.set() | |
320 | 271 |
321 @staticmethod | 272 @staticmethod |
322 def _handleResults(pool): | 273 def _handleResults(pool): |
323 while True: | 274 while True: |
324 try: | 275 try: |
330 | 281 |
331 if res is None: | 282 if res is None: |
332 logger.debug("Result handler exiting.") | 283 logger.debug("Result handler exiting.") |
333 break | 284 break |
334 | 285 |
335 task_type, success, wid, data = res | 286 task_type, task_data, success, wid, data = res |
336 try: | 287 try: |
337 if success and pool._callback: | 288 if success: |
338 pool._callback(data) | 289 if pool._callback: |
339 elif not success: | 290 pool._callback(task_data, data) |
291 else: | |
340 if pool._error_callback: | 292 if pool._error_callback: |
341 pool._error_callback(data) | 293 pool._error_callback(task_data, data) |
342 else: | 294 else: |
343 logger.error("Got error data:") | 295 logger.error( |
296 "Worker %d failed to process a job:" % wid) | |
344 logger.error(data) | 297 logger.error(data) |
345 except Exception as ex: | 298 except Exception as ex: |
346 logger.exception(ex) | 299 logger.exception(ex) |
347 | 300 |
348 if task_type == TASK_JOB: | 301 if task_type == TASK_JOB: |
349 pool._listener._onTaskDone() | 302 pool._onTaskDone() |
350 | 303 |
351 | 304 |
352 class AsyncResult(object): | 305 class _ReportHandler: |
353 def __init__(self, pool, count): | |
354 self._pool = pool | |
355 self._count = count | |
356 self._event = threading.Event() | |
357 | |
358 def ready(self): | |
359 return self._event.is_set() | |
360 | |
361 def wait(self, timeout=None): | |
362 return self._event.wait(timeout) | |
363 | |
364 def _onTaskDone(self): | |
365 self._count -= 1 | |
366 if self._count == 0: | |
367 self._pool.setHandler(None) | |
368 self._pool._listener = None | |
369 self._event.set() | |
370 | |
371 | |
372 class _ReportHandler(object): | |
373 def __init__(self, worker_count): | 306 def __init__(self, worker_count): |
374 self.reports = [None] * worker_count | 307 self.reports = [None] * worker_count |
375 self._count = worker_count | 308 self._count = worker_count |
376 self._received = 0 | 309 self._received = 0 |
377 self._event = threading.Event() | 310 self._event = threading.Event() |
378 | 311 |
379 def wait(self, timeout=None): | 312 def wait(self, timeout=None): |
380 return self._event.wait(timeout) | 313 return self._event.wait(timeout) |
381 | 314 |
382 def _handle(self, res): | 315 def _handle(self, job, res): |
383 wid, data = res | 316 wid, data = res |
384 if wid < 0 or wid > self._count: | 317 if wid < 0 or wid > self._count: |
385 logger.error("Ignoring report from unknown worker %d." % wid) | 318 logger.error("Ignoring report from unknown worker %d." % wid) |
386 return | 319 return |
387 | 320 |
389 self.reports[wid] = data | 322 self.reports[wid] = data |
390 | 323 |
391 if self._received == self._count: | 324 if self._received == self._count: |
392 self._event.set() | 325 self._event.set() |
393 | 326 |
394 def _handleError(self, res): | 327 def _handleError(self, job, res): |
395 wid, data = res | 328 logger.error("Worker %d failed to send its report." % res.wid) |
396 logger.error("Worker %d failed to send its report." % wid) | 329 logger.error(res) |
397 logger.exception(data) | 330 |
398 | 331 |
399 | 332 class FastQueue: |
400 class FastQueue(object): | |
401 def __init__(self): | 333 def __init__(self): |
402 self._reader, self._writer = multiprocessing.Pipe(duplex=False) | 334 self._reader, self._writer = multiprocessing.Pipe(duplex=False) |
403 self._rlock = multiprocessing.Lock() | 335 self._rlock = multiprocessing.Lock() |
404 self._wlock = multiprocessing.Lock() | 336 self._wlock = multiprocessing.Lock() |
405 self._initBuffers() | 337 self._initBuffers() |
427 self._rbuf.truncate(bufsize * 2) | 359 self._rbuf.truncate(bufsize * 2) |
428 self._rbuf.seek(0) | 360 self._rbuf.seek(0) |
429 self._rbuf.write(e.args[0]) | 361 self._rbuf.write(e.args[0]) |
430 | 362 |
431 self._rbuf.seek(0) | 363 self._rbuf.seek(0) |
432 return self._unpickle(self._rbuf, bufsize) | 364 return _unpickle(self._rbuf, bufsize) |
433 | 365 |
434 def put(self, obj): | 366 def put(self, obj): |
435 self._wbuf.seek(0) | 367 self._wbuf.seek(0) |
436 self._pickle(obj, self._wbuf) | 368 _pickle(obj, self._wbuf) |
437 size = self._wbuf.tell() | 369 size = self._wbuf.tell() |
438 | 370 |
439 self._wbuf.seek(0) | 371 self._wbuf.seek(0) |
440 with self._wlock: | 372 with self._wlock: |
441 with self._wbuf.getbuffer() as b: | 373 with self._wbuf.getbuffer() as b: |
442 self._writer.send_bytes(b, 0, size) | 374 self._writer.send_bytes(b, 0, size) |
443 | 375 |
444 def _pickle(self, obj, buf): | 376 |
445 fastpickle.pickle_intob(obj, buf) | 377 def _pickle_fast(obj, buf): |
446 | 378 fastpickle.pickle_intob(obj, buf) |
447 def _unpickle(self, buf, bufsize): | 379 |
448 return fastpickle.unpickle_fromb(buf, bufsize) | 380 |
449 | 381 def _unpickle_fast(buf, bufsize): |
382 return fastpickle.unpickle_fromb(buf, bufsize) | |
383 | |
384 | |
385 def _pickle_default(obj, buf): | |
386 pickle.dump(obj, buf) | |
387 | |
388 | |
389 def _unpickle_default(buf, bufsize): | |
390 return pickle.load(buf) | |
391 | |
392 | |
393 if use_fastpickle: | |
394 _pickle = _pickle_fast | |
395 _unpickle = _unpickle_fast | |
396 else: | |
397 _pickle = _pickle_default | |
398 _unpickle = _unpickle_default | |
399 |