Mercurial > piecrust2
changeset 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
line wrap: on
line diff
--- a/piecrust/admin/views/dashboard.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/admin/views/dashboard.py Sun Nov 19 14:29:17 2017 -0800 @@ -88,8 +88,9 @@ if source is None: return None + # TODO: this assumes FS sources, but this comes from the disk anyway. full_path = os.path.join(pcapp.root_dir, path) - content_item = source.findContentFromPath(full_path) + content_item = source.findContentFromSpec(full_path) if content_item is None: return None
--- a/piecrust/app.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/app.py Sun Nov 19 14:29:17 2017 -0800 @@ -48,6 +48,7 @@ stats.registerTimer("PageRenderSegments") stats.registerTimer("PageRenderLayout") stats.registerTimer("PageSerialize") + stats.registerTimer("MergedMapping_get") stats.registerCounter('PageLoads') stats.registerCounter('PageRenderSegments') stats.registerCounter('PageRenderLayout')
--- a/piecrust/baking/baker.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/baking/baker.py Sun Nov 19 14:29:17 2017 -0800 @@ -6,7 +6,8 @@ format_timed_scope, format_timed) from piecrust.environment import ExecutionStats from piecrust.pipelines.base import ( - PipelineJobCreateContext, PipelineMergeRecordContext, PipelineManager, + PipelineJobCreateContext, PipelineJobResultHandleContext, + PipelineJobValidateContext, PipelineManager, get_pipeline_name_for_source) from piecrust.pipelines.records import ( MultiRecordHistory, MultiRecord, RecordEntry, @@ -42,6 +43,8 @@ def bake(self): start_time = time.perf_counter() + + # Setup baker. logger.debug(" Bake Output: %s" % self.out_dir) logger.debug(" Root URL: %s" % self.app.config.get('site/root')) @@ -50,7 +53,9 @@ self.app.config.set('site/asset_url_format', '%page_uri%/%filename%') stats = self.app.env.stats - stats.registerTimer('WorkerTaskPut') + stats.registerTimer('LoadSourceContents', raise_if_registered=False) + stats.registerTimer('MasterTaskPut_1', raise_if_registered=False) + stats.registerTimer('MasterTaskPut_2+', raise_if_registered=False) # Make sure the output directory exists. if not os.path.isdir(self.out_dir): @@ -90,6 +95,12 @@ pool_userdata = _PoolUserData(self, ppmngr) pool = self._createWorkerPool(records_path, pool_userdata) + # Done with all the setup, let's start the actual work. + logger.info(format_timed(start_time, "setup baker")) + + # Load all sources. + self._loadSources(ppmngr) + # Bake the realms. self._bakeRealms(pool, ppmngr, record_histories) @@ -149,8 +160,9 @@ self.force = True current_records.incremental_count = 0 previous_records = MultiRecord() - logger.info(format_timed( - start_time, "cleaned cache (reason: %s)" % reason)) + logger.debug(format_timed( + start_time, "cleaned cache (reason: %s)" % reason, + colored=False)) return False else: current_records.incremental_count += 1 @@ -167,7 +179,8 @@ # Also, create and initialize each pipeline for each source. has_any_pp = False ppmngr = PipelineManager( - self.app, self.out_dir, record_histories) + self.app, self.out_dir, + record_histories=record_histories) ok_pp = self.allowed_pipelines nok_pp = self.forbidden_pipelines ok_src = self.allowed_sources @@ -192,13 +205,28 @@ "out. There's nothing to do.") return ppmngr + def _loadSources(self, ppmngr): + start_time = time.perf_counter() + + for ppinfo in ppmngr.getPipelineInfos(): + rec = ppinfo.record_history.current + rec_entries = ppinfo.pipeline.loadAllContents() + if rec_entries is not None: + for e in rec_entries: + rec.addEntry(e) + + stats = self.app.env.stats + stats.stepTimer('LoadSourceContents', + time.perf_counter() - start_time) + logger.info(format_timed(start_time, "loaded site content")) + def _bakeRealms(self, pool, ppmngr, record_histories): # Bake the realms -- user first, theme second, so that a user item # can override a theme item. # Do this for as many times as we have pipeline passes left to do. realm_list = [REALM_USER, REALM_THEME] pp_by_pass_and_realm = _get_pipeline_infos_by_pass_and_realm( - ppmngr.getPipelines()) + ppmngr.getPipelineInfos()) for pp_pass_num in sorted(pp_by_pass_and_realm.keys()): logger.debug("Pipelines pass %d" % pp_pass_num) @@ -206,10 +234,11 @@ for realm in realm_list: pplist = pp_by_realm.get(realm) if pplist is not None: - self._bakeRealm( - pool, record_histories, pp_pass_num, realm, pplist) + self._bakeRealm(pool, ppmngr, record_histories, + pp_pass_num, realm, pplist) - def _bakeRealm(self, pool, record_histories, pp_pass_num, realm, pplist): + def _bakeRealm(self, pool, ppmngr, record_histories, + pp_pass_num, realm, pplist): # Start with the first step, where we iterate on the content sources' # items and run jobs on those. pool.userdata.cur_step = 0 @@ -218,15 +247,16 @@ start_time = time.perf_counter() job_count = 0 + stats = self.app.env.stats realm_name = REALM_NAMES[realm].lower() - stats = self.app.env.stats for ppinfo in pplist: src = ppinfo.source pp = ppinfo.pipeline + jcctx = PipelineJobCreateContext(pp_pass_num, pp.record_name, + record_histories) next_step_jobs[src.name] = [] - jcctx = PipelineJobCreateContext(pp_pass_num, record_histories) jobs = pp.createJobs(jcctx) if jobs is not None: new_job_count = len(jobs) @@ -240,7 +270,7 @@ "(%s, step 0)." % (new_job_count, src.name, pp.PIPELINE_NAME, realm_name)) - stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) + stats.stepTimer('MasterTaskPut_1', time.perf_counter() - start_time) if job_count == 0: logger.debug("No jobs queued! Bailing out of this bake pass.") @@ -270,11 +300,18 @@ logger.debug( "Queuing jobs for source '%s' (%s, step %d)." % (sn, realm_name, pool.userdata.cur_step)) + + pp = ppmngr.getPipeline(sn) + valctx = PipelineJobValidateContext( + pp_pass_num, pool.userdata.cur_step, + pp.record_name, record_histories) + pp.validateNextStepJobs(jobs, valctx) + job_count += len(jobs) pool.userdata.next_step_jobs[sn] = [] pool.queueJobs(jobs) - stats.stepTimer('WorkerTaskPut', time.perf_counter() - start_time) + stats.stepTimer('MasterTaskPut_2+', time.perf_counter() - start_time) if job_count == 0: break @@ -293,6 +330,12 @@ for e in errors: logger.error(" " + e) + def _logWorkerException(self, item_spec, exc_data): + logger.error("Errors found in %s:" % item_spec) + logger.error(exc_data['value']) + if self.app.debug: + logger.error(exc_data['traceback']) + def _createWorkerPool(self, previous_records_path, pool_userdata): from piecrust.workerpool import WorkerPool from piecrust.baking.worker import BakeWorkerContext, BakeWorker @@ -319,47 +362,45 @@ def _handleWorkerResult(self, job, res, userdata): cur_step = userdata.cur_step - record = userdata.records.getRecord(job.record_name) + source_name, item_spec = job['job_spec'] + + # See if there's a next step to take. + npj = res.get('next_step_job') + if npj is not None: + npj['step_num'] = cur_step + 1 + userdata.next_step_jobs[source_name].append(npj) - if cur_step == 0: - record.addEntry(res.record_entry) - else: - ppinfo = userdata.ppmngr.getPipeline(job.source_name) - ppmrctx = PipelineMergeRecordContext(record, job, cur_step) - ppinfo.pipeline.mergeRecordEntry(res.record_entry, ppmrctx) + # Make the pipeline do custom handling to update the record entry. + ppinfo = userdata.ppmngr.getPipelineInfo(source_name) + pipeline = ppinfo.pipeline + record = ppinfo.current_record + ppmrctx = PipelineJobResultHandleContext(record, job, cur_step) + pipeline.handleJobResult(res, ppmrctx) - npj = res.next_step_job - if npj is not None: - npj.step_num = cur_step + 1 - userdata.next_step_jobs[job.source_name].append(npj) - - if not res.record_entry.success: + # Set the overall success flags if there was an error. + record_entry = ppmrctx.record_entry + if not record_entry.success: record.success = False userdata.records.success = False - self._logErrors(job.content_item.spec, res.record_entry.errors) + self._logErrors(job['item_spec'], record_entry.errors) def _handleWorkerError(self, job, exc_data, userdata): - cur_step = userdata.cur_step - record = userdata.records.getRecord(job.record_name) - - record_entry_spec = job.content_item.metadata.get( - 'record_entry_spec', job.content_item.spec) - - if cur_step == 0: - ppinfo = userdata.ppmngr.getPipeline(job.source_name) - entry_class = ppinfo.pipeline.RECORD_ENTRY_CLASS or RecordEntry - e = entry_class() - e.item_spec = record_entry_spec - e.errors.append(str(exc_data)) - record.addEntry(e) - else: - e = record.getEntry(record_entry_spec) - e.errors.append(str(exc_data)) - + # Set the overall success flag. + source_name, item_spec = job['job_spec'] + ppinfo = userdata.ppmngr.getPipelineInfo(source_name) + pipeline = ppinfo.pipeline + record = ppinfo.current_record record.success = False userdata.records.success = False - self._logErrors(job.content_item.spec, e.errors) + # Add those errors to the record, if possible. + record_entry_spec = job.get('record_entry_spec', item_spec) + e = record.getEntry(record_entry_spec) + if e: + e.errors.append(exc_data['value']) + self._logWorkerException(item_spec, exc_data) + + # Log debug stuff. if self.app.debug: logger.error(exc_data.traceback)
--- a/piecrust/baking/worker.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/baking/worker.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,7 +1,7 @@ import time import logging from piecrust.pipelines.base import ( - PipelineManager, PipelineJobRunContext, PipelineJobResult, + PipelineManager, PipelineJobRunContext, get_pipeline_name_for_source) from piecrust.pipelines.records import ( MultiRecordHistory, MultiRecord, load_records) @@ -27,7 +27,8 @@ def __init__(self, ctx): self.ctx = ctx self.app = None - self.record_histories = None + self.stats = None + self.previous_records = None self._work_start_time = time.perf_counter() self._sources = {} self._ppctx = None @@ -44,22 +45,20 @@ stats = app.env.stats stats.registerTimer("BakeWorker_%d_Total" % self.wid) stats.registerTimer("BakeWorkerInit") - self.timerScope = stats.timerScope self.app = app + self.stats = stats # Load previous record if self.ctx.previous_records_path: previous_records = load_records(self.ctx.previous_records_path) else: previous_records = MultiRecord() - current_records = MultiRecord() - self.record_histories = MultiRecordHistory( - previous_records, current_records) + self.previous_records = previous_records # Create the pipelines. self.ppmngr = PipelineManager( - app, self.ctx.out_dir, self.record_histories, + app, self.ctx.out_dir, worker_id=self.wid, force=self.ctx.force) ok_pp = self.ctx.allowed_pipelines nok_pp = self.ctx.forbidden_pipelines @@ -78,24 +77,22 @@ stats.stepTimerSince("BakeWorkerInit", self._work_start_time) def process(self, job): - item = job.content_item - logger.debug("Received job: %s@%s" % (job.source_name, item.spec)) - - ppinfo = self.ppmngr.getPipeline(job.source_name) - pp = ppinfo.pipeline + source_name, item_spec = job['job_spec'] + logger.debug("Received job: %s@%s" % (source_name, item_spec)) - with self.timerScope("PipelineJobs_%s" % pp.PIPELINE_NAME): - runctx = PipelineJobRunContext(job, pp.record_name, - self.record_histories) + # Run the job! + job_start = time.perf_counter() + pp = self.ppmngr.getPipeline(source_name) + runctx = PipelineJobRunContext(job, pp.record_name, + self.previous_records) + ppres = { + 'item_spec': item_spec + } + pp.run(job, runctx, ppres) - ppres = PipelineJobResult() - # For subsequent pass jobs, there will be a record entry given. - # For first pass jobs, there's none so we get the pipeline to - # create it. - ppres.record_entry = job.data.get('record_entry') - if ppres.record_entry is None: - ppres.record_entry = pp.createRecordEntry(job, runctx) - pp.run(job, runctx, ppres) + # Log time spent in this pipeline. + self.stats.stepTimerSince("PipelineJobs_%s" % pp.PIPELINE_NAME, + job_start) return ppres
--- a/piecrust/commands/builtin/baking.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/commands/builtin/baking.py Sun Nov 19 14:29:17 2017 -0800 @@ -48,33 +48,60 @@ '--show-stats', help="Show detailed information about the bake.", action='store_true') + parser.add_argument( + '--profile', + help="Run the bake several times, for profiling.", + type=int, default=-1) def run(self, ctx): from piecrust.chefutil import format_timed + from piecrust.environment import ExecutionStats out_dir = (ctx.args.output or os.path.join(ctx.app.root_dir, '_counter')) + success = True + avg_stats = ExecutionStats() + avg_stats.registerTimer('Total') start_time = time.perf_counter() - try: - records = self._doBake(ctx, out_dir) - # Show merged stats. - if ctx.args.show_stats: - logger.info("-------------------") - logger.info("Timing information:") - _show_stats(records.stats) + num_iter = 1 + if ctx.args.profile > 0: + num_iter = ctx.args.profile + + for i in range(num_iter): + iter_start_time = time.perf_counter() + if num_iter > 1: + import gc + gc.collect() + logger.info("---- %d/%d ----" % (i + 1, num_iter)) - # All done. - logger.info('-------------------------') - logger.info(format_timed(start_time, 'done baking')) - return 0 if records.success else 1 - except Exception as ex: - if ctx.app.debug: - logger.exception(ex) - else: - logger.error(str(ex)) - return 1 + try: + records = self._doBake(ctx, out_dir) + except Exception as ex: + if ctx.app.debug: + logger.exception(ex) + else: + logger.error(str(ex)) + return 1 + + success = success and records.success + avg_stats.mergeStats(records.stats) + avg_stats.stepTimerSince('Total', iter_start_time) + + # Show merged stats. + if ctx.args.show_stats: + if num_iter > 1: + _average_stats(avg_stats, num_iter) + + logger.info("-------------------") + logger.info("Timing information:") + _show_stats(avg_stats) + + # All done. + logger.info('-------------------------') + logger.info(format_timed(start_time, 'done baking')) + return 0 if success else 1 def _doBake(self, ctx, out_dir): from piecrust.baking.baker import Baker @@ -251,6 +278,13 @@ logger.info("Record: %s" % rec.name) logger.info("Status: %s" % ('SUCCESS' if rec.success else 'FAILURE')) + logger.info("User Data:") + if not rec.user_data: + logger.info(" <empty>") + else: + for k, v in rec.user_data.items(): + logger.info(" %s: %s" % (k, v)) + for e in entries_to_show: _print_record_entry(e) logger.info("") @@ -270,6 +304,13 @@ logger.info(" - %s" % v) +def _average_stats(stats, cnt): + for name in stats.timers: + stats.timers[name] /= cnt + for name in stats.counters: + stats.counters[name] /= cnt + + def _show_stats(stats, *, full=False): indent = ' '
--- a/piecrust/data/assetor.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/data/assetor.py Sun Nov 19 14:29:17 2017 -0800 @@ -115,5 +115,5 @@ stack = app.env.render_ctx_stack cur_ctx = stack.current_ctx if cur_ctx is not None: - cur_ctx.current_pass_info.used_assets = True + cur_ctx.render_info['used_assets'] = True
--- a/piecrust/data/base.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/data/base.py Sun Nov 19 14:29:17 2017 -0800 @@ -5,9 +5,10 @@ """ Provides a dictionary-like object that's really the aggregation of multiple dictionary-like objects. """ - def __init__(self, dicts, path=''): + def __init__(self, dicts, path='', *, stats=None): self._dicts = dicts self._path = path + self._stats = stats def __getattr__(self, name): try:
--- a/piecrust/data/builder.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/data/builder.py Sun Nov 19 14:29:17 2017 -0800 @@ -68,7 +68,8 @@ # Put the site data first so that `MergedMapping` doesn't load stuff # for nothing just to find a value that was in the YAML config all # along. - data = MergedMapping([site_data, data, providers_data]) + data = MergedMapping([site_data, data, providers_data], + stats=app.env.stats) # Do this at the end because we want all the data to be ready to be # displayed in the debugger window.
--- a/piecrust/data/linker.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/data/linker.py Sun Nov 19 14:29:17 2017 -0800 @@ -83,15 +83,17 @@ def forpath(self, path): # TODO: generalize this for sources that aren't file-system based. - item = self._source.findContent({'slug': path}) + item = self._source.findContentFromSpec({'slug': path}) return Linker(self._source, item) def childrenof(self, path): # TODO: generalize this for sources that aren't file-system based. src = self._source app = src.app - group = src.findGroup(path) + group = src.findContentFromSpec(path) if group is not None: + if not group.is_group: + raise Exception("'%s' is not a folder/group." % path) for i in src.getContents(group): if not i.is_group: ipage = app.getPage(src, i)
--- a/piecrust/fastpickle.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/fastpickle.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,54 +1,85 @@ +import io import sys -import json import codecs import datetime import collections -def pickle(obj): - data = _pickle_object(obj) - data = json.dumps(data, indent=None, separators=(',', ':')) - return data.encode('utf8') +use_msgpack = False +use_marshall = False -def pickle_obj(obj): - if obj is not None: - return _pickle_object(obj) - return None +if use_msgpack: + import msgpack + + def _dumps_msgpack(obj, buf): + msgpack.pack(obj, buf) + + def _loads_msgpack(buf, bufsize): + return msgpack.unpack(buf) + + _dumps = _dumps_msgpack + _loads = _loads_msgpack + +elif use_marshall: + import marshal + + def _dumps_marshal(obj, buf): + marshal.dump(obj, buf) + + def _loads_marshal(buf, bufsize): + return marshal.load(buf) + + _dumps = _dumps_marshal + _loads = _loads_marshal + +else: + import json + + class _BufferWrapper: + def __init__(self, buf): + self._buf = buf + + def write(self, data): + self._buf.write(data.encode('utf8')) + + def read(self): + return self._buf.read().decode('utf8') + + def _dumps_json(obj, buf): + buf = _BufferWrapper(buf) + json.dump(obj, buf, indent=None, separators=(',', ':')) + + def _loads_json(buf, bufsize): + buf = _BufferWrapper(buf) + return json.load(buf) + + _dumps = _dumps_json + _loads = _loads_json + + +def pickle(obj): + with io.BytesIO() as buf: + pickle_intob(obj, buf) + return buf.getvalue() def pickle_intob(obj, buf): data = _pickle_object(obj) - buf = _WriteWrapper(buf) - json.dump(data, buf, indent=None, separators=(',', ':')) + _dumps(data, buf) def unpickle(data): - data = json.loads(data.decode('utf8')) + with io.BytesIO(data) as buf: + data = _loads(buf, len(data)) return _unpickle_object(data) -def unpickle_obj(data): - if data is not None: - return _unpickle_object(data) - return None - - def unpickle_fromb(buf, bufsize): - with buf.getbuffer() as innerbuf: - data = codecs.decode(innerbuf[:bufsize], 'utf8') - data = json.loads(data) + data = _loads(buf, bufsize) return _unpickle_object(data) -class _WriteWrapper(object): - def __init__(self, buf): - self._buf = buf - - def write(self, data): - self._buf.write(data.encode('utf8')) - - _PICKLING = 0 _UNPICKLING = 1
--- a/piecrust/formatting/hoedownformatter.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/formatting/hoedownformatter.py Sun Nov 19 14:29:17 2017 -0800 @@ -6,7 +6,7 @@ class HoedownFormatter(Formatter): - FORMAT_NAMES = ['hoedown'] + FORMAT_NAMES = ['markdown', 'mdown', 'md'] OUTPUT_FORMAT = 'html' def __init__(self):
--- a/piecrust/formatting/markdownformatter.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/formatting/markdownformatter.py Sun Nov 19 14:29:17 2017 -0800 @@ -2,7 +2,7 @@ class MarkdownFormatter(Formatter): - FORMAT_NAMES = ['markdown', 'mdown', 'md'] + FORMAT_NAMES = ['pymarkdown'] OUTPUT_FORMAT = 'html' def __init__(self):
--- a/piecrust/main.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/main.py Sun Nov 19 14:29:17 2017 -0800 @@ -19,6 +19,8 @@ logger = logging.getLogger(__name__) +_chef_start_time = time.perf_counter() + class ColoredFormatter(logging.Formatter): COLORS = { @@ -246,7 +248,6 @@ def _run_chef(pre_args, argv): # Setup the app. - start_time = time.perf_counter() root = None if pre_args.root: root = os.path.expanduser(pre_args.root) @@ -306,7 +307,7 @@ # Parse the command line. result = parser.parse_args(argv) - logger.debug(format_timed(start_time, 'initialized PieCrust', + logger.debug(format_timed(_chef_start_time, 'initialized PieCrust', colored=False)) # Print the help if no command was specified. @@ -314,6 +315,10 @@ parser.print_help() return 0 + # Add some timing information. + app.env.stats.registerTimer('ChefStartup') + app.env.stats.stepTimerSince('ChefStartup', _chef_start_time) + # Run the command! ctx = CommandContext(appfactory, app, parser, result) exit_code = result.func(ctx)
--- a/piecrust/pipelines/_pagebaker.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/_pagebaker.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,10 +1,12 @@ import os.path +import copy import queue import shutil import logging import threading import urllib.parse -from piecrust.pipelines._pagerecords import SubPagePipelineRecordEntry +from piecrust.pipelines._pagerecords import ( + SubPageFlags, create_subpage_job_result) from piecrust.rendering import RenderingContext, render_page from piecrust.sources.base import AbortedSourceUseError from piecrust.uriutil import split_uri @@ -67,13 +69,16 @@ with open(out_path, 'w', encoding='utf8') as fp: fp.write(content) - def bake(self, page, prev_entry, cur_entry): + def bake(self, page, prev_entry, force=False): cur_sub = 1 has_more_subs = True app = self.app out_dir = self.out_dir + force_bake = self.force or force pretty_urls = page.config.get('pretty_urls', self.pretty_urls) + rendered_subs = [] + # Start baking the sub-pages. while has_more_subs: sub_uri = page.getUri(sub_num=cur_sub) @@ -82,7 +87,8 @@ out_path = get_output_path(app, out_dir, sub_uri, pretty_urls) # Create the sub-entry for the bake record. - cur_sub_entry = SubPagePipelineRecordEntry(sub_uri, out_path) + cur_sub_entry = create_subpage_job_result(sub_uri, out_path) + rendered_subs.append(cur_sub_entry) # Find a corresponding sub-entry in the previous bake record. prev_sub_entry = None @@ -93,15 +99,15 @@ pass # Figure out if we need to bake this page. - bake_status = _get_bake_status(page, out_path, self.force, + bake_status = _get_bake_status(page, out_path, force_bake, prev_sub_entry, cur_sub_entry) # If this page didn't bake because it's already up-to-date. # Keep trying for as many subs as we know this page has. if bake_status == STATUS_CLEAN: - cur_sub_entry.render_info = prev_sub_entry.copyRenderInfo() - cur_sub_entry.flags = SubPagePipelineRecordEntry.FLAG_NONE - cur_entry.subs.append(cur_sub_entry) + cur_sub_entry['render_info'] = copy.deepcopy( + prev_sub_entry['render_info']) + cur_sub_entry['flags'] = SubPageFlags.FLAG_NONE if prev_entry.num_subs >= cur_sub + 1: cur_sub += 1 @@ -118,8 +124,8 @@ if bake_status == STATUS_INVALIDATE_AND_BAKE: cache_key = sub_uri self._rsr.invalidate(cache_key) - cur_sub_entry.flags |= \ - SubPagePipelineRecordEntry.FLAG_FORMATTING_INVALIDATED + cur_sub_entry['flags'] |= \ + SubPageFlags.FLAG_RENDER_CACHE_INVALIDATED logger.debug(" p%d -> %s" % (cur_sub, out_path)) rp = self._bakeSingle(page, cur_sub, out_path) @@ -131,13 +137,12 @@ (page.content_spec, sub_uri)) from ex # Record what we did. - cur_sub_entry.flags |= SubPagePipelineRecordEntry.FLAG_BAKED - cur_sub_entry.render_info = rp.copyRenderInfo() - cur_entry.subs.append(cur_sub_entry) + cur_sub_entry['flags'] |= SubPageFlags.FLAG_BAKED + cur_sub_entry['render_info'] = copy.deepcopy(rp.render_info) # Copy page assets. if (cur_sub == 1 and - cur_sub_entry.anyPass(lambda p: p.used_assets)): + cur_sub_entry['render_info']['used_assets']): if pretty_urls: out_assets_dir = os.path.dirname(out_path) else: @@ -159,10 +164,12 @@ # Figure out if we have more work. has_more_subs = False - if cur_sub_entry.anyPass(lambda p: p.pagination_has_more): + if cur_sub_entry['render_info']['pagination_has_more']: cur_sub += 1 has_more_subs = True + return rendered_subs + def _bakeSingle(self, page, sub_num, out_path): ctx = RenderingContext(page, sub_num=sub_num) page.source.prepareRenderContext(ctx) @@ -207,9 +214,11 @@ # Easy test. if force: - cur_sub_entry.flags |= \ - SubPagePipelineRecordEntry.FLAG_FORCED_BY_GENERAL_FORCE - return STATUS_BAKE + cur_sub_entry['flags'] |= \ + SubPageFlags.FLAG_FORCED_BY_GENERAL_FORCE + # We need to invalidate any cache we have on this page because + # it's being forced, so something important has changed somehow. + return STATUS_INVALIDATE_AND_BAKE # Check for up-to-date outputs. in_path_time = page.content_mtime @@ -217,8 +226,8 @@ out_path_time = os.path.getmtime(out_path) except OSError: # File doesn't exist, we'll need to bake. - cur_sub_entry.flags |= \ - SubPagePipelineRecordEntry.FLAG_FORCED_BY_NO_PREVIOUS + cur_sub_entry['flags'] |= \ + SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS return STATUS_BAKE if out_path_time <= in_path_time: @@ -229,15 +238,16 @@ def _compute_force_flags(prev_sub_entry, cur_sub_entry): - if prev_sub_entry and prev_sub_entry.errors: + if prev_sub_entry and len(prev_sub_entry['errors']) > 0: # Previous bake failed. We'll have to bake it again. - cur_sub_entry.flags |= \ - SubPagePipelineRecordEntry.FLAG_FORCED_BY_PREVIOUS_ERRORS + cur_sub_entry['flags'] |= \ + SubPageFlags.FLAG_FORCED_BY_PREVIOUS_ERRORS return STATUS_BAKE if not prev_sub_entry: - cur_sub_entry.flags |= \ - SubPagePipelineRecordEntry.FLAG_FORCED_BY_NO_PREVIOUS + # No previous record, so most probably was never baked. Bake it. + cur_sub_entry['flags'] |= \ + SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS return STATUS_BAKE return STATUS_CLEAN
--- a/piecrust/pipelines/_pagerecords.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/_pagerecords.py Sun Nov 19 14:29:17 2017 -0800 @@ -2,42 +2,37 @@ from piecrust.pipelines.records import RecordEntry, get_flag_descriptions -class SubPagePipelineRecordEntry: +class SubPageFlags: FLAG_NONE = 0 FLAG_BAKED = 2**0 FLAG_FORCED_BY_SOURCE = 2**1 FLAG_FORCED_BY_NO_PREVIOUS = 2**2 FLAG_FORCED_BY_PREVIOUS_ERRORS = 2**3 FLAG_FORCED_BY_GENERAL_FORCE = 2**4 - FLAG_FORMATTING_INVALIDATED = 2**5 + FLAG_RENDER_CACHE_INVALIDATED = 2**5 + - def __init__(self, out_uri, out_path): - self.out_uri = out_uri - self.out_path = out_path - self.flags = self.FLAG_NONE - self.errors = [] - self.render_info = [None, None] # Same length as RENDER_PASSES - - @property - def was_clean(self): - return (self.flags & self.FLAG_BAKED) == 0 and len(self.errors) == 0 +def create_subpage_job_result(out_uri, out_path): + return { + 'out_uri': out_uri, + 'out_path': out_path, + 'flags': SubPageFlags.FLAG_NONE, + 'errors': [], + 'render_info': None + } - @property - def was_baked(self): - return (self.flags & self.FLAG_BAKED) != 0 - @property - def was_baked_successfully(self): - return self.was_baked and len(self.errors) == 0 +def was_subpage_clean(sub): + return ((sub['flags'] & SubPageFlags.FLAG_BAKED) == 0 and + len(sub['errors']) == 0) + - def anyPass(self, func): - for pinfo in self.render_info: - if pinfo and func(pinfo): - return True - return False +def was_subpage_baked(sub): + return (sub['flags'] & SubPageFlags.FLAG_BAKED) != 0 - def copyRenderInfo(self): - return copy.deepcopy(self.render_info) + +def was_subpage_baked_successfully(sub): + return was_subpage_baked(sub) and len(sub['errors']) == 0 class PagePipelineRecordEntry(RecordEntry): @@ -47,11 +42,13 @@ FLAG_OVERRIDEN = 2**2 FLAG_COLLAPSED_FROM_LAST_RUN = 2**3 FLAG_IS_DRAFT = 2**4 + FLAG_ABORTED_FOR_SOURCE_USE = 2**5 def __init__(self): super().__init__() self.flags = self.FLAG_NONE self.config = None + self.route_params = None self.timestamp = None self.subs = [] @@ -70,7 +67,7 @@ @property def was_any_sub_baked(self): for o in self.subs: - if o.was_baked: + if was_subpage_baked(o): return True return False @@ -79,7 +76,7 @@ if len(self.errors) > 0: return True for o in self.subs: - if len(o.errors) > 0: + if len(o['errors']) > 0: return True return False @@ -89,55 +86,64 @@ def getAllErrors(self): yield from self.errors for o in self.subs: - yield from o.errors + yield from o['errors'] def getAllUsedSourceNames(self): res = set() for o in self.subs: - for pinfo in o.render_info: - if pinfo: - res |= pinfo.used_source_names + pinfo = o.get('render_info') + if pinfo: + res |= pinfo['used_source_names'] return res def getAllOutputPaths(self): for o in self.subs: - yield o.out_path + yield o['out_path'] def describe(self): d = super().describe() d['Flags'] = get_flag_descriptions(self.flags, flag_descriptions) for i, sub in enumerate(self.subs): d['Sub%02d' % i] = { - 'URI': sub.out_uri, - 'Path': sub.out_path, + 'URI': sub['out_uri'], + 'Path': sub['out_path'], 'Flags': get_flag_descriptions( - sub.flags, sub_flag_descriptions), - 'RenderInfo': [ - _describe_render_info(sub.render_info[0]), - _describe_render_info(sub.render_info[1]) - ] + sub['flags'], sub_flag_descriptions), + 'RenderInfo': _describe_render_info(sub['render_info']) } return d +def add_page_job_result(result): + result.update({ + 'flags': PagePipelineRecordEntry.FLAG_NONE, + 'errors': [], + 'subs': [] + }) + + +def merge_job_result_into_record_entry(record_entry, result): + record_entry.flags |= result['flags'] + record_entry.errors += result['errors'] + record_entry.subs += result['subs'] + + flag_descriptions = { PagePipelineRecordEntry.FLAG_NEW: 'new', PagePipelineRecordEntry.FLAG_SOURCE_MODIFIED: 'touched', PagePipelineRecordEntry.FLAG_OVERRIDEN: 'overriden', PagePipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN: 'from last run', - PagePipelineRecordEntry.FLAG_IS_DRAFT: 'draft'} + PagePipelineRecordEntry.FLAG_IS_DRAFT: 'draft', + PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE: 'aborted for source use'} sub_flag_descriptions = { - SubPagePipelineRecordEntry.FLAG_BAKED: 'baked', - SubPagePipelineRecordEntry.FLAG_FORCED_BY_SOURCE: 'forced by source', - SubPagePipelineRecordEntry.FLAG_FORCED_BY_NO_PREVIOUS: 'forced b/c new', - SubPagePipelineRecordEntry.FLAG_FORCED_BY_PREVIOUS_ERRORS: - 'forced by errors', - SubPagePipelineRecordEntry.FLAG_FORCED_BY_GENERAL_FORCE: - 'manually forced', - SubPagePipelineRecordEntry.FLAG_FORMATTING_INVALIDATED: - 'formatting invalidated' + SubPageFlags.FLAG_BAKED: 'baked', + SubPageFlags.FLAG_FORCED_BY_SOURCE: 'forced by source', + SubPageFlags.FLAG_FORCED_BY_NO_PREVIOUS: 'forced b/c new', + SubPageFlags.FLAG_FORCED_BY_PREVIOUS_ERRORS: 'forced by errors', + SubPageFlags.FLAG_FORCED_BY_GENERAL_FORCE: 'manually forced', + SubPageFlags.FLAG_RENDER_CACHE_INVALIDATED: 'cache invalidated' } @@ -145,8 +151,8 @@ if ri is None: return '<null>' return { - 'UsedPagination': ri.used_pagination, - 'PaginationHasMore': ri.pagination_has_more, - 'UsedAssets': ri.used_assets, - 'UsedSourceNames': ri.used_source_names + 'UsedPagination': ri['used_pagination'], + 'PaginationHasMore': ri['pagination_has_more'], + 'UsedAssets': ri['used_assets'], + 'UsedSourceNames': ri['used_source_names'] }
--- a/piecrust/pipelines/_procrecords.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/_procrecords.py Sun Nov 19 14:29:17 2017 -0800 @@ -43,6 +43,22 @@ return self.out_paths +def add_asset_job_result(result): + result.update({ + 'item_spec': None, + 'flags': AssetPipelineRecordEntry.FLAG_NONE, + 'proc_tree': None, + 'out_paths': [], + }) + + +def merge_job_result_into_record_entry(record_entry, result): + record_entry.item_spec = result['item_spec'] + record_entry.flags |= result['flags'] + record_entry.proc_tree = result['proc_tree'] + record_entry.out_paths = result['out_paths'] + + flag_descriptions = { AssetPipelineRecordEntry.FLAG_PREPARED: 'prepared', AssetPipelineRecordEntry.FLAG_PROCESSED: 'processed',
--- a/piecrust/pipelines/asset.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/asset.py Sun Nov 19 14:29:17 2017 -0800 @@ -2,7 +2,9 @@ import os.path import re import logging -from piecrust.pipelines._procrecords import AssetPipelineRecordEntry +from piecrust.pipelines._procrecords import ( + AssetPipelineRecordEntry, + add_asset_job_result, merge_job_result_into_record_entry) from piecrust.pipelines._proctree import ( ProcessingTreeBuilder, ProcessingTreeRunner, get_node_name_tree, print_node, @@ -64,29 +66,32 @@ stats.registerTimer('RunProcessingTree', raise_if_registered=False) def run(self, job, ctx, result): + # Create the result stuff. + item_spec = job['job_spec'][1] + add_asset_job_result(result) + result['item_spec'] = item_spec + # See if we need to ignore this item. - rel_path = os.path.relpath(job.content_item.spec, self._base_dir) + rel_path = os.path.relpath(item_spec, self._base_dir) if re_matchany(rel_path, self._ignore_patterns): return - record_entry = result.record_entry + # Build the processing tree for this job. stats = self.app.env.stats - out_dir = self.ctx.out_dir - - # Build the processing tree for this job. with stats.timerScope('BuildProcessingTree'): builder = ProcessingTreeBuilder(self._processors) tree_root = builder.build(rel_path) - record_entry.flags |= AssetPipelineRecordEntry.FLAG_PREPARED + result['flags'] |= AssetPipelineRecordEntry.FLAG_PREPARED # Prepare and run the tree. + out_dir = self.ctx.out_dir print_node(tree_root, recursive=True) leaves = tree_root.getLeaves() - record_entry.out_paths = [os.path.join(out_dir, l.path) - for l in leaves] - record_entry.proc_tree = get_node_name_tree(tree_root) + result['out_paths'] = [os.path.join(out_dir, l.path) + for l in leaves] + result['proc_tree'] = get_node_name_tree(tree_root) if tree_root.getProcessor().is_bypassing_structured_processing: - record_entry.flags |= ( + result['flags'] |= ( AssetPipelineRecordEntry.FLAG_BYPASSED_STRUCTURED_PROCESSING) if self.ctx.force: @@ -96,9 +101,14 @@ runner = ProcessingTreeRunner( self._base_dir, self.tmp_dir, out_dir) if runner.processSubTree(tree_root): - record_entry.flags |= ( + result['flags'] |= ( AssetPipelineRecordEntry.FLAG_PROCESSED) + def handleJobResult(self, result, ctx): + entry = self.createRecordEntry(result['item_spec']) + merge_job_result_into_record_entry(entry, result) + ctx.record.addEntry(entry) + def getDeletions(self, ctx): for prev, cur in ctx.record_history.diffs: if prev and not cur:
--- a/piecrust/pipelines/base.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/base.py Sun Nov 19 14:29:17 2017 -0800 @@ -2,6 +2,7 @@ import logging from werkzeug.utils import cached_property from piecrust.configuration import ConfigurationError +from piecrust.sources.base import ContentItem logger = logging.getLogger(__name__) @@ -32,69 +33,87 @@ return self.worker_id < 0 -class PipelineJob: - """ Base class for a pipline baking job. - """ - def __init__(self, pipeline, content_item): - self.source_name = pipeline.source.name - self.record_name = pipeline.record_name - self.content_item = content_item - self.step_num = 0 - self.data = {} +class _PipelineMasterProcessJobContextBase: + def __init__(self, record_name, record_histories): + self.record_name = record_name + self.record_histories = record_histories + + @property + def previous_record(self): + return self.record_histories.getPreviousRecord(self.record_name) + + @property + def current_record(self): + return self.record_histories.getCurrentRecord(self.record_name) -class PipelineJobCreateContext: - """ Context for create pipeline baking jobs. +class PipelineJobCreateContext(_PipelineMasterProcessJobContextBase): + """ Context for creating pipeline baking jobs. + + This is run on the master process, so it can access both the + previous and current records. """ - def __init__(self, step_num, record_histories): + def __init__(self, pass_num, record_name, record_histories): + super().__init__(record_name, record_histories) + self.pass_num = pass_num + + +class PipelineJobValidateContext(_PipelineMasterProcessJobContextBase): + """ Context for validating jobs on subsequent step runs (i.e. validating + the list of jobs to run starting with the second step). + + This is run on the master process, so it can access both the + previous and current records. + """ + def __init__(self, pass_num, step_num, record_name, record_histories): + super().__init__(record_name, record_histories) + self.pass_num = pass_num self.step_num = step_num - self.record_histories = record_histories class PipelineJobRunContext: """ Context for running pipeline baking jobs. + + This is run on the worker processes, so it can only access the + previous records. """ - def __init__(self, job, record_name, record_histories): + def __init__(self, job, record_name, previous_records): self.job = job self.record_name = record_name - self.record_histories = record_histories + self.previous_records = previous_records - @property - def content_item(self): - return self.job.content_item + @cached_property + def record_entry_spec(self): + return self.job.get('record_entry_spec', + self.job['job_spec'][1]) @cached_property def previous_record(self): - return self.record_histories.getPreviousRecord(self.record_name) - - @cached_property - def record_entry_spec(self): - content_item = self.content_item - return content_item.metadata.get('record_entry_spec', - content_item.spec) + return self.previous_records.getRecord(self.record_name) @cached_property def previous_entry(self): return self.previous_record.getEntry(self.record_entry_spec) -class PipelineJobResult: - """ Result of running a pipeline on a content item. - """ - def __init__(self): - self.record_entry = None - self.next_step_job = None +class PipelineJobResultHandleContext: + """ The context for handling the result from a job that ran in a + worker process. - -class PipelineMergeRecordContext: - """ The context for merging a record entry for a second or higher pass - into the bake record. + This is run on the master process, so it can access the current + record. """ def __init__(self, record, job, step_num): self.record = record self.job = job self.step_num = step_num + @cached_property + def record_entry(self): + record_entry_spec = self.job.get('record_entry_spec', + self.job['job_spec'][1]) + return self.record.getEntry(record_entry_spec) + class PipelinePostJobRunContext: def __init__(self, record_history): @@ -137,23 +156,26 @@ def initialize(self): pass + def loadAllContents(self): + return None + def createJobs(self, ctx): return [ - self.createJob(item) + create_job(self, item.spec) for item in self.source.getAllContents()] - def createJob(self, content_item): - return PipelineJob(self, content_item) - - def createRecordEntry(self, job, ctx): + def createRecordEntry(self, item_spec): entry_class = self.RECORD_ENTRY_CLASS record_entry = entry_class() - record_entry.item_spec = ctx.record_entry_spec + record_entry.item_spec = item_spec return record_entry - def mergeRecordEntry(self, record_entry, ctx): + def handleJobResult(self, result, ctx): raise NotImplementedError() + def validateNextStepJobs(self, jobs, ctx): + pass + def run(self, job, ctx, result): raise NotImplementedError() @@ -170,6 +192,18 @@ pass +def create_job(pipeline, item_spec, **kwargs): + job = { + 'job_spec': (pipeline.source.name, item_spec) + } + job.update(kwargs) + return job + + +def content_item_from_job(pipeline, job): + return pipeline.source.findContentFromSpec(job['job_spec'][1]) + + def get_record_name_for_source(source): ppname = get_pipeline_name_for_source(source) return '%s@%s' % (source.name, ppname) @@ -186,8 +220,8 @@ class PipelineManager: - def __init__(self, app, out_dir, record_histories, *, - worker_id=-1, force=False): + def __init__(self, app, out_dir, *, + record_histories=None, worker_id=-1, force=False): self.app = app self.record_histories = record_histories self.out_dir = out_dir @@ -201,9 +235,12 @@ self._pipelines = {} def getPipeline(self, source_name): + return self.getPipelineInfo(source_name).pipeline + + def getPipelineInfo(self, source_name): return self._pipelines[source_name] - def getPipelines(self): + def getPipelineInfos(self): return self._pipelines.values() def createPipeline(self, source): @@ -217,22 +254,24 @@ pp = self._pipeline_classes[pname](source, ppctx) pp.initialize() - record_history = self.record_histories.getHistory(pp.record_name) + record_history = None + if self.record_histories: + record_history = self.record_histories.getHistory(pp.record_name) info = _PipelineInfo(pp, record_history) self._pipelines[source.name] = info return info def postJobRun(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ppinfo.record_history.build() - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ctx = PipelinePostJobRunContext(ppinfo.record_history) ppinfo.pipeline.postJobRun(ctx) def deleteStaleOutputs(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ctx = PipelineDeletionContext(ppinfo.record_history) to_delete = ppinfo.pipeline.getDeletions(ctx) current_record = ppinfo.record_history.current @@ -247,12 +286,12 @@ logger.info('[delete] %s' % path) def collapseRecords(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ctx = PipelineCollapseRecordContext(ppinfo.record_history) ppinfo.pipeline.collapseRecords(ctx) def shutdownPipelines(self): - for ppinfo in self.getPipelines(): + for ppinfo in self.getPipelineInfos(): ppinfo.pipeline.shutdown() self._pipelines = {} @@ -269,6 +308,12 @@ return self.pipeline.source @property + def current_record(self): + if self.record_history is not None: + return self.record_history.current + raise Exception("The current record is not available.") + + @property def pipeline_name(self): return self.pipeline.PIPELINE_NAME
--- a/piecrust/pipelines/page.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/page.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,7 +1,11 @@ +import time import logging -from piecrust.pipelines.base import ContentPipeline +from piecrust.pipelines.base import ( + ContentPipeline, create_job, content_item_from_job) from piecrust.pipelines._pagebaker import PageBaker, get_output_path -from piecrust.pipelines._pagerecords import PagePipelineRecordEntry +from piecrust.pipelines._pagerecords import ( + PagePipelineRecordEntry, + add_page_job_result, merge_job_result_into_record_entry) from piecrust.sources.base import AbortedSourceUseError @@ -11,6 +15,7 @@ class PagePipeline(ContentPipeline): PIPELINE_NAME = 'page' RECORD_ENTRY_CLASS = PagePipelineRecordEntry + PASS_NUM = [0, 1] def __init__(self, source, ppctx): super().__init__(source, ppctx) @@ -19,7 +24,7 @@ self._draft_setting = self.app.config['baker/no_bake_setting'] def initialize(self): - stats = self.app.env.stats + stats = self._stats stats.registerCounter('SourceUseAbortions', raise_if_registered=False) stats.registerManifest('SourceUseAbortions', raise_if_registered=False) @@ -28,29 +33,69 @@ force=self.ctx.force) self._pagebaker.startWriterQueue() + def loadAllContents(self): + # Here we load all the pages in the source, making sure they all + # have a valid cache for their configuration and contents. + # We also create the record entries while we're at it. + source = self.source + page_fac = self.app.getPage + record_fac = self.createRecordEntry + for item in source.getAllContents(): + page = page_fac(source, item) + + cur_entry = record_fac(item.spec) + cur_entry.config = page.config.getAll() + cur_entry.route_params = item.metadata['route_params'] + cur_entry.timestamp = page.datetime.timestamp() + + if page.config.get(self._draft_setting): + cur_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT + + yield cur_entry + def createJobs(self, ctx): - used_paths = {} - for rec in ctx.record_histories.current.records: - src_name = rec.name.split('@')[0] - for e in rec.getEntries(): - paths = e.getAllOutputPaths() - if paths is not None: - for p in paths: - used_paths[p] = (src_name, e) + if ctx.pass_num == 0: + return self._createFirstPassJobs(ctx) + return self._createSecondPassJobs(ctx) + def _createFirstPassJobs(self, ctx): jobs = [] + app = self.app - route = self.source.route out_dir = self.ctx.out_dir + uri_getter = self.source.route.getUri pretty_urls = app.config.get('site/pretty_urls') - record = ctx.record_histories.current.getRecord(self.record_name) + + used_paths = _get_used_paths_from_records( + ctx.record_histories.current.records) + history = ctx.record_histories.getHistory(ctx.record_name).copy() + history.build() + + record = ctx.current_record + record.user_data['dirty_source_names'] = set() + + for prev, cur in history.diffs: + # Ignore pages that disappeared since last bake. + if cur is None: + continue - for item in self.source.getAllContents(): - route_params = item.metadata['route_params'] - uri = route.getUri(route_params) + # Skip draft pages. + if cur.flags & PagePipelineRecordEntry.FLAG_IS_DRAFT: + continue + + # Skip pages that are known to use other sources... we'll + # schedule them in the second pass. + if prev and prev.getAllUsedSourceNames(): + continue + + # Check if this item has been overriden by a previous pipeline + # run... for instance, we could be the pipeline for a "theme pages" + # source, and some of our pages have been overriden by a user + # page that writes out to the same URL. + uri = uri_getter(cur.route_params) path = get_output_path(app, out_dir, uri, pretty_urls) + override = used_paths.get(path) - if override is not None: override_source_name, override_entry = override override_source = app.getSource(override_source_name) @@ -59,49 +104,82 @@ logger.error( "Page '%s' would get baked to '%s' " "but is overriden by '%s'." % - (item.spec, path, override_entry.item_spec)) + (enrty.item_spec, path, override_entry.item_spec)) else: logger.debug( "Page '%s' would get baked to '%s' " "but is overriden by '%s'." % - (item.spec, path, override_entry.item_spec)) + (cur.item_spec, path, override_entry.item_spec)) - entry = PagePipelineRecordEntry() - entry.item_spec = item.spec - entry.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN - record.addEntry(entry) - + cur.flags |= PagePipelineRecordEntry.FLAG_OVERRIDEN continue - jobs.append(self.createJob(item)) + # Nope, all good, let's create a job for this item. + jobs.append(create_job(self, cur.item_spec)) if len(jobs) > 0: return jobs return None - def mergeRecordEntry(self, record_entry, ctx): - existing = ctx.record.getEntry(record_entry.item_spec) - existing.flags |= record_entry.flags - existing.errors += record_entry.errors - existing.subs += record_entry.subs + def _createSecondPassJobs(self, ctx): + # Get the list of all sources that had anything baked. + dirty_source_names = set() + all_records = ctx.record_histories.current.records + for rec in all_records: + rec_dsn = rec.user_data.get('dirty_source_names') + if rec_dsn: + dirty_source_names |= rec_dsn + + # Now look at the stuff we bake for our own source on the first pass. + # For anything that wasn't baked (i.e. it was considered 'up to date') + # we look at the records from last time, and if they say that some + # page was using a source that is "dirty", then we force bake it. + # + # The common example for this is a blog index page which hasn't been + # touched, but needs to be re-baked because someone added or edited + # a post. + jobs = [] + pass_num = ctx.pass_num + history = ctx.record_histories.getHistory(ctx.record_name).copy() + history.build() + for prev, cur in history.diffs: + if cur and cur.was_any_sub_baked: + continue + if prev and any(map( + lambda usn: usn in dirty_source_names, + prev.getAllUsedSourceNames())): + jobs.append(create_job(self, prev.item_spec, + pass_num=pass_num, + force_bake=True)) + if len(jobs) > 0: + return jobs + return None + + def handleJobResult(self, result, ctx): + existing = ctx.record_entry + merge_job_result_into_record_entry(existing, result) + if existing.was_any_sub_baked: + ctx.record.user_data['dirty_source_names'].add(self.source.name) def run(self, job, ctx, result): - step_num = job.step_num - if step_num == 0: - self._loadPage(job.content_item, ctx, result) - elif step_num == 1: - self._renderOrPostpone(job.content_item, ctx, result) - elif step_num == 2: - self._renderAlways(job.content_item, ctx, result) + pass_num = job.get('pass_num', 0) + step_num = job.get('step_num', 0) + if pass_num == 0: + if step_num == 0: + self._renderOrPostpone(job, ctx, result) + elif step_num == 1: + self._renderAlways(job, ctx, result) + elif pass_num == 1: + self._renderAlways(job, ctx, result) def getDeletions(self, ctx): for prev, cur in ctx.record_history.diffs: if prev and not cur: for sub in prev.subs: - yield (sub.out_path, 'previous source file was removed') + yield (sub['out_path'], 'previous source file was removed') elif prev and cur: - prev_out_paths = [o.out_path for o in prev.subs] - cur_out_paths = [o.out_path for o in cur.subs] + prev_out_paths = [o['out_path'] for o in prev.subs] + cur_out_paths = [o['out_path'] for o in cur.subs] diff = set(prev_out_paths) - set(cur_out_paths) for p in diff: yield (p, 'source file changed outputs') @@ -112,43 +190,54 @@ def shutdown(self): self._pagebaker.stopWriterQueue() - def _loadPage(self, content_item, ctx, result): - logger.debug("Loading page: %s" % content_item.spec) - page = self.app.getPage(self.source, content_item) - record_entry = result.record_entry - record_entry.config = page.config.getAll() - record_entry.timestamp = page.datetime.timestamp() - - if not page.config.get(self._draft_setting): - result.next_step_job = self.createJob(content_item) - else: - record_entry.flags |= PagePipelineRecordEntry.FLAG_IS_DRAFT - - def _renderOrPostpone(self, content_item, ctx, result): + def _renderOrPostpone(self, job, ctx, result): # Here our job is to render the page's segments so that they're # cached in memory and on disk... unless we detect that the page # is using some other sources, in which case we abort and we'll try # again on the second pass. + content_item = content_item_from_job(self, job) logger.debug("Conditional render for: %s" % content_item.spec) page = self.app.getPage(self.source, content_item) + if page.config.get(self._draft_setting): + return + prev_entry = ctx.previous_entry - cur_entry = result.record_entry - self.app.env.abort_source_use = True + + env = self.app.env + env.abort_source_use = True + add_page_job_result(result) try: - self._pagebaker.bake(page, prev_entry, cur_entry) + rdr_subs = self._pagebaker.bake(page, prev_entry) + result['subs'] = rdr_subs except AbortedSourceUseError: logger.debug("Page was aborted for using source: %s" % content_item.spec) - self.app.env.stats.stepCounter("SourceUseAbortions") - self.app.env.stats.addManifestEntry("SourceUseAbortions", - content_item.spec) - result.next_step_job = self.createJob(content_item) + result['flags'] |= \ + PagePipelineRecordEntry.FLAG_ABORTED_FOR_SOURCE_USE + env.stats.stepCounter("SourceUseAbortions") + env.stats.addManifestEntry("SourceUseAbortions", content_item.spec) + result['next_step_job'] = create_job(self, content_item.spec) finally: - self.app.env.abort_source_use = False + env.abort_source_use = False - def _renderAlways(self, content_item, ctx, result): + def _renderAlways(self, job, ctx, result): + content_item = content_item_from_job(self, job) logger.debug("Full render for: %s" % content_item.spec) page = self.app.getPage(self.source, content_item) prev_entry = ctx.previous_entry - cur_entry = result.record_entry - self._pagebaker.bake(page, prev_entry, cur_entry) + rdr_subs = self._pagebaker.bake(page, prev_entry, + force=job.get('force_bake')) + + add_page_job_result(result) + result['subs'] = rdr_subs + +def _get_used_paths_from_records(records): + used_paths = {} + for rec in records: + src_name = rec.name.split('@')[0] + for e in rec.getEntries(): + paths = e.getAllOutputPaths() + if paths is not None: + for p in paths: + used_paths[p] = (src_name, e) + return used_paths
--- a/piecrust/pipelines/records.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/pipelines/records.py Sun Nov 19 14:29:17 2017 -0800 @@ -37,6 +37,7 @@ def __init__(self, name): self.name = name self.deleted_out_paths = [] + self.user_data = {} self.success = True self._entries = {} @@ -61,7 +62,7 @@ """ A container that includes multiple `Record` instances -- one for each content source that was baked. """ - RECORD_VERSION = 12 + RECORD_VERSION = 13 def __init__(self): self.records = [] @@ -208,6 +209,9 @@ "A current record entry already exists for '%s' " "(%s)" % (key, diff[1].item_spec)) + def copy(self): + return RecordHistory(self._previous, self._current) + class MultiRecordHistory: """ Tracks the differences between an 'old' and a 'new' record
--- a/piecrust/rendering.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/rendering.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,6 +1,5 @@ import re import os.path -import copy import logging from piecrust.data.builder import ( DataBuildingContext, build_page_data, add_layout_data) @@ -24,15 +23,14 @@ class RenderedSegments(object): - def __init__(self, segments, render_pass_info): + def __init__(self, segments, used_templating=False): self.segments = segments - self.render_pass_info = render_pass_info + self.used_templating = used_templating class RenderedLayout(object): - def __init__(self, content, render_pass_info): + def __init__(self, content): self.content = content - self.render_pass_info = render_pass_info class RenderedPage(object): @@ -41,37 +39,24 @@ self.sub_num = sub_num self.data = None self.content = None - self.render_info = [None, None] + self.render_info = {} @property def app(self): return self.page.app - def copyRenderInfo(self): - return copy.deepcopy(self.render_info) - -PASS_NONE = -1 -PASS_FORMATTING = 0 -PASS_RENDERING = 1 - - -RENDER_PASSES = [PASS_FORMATTING, PASS_RENDERING] - - -class RenderPassInfo(object): - def __init__(self): - self.used_source_names = set() - self.used_pagination = False - self.pagination_has_more = False - self.used_assets = False - self._custom_info = {} - - def setCustomInfo(self, key, info): - self._custom_info[key] = info - - def getCustomInfo(self, key, default=None): - return self._custom_info.get(key, default) +def create_render_info(): + """ Creates a bag of rendering properties. It's a dictionary because + it will be passed between workers during the bake process, and + saved to records. + """ + return { + 'used_source_names': set(), + 'used_pagination': False, + 'pagination_has_more': False, + 'used_assets': False, + } class RenderingContext(object): @@ -81,43 +66,25 @@ self.force_render = force_render self.pagination_source = None self.pagination_filter = None + self.render_info = create_render_info() self.custom_data = {} - self.render_passes = [None, None] # Same length as RENDER_PASSES - self._current_pass = PASS_NONE @property def app(self): return self.page.app - @property - def current_pass_info(self): - if self._current_pass != PASS_NONE: - return self.render_passes[self._current_pass] - return None - - def setCurrentPass(self, rdr_pass): - if rdr_pass != PASS_NONE: - self.render_passes[rdr_pass] = RenderPassInfo() - self._current_pass = rdr_pass - def setPagination(self, paginator): - self._raiseIfNoCurrentPass() - pass_info = self.current_pass_info - if pass_info.used_pagination: + ri = self.render_info + if ri.get('used_pagination'): raise Exception("Pagination has already been used.") assert paginator.is_loaded - pass_info.used_pagination = True - pass_info.pagination_has_more = paginator.has_more + ri['used_pagination'] = True + ri['pagination_has_more'] = paginator.has_more self.addUsedSource(paginator._source) def addUsedSource(self, source): - self._raiseIfNoCurrentPass() - pass_info = self.current_pass_info - pass_info.used_source_names.add(source.name) - - def _raiseIfNoCurrentPass(self): - if self._current_pass == PASS_NONE: - raise Exception("No rendering pass is currently active.") + ri = self.render_info + ri['used_source_names'].add(source.name) class RenderingContextStack(object): @@ -173,7 +140,6 @@ page_data = _build_render_data(ctx) # Render content segments. - ctx.setCurrentPass(PASS_FORMATTING) repo = env.rendered_segments_repository save_to_fs = True if env.fs_cache_only_for_main_page and not stack.is_main_ctx: @@ -191,7 +157,6 @@ repo.put(page_uri, render_result, save_to_fs) # Render layout. - ctx.setCurrentPass(PASS_RENDERING) layout_name = page.config.get('layout') if layout_name is None: layout_name = page.source.config.get( @@ -206,13 +171,12 @@ layout_name, page, page_data) else: layout_result = RenderedLayout( - render_result.segments['content'], None) + render_result.segments['content']) rp = RenderedPage(page, ctx.sub_num) rp.data = page_data rp.content = layout_result.content - rp.render_info[PASS_FORMATTING] = render_result.render_pass_info - rp.render_info[PASS_RENDERING] = layout_result.render_pass_info + rp.render_info = ctx.render_info return rp except AbortedSourceUseError: @@ -225,7 +189,6 @@ ctx.page.content_spec) from ex finally: - ctx.setCurrentPass(PASS_NONE) stack.popCtx() @@ -248,7 +211,6 @@ page_uri = page.getUri(ctx.sub_num) try: - ctx.setCurrentPass(PASS_FORMATTING) repo = env.rendered_segments_repository save_to_fs = True @@ -267,7 +229,6 @@ if repo: repo.put(page_uri, render_result, save_to_fs) finally: - ctx.setCurrentPass(PASS_NONE) stack.popCtx() return render_result @@ -297,13 +258,16 @@ engine = get_template_engine(app, engine_name) + used_templating = False formatted_segments = {} for seg_name, seg in page.segments.items(): try: with app.env.stats.timerScope( engine.__class__.__name__ + '_segment'): - seg_text = engine.renderSegment( + seg_text, was_rendered = engine.renderSegment( page.content_spec, seg, page_data) + if was_rendered: + used_templating = True except TemplatingError as err: err.lineno += seg.line raise err @@ -319,8 +283,7 @@ content_abstract = seg_text[:offset] formatted_segments['content.abstract'] = content_abstract - pass_info = ctx.render_passes[PASS_FORMATTING] - res = RenderedSegments(formatted_segments, pass_info) + res = RenderedSegments(formatted_segments, used_templating) app.env.stats.stepCounter('PageRenderSegments') @@ -355,8 +318,7 @@ msg += "Looked for: %s" % ', '.join(full_names) raise Exception(msg) from ex - pass_info = cur_ctx.render_passes[PASS_RENDERING] - res = RenderedLayout(output, pass_info) + res = RenderedLayout(output) app.env.stats.stepCounter('PageRenderLayout')
--- a/piecrust/serving/util.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/serving/util.py Sun Nov 19 14:29:17 2017 -0800 @@ -87,7 +87,7 @@ def _get_requested_page_for_route(app, route, route_params): source = app.getSource(route.source_name) - item = source.findContent(route_params) + item = source.findContentFromRoute(route_params) if item is not None: return app.getPage(source, item) return None
--- a/piecrust/sources/autoconfig.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/autoconfig.py Sun Nov 19 14:29:17 2017 -0800 @@ -92,7 +92,7 @@ return {self.setting_name: values} - def findContent(self, route_params): + def findContentFromRoute(self, route_params): # Pages from this source are effectively flattened, so we need to # find pages using a brute-force kinda way. route_slug = route_params.get('slug', '') @@ -131,7 +131,7 @@ self.setting_name = config.get('setting_name', 'order') self.default_value = config.get('default_value', 0) - def findContent(self, route_params): + def findContentFromRoute(self, route_params): uri_path = route_params.get('slug', '') if uri_path == '': uri_path = '_index'
--- a/piecrust/sources/base.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/base.py Sun Nov 19 14:29:17 2017 -0800 @@ -106,9 +106,6 @@ if self._page_cache is not None: return self._page_cache - if self.app.env.abort_source_use: - raise AbortedSourceUseError() - getter = self.app.getPage self._page_cache = [getter(self, i) for i in self.getAllContents()] return self._page_cache @@ -143,13 +140,13 @@ raise NotImplementedError( "'%s' doesn't implement 'getRelatedContents'." % self.__class__) - def findGroup(self, rel_spec): + def findContentFromSpec(self, spec): raise NotImplementedError( - "'%s' doesn't implement 'findGroup'." % self.__class__) + "'%s' doesn't implement 'findContentFromSpec'." % self.__class__) - def findContent(self, route_params): + def findContentFromRoute(self, route_params): raise NotImplementedError( - "'%s' doesn't implement 'findContent'." % self.__class__) + "'%s' doesn't implement 'findContentFromRoute'." % self.__class__) def getSupportedRouteParameters(self): raise NotImplementedError(
--- a/piecrust/sources/blogarchives.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/blogarchives.py Sun Nov 19 14:29:17 2017 -0800 @@ -7,9 +7,12 @@ PageIterator, HardCodedFilterIterator, DateSortIterator) from piecrust.page import Page from piecrust.pipelines._pagebaker import PageBaker -from piecrust.pipelines._pagerecords import PagePipelineRecordEntry +from piecrust.pipelines._pagerecords import ( + PagePipelineRecordEntry, + add_page_job_result, merge_job_result_into_record_entry) from piecrust.pipelines.base import ( - ContentPipeline, get_record_name_for_source) + ContentPipeline, + create_job, get_record_name_for_source, content_item_from_job) from piecrust.routing import RouteParameter from piecrust.sources.base import ContentItem from piecrust.sources.generator import GeneratorSourceBase @@ -38,14 +41,11 @@ def getSupportedRouteParameters(self): return [RouteParameter('year', RouteParameter.TYPE_INT4)] - def findContent(self, route_params): + def findContentFromRoute(self, route_params): year = route_params['year'] - spec = '_index' - metadata = { - 'record_entry_spec': '_index[%04d]' % year, - 'route_params': {'year': year} - } - return ContentItem(spec, metadata) + return ContentItem( + '_index', + {'route_params': {'year': year}}) def prepareRenderContext(self, ctx): ctx.pagination_source = self.inner_source @@ -178,24 +178,41 @@ (len(self._dirty_years), len(self._all_years))) jobs = [] + rec_fac = self.createRecordEntry + current_record = ctx.current_record + for y in self._dirty_years: - item = ContentItem( - '_index', - { - 'record_entry_spec': '_index[%04d]' % y, - 'route_params': {'year': y} - }) - jobs.append(self.createJob(item)) + record_entry_spec = '_index[%04d]' % y + + jobs.append(create_job(self, '_index', + year=y, + record_entry_spec=record_entry_spec)) + + entry = rec_fac(record_entry_spec) + current_record.addEntry(entry) + if len(jobs) > 0: return jobs return None def run(self, job, ctx, result): - page = Page(self.source, job.content_item) + year = job['year'] + content_item = ContentItem('_index', + {'year': year, + 'route_params': {'year': year}}) + page = Page(self.source, content_item) + prev_entry = ctx.previous_entry - cur_entry = result.record_entry - cur_entry.year = job.content_item.metadata['route_params']['year'] - self._pagebaker.bake(page, prev_entry, cur_entry) + rdr_subs = self._pagebaker.bake(page, prev_entry) + + add_page_job_result(result) + result['subs'] = rdr_subs + result['year'] = page.source_metadata['year'] + + def handleJobResult(self, result, ctx): + existing = ctx.record_entry + merge_job_result_into_record_entry(existing, result) + existing.year = result['year'] def postJobRun(self, ctx): # Create bake entries for the years that were *not* dirty.
--- a/piecrust/sources/default.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/default.py Sun Nov 19 14:29:17 2017 -0800 @@ -27,13 +27,10 @@ self.default_auto_format = app.config.get('site/default_auto_format') self.supported_extensions = list(self.auto_formats) - def _createItemMetadata(self, path): - return self._doCreateItemMetadata(path) - def _finalizeContent(self, parent_group, items, groups): SimpleAssetsSubDirMixin._removeAssetGroups(self, groups) - def _doCreateItemMetadata(self, path): + def _createItemMetadata(self, path): slug = self._makeSlug(path) metadata = { 'route_params': { @@ -69,7 +66,7 @@ return [ RouteParameter('slug', RouteParameter.TYPE_PATH)] - def findContent(self, route_params): + def findContentFromRoute(self, route_params): uri_path = route_params.get('slug', '') if not uri_path: uri_path = '_index' @@ -84,14 +81,10 @@ paths_to_check = [path] for path in paths_to_check: if os.path.isfile(path): - metadata = self._doCreateItemMetadata(path) + metadata = self._createItemMetadata(path) return ContentItem(path, metadata) return None - def findContentFromPath(self, path): - metadata = self._doCreateItemMetadata(path) - return ContentItem(path, metadata) - def setupPrepareParser(self, parser, app): parser.add_argument('slug', help='The slug for the new page.') @@ -104,7 +97,7 @@ if ext == '': path = '%s.%s' % (path, self.default_auto_format) - metadata = self._doCreateItemMetadata(path) + metadata = self._createItemMetadata(path) config = metadata.setdefault('config', {}) config.update({'title': uri_to_title( os.path.basename(metadata['slug']))})
--- a/piecrust/sources/fs.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/fs.py Sun Nov 19 14:29:17 2017 -0800 @@ -129,11 +129,13 @@ def _finalizeContent(self, parent_group, items, groups): pass - def findGroup(self, rel_spec): - path = os.path.join(self.fs_endpoint_path, rel_spec) - if os.path.isdir(path): - metadata = self._createGroupMetadata(path) - return ContentGroup(path, metadata) + def findContentFromSpec(self, spec): + if os.path.isdir(spec): + metadata = self._createGroupMetadata(spec) + return ContentGroup(spec, metadata) + elif os.path.isfile(spec): + metadata = self._createItemMetadata(spec) + return ContentItem(spec, metadata) return None def getRelatedContents(self, item, relationship):
--- a/piecrust/sources/interfaces.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/interfaces.py Sun Nov 19 14:29:17 2017 -0800 @@ -28,6 +28,3 @@ """ def getInteractiveFields(self): raise NotImplementedError() - - def findContentFromPath(self, path): - raise NotImplementedError()
--- a/piecrust/sources/list.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/list.py Sun Nov 19 14:29:17 2017 -0800 @@ -21,7 +21,7 @@ def getRelatedContents(self, item, relationship): return self.inner_source.getRelatedContents(item, relationship) - def findContent(self, route_params): + def findContentFromRoute(self, route_params): # Can't find items... we could find stuff that's not in our list? raise NotImplementedError( "The list source doesn't support finding items.")
--- a/piecrust/sources/posts.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/posts.py Sun Nov 19 14:29:17 2017 -0800 @@ -50,10 +50,11 @@ return FSContentSource.getRelatedContents(self, item, relationship) - def findGroup(self, spec): - return None + def findContentFromSpec(self, spec): + metadata = self._parseMetadataFromPath(spec) + return ContentItem(spec, metadata) - def findContent(self, route_params): + def findContentFromRoute(self, route_params): year = route_params.get('year') month = route_params.get('month') day = route_params.get('day') @@ -111,10 +112,6 @@ metadata = self._parseMetadataFromPath(path) return ContentItem(path, metadata) - def findContentFromPath(self, path): - metadata = self._parseMetadataFromPath(path) - return ContentItem(path, metadata) - def _parseMetadataFromPath(self, path): regex_repl = { 'year': '(?P<year>\d{4})',
--- a/piecrust/sources/taxonomy.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/sources/taxonomy.py Sun Nov 19 14:29:17 2017 -0800 @@ -7,9 +7,12 @@ PaginationFilter, SettingFilterClause) from piecrust.page import Page from piecrust.pipelines._pagebaker import PageBaker -from piecrust.pipelines._pagerecords import PagePipelineRecordEntry +from piecrust.pipelines._pagerecords import ( + PagePipelineRecordEntry, + add_page_job_result, merge_job_result_into_record_entry) from piecrust.pipelines.base import ( - ContentPipeline, get_record_name_for_source) + ContentPipeline, get_record_name_for_source, + create_job, content_item_from_job) from piecrust.pipelines.records import RecordHistory from piecrust.routing import RouteParameter from piecrust.sources.base import ContentItem @@ -83,11 +86,10 @@ return [RouteParameter(name, param_type, variadic=self.taxonomy.is_multiple)] - def findContent(self, route_params): + def findContentFromRoute(self, route_params): slugified_term = route_params[self.taxonomy.term_name] spec = '_index' metadata = {'term': slugified_term, - 'record_entry_spec': '_index[%s]' % slugified_term, 'route_params': { self.taxonomy.term_name: slugified_term} } @@ -174,15 +176,14 @@ # We need to register this use of a taxonomy term. rcs = self.app.env.render_ctx_stack - cpi = rcs.current_ctx.current_pass_info - if cpi: - utt = cpi.getCustomInfo('used_taxonomy_terms') - if utt is None: - utt = set() - utt.add(slugified_values) - cpi.setCustomInfo('used_taxonomy_terms', utt) - else: - utt.add(slugified_values) + ri = rcs.current_ctx.render_info + utt = ri.get('used_taxonomy_terms') + if utt is None: + utt = set() + utt.add(slugified_values) + ri['used_taxonomy_terms'] = utt + else: + utt.add(slugified_values) # Put the slugified values in the route metadata so they're used to # generate the URL. @@ -287,29 +288,46 @@ (len(self._analyzer.dirty_slugified_terms), self.taxonomy.name)) jobs = [] + rec_fac = self.createRecordEntry + current_record = ctx.current_record + for slugified_term in self._analyzer.dirty_slugified_terms: - item = ContentItem( - '_index', - {'term': slugified_term, - 'record_entry_spec': '_index[%s]' % slugified_term, - 'route_params': { - self.taxonomy.term_name: slugified_term} - }) - jobs.append(self.createJob(item)) + item_spec = '_index' + record_entry_spec = '_index[%s]' % slugified_term + + jobs.append(create_job(self, item_spec, + term=slugified_term, + record_entry_spec=record_entry_spec)) + + entry = rec_fac(record_entry_spec) + current_record.addEntry(entry) + if len(jobs) > 0: return jobs return None def run(self, job, ctx, result): - content_item = job.content_item - logger.debug("Rendering '%s' page: %s" % - (self.taxonomy.name, content_item.metadata['term'])) + term = job['term'] + content_item = ContentItem('_index', + {'term': term, + 'route_params': { + self.taxonomy.term_name: term} + }) + page = Page(self.source, content_item) - page = Page(self.source, job.content_item) + logger.debug("Rendering '%s' page: %s" % + (self.taxonomy.name, page.source_metadata['term'])) prev_entry = ctx.previous_entry - cur_entry = result.record_entry - cur_entry.term = content_item.metadata['term'] - self._pagebaker.bake(page, prev_entry, cur_entry) + rdr_subs = self._pagebaker.bake(page, prev_entry) + + add_page_job_result(result) + result['subs'] = rdr_subs + result['term'] = page.source_metadata['term'] + + def handleJobResult(self, result, ctx): + existing = ctx.record_entry + merge_job_result_into_record_entry(existing, result) + existing.term = result['term'] def postJobRun(self, ctx): # We create bake entries for all the terms that were *not* dirty. @@ -381,9 +399,7 @@ # Re-bake all taxonomy terms that include new or changed pages, by # marking them as 'dirty'. - previous_records = self.record_histories.previous - prev_rec = previous_records.getRecord(record_name) - history = RecordHistory(prev_rec, cur_rec) + history = self.record_histories.getHistory(record_name).copy() history.build() for prev_entry, cur_entry in history.diffs: entries = [cur_entry] @@ -462,11 +478,10 @@ def _get_all_entry_taxonomy_terms(entry): res = set() for o in entry.subs: - for pinfo in o.render_info: - if pinfo: - terms = pinfo.getCustomInfo('used_taxonomy_terms') - if terms: - res |= set(terms) + pinfo = o['render_info'] + terms = pinfo.get('used_taxonomy_terms') + if terms: + res |= set(terms) return res
--- a/piecrust/templating/jinja/extensions.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/templating/jinja/extensions.py Sun Nov 19 14:29:17 2017 -0800 @@ -146,19 +146,19 @@ key = self.environment.piecrust_cache_prefix + name rcs = self.environment.app.env.render_ctx_stack - rdr_pass = rcs.current_ctx.current_pass_info + ri = rcs.current_ctx.render_info # try to load the block from the cache # if there is no fragment in the cache, render it and store # it in the cache. pair = self.environment.piecrust_cache.get(key) if pair is not None: - rdr_pass.used_source_names.update(pair[1]) + ri['used_source_names'].update(pair[1]) return pair[0] - prev_used = rdr_pass.used_source_names.copy() + prev_used = ri['used_source_names'].copy() rv = caller() - after_used = rdr_pass.used_source_names.copy() + after_used = ri['used_source_names'].copy() used_delta = after_used.difference(prev_used) self.environment.piecrust_cache[key] = (rv, used_delta) return rv
--- a/piecrust/templating/jinjaengine.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/templating/jinjaengine.py Sun Nov 19 14:29:17 2017 -0800 @@ -19,7 +19,7 @@ def renderSegment(self, path, segment, data): if not _string_needs_render(segment.content): - return segment.content + return segment.content, False self._ensureLoaded() @@ -34,7 +34,7 @@ raise TemplateNotFoundError() try: - return tpl.render(data) + return tpl.render(data), True except self._jinja_syntax_error as tse: raise self._getTemplatingError(tse) except AbortedSourceUseError:
--- a/piecrust/templating/pystacheengine.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/templating/pystacheengine.py Sun Nov 19 14:29:17 2017 -0800 @@ -19,7 +19,7 @@ def renderSegment(self, path, segment, data): self._ensureLoaded() try: - return self.renderer.render(segment.content, data) + return self.renderer.render(segment.content, data), True except self._not_found_error as ex: raise TemplateNotFoundError() from ex except self._pystache_error as ex:
--- a/piecrust/themes/base.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/themes/base.py Sun Nov 19 14:29:17 2017 -0800 @@ -26,6 +26,8 @@ config_path = os.path.join(self.root_dir, CONFIG_PATH) with open(config_path, 'r', encoding='utf8') as fp: config = yaml.load(fp.read()) + if not config: + return None site_config = config.get('site', {}) theme = site_config.get('theme', None) if theme is None:
--- a/piecrust/workerpool.py Fri Nov 03 23:14:56 2017 -0700 +++ b/piecrust/workerpool.py Sun Nov 19 14:29:17 2017 -0800 @@ -11,11 +11,11 @@ logger = logging.getLogger(__name__) -use_fastqueue = False - +use_fastqueue = True use_fastpickle = False use_msgpack = False use_marshall = False +use_json = False class IWorker(object): @@ -34,17 +34,14 @@ pass -class WorkerExceptionData: - def __init__(self, wid): - super().__init__() - self.wid = wid - t, v, tb = sys.exc_info() - self.type = t - self.value = '\n'.join(_get_errors(v)) - self.traceback = ''.join(traceback.format_exception(t, v, tb)) - - def __str__(self): - return str(self.value) +def _get_worker_exception_data(wid): + t, v, tb = sys.exc_info() + return { + 'wid': wid, + 'type': str(t), + 'value': '\n'.join(_get_errors(v)), + 'traceback': ''.join(traceback.format_exception(t, v, tb)) + } def _get_errors(ex): @@ -57,7 +54,8 @@ TASK_JOB = 0 -TASK_END = 1 +TASK_JOB_BATCH = 1 +TASK_END = 2 _TASK_ABORT_WORKER = 10 _CRITICAL_WORKER_ERROR = 11 @@ -169,22 +167,33 @@ task_type, task_data = task - # Job task... just do it. - if task_type == TASK_JOB: - try: - res = (task_type, task_data, True, wid, w.process(task_data)) - except Exception as e: - logger.debug( - "Error processing job, sending exception to main process:") - logger.debug(traceback.format_exc()) - we = WorkerExceptionData(wid) - res = (task_type, task_data, False, wid, we) + # Job task(s)... just do it. + if task_type == TASK_JOB or task_type == TASK_JOB_BATCH: + + task_data_list = task_data + if task_type == TASK_JOB: + task_data_list = [task_data] + + result_list = [] + for td in task_data_list: + try: + res = w.process(td) + result_list.append((td, res, True)) + except Exception as e: + logger.debug( + "Error processing job, sending exception to main process:") + logger.debug(traceback.format_exc()) + we = _get_worker_exception_data(wid) + res = (td, we, False) + result_list.append((td, res, False)) + + res = (task_type, wid, result_list) put_start_time = time.perf_counter() put(res) time_in_put += (time.perf_counter() - put_start_time) - completed += 1 + completed += len(task_data_list) # End task... gather stats to send back to the main process. elif task_type == TASK_END: @@ -193,13 +202,13 @@ stats.registerTimer('WorkerResultPut', time=time_in_put) try: stats.mergeStats(w.getStats()) - rep = (task_type, task_data, True, wid, (wid, stats)) + rep = (task_type, wid, [(task_data, (wid, stats), True)]) except Exception as e: logger.debug( "Error getting report, sending exception to main process:") logger.debug(traceback.format_exc()) - we = WorkerExceptionData(wid) - rep = (task_type, task_data, False, wid, (wid, we)) + we = _get_worker_exception_data(wid) + rep = (task_type, wid, [(task_data, (wid, we), False)]) put(rep) break @@ -302,8 +311,17 @@ self._jobs_left += new_job_count self._event.clear() - for job in jobs: - self._quick_put((TASK_JOB, job)) + bs = self._batch_size + if not bs: + for job in jobs: + self._quick_put((TASK_JOB, job)) + else: + cur_offset = 0 + while cur_offset < new_job_count: + next_batch_idx = min(cur_offset + bs, new_job_count) + job_batch = jobs[cur_offset:next_batch_idx] + self._quick_put((TASK_JOB_BATCH, job_batch)) + cur_offset = next_batch_idx else: with self._lock_jobs_left: done = (self._jobs_left == 0) @@ -388,27 +406,29 @@ logger.debug("Result handler exiting.") return - task_type, task_data, success, wid, data = res - try: - if success: - if pool._callback: - pool._callback(task_data, data, userdata) - else: - if task_type == _CRITICAL_WORKER_ERROR: - logger.error(data) - do_continue = pool._onResultHandlerCriticalError(wid) - if not do_continue: - logger.debug("Aborting result handling thread.") - return + task_type, wid, res_data_list = res + for res_data in res_data_list: + try: + task_data, data, success = res_data + if success: + if pool._callback: + pool._callback(task_data, data, userdata) else: - if pool._error_callback: - pool._error_callback(task_data, data, userdata) + if task_type == _CRITICAL_WORKER_ERROR: + logger.error(data) + do_continue = pool._onResultHandlerCriticalError(wid) + if not do_continue: + logger.debug("Aborting result handling thread.") + return else: - logger.error( - "Worker %d failed to process a job:" % wid) - logger.error(data) - except Exception as ex: - logger.exception(ex) + if pool._error_callback: + pool._error_callback(task_data, data, userdata) + else: + logger.error( + "Worker %d failed to process a job:" % wid) + logger.error(data) + except Exception as ex: + logger.exception(ex) if task_type == TASK_JOB: pool._onTaskDone() @@ -522,6 +542,30 @@ _pickle = _pickle_marshal _unpickle = _unpickle_marshal +elif use_json: + import json + + class _BufferWrapper: + def __init__(self, buf): + self._buf = buf + + def write(self, data): + self._buf.write(data.encode('utf8')) + + def read(self): + return self._buf.read().decode('utf8') + + def _pickle_json(obj, buf): + buf = _BufferWrapper(buf) + json.dump(obj, buf, indent=None, separators=(',', ':')) + + def _unpickle_json(buf, bufsize): + buf = _BufferWrapper(buf) + return json.load(buf) + + _pickle = _pickle_json + _unpickle = _unpickle_json + else: import pickle @@ -533,4 +577,3 @@ _pickle = _pickle_default _unpickle = _unpickle_default -
--- a/tests/mockutil.py Fri Nov 03 23:14:56 2017 -0700 +++ b/tests/mockutil.py Sun Nov 19 14:29:17 2017 -0800 @@ -13,7 +13,7 @@ src = app.getSource('pages') assert src is not None - item = src.findContent({'slug': slug}) + item = src.findContentFromRoute({'slug': slug}) assert item is not None return item
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_baking_baker.py Sun Nov 19 14:29:17 2017 -0800 @@ -0,0 +1,25 @@ +import time +from .mockutil import get_mock_app, mock_fs, mock_fs_scope + + +def test_bake_and_add_post(): + fs = (mock_fs() + .withConfig() + .withPage('pages/_index.html', {'layout': 'none', 'format': 'none'}, + "{% for p in pagination.posts -%}\n" + "{{p.title}}\n" + "{% endfor %}") + .withPage('posts/2017-01-01_first.html', {'title': "First"}, + "something")) + with mock_fs_scope(fs): + fs.runChef('bake') + structure = fs.getStructure('kitchen/_counter') + assert structure['index.html'] == 'First\n' + + time.sleep(1) + fs.withPage('posts/2017-01-02_second.html', {'title': "Second"}, + "something else") + fs.runChef('bake') + structure = fs.getStructure('kitchen/_counter') + assert structure['index.html'] == 'Second\nFirst\n' +
--- a/tests/test_fastpickle.py Fri Nov 03 23:14:56 2017 -0700 +++ b/tests/test_fastpickle.py Sun Nov 19 14:29:17 2017 -0800 @@ -1,6 +1,7 @@ +import io import datetime import pytest -from piecrust.fastpickle import pickle, unpickle, pickle_obj, unpickle_obj +from piecrust.fastpickle import pickle, unpickle, pickle_intob, unpickle_fromb class Foo(object): @@ -36,6 +37,13 @@ actual = unpickle(data) assert actual == expected + with io.BytesIO() as buf: + pickle_intob(obj, buf) + size = buf.tell() + buf.seek(0) + actual = unpickle_fromb(buf, size) + assert actual == expected + def test_objects(): f = Foo('foo') @@ -54,11 +62,10 @@ def test_reentrance(): a = {'test_ints': 42, 'test_set': set([1, 2])} - data = pickle_obj(a) - b = unpickle_obj(data) + data = pickle(a) + b = unpickle(data) assert a == b - other_b = unpickle_obj(data) + other_b = unpickle(data) assert a == other_b - c = unpickle_obj(data) + c = unpickle(data) assert a == c -
--- a/tests/test_sources_autoconfig.py Fri Nov 03 23:14:56 2017 -0700 +++ b/tests/test_sources_autoconfig.py Sun Nov 19 14:29:17 2017 -0800 @@ -182,7 +182,7 @@ app = fs.getApp() s = app.getSource('test') route_metadata = {'slug': route_path} - item = s.findContent(route_metadata) + item = s.findContentFromRoute(route_metadata) if item is None: assert expected_path is None and expected_metadata is None else:
--- a/tests/test_sources_base.py Fri Nov 03 23:14:56 2017 -0700 +++ b/tests/test_sources_base.py Sun Nov 19 14:29:17 2017 -0800 @@ -66,7 +66,7 @@ with mock_fs_scope(fs): app = fs.getApp() s = app.getSource('test') - item = s.findContent({'slug': ref_path}) + item = s.findContentFromRoute({'slug': ref_path}) assert item is not None assert os.path.relpath(item.spec, app.root_dir) == \ slashfix(expected_path)