changeset 85:3471ffa059b2

Add a `BakeScheduler` to handle build dependencies. Add unit-tests.
author Ludovic Chabant <ludovic@chabant.com>
date Wed, 03 Sep 2014 17:27:50 -0700
parents b3ce11b2cf36
children 1cd67680c38c
files piecrust/baking/baker.py piecrust/baking/records.py tests/mockutil.py tests/test_baking_baker.py tests/test_data_assetor.py tests/test_processing_base.py
diffstat 6 files changed, 227 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/piecrust/baking/baker.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/piecrust/baking/baker.py	Wed Sep 03 17:27:50 2014 -0700
@@ -6,7 +6,6 @@
 import logging
 import threading
 import urllib.request, urllib.error, urllib.parse
-from queue import Queue, Empty
 from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry
 from piecrust.chefutil import format_timed, log_friendly_exception
 from piecrust.data.filters import (PaginationFilter, HasFilterClause,
@@ -27,7 +26,6 @@
         self.out_dir = out_dir
         self.force = force
         self.record = record
-        self.force = force
         self.copy_assets = copy_assets
         self.site_root = app.config.get('site/root')
         self.pretty_urls = app.config.get('site/pretty_urls')
@@ -136,8 +134,9 @@
         cur_record_entry = BakeRecordPageEntry(page)
         cur_record_entry.taxonomy_name = taxonomy_name
         cur_record_entry.taxonomy_term = taxonomy_term
-        prev_record_entry = self.record.getPreviousEntry(page, taxonomy_name,
-                taxonomy_term)
+        prev_record_entry = self.record.getPreviousEntry(
+                factory.source.name, factory.rel_path,
+                taxonomy_name, taxonomy_term)
 
         logger.debug("Baking '%s'..." % uri)
         while has_more_subs:
@@ -351,6 +350,7 @@
                 reason = "templates modified"
 
         if reason is not None:
+            # We have to bake everything from scratch.
             cache_dir = self.app.cache.getCacheDir('baker')
             if os.path.isdir(cache_dir):
                 logger.debug("Cleaning baker cache: %s" % cache_dir)
@@ -382,7 +382,7 @@
                     continue
 
                 logger.debug("Queuing: %s" % fac.ref_spec)
-                queue.put_nowait(BakeWorkerJob(fac, route))
+                queue.addJob(BakeWorkerJob(fac, route))
 
         self._waitOnWorkerPool(pool, abort)
 
@@ -469,7 +469,7 @@
                             {tax.term_name: term})
                     logger.debug("Queuing: %s [%s, %s]" %
                             (fac.ref_spec, tax_name, term))
