Mercurial > piecrust2
comparison piecrust/pipelines/base.py @ 854:08e02c2a2a1a
core: Keep refactoring, this time to prepare for generator sources.
- Make a few APIs simpler.
- Content pipelines create their own jobs, so that generator sources can
keep aborting in `getContents`, but rely on their pipeline to generate
pages for baking.
author | Ludovic Chabant <ludovic@chabant.com> |
---|---|
date | Sun, 04 Jun 2017 23:34:28 -0700 |
parents | f070a4fc033c |
children | 448710d84121 |
comparison
equal
deleted
inserted
replaced
853:f070a4fc033c | 854:08e02c2a2a1a |
---|---|
1 import os.path | 1 import os.path |
2 import logging | 2 import logging |
3 from werkzeug.utils import cached_property | |
4 from piecrust.configuration import ConfigurationError | |
3 | 5 |
4 | 6 |
5 logger = logging.getLogger(__name__) | 7 logger = logging.getLogger(__name__) |
6 | 8 |
7 | 9 |
8 class PipelineContext: | 10 class PipelineContext: |
9 """ The context for running a content pipeline. | 11 """ The context for running a content pipeline. |
10 """ | 12 """ |
11 def __init__(self, out_dir, record_history, *, | 13 def __init__(self, out_dir, *, |
12 worker_id=-1, force=None): | 14 worker_id=-1, force=None): |
13 self.out_dir = out_dir | 15 self.out_dir = out_dir |
14 self.record_history = record_history | |
15 self.worker_id = worker_id | 16 self.worker_id = worker_id |
16 self.force = force | 17 self.force = force |
17 | 18 |
18 @property | 19 @property |
19 def is_worker(self): | 20 def is_worker(self): |
28 the main process (and not a worker process). This is the case | 29 the main process (and not a worker process). This is the case |
29 if there are no worker processes at all. | 30 if there are no worker processes at all. |
30 """ | 31 """ |
31 return self.worker_id < 0 | 32 return self.worker_id < 0 |
32 | 33 |
33 @property | 34 |
34 def current_record(self): | 35 class PipelineJob: |
35 return self.record_history.current | 36 """ Base class for a pipline baking job. |
36 | 37 """ |
37 | 38 def __init__(self, pipeline, content_item): |
38 class PipelineResult: | 39 self.source_name = pipeline.source.name |
40 self.record_name = pipeline.record_name | |
41 self.content_item = content_item | |
42 self.data = {} | |
43 | |
44 | |
45 class PipelineJobRunContext: | |
46 """ Context for running pipeline baking jobs. | |
47 """ | |
48 def __init__(self, job, pipeline, record_histories): | |
49 self.record_histories = record_histories | |
50 self._job_item_spec = job.content_item.spec | |
51 self._record_name = pipeline.record_name | |
52 | |
53 @cached_property | |
54 def previous_record(self): | |
55 return self.record_histories.getPreviousRecord(self._record_name) | |
56 | |
57 @cached_property | |
58 def previous_entry(self): | |
59 return self.previous_record.getEntry(self._job_item_spec) | |
60 | |
61 | |
62 class PipelineJobResult: | |
39 """ Result of running a pipeline on a content item. | 63 """ Result of running a pipeline on a content item. |
40 """ | 64 """ |
41 def __init__(self): | 65 def __init__(self): |
42 self.pipeline_name = None | |
43 self.record_entry = None | 66 self.record_entry = None |
67 self.next_pass_job = None | |
68 | |
69 | |
70 class PipelineMergeRecordContext: | |
71 """ The context for merging a record entry for a second or higher pass | |
72 into the bake record. | |
73 """ | |
74 def __init__(self, record, job, pass_num): | |
75 self.record = record | |
76 self.job = job | |
77 self.pass_num = pass_num | |
78 | |
79 | |
80 class PipelineDeletionContext: | |
81 def __init__(self, record_history): | |
82 self.record_history = record_history | |
83 | |
84 | |
85 class PipelineCollapseRecordContext: | |
86 def __init__(self, record_history): | |
87 self.record_history = record_history | |
44 | 88 |
45 | 89 |
46 class ContentPipeline: | 90 class ContentPipeline: |
47 """ A pipeline that processes content from a `ContentSource`. | 91 """ A pipeline that processes content from a `ContentSource`. |
48 """ | 92 """ |
49 PIPELINE_NAME = None | 93 PIPELINE_NAME = None |
50 PIPELINE_PASSES = 1 | |
51 RECORD_ENTRY_CLASS = None | 94 RECORD_ENTRY_CLASS = None |
52 | 95 PASS_NUM = 0 |
53 def __init__(self, source): | 96 |
97 def __init__(self, source, ctx): | |
54 self.source = source | 98 self.source = source |
99 self.ctx = ctx | |
100 self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME) | |
55 | 101 |
56 app = source.app | 102 app = source.app |
57 tmp_dir = app.cache_dir | 103 tmp_dir = app.cache_dir |
58 if not tmp_dir: | 104 if not tmp_dir: |
59 import tempfile | 105 import tempfile |
62 | 108 |
63 @property | 109 @property |
64 def app(self): | 110 def app(self): |
65 return self.source.app | 111 return self.source.app |
66 | 112 |
67 def initialize(self, ctx): | 113 def initialize(self): |
68 pass | 114 pass |
69 | 115 |
70 def run(self, content_item, ctx, result): | 116 def createJobs(self): |
117 return [ | |
118 self.createJob(item) | |
119 for item in self.source.getAllContents()] | |
120 | |
121 def createJob(self, content_item): | |
122 return PipelineJob(self, content_item) | |
123 | |
124 def createRecordEntry(self, job): | |
125 entry_class = self.RECORD_ENTRY_CLASS | |
126 record_entry = entry_class() | |
127 record_entry.item_spec = job.content_item.spec | |
128 return record_entry | |
129 | |
130 def mergeRecordEntry(self, record_entry, ctx): | |
71 raise NotImplementedError() | 131 raise NotImplementedError() |
72 | 132 |
133 def run(self, job, ctx, result): | |
134 raise NotImplementedError() | |
135 | |
73 def getDeletions(self, ctx): | 136 def getDeletions(self, ctx): |
74 pass | 137 pass |
75 | 138 |
76 def collapseRecords(self, ctx): | 139 def collapseRecords(self, ctx): |
77 pass | 140 pass |
78 | 141 |
79 def shutdown(self, ctx): | 142 def shutdown(self): |
80 pass | 143 pass |
144 | |
145 | |
146 def get_pipeline_name_for_source(source): | |
147 pname = source.config['pipeline'] | |
148 if not pname: | |
149 pname = source.DEFAULT_PIPELINE_NAME | |
150 if not pname: | |
151 raise ConfigurationError( | |
152 "Source '%s' doesn't specify any pipeline." % source.name) | |
153 return pname | |
154 | |
155 | |
156 class PipelineManager: | |
157 def __init__(self, app, out_dir, record_histories, *, | |
158 worker_id=-1, force=False): | |
159 self.app = app | |
160 self.record_histories = record_histories | |
161 self.out_dir = out_dir | |
162 self.worker_id = worker_id | |
163 self.force = force | |
164 | |
165 self._pipeline_classes = {} | |
166 for pclass in app.plugin_loader.getPipelines(): | |
167 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass | |
168 | |
169 self._pipelines = {} | |
170 | |
171 def getPipeline(self, source_name): | |
172 return self._pipelines[source_name] | |
173 | |
174 def getPipelines(self): | |
175 return self._pipelines.values() | |
176 | |
177 def createPipeline(self, source): | |
178 if source.name in self._pipelines: | |
179 raise ValueError("Pipeline for source '%s' was already created." % | |
180 source.name) | |
181 | |
182 pname = get_pipeline_name_for_source(source) | |
183 ppctx = PipelineContext(self.out_dir, | |
184 worker_id=self.worker_id, force=self.force) | |
185 pp = self._pipeline_classes[pname](source, ppctx) | |
186 pp.initialize() | |
187 | |
188 record_history = self.record_histories.getHistory(pp.record_name) | |
189 | |
190 info = _PipelineInfo(pp, record_history) | |
191 self._pipelines[source.name] = info | |
192 return info | |
193 | |
194 def buildHistoryDiffs(self): | |
195 for ppinfo in self.getPipelines(): | |
196 ppinfo.record_history.build() | |
197 | |
198 def deleteStaleOutputs(self): | |
199 for ppinfo in self.getPipelines(): | |
200 ctx = PipelineDeletionContext(ppinfo.record_history) | |
201 to_delete = ppinfo.pipeline.getDeletions(ctx) | |
202 current_record = ppinfo.record_history.current | |
203 if to_delete is not None: | |
204 for path, reason in to_delete: | |
205 logger.debug("Removing '%s': %s" % (path, reason)) | |
206 current_record.deleted_out_paths.append(path) | |
207 try: | |
208 os.remove(path) | |
209 except FileNotFoundError: | |
210 pass | |
211 logger.info('[delete] %s' % path) | |
212 | |
213 def collapseRecords(self): | |
214 for ppinfo in self.getPipelines(): | |
215 ctx = PipelineCollapseRecordContext(ppinfo.record_history) | |
216 ppinfo.pipeline.collapseRecords(ctx) | |
217 | |
218 def shutdownPipelines(self): | |
219 for ppinfo in self.getPipelines(): | |
220 ppinfo.pipeline.shutdown() | |
221 | |
222 self._pipelines = {} | |
223 | |
224 | |
225 class _PipelineInfo: | |
226 def __init__(self, pipeline, record_history): | |
227 self.pipeline = pipeline | |
228 self.record_history = record_history | |
229 self.userdata = None | |
230 | |
231 @property | |
232 def source(self): | |
233 return self.pipeline.source | |
234 | |
235 @property | |
236 def pipeline_name(self): | |
237 return self.pipeline.PIPELINE_NAME | |
238 |