Mercurial > piecrust2
comparison piecrust/processing/base.py @ 3:f485ba500df3
Gigantic change to basically make PieCrust 2 vaguely functional.
- Serving works, with debug window.
- Baking works, multi-threading, with dependency handling.
- Various things not implemented yet.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 10 Aug 2014 23:43:16 -0700 |
parents | |
children | 474c9882decf |
comparison
equal
deleted
inserted
replaced
2:40fa08b261b9 | 3:f485ba500df3 |
---|---|
1 import re | |
2 import time | |
3 import shutil | |
4 import os.path | |
5 import logging | |
6 import threading | |
7 from Queue import Queue, Empty | |
8 from piecrust.chefutil import format_timed | |
9 from piecrust.processing.tree import (ProcessingTreeBuilder, | |
10 ProcessingTreeRunner, STATE_DIRTY, print_node) | |
11 from piecrust.records import Record | |
12 | |
13 | |
14 logger = logging.getLogger(__name__) | |
15 | |
16 | |
17 PRIORITY_FIRST = -1 | |
18 PRIORITY_NORMAL = 0 | |
19 PRIORITY_LAST = 1 | |
20 | |
21 | |
22 class Processor(object): | |
23 PROCESSOR_NAME = None | |
24 | |
25 def __init__(self): | |
26 self.priority = PRIORITY_NORMAL | |
27 self.is_bypassing_structured_processing = False | |
28 self.is_delegating_dependency_check = True | |
29 | |
30 def initialize(self, app): | |
31 self.app = app | |
32 | |
33 def onPipelineStart(self, pipeline): | |
34 pass | |
35 | |
36 def onPipelineEnd(self, pipeline): | |
37 pass | |
38 | |
39 def supportsExtension(self, ext): | |
40 return False | |
41 | |
42 def getDependencies(self, path): | |
43 return None | |
44 | |
45 def getOutputFilenames(self, filename): | |
46 return None | |
47 | |
48 def process(self, path, out_dir): | |
49 pass | |
50 | |
51 | |
52 class CopyFileProcessor(Processor): | |
53 PROCESSOR_NAME = 'copy' | |
54 | |
55 def __init__(self): | |
56 super(CopyFileProcessor, self).__init__() | |
57 self.priority = PRIORITY_LAST | |
58 | |
59 def supportsExtension(self, ext): | |
60 return True | |
61 | |
62 def getOutputFilenames(self, filename): | |
63 return [filename] | |
64 | |
65 def process(self, path, out_dir): | |
66 out_path = os.path.join(out_dir, os.path.basename(path)) | |
67 logger.debug("Copying: %s -> %s" % (path, out_path)) | |
68 shutil.copyfile(path, out_path) | |
69 return True | |
70 | |
71 | |
72 class SimpleFileProcessor(Processor): | |
73 def __init__(self, extensions=None): | |
74 super(SimpleFileProcessor, self).__init__() | |
75 self.extensions = extensions or {} | |
76 | |
77 def supportsExtension(self, ext): | |
78 return ext.lstrip('.') in self.extensions | |
79 | |
80 def getOutputFilenames(self, filename): | |
81 basename, ext = os.path.splitext(filename) | |
82 ext = ext.lstrip('.') | |
83 out_ext = self.extensions[ext] | |
84 return ['%s.%s' % (basename, out_ext)] | |
85 | |
86 def process(self, path, out_dir): | |
87 _, in_name = os.path.split(path) | |
88 out_name = self.getOutputFilenames(in_name)[0] | |
89 out_path = os.path.join(out_dir, out_name) | |
90 return self._doProcess(path, out_path) | |
91 | |
92 def _doProcess(self, in_path, out_path): | |
93 raise NotImplementedError() | |
94 | |
95 | |
96 class ProcessorPipelineRecord(Record): | |
97 VERSION = 1 | |
98 | |
99 def __init__(self): | |
100 super(ProcessorPipelineRecord, self).__init__() | |
101 self.is_multi_mount = False | |
102 | |
103 def addEntry(self, item): | |
104 self.entries.append(item) | |
105 | |
106 def hasOverrideEntry(self, rel_path): | |
107 if not self.is_multi_mount: | |
108 return False | |
109 return self.findEntry(rel_path) is not None | |
110 | |
111 def findEntry(self, rel_path): | |
112 rel_path = rel_path.lower() | |
113 for entry in self.entries: | |
114 for out_path in entry.rel_outputs: | |
115 if out_path.lower() == rel_path: | |
116 return entry | |
117 return None | |
118 | |
119 | |
120 class ProcessorPipelineRecordEntry(object): | |
121 def __init__(self, rel_input, is_processed=False, is_overridden=False): | |
122 self.rel_input = rel_input | |
123 self.rel_outputs = [] | |
124 self.is_processed = is_processed | |
125 self.is_overridden = is_overridden | |
126 | |
127 | |
128 class ProcessingContext(object): | |
129 def __init__(self, base_dir, job_queue, record=None): | |
130 self.base_dir = base_dir | |
131 self.job_queue = job_queue | |
132 self.record = record | |
133 | |
134 | |
135 class ProcessorPipeline(object): | |
136 def __init__(self, app, out_dir, force=False, mounts=None, | |
137 skip_patterns=None, force_patterns=None, num_workers=4): | |
138 self.app = app | |
139 tmp_dir = app.cache_dir | |
140 if not tmp_dir: | |
141 import tempfile | |
142 tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') | |
143 self.tmp_dir = os.path.join(tmp_dir, 'proc') | |
144 self.out_dir = out_dir | |
145 self.force = force | |
146 self.mounts = mounts or {} | |
147 self.skip_patterns = skip_patterns or [] | |
148 self.force_patterns = force_patterns or [] | |
149 self.processors = app.plugin_loader.getProcessors() | |
150 self.num_workers = num_workers | |
151 | |
152 if app.theme_dir is not None: | |
153 self.mounts['theme'] = app.theme_dir | |
154 | |
155 self.skip_patterns += ['_cache', '_content', '_counter', | |
156 'theme_info.yml', | |
157 '.DS_Store', 'Thumbs.db', | |
158 '.git*', '.hg*', '.svn'] | |
159 | |
160 self.skip_patterns = make_re(self.skip_patterns) | |
161 self.force_patterns = make_re(self.force_patterns) | |
162 | |
163 def run(self, src_dir_or_file=None): | |
164 record = ProcessorPipelineRecord() | |
165 | |
166 # Create the workers. | |
167 pool = [] | |
168 queue = Queue() | |
169 abort = threading.Event() | |
170 pipeline_lock = threading.Lock() | |
171 for i in range(self.num_workers): | |
172 ctx = ProcessingWorkerContext(self, record, queue, abort, | |
173 pipeline_lock) | |
174 worker = ProcessingWorker(i, ctx) | |
175 worker.start() | |
176 pool.append(worker) | |
177 | |
178 # Invoke pre-processors. | |
179 for proc in self.processors: | |
180 proc.onPipelineStart(self) | |
181 | |
182 if src_dir_or_file is not None: | |
183 # Process only the given path. | |
184 # Find out if this source directory is in a mount point. | |
185 base_dir = self.app.root_dir | |
186 for name, path in self.mounts.iteritems(): | |
187 if src_dir_or_file[:len(path)] == path: | |
188 base_dir = path | |
189 | |
190 ctx = ProcessingContext(base_dir, queue, record) | |
191 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) | |
192 if os.path.isdir(src_dir_or_file): | |
193 self.processDirectory(ctx, src_dir_or_file) | |
194 elif os.path.isfile(src_dir_or_file): | |
195 self.processFile(ctx, src_dir_or_file) | |
196 | |
197 else: | |
198 # Process everything. | |
199 ctx = ProcessingContext(self.app.root_dir, queue, record) | |
200 logger.debug("Initiating processing pipeline on: %s" % self.app.root_dir) | |
201 self.processDirectory(ctx, self.app.root_dir) | |
202 ctx.is_multi_mount = True | |
203 for name, path in self.mounts.iteritems(): | |
204 mount_ctx = ProcessingContext(path, queue, record) | |
205 logger.debug("Initiating processing pipeline on: %s" % path) | |
206 self.processDirectory(mount_ctx, path) | |
207 | |
208 # Wait on all workers. | |
209 for w in pool: | |
210 w.join() | |
211 if abort.is_set(): | |
212 raise Exception("Worker pool was aborted.") | |
213 | |
214 # Invoke post-processors. | |
215 for proc in self.processors: | |
216 proc.onPipelineEnd(self) | |
217 | |
218 return record | |
219 | |
220 def processDirectory(self, ctx, start_dir): | |
221 for dirpath, dirnames, filenames in os.walk(start_dir): | |
222 rel_dirpath = os.path.relpath(dirpath, start_dir) | |
223 dirnames[:] = [d for d in dirnames | |
224 if not re_matchany(os.path.join(rel_dirpath, d), | |
225 self.skip_patterns)] | |
226 | |
227 for filename in filenames: | |
228 if re_matchany(os.path.join(rel_dirpath, filename), | |
229 self.skip_patterns): | |
230 continue | |
231 self.processFile(ctx, os.path.join(dirpath, filename)) | |
232 | |
233 def processFile(self, ctx, path): | |
234 logger.debug("Queuing: %s" % path) | |
235 job = ProcessingWorkerJob(ctx.base_dir, path) | |
236 ctx.job_queue.put_nowait(job) | |
237 | |
238 | |
239 class ProcessingWorkerContext(object): | |
240 def __init__(self, pipeline, record, work_queue, abort_event, | |
241 pipeline_lock): | |
242 self.pipeline = pipeline | |
243 self.record = record | |
244 self.work_queue = work_queue | |
245 self.abort_event = abort_event | |
246 self.pipeline_lock = pipeline_lock | |
247 | |
248 | |
249 class ProcessingWorkerJob(object): | |
250 def __init__(self, base_dir, path): | |
251 self.base_dir = base_dir | |
252 self.path = path | |
253 | |
254 | |
255 class ProcessingWorker(threading.Thread): | |
256 def __init__(self, wid, ctx): | |
257 super(ProcessingWorker, self).__init__() | |
258 self.wid = wid | |
259 self.ctx = ctx | |
260 | |
261 def run(self): | |
262 while(not self.ctx.abort_event.is_set()): | |
263 try: | |
264 job = self.ctx.work_queue.get(True, 0.1) | |
265 except Empty: | |
266 logger.debug("[%d] No more work... shutting down." % self.wid) | |
267 break | |
268 | |
269 try: | |
270 self._unsafeRun(job) | |
271 logger.debug("[%d] Done with file." % self.wid) | |
272 self.ctx.work_queue.task_done() | |
273 except Exception as ex: | |
274 self.ctx.abort_event.set() | |
275 logger.error("[%d] Critical error, aborting." % self.wid) | |
276 logger.exception(ex) | |
277 break | |
278 | |
279 def _unsafeRun(self, job): | |
280 start_time = time.clock() | |
281 pipeline = self.ctx.pipeline | |
282 record = self.ctx.record | |
283 | |
284 rel_path = os.path.relpath(job.path, job.base_dir) | |
285 | |
286 # Figure out if a previously processed file is overriding this one. | |
287 # This can happen if a theme file (processed via a mount point) | |
288 # is overridden in the user's website. | |
289 if record.hasOverrideEntry(rel_path): | |
290 record.addEntry(ProcessorPipelineRecordEntry(rel_path, | |
291 is_processed=False, is_overridden=True)) | |
292 logger.info(format_timed(start_time, | |
293 '%s [not baked, overridden]' % rel_path)) | |
294 return | |
295 | |
296 builder = ProcessingTreeBuilder(pipeline.processors) | |
297 tree_root = builder.build(rel_path) | |
298 print_node(tree_root, recursive=True) | |
299 leaves = tree_root.getLeaves() | |
300 fi = ProcessorPipelineRecordEntry(rel_path) | |
301 fi.rel_outputs = [l.path for l in leaves] | |
302 record.addEntry(fi) | |
303 | |
304 force = pipeline.force | |
305 if not force: | |
306 force = re_matchany(rel_path, pipeline.force_patterns) | |
307 | |
308 if force: | |
309 tree_root.setState(STATE_DIRTY, True) | |
310 | |
311 runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir, | |
312 pipeline.out_dir, self.ctx.pipeline_lock) | |
313 if runner.processSubTree(tree_root): | |
314 fi.is_processed = True | |
315 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) | |
316 | |
317 | |
318 def make_re(patterns): | |
319 re_patterns = [] | |
320 for pat in patterns: | |
321 if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: | |
322 re_patterns.append(pat[1:-1]) | |
323 else: | |
324 escaped_pat = (re.escape(pat) | |
325 .replace(r'\*', r'[^/\\]*') | |
326 .replace(r'\?', r'[^/\\]')) | |
327 re_patterns.append(escaped_pat) | |
328 return map(lambda p: re.compile(p), re_patterns) | |
329 | |
330 | |
331 def re_matchany(filename, patterns): | |
332 for pattern in patterns: | |
333 if pattern.match(filename): | |
334 return True | |
335 return False | |
336 |