-                    queue.put_nowait(
+                    queue.addJob(
                             BakeWorkerJob(fac, route, tax_name, term))
 
         self._waitOnWorkerPool(pool, abort)
@@ -487,23 +487,24 @@
 
     def _createWorkerPool(self, record, pool_size=4):
         pool = []
-        queue = Queue()
+        queue = BakeScheduler(record)
         abort = threading.Event()
         for i in range(pool_size):
             ctx = BakeWorkerContext(self.app, self.out_dir, self.force,
                     record, queue, abort)
             worker = BakeWorker(i, ctx)
-            worker.start()
             pool.append(worker)
         return pool, queue, abort
 
     def _waitOnWorkerPool(self, pool, abort):
         for w in pool:
+            w.start()
+        for w in pool:
             w.join()
         if abort.is_set():
             excs = [w.abort_exception for w in pool
                     if w.abort_exception is not None]
-            logger.error("%s errors" % len(excs))
+            logger.error("Baking was aborted due to %s error(s):" % len(excs))
             if self.app.debug:
                 for e in excs:
                     logger.exception(e)
@@ -513,6 +514,86 @@
             raise Exception("Baking was aborted due to errors.")
 
 
+class BakeScheduler(object):
+    _EMPTY = object()
+    _WAIT = object()
+
+    def __init__(self, record, jobs=None):
+        self.record = record
+        self.jobs = list(jobs) if jobs is not None else []
+        self._active_jobs = []
+        self._lock = threading.Lock()
+        self._added_event = threading.Event()
+        self._done_event = threading.Event()
+
+    def addJob(self, job):
+        logger.debug("Adding job '%s:%s' to scheduler." % (
+            job.factory.source.name, job.factory.rel_path))
+        with self._lock:
+            self.jobs.append(job)
+        self._added_event.set()
+
+    def onJobFinished(self, job):
+        logger.debug("Removing job '%s:%s' from scheduler." % (
+            job.factory.source.name, job.factory.rel_path))
+        with self._lock:
+            self._active_jobs.remove(job)
+        self._done_event.set()
+
+    def getNextJob(self, timeout=None):
+        self._added_event.clear()
+        self._done_event.clear()
+        job = self._doGetNextJob()
+        while job in (self._EMPTY, self._WAIT):
+            if timeout is None:
+                return None
+            if job == self._EMPTY:
+                logger.debug("Waiting for a new job to be added...")
+                res = self._added_event.wait(timeout)
+            elif job == self._WAIT:
+                logger.debug("Waiting for a job to be finished...")
+                res = self._done_event.wait(timeout)
+            if not res:
+                logger.debug("Timed-out. No job found.")
+                return None
+            job = self._doGetNextJob()
+        return job
+
+    def _doGetNextJob(self):
+        with self._lock:
+            if len(self.jobs) == 0:
+                return self._EMPTY
+
+            job = self.jobs.pop(0)
+            first_job = job
+            while not self._isJobReady(job):
+                logger.debug("Job '%s:%s' isn't ready yet." % (
+                        job.factory.source.name, job.factory.rel_path))
+                self.jobs.append(job)
+                job = self.jobs.pop(0)
+                if job == first_job:
+                    # None of the jobs are ready... we need to wait.
+                    return self._WAIT
+
+            logger.debug("Job '%s:%s' is ready to go, moving to active "
+                    "queue." % (job.factory.source.name, job.factory.rel_path))
+            self._active_jobs.append(job)
+            return job
+
+    def _isJobReady(self, job):
+        e = self.record.getPreviousEntry(job.factory.source.name,
+                job.factory.rel_path)
+        if not e:
+            return True
+        for sn in e.used_source_names:
+            if any(filter(lambda j: j.factory.source.name == sn, self.jobs)):
+                return False
+            if any(filter(lambda j: j.factory.source.name == sn,
+                    self._active_jobs)):
+                return False
+        return True
+
+
 class BakeWorkerContext(object):
     def __init__(self, app, out_dir, force, record, work_queue,
             abort_event):
@@ -547,16 +628,15 @@
 
     def run(self):
         while(not self.ctx.abort_event.is_set()):
-            try:
-                job = self.ctx.work_queue.get(True, 0.1)
-            except Empty:
+            job = self.ctx.work_queue.getNextJob()
+            if job is None:
                 logger.debug("[%d] No more work... shutting down." % self.wid)
                 break
 
             try:
                 self._unsafeRun(job)
                 logger.debug("[%d] Done with page." % self.wid)
-                self.ctx.work_queue.task_done()
+                self.ctx.work_queue.onJobFinished(job)
             except Exception as ex:
                 self.ctx.abort_event.set()
                 self.abort_exception = ex
--- a/piecrust/baking/records.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/piecrust/baking/records.py	Wed Sep 03 17:27:50 2014 -0700
@@ -111,8 +111,9 @@
                 return prev
         return None
 
-    def getPreviousEntry(self, page, taxonomy_name=None, taxonomy_term=None):
-        key = _get_transition_key(page.source.name, page.rel_path,
+    def getPreviousEntry(self, source_name, rel_path, taxonomy_name=None,
+            taxonomy_term=None):
+        key = _get_transition_key(source_name, rel_path,
                 taxonomy_name, taxonomy_term)
         pair = self.transitions.get(key)
         if pair is not None:
--- a/tests/mockutil.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/tests/mockutil.py	Wed Sep 03 17:27:50 2014 -0700
@@ -22,13 +22,31 @@
     return app
 
 
-def with_mock_fs_app(f):
-    @functools.wraps(f)
-    def wrapper(app, *args, **kwargs):
-        with mock_fs_scope(app):
-            real_app = app.getApp()
-            return f(real_app, *args, **kwargs)
-    return wrapper
+class _MockFsEntry(object):
+    def __init__(self, contents):
+        self.contents = contents
+        self.metadata = {'mtime': time.time()}
+
+
+class _MockFsEntryWriter(object):
+    def __init__(self, entry):
+        self._entry = entry
+        if isinstance(entry.contents, str):
+            self._stream = io.StringIO(entry.contents)
+        elif isinstance(entry.contents, bytes):
+            self._stream = io.BytesIO(entry.contents)
+        else:
+            raise Exception("Unexpected entry contents: %s" % type(entry.contents))
+
+    def __getattr__(self, name):
+        return getattr(self._stream, name)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_tb):
+        self._entry.contents = self._stream.getvalue()
+        self._stream.close()
 
 
 class mock_fs(object):
@@ -46,9 +64,9 @@
             return '/%s' % self._root
         return '/%s/%s' % (self._root, p.lstrip('/'))
 
-    def getApp(self):
+    def getApp(self, cache=True):
         root_dir = self.path('/kitchen')
-        return PieCrust(root_dir, cache=False)
+        return PieCrust(root_dir, cache=cache, debug=True)
 
     def withDir(self, path):
         path = path.replace('\\', '/')
@@ -112,8 +130,8 @@
         return res
 
     def _getStructureRecursive(self, src, target, name):
-        if isinstance(src, tuple):
-            target[name] = src[0]
+        if isinstance(src, _MockFsEntry):
+            target[name] = src.contents
             return
 
         e = {}
@@ -134,7 +152,8 @@
 
     def _createDir(self, path):
         cur = self._fs
-        bits = path.strip('/').split('/')
+        path = path.replace('\\', '/').strip('/')
+        bits = path.split('/')
         for b in bits:
             if b not in cur:
                 cur[b] = {}
@@ -143,14 +162,22 @@
 
     def _createFile(self, path, contents):
         cur = self._fs
-        bits = path.strip('/').split('/')
+        path = path.replace('\\', '/').lstrip('/')
+        bits = path.split('/')
         for b in bits[:-1]:
             if b not in cur:
                 cur[b] = {}
             cur = cur[b]
-        cur[bits[-1]] = (contents, {'mtime': time.time()})
+        cur[bits[-1]] = _MockFsEntry(contents)
         return self
 
+    def _deleteEntry(self, path):
+        parent = self._getEntry(os.path.dirname(path))
+        assert parent is not None
+        name = os.path.basename(path)
+        assert name in parent
+        del parent[name]
+
 
 class mock_fs_scope(object):
     def __init__(self, fs):
@@ -171,6 +198,8 @@
 
     def _startMock(self):
         self._createMock('__main__.open', open, self._open, create=True)
+        # TODO: WTF, apparently the previous one doesn't really work?
+        self._createMock('piecrust.records.open', open, self._open, create=True)
         self._createMock('codecs.open', codecs.open, self._codecsOpen)
         self._createMock('os.listdir', os.listdir, self._listdir)
         self._createMock('os.makedirs', os.makedirs, self._makedirs)
@@ -179,6 +208,7 @@
         self._createMock('os.path.islink', os.path.islink, self._islink)
         self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime)
         self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile)
+        self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree)
         for p in self._patchers:
             p.start()
 
@@ -190,31 +220,46 @@
         self._originals[name] = orig
         self._patchers.append(mock.patch(name, func, **kwargs))
 
-    def _open(self, path, *args, **kwargs):
+    def _doOpen(self, orig_name, path, mode, *args, **kwargs):
         path = os.path.normpath(path)
         if path.startswith(resources_path):
-            return self._originals['__main__.open'](path, **kwargs)
-        e = self._getFsEntry(path)
+            return self._originals[orig_name](path, mode, *args, **kwargs)
+
+        if 'r' in mode:
+            e = self._getFsEntry(path)
+        elif 'w' in mode:
+            e = self._getFsEntry(path)
+            if e is None:
+                contents = ''
+                if 'b' in mode:
+                    contents = bytes()
+                self._fs._createFile(path, contents)
+                e = self._getFsEntry(path)
+                assert e is not None
+        else:
+            raise OSError("Unsupported open mode: %s" % mode)
+
         if e is None:
             raise OSError("No such file: %s" % path)
-        if not isinstance(e, tuple):
-            raise OSError("'%s' is not a file" % path)
-        return io.StringIO(e[0])
+        if not isinstance(e, _MockFsEntry):
+            raise OSError("'%s' is not a file %s" % (path, e))
+        if 'b' in mode:
+            assert isinstance(e.contents, bytes)
+            return _MockFsEntryWriter(e)
+        assert isinstance(e.contents, str)
+        return _MockFsEntryWriter(e)
 
