Mercurial > piecrust2
view piecrust/processing/base.py @ 111:208c652551a3
Quick fix for making the server correctly update referenced pages.
Disable the file-system cache for rendered segments when in server mode. We
can bring this optimization back when we're actually using the baking record
in the server too in order to know dependencies.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Thu, 16 Oct 2014 17:03:42 -0700 |
parents | 45828c4167ad |
children | 6827dcc9d3fb |
line wrap: on
line source
import re import time import shutil import os.path import logging import threading from queue import Queue, Empty from piecrust.chefutil import format_timed from piecrust.processing.tree import (ProcessingTreeBuilder, ProcessingTreeRunner, STATE_DIRTY, print_node) from piecrust.records import Record logger = logging.getLogger(__name__) PRIORITY_FIRST = -1 PRIORITY_NORMAL = 0 PRIORITY_LAST = 1 class Processor(object): PROCESSOR_NAME = None def __init__(self): self.priority = PRIORITY_NORMAL self.is_bypassing_structured_processing = False self.is_delegating_dependency_check = True def initialize(self, app): self.app = app def onPipelineStart(self, pipeline): pass def onPipelineEnd(self, pipeline): pass def matches(self, filename): return False def getDependencies(self, path): return None def getOutputFilenames(self, filename): return None def process(self, path, out_dir): pass class CopyFileProcessor(Processor): PROCESSOR_NAME = 'copy' def __init__(self): super(CopyFileProcessor, self).__init__() self.priority = PRIORITY_LAST def matches(self, filename): return True def getOutputFilenames(self, filename): return [filename] def process(self, path, out_dir): out_path = os.path.join(out_dir, os.path.basename(path)) logger.debug("Copying: %s -> %s" % (path, out_path)) shutil.copyfile(path, out_path) return True class SimpleFileProcessor(Processor): def __init__(self, extensions=None): super(SimpleFileProcessor, self).__init__() self.extensions = extensions or {} def matches(self, filename): for ext in self.extensions: if filename.endswith('.' + ext): return True return False def getOutputFilenames(self, filename): basename, ext = os.path.splitext(filename) ext = ext.lstrip('.') out_ext = self.extensions[ext] return ['%s.%s' % (basename, out_ext)] def process(self, path, out_dir): _, in_name = os.path.split(path) out_name = self.getOutputFilenames(in_name)[0] out_path = os.path.join(out_dir, out_name) return self._doProcess(path, out_path) def _doProcess(self, in_path, out_path): raise NotImplementedError() class ProcessorPipelineRecord(Record): VERSION = 1 def __init__(self): super(ProcessorPipelineRecord, self).__init__() def addEntry(self, item): self.entries.append(item) def hasOverrideEntry(self, rel_path): return self.findEntry(rel_path) is not None def findEntry(self, rel_path): rel_path = rel_path.lower() for entry in self.entries: for out_path in entry.rel_outputs: if out_path.lower() == rel_path: return entry return None class ProcessorPipelineRecordEntry(object): def __init__(self, base_dir, rel_input, is_processed=False, is_overridden=False): self.base_dir = base_dir self.rel_input = rel_input self.rel_outputs = [] self.is_processed = is_processed self.is_overridden = is_overridden @property def path(self): return os.path.join(self.base_dir, self.rel_input) class ProcessingContext(object): def __init__(self, base_dir, job_queue, record=None): self.base_dir = base_dir self.job_queue = job_queue self.record = record class ProcessorPipeline(object): def __init__(self, app, mounts, out_dir, force=False, skip_patterns=None, force_patterns=None, num_workers=4): self.app = app self.mounts = mounts tmp_dir = app.cache_dir if not tmp_dir: import tempfile tmp_dir = os.path.join(tempfile.gettempdir(), 'piecrust') self.tmp_dir = os.path.join(tmp_dir, 'proc') self.out_dir = out_dir self.force = force self.skip_patterns = skip_patterns or [] self.force_patterns = force_patterns or [] self.processors = app.plugin_loader.getProcessors() self.num_workers = num_workers self.skip_patterns += ['_cache', '_counter', 'theme_info.yml', '.DS_Store', 'Thumbs.db', '.git*', '.hg*', '.svn'] self.skip_patterns = make_re(self.skip_patterns) self.force_patterns = make_re(self.force_patterns) def filterProcessors(self, authorized_names): self.processors = list(filter( lambda p: p.PROCESSOR_NAME in authorized_names, self.processors)) def run(self, src_dir_or_file=None): record = ProcessorPipelineRecord() # Create the workers. pool = [] queue = Queue() abort = threading.Event() pipeline_lock = threading.Lock() for i in range(self.num_workers): ctx = ProcessingWorkerContext(self, record, queue, abort, pipeline_lock) worker = ProcessingWorker(i, ctx) worker.start() pool.append(worker) # Invoke pre-processors. for proc in self.processors: proc.onPipelineStart(self) if src_dir_or_file is not None: # Process only the given path. # Find out what mount point this is in. for path in self.mounts: if src_dir_or_file[:len(path)] == path: base_dir = path break else: raise Exception("Input path '%s' is not part of any known " "mount point: %s" % (src_dir_or_file, self.mounts)) ctx = ProcessingContext(base_dir, queue, record) logger.debug("Initiating processing pipeline on: %s" % src_dir_or_file) if os.path.isdir(src_dir_or_file): self.processDirectory(ctx, src_dir_or_file) elif os.path.isfile(src_dir_or_file): self.processFile(ctx, src_dir_or_file) else: # Process everything. for path in self.mounts: ctx = ProcessingContext(path, queue, record) logger.debug("Initiating processing pipeline on: %s" % path) self.processDirectory(ctx, path) # Wait on all workers. for w in pool: w.join() if abort.is_set(): raise Exception("Worker pool was aborted.") # Invoke post-processors. for proc in self.processors: proc.onPipelineEnd(self) return record def processDirectory(self, ctx, start_dir): for dirpath, dirnames, filenames in os.walk(start_dir): rel_dirpath = os.path.relpath(dirpath, start_dir) dirnames[:] = [d for d in dirnames if not re_matchany(d, self.skip_patterns, rel_dirpath)] for filename in filenames: if re_matchany(filename, self.skip_patterns, rel_dirpath): continue self.processFile(ctx, os.path.join(dirpath, filename)) def processFile(self, ctx, path): logger.debug("Queuing: %s" % path) job = ProcessingWorkerJob(ctx.base_dir, path) ctx.job_queue.put_nowait(job) class ProcessingWorkerContext(object): def __init__(self, pipeline, record, work_queue, abort_event, pipeline_lock): self.pipeline = pipeline self.record = record self.work_queue = work_queue self.abort_event = abort_event self.pipeline_lock = pipeline_lock class ProcessingWorkerJob(object): def __init__(self, base_dir, path): self.base_dir = base_dir self.path = path class ProcessingWorker(threading.Thread): def __init__(self, wid, ctx): super(ProcessingWorker, self).__init__() self.wid = wid self.ctx = ctx def run(self): while(not self.ctx.abort_event.is_set()): try: job = self.ctx.work_queue.get(True, 0.1) except Empty: logger.debug("[%d] No more work... shutting down." % self.wid) break try: self._unsafeRun(job) logger.debug("[%d] Done with file." % self.wid) self.ctx.work_queue.task_done() except Exception as ex: self.ctx.abort_event.set() logger.error("[%d] Critical error, aborting." % self.wid) logger.exception(ex) break def _unsafeRun(self, job): start_time = time.clock() pipeline = self.ctx.pipeline record = self.ctx.record rel_path = os.path.relpath(job.path, job.base_dir) # Figure out if a previously processed file is overriding this one. # This can happen if a theme file (processed via a mount point) # is overridden in the user's website. if record.hasOverrideEntry(rel_path): record.addEntry(ProcessorPipelineRecordEntry( job.base_dir, rel_path, is_processed=False, is_overridden=True)) logger.info(format_timed(start_time, '%s [not baked, overridden]' % rel_path)) return builder = ProcessingTreeBuilder(pipeline.processors) tree_root = builder.build(rel_path) print_node(tree_root, recursive=True) leaves = tree_root.getLeaves() fi = ProcessorPipelineRecordEntry(job.base_dir, rel_path) fi.rel_outputs = [l.path for l in leaves] record.addEntry(fi) force = pipeline.force if not force: force = re_matchany(rel_path, pipeline.force_patterns) if force: tree_root.setState(STATE_DIRTY, True) runner = ProcessingTreeRunner(job.base_dir, pipeline.tmp_dir, pipeline.out_dir, self.ctx.pipeline_lock) if runner.processSubTree(tree_root): fi.is_processed = True logger.info(format_timed(start_time, "[%d] %s" % (self.wid, rel_path))) def make_re(patterns): re_patterns = [] for pat in patterns: if pat[0] == '/' and pat[-1] == '/' and len(pat) > 2: re_patterns.append(pat[1:-1]) else: escaped_pat = (re.escape(pat) .replace(r'\*', r'[^/\\]*') .replace(r'\?', r'[^/\\]')) re_patterns.append(escaped_pat) return [re.compile(p) for p in re_patterns] def re_matchany(filename, patterns, dirname=None): if dirname and dirname != '.': filename = os.path.join(dirname, filename) # skip patterns use a forward slash regardless of the platform. filename = filename.replace('\\', '/') for pattern in patterns: if pattern.search(filename): return True return False