Mercurial > piecrust2
comparison piecrust/workerpool.py @ 991:1857dbd4580f
bake: Fix bugs introduced by bake optimizations, of course.
- Make the execution stats JSON-serializable.
- Re-add ability to differentiate between sources used during segment rendering
and during layout rendering. Fixes problems with cache invalidation of
pages that use other sources.
- Make taxonomy-related stuff JSON-serializable.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Mon, 20 Nov 2017 23:06:47 -0800 |
parents | 8adc27285d93 |
children | 09c3d415d9e5 |
comparison
equal
deleted
inserted
replaced
990:22cf13b86cc3 | 991:1857dbd4580f |
---|---|
9 from piecrust.environment import ExecutionStats | 9 from piecrust.environment import ExecutionStats |
10 | 10 |
11 | 11 |
12 logger = logging.getLogger(__name__) | 12 logger = logging.getLogger(__name__) |
13 | 13 |
14 use_fastqueue = True | 14 use_fastqueue = False |
15 use_fastpickle = False | 15 use_fastpickle = False |
16 use_msgpack = False | 16 use_msgpack = False |
17 use_marshall = False | 17 use_marshall = False |
18 use_json = False | 18 use_json = False |
19 | 19 |
200 logger.debug("Worker %d got end task, exiting." % wid) | 200 logger.debug("Worker %d got end task, exiting." % wid) |
201 stats.registerTimer('WorkerTaskGet', time=time_in_get) | 201 stats.registerTimer('WorkerTaskGet', time=time_in_get) |
202 stats.registerTimer('WorkerResultPut', time=time_in_put) | 202 stats.registerTimer('WorkerResultPut', time=time_in_put) |
203 try: | 203 try: |
204 stats.mergeStats(w.getStats()) | 204 stats.mergeStats(w.getStats()) |
205 rep = (task_type, wid, [(task_data, (wid, stats), True)]) | 205 stats_data = stats.toData() |
206 rep = (task_type, wid, [(task_data, (wid, stats_data), True)]) | |
206 except Exception as e: | 207 except Exception as e: |
207 logger.debug( | 208 logger.debug( |
208 "Error getting report, sending exception to main process:") | 209 "Error getting report, sending exception to main process:") |
209 logger.debug(traceback.format_exc()) | 210 logger.debug(traceback.format_exc()) |
210 we = _get_worker_exception_data(wid) | 211 we = _get_worker_exception_data(wid) |
437 class _ReportHandler: | 438 class _ReportHandler: |
438 def __init__(self, worker_count): | 439 def __init__(self, worker_count): |
439 self.reports = [None] * worker_count | 440 self.reports = [None] * worker_count |
440 self._count = worker_count | 441 self._count = worker_count |
441 self._received = 0 | 442 self._received = 0 |
443 self._lock = threading.Lock() | |
442 self._event = threading.Event() | 444 self._event = threading.Event() |
443 | 445 |
444 def wait(self, timeout=None): | 446 def wait(self, timeout=None): |
445 return self._event.wait(timeout) | 447 return self._event.wait(timeout) |
446 | 448 |
448 wid, data = res | 450 wid, data = res |
449 if wid < 0 or wid > self._count: | 451 if wid < 0 or wid > self._count: |
450 logger.error("Ignoring report from unknown worker %d." % wid) | 452 logger.error("Ignoring report from unknown worker %d." % wid) |
451 return | 453 return |
452 | 454 |
453 self._received += 1 | 455 stats = ExecutionStats() |
454 self.reports[wid] = data | 456 stats.fromData(data) |
455 | 457 |
456 if self._received == self._count: | 458 with self._lock: |
457 self._event.set() | 459 self.reports[wid] = stats |
460 self._received += 1 | |
461 if self._received == self._count: | |
462 self._event.set() | |
458 | 463 |
459 def _handleError(self, job, res, _): | 464 def _handleError(self, job, res, _): |
460 logger.error("Worker %d failed to send its report." % res.wid) | 465 logger.error("Worker %d failed to send its report." % res.wid) |
461 logger.error(res) | 466 logger.error(res) |
462 | 467 |
465 def __init__(self): | 470 def __init__(self): |
466 self._reader, self._writer = multiprocessing.Pipe(duplex=False) | 471 self._reader, self._writer = multiprocessing.Pipe(duplex=False) |
467 self._rlock = multiprocessing.Lock() | 472 self._rlock = multiprocessing.Lock() |
468 self._wlock = multiprocessing.Lock() | 473 self._wlock = multiprocessing.Lock() |
469 self._initBuffers() | 474 self._initBuffers() |
475 self._initSerializer() | |
470 | 476 |
471 def _initBuffers(self): | 477 def _initBuffers(self): |
472 self._rbuf = io.BytesIO() | 478 self._rbuf = io.BytesIO() |
473 self._rbuf.truncate(256) | 479 self._rbuf.truncate(256) |
474 self._wbuf = io.BytesIO() | 480 self._wbuf = io.BytesIO() |
475 self._wbuf.truncate(256) | 481 self._wbuf.truncate(256) |
476 | 482 |
483 def _initSerializer(self): | |
484 pass | |
485 | |
477 def __getstate__(self): | 486 def __getstate__(self): |
478 return (self._reader, self._writer, self._rlock, self._wlock) | 487 return (self._reader, self._writer, self._rlock, self._wlock) |
479 | 488 |
480 def __setstate__(self, state): | 489 def __setstate__(self, state): |
481 (self._reader, self._writer, self._rlock, self._wlock) = state | 490 (self._reader, self._writer, self._rlock, self._wlock) = state |
482 self._initBuffers() | 491 self._initBuffers() |
483 | 492 |
484 def get(self): | 493 def get(self): |
485 with self._rlock: | 494 with self._rlock: |
495 self._rbuf.seek(0) | |
486 try: | 496 try: |
487 with self._rbuf.getbuffer() as b: | 497 with self._rbuf.getbuffer() as b: |
488 bufsize = self._reader.recv_bytes_into(b) | 498 bufsize = self._reader.recv_bytes_into(b) |
489 except multiprocessing.BufferTooShort as e: | 499 except multiprocessing.BufferTooShort as e: |
490 bufsize = len(e.args[0]) | 500 bufsize = len(e.args[0]) |
491 self._rbuf.truncate(bufsize * 2) | 501 self._rbuf.truncate(bufsize * 2) |
492 self._rbuf.seek(0) | 502 self._rbuf.seek(0) |
493 self._rbuf.write(e.args[0]) | 503 self._rbuf.write(e.args[0]) |
494 | 504 |
495 self._rbuf.seek(0) | 505 self._rbuf.seek(0) |
496 return _unpickle(self._rbuf, bufsize) | 506 return _unpickle(self, self._rbuf, bufsize) |
497 | 507 |
498 def put(self, obj): | 508 def put(self, obj): |
499 self._wbuf.seek(0) | 509 self._wbuf.seek(0) |
500 _pickle(obj, self._wbuf) | 510 _pickle(self, obj, self._wbuf) |
501 size = self._wbuf.tell() | 511 size = self._wbuf.tell() |
502 | 512 |
503 self._wbuf.seek(0) | 513 self._wbuf.seek(0) |
504 with self._wlock: | 514 with self._wlock: |
505 with self._wbuf.getbuffer() as b: | 515 with self._wbuf.getbuffer() as b: |
506 self._writer.send_bytes(b, 0, size) | 516 self._writer.send_bytes(b, 0, size) |
507 | 517 |
508 | 518 |
519 class _BufferWrapper: | |
520 def __init__(self, buf, read_size=0): | |
521 self._buf = buf | |
522 self._read_size = read_size | |
523 | |
524 def write(self, data): | |
525 self._buf.write(data.encode('utf8')) | |
526 | |
527 def read(self): | |
528 return self._buf.read(self._read_size).decode('utf8') | |
529 | |
530 | |
509 if use_fastpickle: | 531 if use_fastpickle: |
510 from piecrust import fastpickle | 532 from piecrust import fastpickle |
511 | 533 |
512 def _pickle_fast(obj, buf): | 534 def _pickle_fast(queue, obj, buf): |
513 fastpickle.pickle_intob(obj, buf) | 535 fastpickle.pickle_intob(obj, buf) |
514 | 536 |
515 def _unpickle_fast(buf, bufsize): | 537 def _unpickle_fast(queue, buf, bufsize): |
516 return fastpickle.unpickle_fromb(buf, bufsize) | 538 return fastpickle.unpickle_fromb(buf, bufsize) |
517 | 539 |
518 _pickle = _pickle_fast | 540 _pickle = _pickle_fast |
519 _unpickle = _unpickle_fast | 541 _unpickle = _unpickle_fast |
520 | 542 |
521 elif use_msgpack: | 543 elif use_msgpack: |
522 import msgpack | 544 import msgpack |
523 | 545 |
524 def _pickle_msgpack(obj, buf): | 546 def _pickle_msgpack(queue, obj, buf): |
525 msgpack.pack(obj, buf) | 547 buf.write(queue._packer.pack(obj)) |
526 | 548 |
527 def _unpickle_msgpack(buf, bufsize): | 549 def _unpickle_msgpack(queue, buf, bufsize): |
528 return msgpack.unpack(buf) | 550 queue._unpacker.feed(buf.getbuffer()) |
551 for o in queue._unpacker: | |
552 return o | |
553 # return msgpack.unpack(buf) | |
554 | |
555 def _init_msgpack(queue): | |
556 queue._packer = msgpack.Packer() | |
557 queue._unpacker = msgpack.Unpacker() | |
529 | 558 |
530 _pickle = _pickle_msgpack | 559 _pickle = _pickle_msgpack |
531 _unpickle = _unpickle_msgpack | 560 _unpickle = _unpickle_msgpack |
561 FastQueue._initSerializer = _init_msgpack | |
532 | 562 |
533 elif use_marshall: | 563 elif use_marshall: |
534 import marshal | 564 import marshal |
535 | 565 |
536 def _pickle_marshal(obj, buf): | 566 def _pickle_marshal(queue, obj, buf): |
537 marshal.dump(obj, buf) | 567 marshal.dump(obj, buf) |
538 | 568 |
539 def _unpickle_marshal(buf, bufsize): | 569 def _unpickle_marshal(queue, buf, bufsize): |
540 return marshal.load(buf) | 570 return marshal.load(buf) |
541 | 571 |
542 _pickle = _pickle_marshal | 572 _pickle = _pickle_marshal |
543 _unpickle = _unpickle_marshal | 573 _unpickle = _unpickle_marshal |
544 | 574 |
545 elif use_json: | 575 elif use_json: |
546 import json | 576 import json |
547 | 577 |
548 class _BufferWrapper: | 578 def _pickle_json(queue, obj, buf): |
549 def __init__(self, buf): | |
550 self._buf = buf | |
551 | |
552 def write(self, data): | |
553 self._buf.write(data.encode('utf8')) | |
554 | |
555 def read(self): | |
556 return self._buf.read().decode('utf8') | |
557 | |
558 def _pickle_json(obj, buf): | |
559 buf = _BufferWrapper(buf) | 579 buf = _BufferWrapper(buf) |
560 json.dump(obj, buf, indent=None, separators=(',', ':')) | 580 json.dump(obj, buf, indent=None, separators=(',', ':')) |
561 | 581 |
562 def _unpickle_json(buf, bufsize): | 582 def _unpickle_json(queue, buf, bufsize): |
563 buf = _BufferWrapper(buf) | 583 buf = _BufferWrapper(buf, bufsize) |
564 return json.load(buf) | 584 return json.load(buf) |
565 | 585 |
566 _pickle = _pickle_json | 586 _pickle = _pickle_json |
567 _unpickle = _unpickle_json | 587 _unpickle = _unpickle_json |
568 | 588 |
569 else: | 589 else: |
570 import pickle | 590 import pickle |
571 | 591 |
572 def _pickle_default(obj, buf): | 592 def _pickle_default(queue, obj, buf): |
573 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) | 593 pickle.dump(obj, buf, pickle.HIGHEST_PROTOCOL) |
574 | 594 |
575 def _unpickle_default(buf, bufsize): | 595 def _unpickle_default(queue, buf, bufsize): |
576 return pickle.load(buf) | 596 return pickle.load(buf) |
577 | 597 |
578 _pickle = _pickle_default | 598 _pickle = _pickle_default |
579 _unpickle = _unpickle_default | 599 _unpickle = _unpickle_default |