comparison piecrust/workerpool.py @ 991:1857dbd4580f

bake: Fix bugs introduced by bake optimizations, of course. - Make the execution stats JSON-serializable. - Re-add ability to differentiate between sources used during segment rendering and during layout rendering. Fixes problems with cache invalidation of pages that use other sources. - Make taxonomy-related stuff JSON-serializable.
author Ludovic Chabant <ludovic@chabant.com>
date Mon, 20 Nov 2017 23:06:47 -0800
parents 8adc27285d93
children 09c3d415d9e5
comparison
equal deleted inserted replaced
990:22cf13b86cc3 991:1857dbd4580f
9 from piecrust.environment import ExecutionStats 9 from piecrust.environment import ExecutionStats
10 10
11 11
12 logger = logging.getLogger(__name__) 12 logger = logging.getLogger(__name__)
13 13
14 use_fastqueue = True 14 use_fastqueue = False
15 use_fastpickle = False 15 use_fastpickle = False
16 use_msgpack = False 16 use_msgpack = False
17 use_marshall = False 17 use_marshall = False
18 use_json = False 18 use_json = False
19 19
200 logger.debug("Worker %d got end task, exiting." % wid) 200 logger.debug("Worker %d got end task, exiting." % wid)
201 stats.registerTimer('WorkerTaskGet', time=time_in_get) 201 stats.registerTimer('WorkerTaskGet', time=time_in_get)
202 stats.registerTimer('WorkerResultPut', time=time_in_put) 202 stats.registerTimer('WorkerResultPut', time=time_in_put)
203 try: 203 try:
204 stats.mergeStats(w.getStats()) 204 stats.mergeStats(w.getStats())
205 rep = (task_type, wid, [(task_data, (wid, stats), True)]) 205 stats_data = stats.toData()
206 rep = (task_type, wid, [(task_data, (wid, stats_data), True)])
206 except Exception as e: 207 except Exception as e:
207 logger.debug( 208 logger.debug(
208 "Error getting report, sending exception to main process:") 209 "Error getting report, sending exception to main process:")
209 logger.debug(traceback.format_exc()) 210 logger.debug(traceback.format_exc())
210 we = _get_worker_exception_data(wid) 211 we = _get_worker_exception_data(wid)
437 class _ReportHandler: 438 class _ReportHandler:
438 def __init__(self, worker_count): 439 def __init__(self, worker_count):
439 self.reports = [None] * worker_count 440 self.reports = [None] * worker_count
440 self._count = worker_count 441 self._count = worker_count
441 self._received = 0 442 self._received = 0
443 self._lock = threading.Lock()
442 self._event = threading.Event() 444 self._event = threading.Event()
443 445
444 def wait(self, timeout=None): 446 def wait(self, timeout=None):
445 return self._event.wait(timeout) 447 return self._event.wait(timeout)
446 448
448 wid, data = res 450 wid, data = res
449 if wid < 0 or wid > self._count: 451 if wid < 0 or wid > self._count:
450 logger.error("Ignoring report from unknown worker %d." % wid) 452 logger.error("Ignoring report from unknown worker %d." % wid)
451 return 453 return
452 454
453 self._received += 1 455 stats = ExecutionStats()
454 self.reports[wid] = data 456 stats.fromData(data)
455 457
456 if self._received == self._count: 458 with self._lock:
457 self._event.set() 459 self.reports[wid] = stats
460 self._received += 1
461 if self._received == self._count:
462 self._event.set()
458 463
459 def _handleError(self, job, res, _): 464 def _handleError(self, job, res, _):
460 logger.error("Worker %d failed to send its report." % res.wid) 465 logger.error("Worker %d failed to send its report." % res.wid)
461 logger.error(res) 466 logger.error(res)
462 467
465 def __init__(self): 470 def __init__(self):
466 self._reader, self._writer = multiprocessing.Pipe(duplex=False) 471 self._reader, self._writer = multiprocessing.Pipe(duplex=False)
467 self._rlock = multiprocessing.Lock() 472 self._rlock = multiprocessing.Lock()
468 self._wlock = multiprocessing.Lock() 473 self._wlock = multiprocessing.Lock()
469 self._initBuffers() 474 self._initBuffers()
475 self._initSerializer()
470 476
471 def _initBuffers(self): 477 def _initBuffers(self):
472 self._rbuf = io.BytesIO() 478 self._rbuf = io.BytesIO()
473 self._rbuf.truncate(256) 479 self._rbuf.truncate(256)
474 self._wbuf = io.BytesIO() 480 self._wbuf = io.BytesIO()
475 self._wbuf.truncate(256) 481 self._wbuf.truncate(256)
476 482
483 def _initSerializer(self):
484 pass
485
477 def __getstate__(self): 486 def __getstate__(self):
478 return (self._reader, self._writer, self._rlock, self._wlock) 487 return (self._reader, self._writer, self._rlock, self._wlock)
479 488
480 def __setstate__(self, state): 489 def __setstate__(self, state):
481 (self._reader, self._writer, self._rlock, self._wlock) = state 490 (self._reader, self._writer, self._rlock, self._wlock) = state
482 self._initBuffers() 491 self._initBuffers()
483 492
484 def get(self): 493 def get(self):
485 with self._rlock: 494 with self._rlock:
495 self._rbuf.seek(0)
486 try: 496 try:
487 with self._rbuf.getbuffer() as b: 497 with self._rbuf.getbuffer() as b:
488 bufsize = self._reader.recv_bytes_into(b) 498 bufsize = self._reader.recv_bytes_into(b)
489 except multiprocessing.BufferTooShort as e: 499 except multiprocessing.BufferTooShort as e:
490 bufsize = len(e.args[0]) 500 bufsize = len(e.args[0])
491 self._rbuf.truncate(bufsize * 2) 501 self._rbuf.truncate(bufsize * 2)
492 self._rbuf.seek(0) 502 self._rbuf.seek(0)
493 self._rbuf.write(e.args[0]) 503 self._rbuf.write(e.args[0])
494 504
495 self._rbuf.seek(0) 505 self._rbuf.seek(0)
496 return _unpickle(self._rbuf, bufsize) 506 return _unpickle(self, self._rbuf, bufsize)
497 507
498 def put(self, obj): 508 def put(self, obj):
499 self._wbuf.seek(0) 509 self._wbuf.seek(0)
500 _pickle(obj, self._wbuf) 510 _pickle(self, obj, self._wbuf)
501 size = self._wbuf.tell() 511 size = self._wbuf.tell()
502 512
503 self._wbuf.seek(0) 513 self._wbuf.seek(0)
504 with self._wlock: 514 with self._wlock:
505 with self._wbuf.getbuffer() as b: 515 with self._wbuf.getbuffer() as b:
506 self._writer.send_bytes(b, 0, size) 516 self._writer.send_bytes(b, 0, size)
507 517
508 518
519 class _BufferWrapper:
520 def __init__(self, buf, read_size=0):
521 self._buf = buf
522 self._read_size = read_size
523
524 def write(self, data):
525 self._buf.write(data.encode('utf8'))
526
527 def read(self):
528 return self._buf.read(self._read_size).decode('utf8')
529
530
509 if use_fastpickle: 531 if use_fastpickle:
510 from piecrust import fastpickle 532 from piecrust import fastpickle
511 533
512 def _pickle_fast(obj, buf): 534 def _pickle_fast(queue, obj, buf):
513 fastpickle.pickle_intob(obj, buf) 535 fastpickle.pickle_intob(obj, buf)
514 536
515 def _unpickle_fast(buf, bufsize): 537 def _unpickle_fast(queue, buf, bufsize):
516 return fastpickle.unpickle_fromb(buf, bufsize) 538 return fastpickle.unpickle_fromb(buf, bufsize)
517 539
518 _pickle = _pickle_fast 540 _pickle = _pickle_fast
519 _unpickle = _unpickle_fast 541 _unpickle = _unpickle_fast
520 542
521 elif use_msgpack: 543 elif use_msgpack:
522 import msgpack 544 import msgpack
523 545
524 def _pickle_msgpack(obj, buf): 546 def _pickle_msgpack(queue, obj, buf):
525 msgpack.pack(obj, buf) 547 buf.write(queue._packer.pack(obj))
526 548
527 def _unpickle_msgpack(buf, bufsize): 549 def _unpickle_msgpack(queue, buf, bufsize):
528 return msgpack.unpack(buf) 550 queue._unpacker.feed(buf.getbuffer())
551 for o in queue._unpacker:
552 return o
553 # return msgpack.unpack(buf)
554
555 def _init_msgpack(queue):
556 queue._packer = msgpack.Packer()
557 queue._unpacker = msgpack.Unpacker()
529 558
530 _pickle = _pickle_msgpack 559 _pickle = _pickle_msgpack
531 _unpickle = _unpickle_msgpack 560 _unpickle = _unpickle_msgpack
561 FastQueue._initSerializer = _init_msgpack
532 562
533 elif use_marshall: 563 elif use_marshall:
534 import marshal 564 import marshal
535 565
536 def _pickle_marshal(obj, buf): 566 def _pickle_marshal(queue, obj, buf):
537 marshal.dump(obj, buf) 567 marshal.dump(obj, buf)
538 568
539 def _unpickle_marshal(buf, bufsize): 569 def _unpickle_marshal(queue, buf, bufsize):
540 return marshal.load(buf) 570 return marshal.load(buf)
541 571
542 _pickle = _pickle_marshal 572 _pickle = _pickle_marshal
543 _unpickle = _unpickle_marshal 573 _unpickle = _unpickle_marshal
544 574
545 elif use_json: 575 elif use_json:
546 import json 576 import json
547 577
548 class _BufferWrapper: 578 def _pickle_json(queue, obj, buf):
549 def __init__(self, buf):
550 self._buf = buf
551
552 def write(self, data):
553 self._buf.write(data.encode('utf8'))
554
555 def read(self):
556 return self._buf.read().decode('utf8')
557
558 def _pickle_json(obj, buf):
559 buf = _BufferWrapper(buf) 579 buf = _BufferWrapper(buf)
560 json.dump(obj, buf, indent=None, separators=(',', ':')) 580 json.dump(obj, buf, indent=None, separators=(',', ':'))
561 581
562 def _unpickle_json(buf, bufsize): 582 def _unpickle_json(queue, buf, bufsize):
563 buf = _BufferWrapper(buf) 583 buf = _BufferWrapper(buf, bufsize)
564 return json.load(buf) 584 return json.load(buf)
565 585
566 _pickle = _pickle_json 586 _pickle = _pickle_json
567 _unpickle = _unpickle_json 587 _unpickle = _unpickle_json
568 588
569 else: 589 else:
570 import pickle 590 import pickle
571 591
572 def _pickle_default(obj, buf): 592 def _pickle_default(queue, obj, buf):
573 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) 593 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL)
574 594
575 def _unpickle_default(buf, bufsize): 595 def _unpickle_default(queue, buf, bufsize):
576 return pickle.load(buf) 596 return pickle.load(buf)
577 597
578 _pickle = _pickle_default 598 _pickle = _pickle_default
579 _unpickle = _unpickle_default 599 _unpickle = _unpickle_default