Mercurial > piecrust2
comparison piecrust/baking/worker.py @ 989:8adc27285d93
bake: Big pass on bake performance.
- Reduce the amount of data passed between processes.
- Make inter-process data simple objects to make it easier to test with
alternatives to pickle.
- Make sources have the basic requirement to be able to find a content item
from an item spec (path).
- Make Hoedown the default Markdown formatter.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 19 Nov 2017 14:29:17 -0800 |
parents | bf65a1a6992a |
children | 2e5c5d33d62c |
comparison
equal
deleted
inserted
replaced
988:f83ae0a5d793 | 989:8adc27285d93 |
---|---|
1 import time | 1 import time |
2 import logging | 2 import logging |
3 from piecrust.pipelines.base import ( | 3 from piecrust.pipelines.base import ( |
4 PipelineManager, PipelineJobRunContext, PipelineJobResult, | 4 PipelineManager, PipelineJobRunContext, |
5 get_pipeline_name_for_source) | 5 get_pipeline_name_for_source) |
6 from piecrust.pipelines.records import ( | 6 from piecrust.pipelines.records import ( |
7 MultiRecordHistory, MultiRecord, load_records) | 7 MultiRecordHistory, MultiRecord, load_records) |
8 from piecrust.workerpool import IWorker | 8 from piecrust.workerpool import IWorker |
9 | 9 |
25 | 25 |
26 class BakeWorker(IWorker): | 26 class BakeWorker(IWorker): |
27 def __init__(self, ctx): | 27 def __init__(self, ctx): |
28 self.ctx = ctx | 28 self.ctx = ctx |
29 self.app = None | 29 self.app = None |
30 self.record_histories = None | 30 self.stats = None |
31 self.previous_records = None | |
31 self._work_start_time = time.perf_counter() | 32 self._work_start_time = time.perf_counter() |
32 self._sources = {} | 33 self._sources = {} |
33 self._ppctx = None | 34 self._ppctx = None |
34 | 35 |
35 def initialize(self): | 36 def initialize(self): |
42 app.env.fs_cache_only_for_main_page = True | 43 app.env.fs_cache_only_for_main_page = True |
43 | 44 |
44 stats = app.env.stats | 45 stats = app.env.stats |
45 stats.registerTimer("BakeWorker_%d_Total" % self.wid) | 46 stats.registerTimer("BakeWorker_%d_Total" % self.wid) |
46 stats.registerTimer("BakeWorkerInit") | 47 stats.registerTimer("BakeWorkerInit") |
47 self.timerScope = stats.timerScope | |
48 | 48 |
49 self.app = app | 49 self.app = app |
50 self.stats = stats | |
50 | 51 |
51 # Load previous record | 52 # Load previous record |
52 if self.ctx.previous_records_path: | 53 if self.ctx.previous_records_path: |
53 previous_records = load_records(self.ctx.previous_records_path) | 54 previous_records = load_records(self.ctx.previous_records_path) |
54 else: | 55 else: |
55 previous_records = MultiRecord() | 56 previous_records = MultiRecord() |
56 current_records = MultiRecord() | 57 self.previous_records = previous_records |
57 self.record_histories = MultiRecordHistory( | |
58 previous_records, current_records) | |
59 | 58 |
60 # Create the pipelines. | 59 # Create the pipelines. |
61 self.ppmngr = PipelineManager( | 60 self.ppmngr = PipelineManager( |
62 app, self.ctx.out_dir, self.record_histories, | 61 app, self.ctx.out_dir, |
63 worker_id=self.wid, force=self.ctx.force) | 62 worker_id=self.wid, force=self.ctx.force) |
64 ok_pp = self.ctx.allowed_pipelines | 63 ok_pp = self.ctx.allowed_pipelines |
65 nok_pp = self.ctx.forbidden_pipelines | 64 nok_pp = self.ctx.forbidden_pipelines |
66 for src in app.sources: | 65 for src in app.sources: |
67 pname = get_pipeline_name_for_source(src) | 66 pname = get_pipeline_name_for_source(src) |
76 raise_if_registered=False) | 75 raise_if_registered=False) |
77 | 76 |
78 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) | 77 stats.stepTimerSince("BakeWorkerInit", self._work_start_time) |
79 | 78 |
80 def process(self, job): | 79 def process(self, job): |
81 item = job.content_item | 80 source_name, item_spec = job['job_spec'] |
82 logger.debug("Received job: %s@%s" % (job.source_name, item.spec)) | 81 logger.debug("Received job: %s@%s" % (source_name, item_spec)) |
83 | 82 |
84 ppinfo = self.ppmngr.getPipeline(job.source_name) | 83 # Run the job! |
85 pp = ppinfo.pipeline | 84 job_start = time.perf_counter() |
85 pp = self.ppmngr.getPipeline(source_name) | |
86 runctx = PipelineJobRunContext(job, pp.record_name, | |
87 self.previous_records) | |
88 ppres = { | |
89 'item_spec': item_spec | |
90 } | |
91 pp.run(job, runctx, ppres) | |
86 | 92 |
87 with self.timerScope("PipelineJobs_%s" % pp.PIPELINE_NAME): | 93 # Log time spent in this pipeline. |
88 runctx = PipelineJobRunContext(job, pp.record_name, | 94 self.stats.stepTimerSince("PipelineJobs_%s" % pp.PIPELINE_NAME, |
89 self.record_histories) | 95 job_start) |
90 | |
91 ppres = PipelineJobResult() | |
92 # For subsequent pass jobs, there will be a record entry given. | |
93 # For first pass jobs, there's none so we get the pipeline to | |
94 # create it. | |
95 ppres.record_entry = job.data.get('record_entry') | |
96 if ppres.record_entry is None: | |
97 ppres.record_entry = pp.createRecordEntry(job, runctx) | |
98 pp.run(job, runctx, ppres) | |
99 | 96 |
100 return ppres | 97 return ppres |
101 | 98 |
102 def getStats(self): | 99 def getStats(self): |
103 stats = self.app.env.stats | 100 stats = self.app.env.stats |