Mercurial > piecrust2
diff piecrust/processing/base.py @ 3:f485ba500df3
Gigantic change to basically make PieCrust 2 vaguely functional.
- Serving works, with debug window.
- Baking works, multi-threading, with dependency handling.
- Various things not implemented yet.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 10 Aug 2014 23:43:16 -0700 |
parents | |
children | 474c9882decf |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/piecrust/processing/base.py Sun Aug 10 23:43:16 2014 -0700 @@ -0,0 +1,336 @@ +import re +import time +import shutil +import os.path +import logging +import threading +from Queue import Queue, Empty +from piecrust.chefutil import format_timed +from piecrust.processing.tree import (ProcessingTreeBuilder, + ProcessingTreeRunner, STATE_DIRTY, print_node) +from piecrust.records import Record + + +logger = logging.getLogger(__name__) + + +PRIORITY_FIRST = -1 +PRIORITY_NORMAL = 0 +PRIORITY_LAST = 1 + + +class Processor(object): + PROCESSOR_NAME = None + + def __init__(self): + self.priority = PRIORITY_NORMAL + self.is_bypassing_structured_processing = False + self.is_delegating_dependency_check = True + + def initialize(self, app): + self.app = app + + def onPipelineStart(self, pipeline): + pass + + def onPipelineEnd(self, pipeline): + pass + + def supportsExtension(self, ext): + return False + + def getDependencies(self, path): + return None + + def getOutputFilenames(self, filename): + return None + + def process(self, path, out_dir): + pass + + +class CopyFileProcessor(Processor): + PROCESSOR_NAME = 'copy' + + def __init__(self): + super(CopyFileProcessor, self).__init__() + self.priority = PRIORITY_LAST + + def supportsExtension(self, ext): + return True + + def getOutputFilenames(self, filename): + return [filename] + + def process(self, path, out_dir): + out_path = os.path.join(out_dir, os.path.basename(path)) + logger.debug("Copying: %s -> %s" % (path, out_path)) + shutil.copyfile(path, out_path) + return True + + +class SimpleFileProcessor(Processor): + def __init__(self, extensions=None): + super(SimpleFileProcessor, self).__init__() + self.extensions = extensions or {} + + def supportsExtension(self, ext): + return ext.lstrip('.') in self.extensions + + def getOutputFilenames(self, filename): + basename, ext = os.path.splitext(filename) + ext = ext.lstrip('.') + out_ext = self.extensions[ext] + return ['%s.%s' % (basename, out_ext)] + + def process(self, path, out_dir): + _, in_name = os.path.split(path) + out_name = self.getOutputFilenames(in_name)[0] + out_path = os.path.join(out_dir, out_name) + return self._doProcess(path, out_path) + + def _doProcess(self, in_path, out_path): + raise NotImplementedError() + + +class ProcessorPipelineRecord(Record): + VERSION = 1 + + def __init__(self): + super(ProcessorPipelineRecord, self).__init__() + self.is_multi_mount = False + + def addEntry(self, item): + self.entries.append(item) + + def hasOverrideEntry(self, rel_path): + if not self.is_multi_mount: + return False + return self.findEntry(rel_path) is not None + + def findEntry(self, rel_path): + rel_path = rel_path.lower() + for entry in self.entries: + for out_path in entry.rel_outputs: + if out_path.lower() == rel_path: + return entry + return None + + +class ProcessorPipelineRecordEntry(object): + def __init__(self, rel_input, is_processed=False, is_overridden=False): + self.rel_input = rel_input + self.rel_outputs = [] + self.is_processed = is_processed + self.is_overridden = is_overridden + + +class ProcessingContext(object): + def __init__(self, base_dir, job_queue, record=None): + self.base_dir = base_dir + self.job_queue = job_queue + self.record = record + + +class ProcessorPipeline(object): + def __init__(self, app, out_dir, force=False, mounts=None, + skip_patterns=None, force_patterns=None, num_workers=4): + self.app = app + tmp_dir = app.cache_dir + if not tmp_dir: + import tempfile + tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') + self.tmp_dir = os.path.join(tmp_dir, 'proc') + self.out_dir = out_dir + self.force = force + self.mounts = mounts or {} + self.skip_patterns = skip_patterns or [] + self.force_patterns = force_patterns or [] + self.processors = app.plugin_loader.getProcessors() + self.num_workers = num_workers + + if app.theme_dir is not None: + self.mounts['theme'] = app.theme_dir + + self.skip_patterns += ['_cache', '_content', '_counter', + 'theme_info.yml', + '.DS_Store', 'Thumbs.db', + '.git*', '.hg*', '.svn'] + + self.skip_patterns = make_re(self.skip_patterns) + self.force_patterns = make_re(self.force_patterns) + + def run(self, src_dir_or_file=None): + record = ProcessorPipelineRecord() + + # Create the workers. + pool = [] + queue = Queue() + abort = threading.Event() + pipeline_lock = threading.Lock() + for i in range(self.num_workers): + ctx = ProcessingWorkerContext(self, record, queue, abort, + pipeline_lock) + worker = ProcessingWorker(i, ctx) + worker.start() + pool.append(worker) + + # Invoke pre-processors. + for proc in self.processors: + proc.onPipelineStart(self) + + if src_dir_or_file is not None: + # Process only the given path. + # Find out if this source directory is in a mount point. + base_dir = self.app.root_dir + for name, path in self.mounts.iteritems(): + if src_dir_or_file[:len(path)] == path: + base_dir = path + + ctx = ProcessingContext(base_dir, queue, record) + logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) + if os.path.isdir(src_dir_or_file): + self.processDirectory(ctx, src_dir_or_file) + elif os.path.isfile(src_dir_or_file): + self.processFile(ctx, src_dir_or_file) + + else: + # Process everything. + ctx = ProcessingContext(self.app.root_dir, queue, record) + logger.debug("Initiating processing pipeline on: %s" % self.app.root_dir) + self.processDirectory(ctx, self.app.root_dir) + ctx.is_multi_mount = True + for name, path in self.mounts.iteritems(): + mount_ctx = ProcessingContext(path, queue, record) + logger.debug("Initiating processing pipeline on: %s" % path) + self.processDirectory(mount_ctx, path) + + # Wait on all workers. + for w in pool: + w.join() + if abort.is_set(): + raise Exception("Worker pool was aborted.") + + # Invoke post-processors. + for proc in self.processors: + proc.onPipelineEnd(self) + + return record + + def processDirectory(self, ctx, start_dir): + for dirpath, dirnames, filenames in os.walk(start_dir): + rel_dirpath = os.path.relpath(dirpath, start_dir) + dirnames[:] = [d for d in dirnames + if not re_matchany(os.path.join(rel_dirpath, d), + self.skip_patterns)] + + for filename in filenames: + if re_matchany(os.path.join(rel_dirpath, filename), + self.skip_patterns): + continue + self.processFile(ctx, os.path.join(dirpath, filename)) + + def processFile(self, ctx, path): + logger.debug("Queuing: %s" % path) + job = ProcessingWorkerJob(ctx.base_dir, path) + ctx.job_queue.put_nowait(job) + + +class ProcessingWorkerContext(object): + def __init__(self, pipeline, record, work_queue, abort_event, + pipeline_lock): + self.pipeline = pipeline + self.record = record + self.work_queue = work_queue + self.abort_event = abort_event + self.pipeline_lock = pipeline_lock + + +class ProcessingWorkerJob(object): + def __init__(self, base_dir, path): + self.base_dir = base_dir + self.path = path + + +class ProcessingWorker(threading.Thread): + def __init__(self, wid, ctx): + super(ProcessingWorker, self).__init__() + self.wid = wid + self.ctx = ctx + + def run(self): + while(not self.ctx.abort_event.is_set()): + try: + job = self.ctx.work_queue.get(True, 0.1) + except Empty: + logger.debug("[%d] No more work... shutting down." % self.wid) + break + + try: + self._unsafeRun(job) + logger.debug("[%d] Done with file." % self.wid) + self.ctx.work_queue.task_done() + except Exception as ex: + self.ctx.abort_event.set() + logger.error("[%d] Critical error, aborting." % self.wid) + logger.exception(ex) + break + + def _unsafeRun(self, job): + start_time = time.clock() + pipeline = self.ctx.pipeline + record = self.ctx.record + + rel_path = os.path.relpath(job.path, job.base_dir) + + # Figure out if a previously processed file is overriding this one. + # This can happen if a theme file (processed via a mount point) + # is overridden in the user's website. + if record.hasOverrideEntry(rel_path): + record.addEntry(ProcessorPipelineRecordEntry(rel_path, + is_processed=False, is_overridden=True)) + logger.info(format_timed(start_time, + '%s [not baked, overridden]' % rel_path)) + return + + builder = ProcessingTreeBuilder(pipeline.processors) + tree_root = builder.build(rel_path) + print_node(tree_root, recursive=True) + leaves = tree_root.getLeaves() + fi = ProcessorPipelineRecordEntry(rel_path) + fi.rel_outputs = [l.path for l in leaves] + record.addEntry(fi) + + force = pipeline.force + if not force: + force = re_matchany(rel_path, pipeline.force_patterns) + + if force: + tree_root.setState(STATE_DIRTY, True) + + runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir, + pipeline.out_dir, self.ctx.pipeline_lock) + if runner.processSubTree(tree_root): + fi.is_processed = True + logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) + + +def make_re(patterns): + re_patterns = [] + for pat in patterns: + if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: + re_patterns.append(pat[1:-1]) + else: + escaped_pat = (re.escape(pat) + .replace(r'\*', r'[^/\\]*') + .replace(r'\?', r'[^/\\]')) + re_patterns.append(escaped_pat) + return map(lambda p: re.compile(p), re_patterns) + + +def re_matchany(filename, patterns): + for pattern in patterns: + if pattern.match(filename): + return True + return False +