comparison piecrust/processing/base.py @ 3:f485ba500df3

Gigantic change to basically make PieCrust 2 vaguely functional. - Serving works, with debug window. - Baking works, multi-threading, with dependency handling. - Various things not implemented yet.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 10 Aug 2014 23:43:16 -0700
parents
children 474c9882decf
comparison
equal deleted inserted replaced
2:40fa08b261b9 3:f485ba500df3
1 import re
2 import time
3 import shutil
4 import os.path
5 import logging
6 import threading
7 from Queue import Queue, Empty
8 from piecrust.chefutil import format_timed
9 from piecrust.processing.tree import (ProcessingTreeBuilder,
10 ProcessingTreeRunner, STATE_DIRTY, print_node)
11 from piecrust.records import Record
12
13
14 logger = logging.getLogger(__name__)
15
16
17 PRIORITY_FIRST = -1
18 PRIORITY_NORMAL = 0
19 PRIORITY_LAST = 1
20
21
22 class Processor(object):
23 PROCESSOR_NAME = None
24
25 def __init__(self):
26 self.priority = PRIORITY_NORMAL
27 self.is_bypassing_structured_processing = False
28 self.is_delegating_dependency_check = True
29
30 def initialize(self, app):
31 self.app = app
32
33 def onPipelineStart(self, pipeline):
34 pass
35
36 def onPipelineEnd(self, pipeline):
37 pass
38
39 def supportsExtension(self, ext):
40 return False
41
42 def getDependencies(self, path):
43 return None
44
45 def getOutputFilenames(self, filename):
46 return None
47
48 def process(self, path, out_dir):
49 pass
50
51
52 class CopyFileProcessor(Processor):
53 PROCESSOR_NAME = 'copy'
54
55 def __init__(self):
56 super(CopyFileProcessor, self).__init__()
57 self.priority = PRIORITY_LAST
58
59 def supportsExtension(self, ext):
60 return True
61
62 def getOutputFilenames(self, filename):
63 return [filename]
64
65 def process(self, path, out_dir):
66 out_path = os.path.join(out_dir, os.path.basename(path))
67 logger.debug("Copying: %s -> %s" % (path, out_path))
68 shutil.copyfile(path, out_path)
69 return True
70
71
72 class SimpleFileProcessor(Processor):
73 def __init__(self, extensions=None):
74 super(SimpleFileProcessor, self).__init__()
75 self.extensions = extensions or {}
76
77 def supportsExtension(self, ext):
78 return ext.lstrip('.') in self.extensions
79
80 def getOutputFilenames(self, filename):
81 basename, ext = os.path.splitext(filename)
82 ext = ext.lstrip('.')
83 out_ext = self.extensions[ext]
84 return ['%s.%s' % (basename, out_ext)]
85
86 def process(self, path, out_dir):
87 _, in_name = os.path.split(path)
88 out_name = self.getOutputFilenames(in_name)[0]
89 out_path = os.path.join(out_dir, out_name)
90 return self._doProcess(path, out_path)
91
92 def _doProcess(self, in_path, out_path):
93 raise NotImplementedError()
94
95
96 class ProcessorPipelineRecord(Record):
97 VERSION = 1
98
99 def __init__(self):
100 super(ProcessorPipelineRecord, self).__init__()
101 self.is_multi_mount = False
102
103 def addEntry(self, item):
104 self.entries.append(item)
105
106 def hasOverrideEntry(self, rel_path):
107 if not self.is_multi_mount:
108 return False
109 return self.findEntry(rel_path) is not None
110
111 def findEntry(self, rel_path):
112 rel_path = rel_path.lower()
113 for entry in self.entries:
114 for out_path in entry.rel_outputs:
115 if out_path.lower() == rel_path:
116 return entry
117 return None
118
119
120 class ProcessorPipelineRecordEntry(object):
121 def __init__(self, rel_input, is_processed=False, is_overridden=False):
122 self.rel_input = rel_input
123 self.rel_outputs = []
124 self.is_processed = is_processed
125 self.is_overridden = is_overridden
126
127
128 class ProcessingContext(object):
129 def __init__(self, base_dir, job_queue, record=None):
130 self.base_dir = base_dir
131 self.job_queue = job_queue
132 self.record = record
133
134
135 class ProcessorPipeline(object):
136 def __init__(self, app, out_dir, force=False, mounts=None,
137 skip_patterns=None, force_patterns=None, num_workers=4):
138 self.app = app
139 tmp_dir = app.cache_dir
140 if not tmp_dir:
141 import tempfile
142 tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust')
143 self.tmp_dir = os.path.join(tmp_dir, 'proc')
144 self.out_dir = out_dir
145 self.force = force
146 self.mounts = mounts or {}
147 self.skip_patterns = skip_patterns or []
148 self.force_patterns = force_patterns or []
149 self.processors = app.plugin_loader.getProcessors()
150 self.num_workers = num_workers
151
152 if app.theme_dir is not None:
153 self.mounts['theme'] = app.theme_dir
154
155 self.skip_patterns += ['_cache', '_content', '_counter',
156 'theme_info.yml',
157 '.DS_Store', 'Thumbs.db',
158 '.git*', '.hg*', '.svn']
159
160 self.skip_patterns = make_re(self.skip_patterns)
161 self.force_patterns = make_re(self.force_patterns)
162
163 def run(self, src_dir_or_file=None):
164 record = ProcessorPipelineRecord()
165
166 # Create the workers.
167 pool = []
168 queue = Queue()
169 abort = threading.Event()
170 pipeline_lock = threading.Lock()
171 for i in range(self.num_workers):
172 ctx = ProcessingWorkerContext(self, record, queue, abort,
173 pipeline_lock)
174 worker = ProcessingWorker(i, ctx)
175 worker.start()
176 pool.append(worker)
177
178 # Invoke pre-processors.
179 for proc in self.processors:
180 proc.onPipelineStart(self)
181
182 if src_dir_or_file is not None:
183 # Process only the given path.
184 # Find out if this source directory is in a mount point.
185 base_dir = self.app.root_dir
186 for name, path in self.mounts.iteritems():
187 if src_dir_or_file[:len(path)] == path:
188 base_dir = path
189
190 ctx = ProcessingContext(base_dir, queue, record)
191 logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file)
192 if os.path.isdir(src_dir_or_file):
193 self.processDirectory(ctx, src_dir_or_file)
194 elif os.path.isfile(src_dir_or_file):
195 self.processFile(ctx, src_dir_or_file)
196
197 else:
198 # Process everything.
199 ctx = ProcessingContext(self.app.root_dir, queue, record)
200 logger.debug("Initiating processing pipeline on: %s" % self.app.root_dir)
201 self.processDirectory(ctx, self.app.root_dir)
202 ctx.is_multi_mount = True
203 for name, path in self.mounts.iteritems():
204 mount_ctx = ProcessingContext(path, queue, record)
205 logger.debug("Initiating processing pipeline on: %s" % path)
206 self.processDirectory(mount_ctx, path)
207
208 # Wait on all workers.
209 for w in pool:
210 w.join()
211 if abort.is_set():
212 raise Exception("Worker pool was aborted.")
213
214 # Invoke post-processors.
215 for proc in self.processors:
216 proc.onPipelineEnd(self)
217
218 return record
219
220 def processDirectory(self, ctx, start_dir):
221 for dirpath, dirnames, filenames in os.walk(start_dir):
222 rel_dirpath = os.path.relpath(dirpath, start_dir)
223 dirnames[:] = [d for d in dirnames
224 if not re_matchany(os.path.join(rel_dirpath, d),
225 self.skip_patterns)]
226
227 for filename in filenames:
228 if re_matchany(os.path.join(rel_dirpath, filename),
229 self.skip_patterns):
230 continue
231 self.processFile(ctx, os.path.join(dirpath, filename))
232
233 def processFile(self, ctx, path):
234 logger.debug("Queuing: %s" % path)
235 job = ProcessingWorkerJob(ctx.base_dir, path)
236 ctx.job_queue.put_nowait(job)
237
238
239 class ProcessingWorkerContext(object):
240 def __init__(self, pipeline, record, work_queue, abort_event,
241 pipeline_lock):
242 self.pipeline = pipeline
243 self.record = record
244 self.work_queue = work_queue
245 self.abort_event = abort_event
246 self.pipeline_lock = pipeline_lock
247
248
249 class ProcessingWorkerJob(object):
250 def __init__(self, base_dir, path):
251 self.base_dir = base_dir
252 self.path = path
253
254
255 class ProcessingWorker(threading.Thread):
256 def __init__(self, wid, ctx):
257 super(ProcessingWorker, self).__init__()
258 self.wid = wid
259 self.ctx = ctx
260
261 def run(self):
262 while(not self.ctx.abort_event.is_set()):
263 try:
264 job = self.ctx.work_queue.get(True, 0.1)
265 except Empty:
266 logger.debug("[%d] No more work... shutting down." % self.wid)
267 break
268
269 try:
270 self._unsafeRun(job)
271 logger.debug("[%d] Done with file." % self.wid)
272 self.ctx.work_queue.task_done()
273 except Exception as ex:
274 self.ctx.abort_event.set()
275 logger.error("[%d] Critical error, aborting." % self.wid)
276 logger.exception(ex)
277 break
278
279 def _unsafeRun(self, job):
280 start_time = time.clock()
281 pipeline = self.ctx.pipeline
282 record = self.ctx.record
283
284 rel_path = os.path.relpath(job.path, job.base_dir)
285
286 # Figure out if a previously processed file is overriding this one.
287 # This can happen if a theme file (processed via a mount point)
288 # is overridden in the user's website.
289 if record.hasOverrideEntry(rel_path):
290 record.addEntry(ProcessorPipelineRecordEntry(rel_path,
291 is_processed=False, is_overridden=True))
292 logger.info(format_timed(start_time,
293 '%s [not baked, overridden]' % rel_path))
294 return
295
296 builder = ProcessingTreeBuilder(pipeline.processors)
297 tree_root = builder.build(rel_path)
298 print_node(tree_root, recursive=True)
299 leaves = tree_root.getLeaves()
300 fi = ProcessorPipelineRecordEntry(rel_path)
301 fi.rel_outputs = [l.path for l in leaves]
302 record.addEntry(fi)
303
304 force = pipeline.force
305 if not force:
306 force = re_matchany(rel_path, pipeline.force_patterns)
307
308 if force:
309 tree_root.setState(STATE_DIRTY, True)
310
311 runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir,
312 pipeline.out_dir, self.ctx.pipeline_lock)
313 if runner.processSubTree(tree_root):
314 fi.is_processed = True
315 logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path)))
316
317
318 def make_re(patterns):
319 re_patterns = []
320 for pat in patterns:
321 if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2:
322 re_patterns.append(pat[1:-1])
323 else:
324 escaped_pat = (re.escape(pat)
325 .replace(r'\*', r'[^/\\]*')
326 .replace(r'\?', r'[^/\\]'))
327 re_patterns.append(escaped_pat)
328 return map(lambda p: re.compile(p), re_patterns)
329
330
331 def re_matchany(filename, patterns):
332 for pattern in patterns:
333 if pattern.match(filename):
334 return True
335 return False
336