comparison piecrust/serving/procloop.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents c2ea75e37540
children 448710d84121
comparison
equal deleted inserted replaced
853:f070a4fc033c 854:08e02c2a2a1a
5 import queue 5 import queue
6 import logging 6 import logging
7 import itertools 7 import itertools
8 import threading 8 import threading
9 from piecrust import CONFIG_PATH, THEME_CONFIG_PATH 9 from piecrust import CONFIG_PATH, THEME_CONFIG_PATH
10 from piecrust.app import PieCrust 10 from piecrust.pipelines.base import (
11 from piecrust.processing.pipeline import ProcessorPipeline 11 PipelineJobCreateContext, PipelineJobRunContext, PipelineJobResult,
12 PipelineManager)
13 from piecrust.pipelines.records import (
14 MultiRecord, MultiRecordHistory)
12 15
13 16
14 logger = logging.getLogger(__name__) 17 logger = logging.getLogger(__name__)
15 18
16 # This flag is for cancelling all long running requests like SSEs. 19 # This flag is for cancelling all long running requests like SSEs.
72 logger.debug("Closing pipeline status SSE.") 75 logger.debug("Closing pipeline status SSE.")
73 self._proc_loop.removeObserver(self) 76 self._proc_loop.removeObserver(self)
74 self._running = 2 77 self._running = 2
75 78
76 79
80 class _AssetProcessingInfo:
81 def __init__(self, source):
82 self.source = source
83 self.paths = set()
84 self.last_bake_time = time.time()
85
86
77 class ProcessingLoop(threading.Thread): 87 class ProcessingLoop(threading.Thread):
78 def __init__(self, appfactory, out_dir): 88 def __init__(self, appfactory, out_dir):
79 super(ProcessingLoop, self).__init__( 89 super().__init__(name='pipeline-reloader', daemon=True)
80 name='pipeline-reloader', daemon=True)
81 self.appfactory = appfactory 90 self.appfactory = appfactory
82 self.out_dir = out_dir 91 self.out_dir = out_dir
83 self.last_status_id = 0 92 self.last_status_id = 0
84 self.interval = 1 93 self.interval = 1
85 self.app = None 94 self._app = None
86 self._roots = [] 95 self._proc_infos = None
87 self._monitor_assets_root = False 96 self._last_records = None
88 self._paths = set()
89 self._record = None
90 self._last_bake = 0
91 self._last_config_mtime = 0 97 self._last_config_mtime = 0
92 self._obs = [] 98 self._obs = []
93 self._obs_lock = threading.Lock() 99 self._obs_lock = threading.Lock()
94 config_name = ( 100 config_name = (
95 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH) 101 THEME_CONFIG_PATH if appfactory.theme_site else CONFIG_PATH)
96 self._config_path = os.path.join(appfactory.root_dir, config_name) 102 self._config_path = os.path.join(appfactory.root_dir, config_name)
97 103
98 def addObserver(self, obs): 104 def addObserver(self, obs):
99 with self._obs_lock: 105 with self._obs_lock:
100 self._obs.append(obs) 106 self._obs.append(obs)
102 def removeObserver(self, obs): 108 def removeObserver(self, obs):
103 with self._obs_lock: 109 with self._obs_lock:
104 self._obs.remove(obs) 110 self._obs.remove(obs)
105 111
106 def run(self): 112 def run(self):
107 self._initPipeline() 113 self._init()
108 114
109 self._last_bake = time.time()
110 self._last_config_mtime = os.path.getmtime(self._config_path) 115 self._last_config_mtime = os.path.getmtime(self._config_path)
111 self._record = self.pipeline.run()
112 116
113 while True: 117 while True:
114 cur_config_time = os.path.getmtime(self._config_path) 118 cur_config_time = os.path.getmtime(self._config_path)
115 if self._last_config_mtime < cur_config_time: 119 if self._last_config_mtime < cur_config_time:
116 logger.info("Site configuration changed, reloading pipeline.") 120 logger.info("Site configuration changed, reloading pipeline.")
117 self._last_config_mtime = cur_config_time 121 self._last_config_mtime = cur_config_time
118 self._initPipeline() 122 self._init()
119 for root in self._roots: 123 self._runPipelines()
120 self._runPipeline(root) 124 continue
121 continue 125
122 126 for procinfo in self._proc_infos:
123 if self._monitor_assets_root: 127 # For each assets folder we try to find the first new or
124 assets_dir = os.path.join(self.app.root_dir, 'assets')
125 if os.path.isdir(assets_dir):
126 logger.info("Assets directory was created, reloading "
127 "pipeline.")
128 self._initPipeline()
129 self._runPipeline(assets_dir)
130 continue
131
132 for root in self._roots:
133 # For each mount root we try to find the first new or
134 # modified file. If any, we just run the pipeline on 128 # modified file. If any, we just run the pipeline on
135 # that mount. 129 # that source.
136 found_new_or_modified = False 130 found_new_or_modified = False
137 for dirpath, dirnames, filenames in os.walk(root): 131 for item in procinfo.source.getAllContents():
138 for filename in filenames: 132 path = item.spec
139 path = os.path.join(dirpath, filename) 133 if path not in procinfo.paths:
140 if path not in self._paths: 134 logger.debug("Found new asset: %s" % path)
141 logger.debug("Found new asset: %s" % path) 135 procinfo.paths.add(path)
142 self._paths.add(path) 136 found_new_or_modified = True
143 found_new_or_modified = True
144 break
145 if os.path.getmtime(path) > self._last_bake:
146 logger.debug("Found modified asset: %s" % path)
147 found_new_or_modified = True
148 break
149
150 if found_new_or_modified:
151 break 137 break
152 138 if os.path.getmtime(path) > procinfo.last_bake_time:
139 logger.debug("Found modified asset: %s" % path)
140 found_new_or_modified = True
141 break
153 if found_new_or_modified: 142 if found_new_or_modified:
154 self._runPipeline(root) 143 self._runPipeline(procinfo)
155 144
156 time.sleep(self.interval) 145 time.sleep(self.interval)
157 146
158 def _initPipeline(self): 147 def _init(self):
159 # Create the app and pipeline. 148 self._app = self.appfactory.create()
160 self.app = self.appfactory.create() 149 self._last_records = MultiRecord()
161 self.pipeline = ProcessorPipeline(self.app, self.out_dir) 150
162 151 self._proc_infos = []
163 # Get the list of assets directories. 152 for src in self._app.sources:
164 self._roots = list(self.pipeline.mounts.keys()) 153 if src.config['pipeline'] != 'asset':
165 154 continue
166 # The 'assets' folder may not be in the mounts list if it doesn't 155
167 # exist yet, but we want to monitor for when the user creates it. 156 procinfo = _AssetProcessingInfo(src)
168 default_root = os.path.join(self.app.root_dir, 'assets') 157 self._proc_infos.append(procinfo)
169 self._monitor_assets_root = (default_root not in self._roots) 158
170 159 # Build the list of initial asset files.
171 # Build the list of initial asset files. 160 for item in src.getAllContents():
172 self._paths = set() 161 procinfo.paths.add(item.spec)
173 for root in self._roots: 162
174 for dirpath, dirnames, filenames in os.walk(root): 163 def _runPipelines(self):
175 self._paths |= set([os.path.join(dirpath, f) 164 record_histories = MultiRecordHistory(MultiRecord(), self._records)
176 for f in filenames]) 165 self._ppmngr = PipelineManager(
177 166 self._app, self.out_dir, record_histories)
178 def _runPipeline(self, root): 167
179 self._last_bake = time.time() 168 # Create the pipelines, but also remember some stuff for what
180 try: 169 # we want to do.
181 self._record = self.pipeline.run( 170 for src in self._app.sources:
182 root, 171 if src.config['pipeline'] != 'asset':
183 previous_record=self._record, 172 continue
184 save_record=False) 173
185 174 ppinfo = self._ppmngr.createPipeline(src)
186 status_id = self.last_status_id + 1 175 api = _AssetProcessingInfo()
187 self.last_status_id += 1 176 ppinfo.userdata = api
188 177
189 if self._record.success: 178 current_records = MultiRecord()
190 changed = filter( 179 record_histories = MultiRecordHistory(
191 lambda i: not i.was_collapsed_from_last_run, 180 self._records, current_records)
192 self._record.entries) 181
193 changed = itertools.chain.from_iterable( 182 for ppinfo, procinfo in self._pipelines:
194 map(lambda i: i.rel_outputs, changed)) 183 self._runPipeline(ppinfo, procinfo, record_histories)
195 changed = list(changed) 184
196 item = { 185 status_id = self.last_status_id + 1
197 'id': status_id, 186 self.last_status_id += 1
198 'type': 'pipeline_success', 187
199 'assets': changed} 188 if self._records.success:
200 189 changed = filter(
201 self._notifyObservers(item) 190 lambda i: not i.was_collapsed_from_last_run,
202 else: 191 self._record.entries)
203 item = { 192 changed = itertools.chain.from_iterable(
204 'id': status_id, 193 map(lambda i: i.rel_outputs, changed))
205 'type': 'pipeline_error', 194 changed = list(changed)
206 'assets': []} 195 item = {
207 for entry in self._record.entries: 196 'id': status_id,
208 if entry.errors: 197 'type': 'pipeline_success',
209 asset_item = { 198 'assets': changed}
210 'path': entry.path, 199
211 'errors': list(entry.errors)} 200 self._notifyObservers(item)
212 item['assets'].append(asset_item) 201 else:
213 202 item = {
214 self._notifyObservers(item) 203 'id': status_id,
215 except Exception as ex: 204 'type': 'pipeline_error',
216 logger.exception(ex) 205 'assets': []}
206 for entry in self._record.entries:
207 if entry.errors:
208 asset_item = {
209 'path': entry.path,
210 'errors': list(entry.errors)}
211 item['assets'].append(asset_item)
212
213 self._notifyObservers(item)
214
215 def _runPipeline(self, procinfo):
216 procinfo.last_bake_time = time.time()
217
218 src = procinfo.source
219
220 current_records = MultiRecord()
221 record_histories = MultiRecordHistory(
222 self._last_records, current_records)
223 ppmngr = PipelineManager(
224 self._app, self.out_dir, record_histories)
225 ppinfo = ppmngr.createPipeline(src)
226
227 logger.debug("Running pipeline '%s' on: %s" %
228 (ppinfo.pipeline_name, src.name))
229
230 # Process all items in the source.
231 pp = ppinfo.pipeline
232 cr = ppinfo.record_history.current
233 jobctx = PipelineJobCreateContext(src)
234 for item in src.getAllContents():
235 job = pp.createJob(item, jobctx)
236
237 ppres = PipelineJobResult()
238 ppres.record_entry = pp.createRecordEntry(job)
239
240 runctx = PipelineJobRunContext(
241 ppinfo.pipeline_ctx, job, record_histories)
242 try:
243 pp.run(item, runctx, ppres)
244 except Exception as e:
245 ppres.record_entry.errors.append(str(e))
246
247 if ppres.next_pass_job is not None:
248 logger.error("The processing loop for the server "
249 "doesn't support multi-pass pipelines.")
250
251 cr.addEntry(ppres.record_entry)
252 if not ppres.record_entry.success:
253 cr.success = False
254 current_records.success = False
255 logger.error("Errors found in %s:" % item.spec)
256 for e in ppres.record_entry.errors:
257 logger.error(" " + e)
258
259 # Do all the final stuff.
260 ppmngr.buildHistoryDiffs()
261 ppmngr.deleteStaleOutputs()
262 ppmngr.collapseRecords()
263 ppmngr.shutdownPipelines()
264
265 # Swap the old record with the next record.
266 pr = ppinfo.record_history.previous
267 self._last_records.records.remove(pr)
268 self._last_records.records.append(cr)
217 269
218 def _notifyObservers(self, item): 270 def _notifyObservers(self, item):
219 with self._obs_lock: 271 with self._obs_lock:
220 observers = list(self._obs) 272 observers = list(self._obs)
221 for obs in observers: 273 for obs in observers: