comparison piecrust/baking/worker.py @ 989:8adc27285d93

bake: Big pass on bake performance. - Reduce the amount of data passed between processes. - Make inter-process data simple objects to make it easier to test with alternatives to pickle. - Make sources have the basic requirement to be able to find a content item from an item spec (path). - Make Hoedown the default Markdown formatter.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 19 Nov 2017 14:29:17 -0800
parents bf65a1a6992a
children 2e5c5d33d62c
comparison
equal deleted inserted replaced
988:f83ae0a5d793 989:8adc27285d93
1 import time 1 import time
2 import logging 2 import logging
3 from piecrust.pipelines.base import ( 3 from piecrust.pipelines.base import (
4 PipelineManager, PipelineJobRunContext, PipelineJobResult, 4 PipelineManager, PipelineJobRunContext,
5 get_pipeline_name_for_source) 5 get_pipeline_name_for_source)
6 from piecrust.pipelines.records import ( 6 from piecrust.pipelines.records import (
7 MultiRecordHistory, MultiRecord, load_records) 7 MultiRecordHistory, MultiRecord, load_records)
8 from piecrust.workerpool import IWorker 8 from piecrust.workerpool import IWorker
9 9
25 25
26 class BakeWorker(IWorker): 26 class BakeWorker(IWorker):
27 def __init__(self, ctx): 27 def __init__(self, ctx):
28 self.ctx = ctx 28 self.ctx = ctx
29 self.app = None 29 self.app = None
30 self.record_histories = None 30 self.stats = None
31 self.previous_records = None
31 self._work_start_time = time.perf_counter() 32 self._work_start_time = time.perf_counter()
32 self._sources = {} 33 self._sources = {}
33 self._ppctx = None 34 self._ppctx = None
34 35
35 def initialize(self): 36 def initialize(self):
42 app.env.fs_cache_only_for_main_page = True 43 app.env.fs_cache_only_for_main_page = True
43 44
44 stats = app.env.stats 45 stats = app.env.stats
45 stats.registerTimer("BakeWorker_%d_Total" % self.wid) 46 stats.registerTimer("BakeWorker_%d_Total" % self.wid)
46 stats.registerTimer("BakeWorkerInit") 47 stats.registerTimer("BakeWorkerInit")
47 self.timerScope = stats.timerScope
48 48
49 self.app = app 49 self.app = app
50 self.stats = stats
50 51
51 # Load previous record 52 # Load previous record
52 if self.ctx.previous_records_path: 53 if self.ctx.previous_records_path:
53 previous_records = load_records(self.ctx.previous_records_path) 54 previous_records = load_records(self.ctx.previous_records_path)
54 else: 55 else:
55 previous_records = MultiRecord() 56 previous_records = MultiRecord()
56 current_records = MultiRecord() 57 self.previous_records = previous_records
57 self.record_histories = MultiRecordHistory(
58 previous_records, current_records)
59 58
60 # Create the pipelines. 59 # Create the pipelines.
61 self.ppmngr = PipelineManager( 60 self.ppmngr = PipelineManager(
62 app, self.ctx.out_dir, self.record_histories, 61 app, self.ctx.out_dir,
63 worker_id=self.wid, force=self.ctx.force) 62 worker_id=self.wid, force=self.ctx.force)
64 ok_pp = self.ctx.allowed_pipelines 63 ok_pp = self.ctx.allowed_pipelines
65 nok_pp = self.ctx.forbidden_pipelines 64 nok_pp = self.ctx.forbidden_pipelines
66 for src in app.sources: 65 for src in app.sources:
67 pname = get_pipeline_name_for_source(src) 66 pname = get_pipeline_name_for_source(src)
76 raise_if_registered=False) 75 raise_if_registered=False)
77 76
78 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) 77 stats.stepTimerSince("BakeWorkerInit", self._work_start_time)
79 78
80 def process(self, job): 79 def process(self, job):
81 item = job.content_item 80 source_name, item_spec = job['job_spec']
82 logger.debug("Received job: %s@%s" % (job.source_name, item.spec)) 81 logger.debug("Received job: %s@%s" % (source_name, item_spec))
83 82
84 ppinfo = self.ppmngr.getPipeline(job.source_name) 83 # Run the job!
85 pp = ppinfo.pipeline 84 job_start = time.perf_counter()
85 pp = self.ppmngr.getPipeline(source_name)
86 runctx = PipelineJobRunContext(job, pp.record_name,
87 self.previous_records)
88 ppres = {
89 'item_spec': item_spec
90 }
91 pp.run(job, runctx, ppres)
86 92
87 with self.timerScope("PipelineJobs_%s" % pp.PIPELINE_NAME): 93 # Log time spent in this pipeline.
88 runctx = PipelineJobRunContext(job, pp.record_name, 94 self.stats.stepTimerSince("PipelineJobs_%s" % pp.PIPELINE_NAME,
89 self.record_histories) 95 job_start)
90
91 ppres = PipelineJobResult()
92 # For subsequent pass jobs, there will be a record entry given.
93 # For first pass jobs, there's none so we get the pipeline to
94 # create it.
95 ppres.record_entry = job.data.get('record_entry')
96 if ppres.record_entry is None:
97 ppres.record_entry = pp.createRecordEntry(job, runctx)
98 pp.run(job, runctx, ppres)
99 96
100 return ppres 97 return ppres
101 98
102 def getStats(self): 99 def getStats(self):
103 stats = self.app.env.stats 100 stats = self.app.env.stats