-    def _codecsOpen(self, path, *args, **kwargs):
+    def _open(self, path, mode, *args, **kwargs):
+        return self._doOpen('__main__.open', path, mode, *args, **kwargs)
+
+    def _codecsOpen(self, path, mode, *args, **kwargs):
+        return self._doOpen('codecs.open', path, mode, *args, **kwargs)
+
+    def _listdir(self, path):
         path = os.path.normpath(path)
         if path.startswith(resources_path):
-            return self._originals['codecs.open'](path, *args, **kwargs)
-        e = self._getFsEntry(path)
-        if e is None:
-            raise OSError("No such file: %s" % path)
-        if not isinstance(e, tuple):
-            raise OSError("'%s' is not a file" % path)
-        return io.StringIO(e[0])
+            return self._originals['os.listdir'](path)
 
-    def _listdir(self, path):
-        if not path.startswith('/' + self.root):
-            return self._originals['os.listdir'](path)
         e = self._getFsEntry(path)
         if e is None:
             raise OSError("No such directory: %s" % path)
@@ -222,47 +267,58 @@
             raise OSError("'%s' is not a directory." % path)
         return list(e.keys())
 
-    def _makedirs(self, path, mode):
-        if not path.startswith('/' + self.root):
+    def _makedirs(self, path, mode=0o777):
+        if not path.replace('\\', '/').startswith('/' + self.root):
             raise Exception("Shouldn't create directory: %s" % path)
         self._fs._createDir(path)
 
     def _isdir(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.isdir'](path)
         e = self._getFsEntry(path)
         return e is not None and isinstance(e, dict)
 
     def _isfile(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.isfile'](path)
         e = self._getFsEntry(path)
-        return e is not None and isinstance(e, tuple)
+        return e is not None and isinstance(e, _MockFsEntry)
 
     def _islink(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.islink'](path)
         return False
 
     def _getmtime(self, path):
-        if not path.startswith('/' + self.root):
+        path = os.path.normpath(path)
+        if path.startswith(resources_path):
             return self._originals['os.path.getmtime'](path)
         e = self._getFsEntry(path)
         if e is None:
             raise OSError("No such file: %s" % path)
-        return e[1]['mtime']
+        return e.metadata['mtime']
 
     def _copyfile(self, src, dst):
-        if not src.startswith('/' + self.root):
-            with open(src, 'r') as fp:
+        src = os.path.normpath(src)
+        if src.startswith(resources_path):
+            with self._originals['__main__.open'](src, 'r') as fp:
                 src_text = fp.read()
         else:
             e = self._getFsEntry(src)
-            src_text = e[0]
-        if not dst.startswith('/' + self.root):
+            src_text = e.contents
+        if not dst.replace('\\', '/').startswith('/' + self.root):
             raise Exception("Shouldn't copy to: %s" % dst)
         self._fs._createFile(dst, src_text)
 
+    def _rmtree(self, path):
+        if not path.replace('\\', '/').startswith('/' + self.root):
+            raise Exception("Shouldn't delete trees from: %s" % path)
+        e = self._fs._getEntry(os.path.dirname(path))
+        del e[os.path.basename(path)]
+
     def _getFsEntry(self, path):
         return self._fs._getEntry(path)
 
--- a/tests/test_baking_baker.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/tests/test_baking_baker.py	Wed Sep 03 17:27:50 2014 -0700
@@ -1,7 +1,7 @@
 import os.path
 import pytest
-from piecrust.baking.baker import PageBaker
-from .mockutil import get_mock_app
+from piecrust.baking.baker import PageBaker, Baker
+from .mockutil import get_mock_app, mock_fs, mock_fs_scope
 
 
 @pytest.mark.parametrize('uri, page_num, pretty, expected', [
@@ -45,3 +45,29 @@
             os.path.join('/destination', expected))
     assert expected == path
 
+
+def test_empty_bake():
+    fs = mock_fs()
+    with mock_fs_scope(fs):
+        assert not os.path.isdir(fs.path('kitchen/_counter'))
+        app = fs.getApp()
+        baker = Baker(app)
+        baker.bake()
+        assert os.path.isdir(fs.path('kitchen/_counter'))
+        structure = fs.getStructure('kitchen/_counter')
+        assert list(structure.keys()) == ['index.html']
+
+
+def test_simple_bake():
+    fs = (mock_fs()
+            .withPage('posts/2010-01-01_post1.md', {'layout': 'none', 'format': 'none'}, 'post one')
+            .withPage('pages/_index.md', {'layout': 'none', 'format': 'none'}, "something"))
+    with mock_fs_scope(fs):
+        app = fs.getApp()
+        baker = Baker(app)
+        baker.bake()
+        structure = fs.getStructure('kitchen/_counter')
+        assert structure == {
+                '2010': {'01': {'01': {'post1.html': 'post one'}}},
+                'index.html': 'something'}
+
--- a/tests/test_data_assetor.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/tests/test_data_assetor.py	Wed Sep 03 17:27:50 2014 -0700
@@ -20,7 +20,7 @@
 def test_assets(fs, expected):
     with mock_fs_scope(fs):
         page = MagicMock()
-        page.app = fs.getApp()
+        page.app = fs.getApp(cache=False)
         page.app.env.base_asset_url_format = '%uri%'
         page.path = fs.path('/kitchen/pages/foo/bar.md')
         assetor = Assetor(page, '/foo/bar')
@@ -36,7 +36,7 @@
         fs = mock_fs().withPage('pages/foo/bar')
         with mock_fs_scope(fs):
             page = MagicMock()
-            page.app = fs.getApp()
+            page.app = fs.getApp(cache=False)
             page.path = fs.path('/kitchen/pages/foo/bar.md')
             assetor = Assetor(page, '/foo/bar')
             assetor['this_doesnt_exist']
@@ -50,7 +50,7 @@
                 .withPageAsset('pages/foo/bar', 'one.jpg', 'one picture'))
         with mock_fs_scope(fs):
             page = MagicMock()
-            page.app = fs.getApp()
+            page.app = fs.getApp(cache=False)
             page.path = fs.path('/kitchen/pages/foo/bar.md')
             assetor = Assetor(page, '/foo/bar')
             assetor['one']
--- a/tests/test_processing_base.py	Wed Sep 03 17:26:38 2014 -0700
+++ b/tests/test_processing_base.py	Wed Sep 03 17:27:50 2014 -0700
@@ -5,7 +5,7 @@
 
 
 def _get_pipeline(fs, **kwargs):
-    app = fs.getApp()
+    app = fs.getApp(cache=False)
     mounts = [os.path.join(app.root_dir, 'assets')]
     return ProcessorPipeline(app, mounts, fs.path('counter'),
             num_workers=1, **kwargs)