# HG changeset patch # User Ludovic Chabant # Date 1409790470 25200 # Node ID 3471ffa059b2bb94b7b4a21155794c90256121ff # Parent b3ce11b2cf360d2b58e288ebc97938496fa43824 Add a `BakeScheduler` to handle build dependencies. Add unit-tests. diff -r b3ce11b2cf36 -r 3471ffa059b2 piecrust/baking/baker.py --- a/piecrust/baking/baker.py Wed Sep 03 17:26:38 2014 -0700 +++ b/piecrust/baking/baker.py Wed Sep 03 17:27:50 2014 -0700 @@ -6,7 +6,6 @@ import logging import threading import urllib.request, urllib.error, urllib.parse -from queue import Queue, Empty from piecrust.baking.records import TransitionalBakeRecord, BakeRecordPageEntry from piecrust.chefutil import format_timed, log_friendly_exception from piecrust.data.filters import (PaginationFilter, HasFilterClause, @@ -27,7 +26,6 @@ self.out_dir = out_dir self.force = force self.record = record - self.force = force self.copy_assets = copy_assets self.site_root = app.config.get('site/root') self.pretty_urls = app.config.get('site/pretty_urls') @@ -136,8 +134,9 @@ cur_record_entry = BakeRecordPageEntry(page) cur_record_entry.taxonomy_name = taxonomy_name cur_record_entry.taxonomy_term = taxonomy_term - prev_record_entry = self.record.getPreviousEntry(page, taxonomy_name, - taxonomy_term) + prev_record_entry = self.record.getPreviousEntry( + factory.source.name, factory.rel_path, + taxonomy_name, taxonomy_term) logger.debug("Baking '%s'..." % uri) while has_more_subs: @@ -351,6 +350,7 @@ reason = "templates modified" if reason is not None: + # We have to bake everything from scratch. cache_dir = self.app.cache.getCacheDir('baker') if os.path.isdir(cache_dir): logger.debug("Cleaning baker cache: %s" % cache_dir) @@ -382,7 +382,7 @@ continue logger.debug("Queuing: %s" % fac.ref_spec) - queue.put_nowait(BakeWorkerJob(fac, route)) + queue.addJob(BakeWorkerJob(fac, route)) self._waitOnWorkerPool(pool, abort) @@ -469,7 +469,7 @@ {tax.term_name: term}) logger.debug("Queuing: %s [%s, %s]" % (fac.ref_spec, tax_name, term)) - queue.put_nowait( + queue.addJob( BakeWorkerJob(fac, route, tax_name, term)) self._waitOnWorkerPool(pool, abort) @@ -487,23 +487,24 @@ def _createWorkerPool(self, record, pool_size=4): pool = [] - queue = Queue() + queue = BakeScheduler(record) abort = threading.Event() for i in range(pool_size): ctx = BakeWorkerContext(self.app, self.out_dir, self.force, record, queue, abort) worker = BakeWorker(i, ctx) - worker.start() pool.append(worker) return pool, queue, abort def _waitOnWorkerPool(self, pool, abort): for w in pool: + w.start() + for w in pool: w.join() if abort.is_set(): excs = [w.abort_exception for w in pool if w.abort_exception is not None] - logger.error("%s errors" % len(excs)) + logger.error("Baking was aborted due to %s error(s):" % len(excs)) if self.app.debug: for e in excs: logger.exception(e) @@ -513,6 +514,86 @@ raise Exception("Baking was aborted due to errors.") +class BakeScheduler(object): + _EMPTY = object() + _WAIT = object() + + def __init__(self, record, jobs=None): + self.record = record + self.jobs = list(jobs) if jobs is not None else [] + self._active_jobs = [] + self._lock = threading.Lock() + self._added_event = threading.Event() + self._done_event = threading.Event() + + def addJob(self, job): + logger.debug("Adding job '%s:%s' to scheduler." % ( + job.factory.source.name, job.factory.rel_path)) + with self._lock: + self.jobs.append(job) + self._added_event.set() + + def onJobFinished(self, job): + logger.debug("Removing job '%s:%s' from scheduler." % ( + job.factory.source.name, job.factory.rel_path)) + with self._lock: + self._active_jobs.remove(job) + self._done_event.set() + + def getNextJob(self, timeout=None): + self._added_event.clear() + self._done_event.clear() + job = self._doGetNextJob() + while job in (self._EMPTY, self._WAIT): + if timeout is None: + return None + if job == self._EMPTY: + logger.debug("Waiting for a new job to be added...") + res = self._added_event.wait(timeout) + elif job == self._WAIT: + logger.debug("Waiting for a job to be finished...") + res = self._done_event.wait(timeout) + if not res: + logger.debug("Timed-out. No job found.") + return None + job = self._doGetNextJob() + return job + + def _doGetNextJob(self): + with self._lock: + if len(self.jobs) == 0: + return self._EMPTY + + job = self.jobs.pop(0) + first_job = job + while not self._isJobReady(job): + logger.debug("Job '%s:%s' isn't ready yet." % ( + job.factory.source.name, job.factory.rel_path)) + self.jobs.append(job) + job = self.jobs.pop(0) + if job == first_job: + # None of the jobs are ready... we need to wait. + return self._WAIT + + logger.debug("Job '%s:%s' is ready to go, moving to active " + "queue." % (job.factory.source.name, job.factory.rel_path)) + self._active_jobs.append(job) + return job + + def _isJobReady(self, job): + e = self.record.getPreviousEntry(job.factory.source.name, + job.factory.rel_path) + if not e: + return True + for sn in e.used_source_names: + if any(filter(lambda j: j.factory.source.name == sn, self.jobs)): + return False + if any(filter(lambda j: j.factory.source.name == sn, + self._active_jobs)): + return False + return True + + class BakeWorkerContext(object): def __init__(self, app, out_dir, force, record, work_queue, abort_event): @@ -547,16 +628,15 @@ def run(self): while(not self.ctx.abort_event.is_set()): - try: - job = self.ctx.work_queue.get(True, 0.1) - except Empty: + job = self.ctx.work_queue.getNextJob() + if job is None: logger.debug("[%d] No more work... shutting down." % self.wid) break try: self._unsafeRun(job) logger.debug("[%d] Done with page." % self.wid) - self.ctx.work_queue.task_done() + self.ctx.work_queue.onJobFinished(job) except Exception as ex: self.ctx.abort_event.set() self.abort_exception = ex diff -r b3ce11b2cf36 -r 3471ffa059b2 piecrust/baking/records.py --- a/piecrust/baking/records.py Wed Sep 03 17:26:38 2014 -0700 +++ b/piecrust/baking/records.py Wed Sep 03 17:27:50 2014 -0700 @@ -111,8 +111,9 @@ return prev return None - def getPreviousEntry(self, page, taxonomy_name=None, taxonomy_term=None): - key = _get_transition_key(page.source.name, page.rel_path, + def getPreviousEntry(self, source_name, rel_path, taxonomy_name=None, + taxonomy_term=None): + key = _get_transition_key(source_name, rel_path, taxonomy_name, taxonomy_term) pair = self.transitions.get(key) if pair is not None: diff -r b3ce11b2cf36 -r 3471ffa059b2 tests/mockutil.py --- a/tests/mockutil.py Wed Sep 03 17:26:38 2014 -0700 +++ b/tests/mockutil.py Wed Sep 03 17:27:50 2014 -0700 @@ -22,13 +22,31 @@ return app -def with_mock_fs_app(f): - @functools.wraps(f) - def wrapper(app, *args, **kwargs): - with mock_fs_scope(app): - real_app = app.getApp() - return f(real_app, *args, **kwargs) - return wrapper +class _MockFsEntry(object): + def __init__(self, contents): + self.contents = contents + self.metadata = {'mtime': time.time()} + + +class _MockFsEntryWriter(object): + def __init__(self, entry): + self._entry = entry + if isinstance(entry.contents, str): + self._stream = io.StringIO(entry.contents) + elif isinstance(entry.contents, bytes): + self._stream = io.BytesIO(entry.contents) + else: + raise Exception("Unexpected entry contents: %s" % type(entry.contents)) + + def __getattr__(self, name): + return getattr(self._stream, name) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._entry.contents = self._stream.getvalue() + self._stream.close() class mock_fs(object): @@ -46,9 +64,9 @@ return '/%s' % self._root return '/%s/%s' % (self._root, p.lstrip('/')) - def getApp(self): + def getApp(self, cache=True): root_dir = self.path('/kitchen') - return PieCrust(root_dir, cache=False) + return PieCrust(root_dir, cache=cache, debug=True) def withDir(self, path): path = path.replace('\\', '/') @@ -112,8 +130,8 @@ return res def _getStructureRecursive(self, src, target, name): - if isinstance(src, tuple): - target[name] = src[0] + if isinstance(src, _MockFsEntry): + target[name] = src.contents return e = {} @@ -134,7 +152,8 @@ def _createDir(self, path): cur = self._fs - bits = path.strip('/').split('/') + path = path.replace('\\', '/').strip('/') + bits = path.split('/') for b in bits: if b not in cur: cur[b] = {} @@ -143,14 +162,22 @@ def _createFile(self, path, contents): cur = self._fs - bits = path.strip('/').split('/') + path = path.replace('\\', '/').lstrip('/') + bits = path.split('/') for b in bits[:-1]: if b not in cur: cur[b] = {} cur = cur[b] - cur[bits[-1]] = (contents, {'mtime': time.time()}) + cur[bits[-1]] = _MockFsEntry(contents) return self + def _deleteEntry(self, path): + parent = self._getEntry(os.path.dirname(path)) + assert parent is not None + name = os.path.basename(path) + assert name in parent + del parent[name] + class mock_fs_scope(object): def __init__(self, fs): @@ -171,6 +198,8 @@ def _startMock(self): self._createMock('__main__.open', open, self._open, create=True) + # TODO: WTF, apparently the previous one doesn't really work? + self._createMock('piecrust.records.open', open, self._open, create=True) self._createMock('codecs.open', codecs.open, self._codecsOpen) self._createMock('os.listdir', os.listdir, self._listdir) self._createMock('os.makedirs', os.makedirs, self._makedirs) @@ -179,6 +208,7 @@ self._createMock('os.path.islink', os.path.islink, self._islink) self._createMock('os.path.getmtime', os.path.getmtime, self._getmtime) self._createMock('shutil.copyfile', shutil.copyfile, self._copyfile) + self._createMock('shutil.rmtree', shutil.rmtree, self._rmtree) for p in self._patchers: p.start() @@ -190,31 +220,46 @@ self._originals[name] = orig self._patchers.append(mock.patch(name, func, **kwargs)) - def _open(self, path, *args, **kwargs): + def _doOpen(self, orig_name, path, mode, *args, **kwargs): path = os.path.normpath(path) if path.startswith(resources_path): - return self._originals['__main__.open'](path, **kwargs) - e = self._getFsEntry(path) + return self._originals[orig_name](path, mode, *args, **kwargs) + + if 'r' in mode: + e = self._getFsEntry(path) + elif 'w' in mode: + e = self._getFsEntry(path) + if e is None: + contents = '' + if 'b' in mode: + contents = bytes() + self._fs._createFile(path, contents) + e = self._getFsEntry(path) + assert e is not None + else: + raise OSError("Unsupported open mode: %s" % mode) + if e is None: raise OSError("No such file: %s" % path) - if not isinstance(e, tuple): - raise OSError("'%s' is not a file" % path) - return io.StringIO(e[0]) + if not isinstance(e, _MockFsEntry): + raise OSError("'%s' is not a file %s" % (path, e)) + if 'b' in mode: + assert isinstance(e.contents, bytes) + return _MockFsEntryWriter(e) + assert isinstance(e.contents, str) + return _MockFsEntryWriter(e) - def _codecsOpen(self, path, *args, **kwargs): + def _open(self, path, mode, *args, **kwargs): + return self._doOpen('__main__.open', path, mode, *args, **kwargs) + + def _codecsOpen(self, path, mode, *args, **kwargs): + return self._doOpen('codecs.open', path, mode, *args, **kwargs) + + def _listdir(self, path): path = os.path.normpath(path) if path.startswith(resources_path): - return self._originals['codecs.open'](path, *args, **kwargs) - e = self._getFsEntry(path) - if e is None: - raise OSError("No such file: %s" % path) - if not isinstance(e, tuple): - raise OSError("'%s' is not a file" % path) - return io.StringIO(e[0]) + return self._originals['os.listdir'](path) - def _listdir(self, path): - if not path.startswith('/' + self.root): - return self._originals['os.listdir'](path) e = self._getFsEntry(path) if e is None: raise OSError("No such directory: %s" % path) @@ -222,47 +267,58 @@ raise OSError("'%s' is not a directory." % path) return list(e.keys()) - def _makedirs(self, path, mode): - if not path.startswith('/' + self.root): + def _makedirs(self, path, mode=0o777): + if not path.replace('\\', '/').startswith('/' + self.root): raise Exception("Shouldn't create directory: %s" % path) self._fs._createDir(path) def _isdir(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.isdir'](path) e = self._getFsEntry(path) return e is not None and isinstance(e, dict) def _isfile(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.isfile'](path) e = self._getFsEntry(path) - return e is not None and isinstance(e, tuple) + return e is not None and isinstance(e, _MockFsEntry) def _islink(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.islink'](path) return False def _getmtime(self, path): - if not path.startswith('/' + self.root): + path = os.path.normpath(path) + if path.startswith(resources_path): return self._originals['os.path.getmtime'](path) e = self._getFsEntry(path) if e is None: raise OSError("No such file: %s" % path) - return e[1]['mtime'] + return e.metadata['mtime'] def _copyfile(self, src, dst): - if not src.startswith('/' + self.root): - with open(src, 'r') as fp: + src = os.path.normpath(src) + if src.startswith(resources_path): + with self._originals['__main__.open'](src, 'r') as fp: src_text = fp.read() else: e = self._getFsEntry(src) - src_text = e[0] - if not dst.startswith('/' + self.root): + src_text = e.contents + if not dst.replace('\\', '/').startswith('/' + self.root): raise Exception("Shouldn't copy to: %s" % dst) self._fs._createFile(dst, src_text) + def _rmtree(self, path): + if not path.replace('\\', '/').startswith('/' + self.root): + raise Exception("Shouldn't delete trees from: %s" % path) + e = self._fs._getEntry(os.path.dirname(path)) + del e[os.path.basename(path)] + def _getFsEntry(self, path): return self._fs._getEntry(path) diff -r b3ce11b2cf36 -r 3471ffa059b2 tests/test_baking_baker.py --- a/tests/test_baking_baker.py Wed Sep 03 17:26:38 2014 -0700 +++ b/tests/test_baking_baker.py Wed Sep 03 17:27:50 2014 -0700 @@ -1,7 +1,7 @@ import os.path import pytest -from piecrust.baking.baker import PageBaker -from .mockutil import get_mock_app +from piecrust.baking.baker import PageBaker, Baker +from .mockutil import get_mock_app, mock_fs, mock_fs_scope @pytest.mark.parametrize('uri, page_num, pretty, expected', [ @@ -45,3 +45,29 @@ os.path.join('/destination', expected)) assert expected == path + +def test_empty_bake(): + fs = mock_fs() + with mock_fs_scope(fs): + assert not os.path.isdir(fs.path('kitchen/_counter')) + app = fs.getApp() + baker = Baker(app) + baker.bake() + assert os.path.isdir(fs.path('kitchen/_counter')) + structure = fs.getStructure('kitchen/_counter') + assert list(structure.keys()) == ['index.html'] + + +def test_simple_bake(): + fs = (mock_fs() + .withPage('posts/2010-01-01_post1.md', {'layout': 'none', 'format': 'none'}, 'post one') + .withPage('pages/_index.md', {'layout': 'none', 'format': 'none'}, "something")) + with mock_fs_scope(fs): + app = fs.getApp() + baker = Baker(app) + baker.bake() + structure = fs.getStructure('kitchen/_counter') + assert structure == { + '2010': {'01': {'01': {'post1.html': 'post one'}}}, + 'index.html': 'something'} + diff -r b3ce11b2cf36 -r 3471ffa059b2 tests/test_data_assetor.py --- a/tests/test_data_assetor.py Wed Sep 03 17:26:38 2014 -0700 +++ b/tests/test_data_assetor.py Wed Sep 03 17:27:50 2014 -0700 @@ -20,7 +20,7 @@ def test_assets(fs, expected): with mock_fs_scope(fs): page = MagicMock() - page.app = fs.getApp() + page.app = fs.getApp(cache=False) page.app.env.base_asset_url_format = '%uri%' page.path = fs.path('/kitchen/pages/foo/bar.md') assetor = Assetor(page, '/foo/bar') @@ -36,7 +36,7 @@ fs = mock_fs().withPage('pages/foo/bar') with mock_fs_scope(fs): page = MagicMock() - page.app = fs.getApp() + page.app = fs.getApp(cache=False) page.path = fs.path('/kitchen/pages/foo/bar.md') assetor = Assetor(page, '/foo/bar') assetor['this_doesnt_exist'] @@ -50,7 +50,7 @@ .withPageAsset('pages/foo/bar', 'one.jpg', 'one picture')) with mock_fs_scope(fs): page = MagicMock() - page.app = fs.getApp() + page.app = fs.getApp(cache=False) page.path = fs.path('/kitchen/pages/foo/bar.md') assetor = Assetor(page, '/foo/bar') assetor['one'] diff -r b3ce11b2cf36 -r 3471ffa059b2 tests/test_processing_base.py --- a/tests/test_processing_base.py Wed Sep 03 17:26:38 2014 -0700 +++ b/tests/test_processing_base.py Wed Sep 03 17:27:50 2014 -0700 @@ -5,7 +5,7 @@ def _get_pipeline(fs, **kwargs): - app = fs.getApp() + app = fs.getApp(cache=False) mounts = [os.path.join(app.root_dir, 'assets')] return ProcessorPipeline(app, mounts, fs.path('counter'), num_workers=1, **kwargs)