comparison piecrust/environment.py @ 411:e7b865f8f335

bake: Enable multiprocess baking. Baking is now done by running a worker per CPU, and sending jobs to them. This changes several things across the codebase: * Ability to not cache things related to pages other than the 'main' page (i.e. the page at the bottom of the execution stack). * Decouple the baking process from the bake records, so only the main process keeps track (and modifies) the bake record. * Remove the need for 'batch page getters' and loading a page directly from the page factories. There are various smaller changes too included here, including support for scope performance timers that are saved with the bake record and can be printed out to the console. Yes I got carried away. For testing, the in-memory 'mock' file-system doesn't work anymore, since we're spawning processes, so this is replaced by a 'tmpfs' file-system which is saved in temporary files on disk and deleted after tests have run.
author Ludovic Chabant <ludovic@chabant.com>
date Fri, 12 Jun 2015 17:09:19 -0700
parents c2ca72fb7f0b
children a1567766c83c
comparison
equal deleted inserted replaced
410:d1a472464e57 411:e7b865f8f335
1 import re
2 import time 1 import time
3 import json 2 import json
4 import logging 3 import logging
5 import hashlib 4 import hashlib
6 import threading
7 import contextlib 5 import contextlib
8 import collections 6 import collections
9 import repoze.lru 7 import repoze.lru
10 8
11 9
12 logger = logging.getLogger(__name__) 10 logger = logging.getLogger(__name__)
13
14
15 re_fs_cache_key = re.compile(r'[^\d\w\-\._]+')
16 11
17 12
18 def _make_fs_cache_key(key): 13 def _make_fs_cache_key(key):
19 return hashlib.md5(key.encode('utf8')).hexdigest() 14 return hashlib.md5(key.encode('utf8')).hexdigest()
20 15
24 cache, but items need to be JSON-serializable to do this. 19 cache, but items need to be JSON-serializable to do this.
25 """ 20 """
26 def __init__(self, size=2048): 21 def __init__(self, size=2048):
27 self.cache = repoze.lru.LRUCache(size) 22 self.cache = repoze.lru.LRUCache(size)
28 self.fs_cache = None 23 self.fs_cache = None
24 self._last_access_hit = None
29 self._invalidated_fs_items = set() 25 self._invalidated_fs_items = set()
30 self._lock = threading.RLock()
31 26
32 @contextlib.contextmanager 27 @property
33 def startBatchGet(self): 28 def last_access_hit(self):
34 logger.debug("Starting batch cache operation.") 29 return self._last_access_hit
35 with self._lock:
36 yield
37 logger.debug("Ending batch cache operation.")
38 30
39 def invalidate(self, key): 31 def invalidate(self, key):
40 with self._lock: 32 logger.debug("Invalidating cache item '%s'." % key)
41 logger.debug("Invalidating cache item '%s'." % key) 33 self.cache.invalidate(key)
42 self.cache.invalidate(key) 34 if self.fs_cache:
43 if self.fs_cache: 35 logger.debug("Invalidating FS cache item '%s'." % key)
44 logger.debug("Invalidating FS cache item '%s'." % key) 36 fs_key = _make_fs_cache_key(key)
45 fs_key = _make_fs_cache_key(key) 37 self._invalidated_fs_items.add(fs_key)
46 self._invalidated_fs_items.add(fs_key)
47 38
48 def get(self, key, item_maker, fs_cache_time=None): 39 def put(self, key, item, save_to_fs=True):
40 self.cache.put(key, item)
41 if self.fs_cache and save_to_fs:
42 fs_key = _make_fs_cache_key(key)
43 item_raw = json.dumps(item)
44 self.fs_cache.write(fs_key, item_raw)
45
46 def get(self, key, item_maker, fs_cache_time=None, save_to_fs=True):
47 self._last_access_hit = True
49 item = self.cache.get(key) 48 item = self.cache.get(key)
50 if item is None: 49 if item is None:
51 logger.debug("Acquiring lock for: %s" % key) 50 if (self.fs_cache is not None and
52 with self._lock: 51 fs_cache_time is not None):
53 item = self.cache.get(key) 52 # Try first from the file-system cache.
54 if item is None: 53 fs_key = _make_fs_cache_key(key)
55 if (self.fs_cache is not None and 54 if (fs_key not in self._invalidated_fs_items and
56 fs_cache_time is not None): 55 self.fs_cache.isValid(fs_key, fs_cache_time)):
57 # Try first from the file-system cache. 56 logger.debug("'%s' found in file-system cache." %
58 fs_key = _make_fs_cache_key(key) 57 key)
59 if (fs_key not in self._invalidated_fs_items and 58 item_raw = self.fs_cache.read(fs_key)
60 self.fs_cache.isValid(fs_key, fs_cache_time)): 59 item = json.loads(
61 logger.debug("'%s' found in file-system cache." % 60 item_raw,
62 key) 61 object_pairs_hook=collections.OrderedDict)
63 item_raw = self.fs_cache.read(fs_key) 62 self.cache.put(key, item)
64 item = json.loads( 63 return item
65 item_raw,
66 object_pairs_hook=collections.OrderedDict)
67 self.cache.put(key, item)
68 return item
69 64
70 # Look into the mem-cache. 65 # Look into the mem-cache.
71 logger.debug("'%s' not found in cache, must build." % key) 66 logger.debug("'%s' not found in cache, must build." % key)
72 item = item_maker() 67 item = item_maker()
73 self.cache.put(key, item) 68 self.cache.put(key, item)
69 self._last_access_hit = False
74 70
75 # Save to the file-system if needed. 71 # Save to the file-system if needed.
76 if (self.fs_cache is not None and 72 if self.fs_cache is not None and save_to_fs:
77 fs_cache_time is not None): 73 item_raw = json.dumps(item)
78 item_raw = json.dumps(item) 74 self.fs_cache.write(fs_key, item_raw)
79 self.fs_cache.write(fs_key, item_raw) 75
80 return item 76 return item
81 77
82 78
83 class ExecutionInfo(object): 79 class ExecutionInfo(object):
84 def __init__(self, page, render_ctx): 80 def __init__(self, page, render_ctx):
86 self.render_ctx = render_ctx 82 self.render_ctx = render_ctx
87 self.was_cache_valid = False 83 self.was_cache_valid = False
88 self.start_time = time.clock() 84 self.start_time = time.clock()
89 85
90 86
91 class ExecutionInfoStack(threading.local): 87 class ExecutionInfoStack(object):
92 def __init__(self): 88 def __init__(self):
93 self._page_stack = [] 89 self._page_stack = []
94 90
95 @property 91 @property
96 def current_page_info(self): 92 def current_page_info(self):
129 self.base_asset_url_format = '%uri%' 125 self.base_asset_url_format = '%uri%'
130 self.page_repository = MemCache() 126 self.page_repository = MemCache()
131 self.rendered_segments_repository = MemCache() 127 self.rendered_segments_repository = MemCache()
132 self.fs_caches = { 128 self.fs_caches = {
133 'renders': self.rendered_segments_repository} 129 'renders': self.rendered_segments_repository}
130 self.fs_cache_only_for_main_page = False
131 self._timers = {}
134 132
135 def initialize(self, app): 133 def initialize(self, app):
136 self.start_time = time.clock() 134 self.start_time = time.perf_counter()
137 self.exec_info_stack.clear() 135 self.exec_info_stack.clear()
138 self.was_cache_cleaned = False 136 self.was_cache_cleaned = False
139 self.base_asset_url_format = '%uri%' 137 self.base_asset_url_format = '%uri%'
140 self._onSubCacheDirChanged(app) 138 self._onSubCacheDirChanged(app)
139
140 def registerTimer(self, category):
141 self._timers[category] = 0
142
143 @contextlib.contextmanager
144 def timerScope(self, category):
145 start = time.perf_counter()
146 yield
147 self._timers[category] += time.perf_counter() - start
148
149 def stepTimer(self, category, value):
150 self._timers[category] += value
141 151
142 def _onSubCacheDirChanged(self, app): 152 def _onSubCacheDirChanged(self, app):
143 for name, repo in self.fs_caches.items(): 153 for name, repo in self.fs_caches.items():
144 cache = app.cache.getCache(name) 154 cache = app.cache.getCache(name)
145 repo.fs_cache = cache 155 repo.fs_cache = cache