comparison piecrust/pipelines/base.py @ 854:08e02c2a2a1a

core: Keep refactoring, this time to prepare for generator sources. - Make a few APIs simpler. - Content pipelines create their own jobs, so that generator sources can keep aborting in `getContents`, but rely on their pipeline to generate pages for baking.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 04 Jun 2017 23:34:28 -0700
parents f070a4fc033c
children 448710d84121
comparison
equal deleted inserted replaced
853:f070a4fc033c 854:08e02c2a2a1a
1 import os.path 1 import os.path
2 import logging 2 import logging
3 from werkzeug.utils import cached_property
4 from piecrust.configuration import ConfigurationError
3 5
4 6
5 logger = logging.getLogger(__name__) 7 logger = logging.getLogger(__name__)
6 8
7 9
8 class PipelineContext: 10 class PipelineContext:
9 """ The context for running a content pipeline. 11 """ The context for running a content pipeline.
10 """ 12 """
11 def __init__(self, out_dir, record_history, *, 13 def __init__(self, out_dir, *,
12 worker_id=-1, force=None): 14 worker_id=-1, force=None):
13 self.out_dir = out_dir 15 self.out_dir = out_dir
14 self.record_history = record_history
15 self.worker_id = worker_id 16 self.worker_id = worker_id
16 self.force = force 17 self.force = force
17 18
18 @property 19 @property
19 def is_worker(self): 20 def is_worker(self):
28 the main process (and not a worker process). This is the case 29 the main process (and not a worker process). This is the case
29 if there are no worker processes at all. 30 if there are no worker processes at all.
30 """ 31 """
31 return self.worker_id < 0 32 return self.worker_id < 0
32 33
33 @property 34
34 def current_record(self): 35 class PipelineJob:
35 return self.record_history.current 36 """ Base class for a pipline baking job.
36 37 """
37 38 def __init__(self, pipeline, content_item):
38 class PipelineResult: 39 self.source_name = pipeline.source.name
40 self.record_name = pipeline.record_name
41 self.content_item = content_item
42 self.data = {}
43
44
45 class PipelineJobRunContext:
46 """ Context for running pipeline baking jobs.
47 """
48 def __init__(self, job, pipeline, record_histories):
49 self.record_histories = record_histories
50 self._job_item_spec = job.content_item.spec
51 self._record_name = pipeline.record_name
52
53 @cached_property
54 def previous_record(self):
55 return self.record_histories.getPreviousRecord(self._record_name)
56
57 @cached_property
58 def previous_entry(self):
59 return self.previous_record.getEntry(self._job_item_spec)
60
61
62 class PipelineJobResult:
39 """ Result of running a pipeline on a content item. 63 """ Result of running a pipeline on a content item.
40 """ 64 """
41 def __init__(self): 65 def __init__(self):
42 self.pipeline_name = None
43 self.record_entry = None 66 self.record_entry = None
67 self.next_pass_job = None
68
69
70 class PipelineMergeRecordContext:
71 """ The context for merging a record entry for a second or higher pass
72 into the bake record.
73 """
74 def __init__(self, record, job, pass_num):
75 self.record = record
76 self.job = job
77 self.pass_num = pass_num
78
79
80 class PipelineDeletionContext:
81 def __init__(self, record_history):
82 self.record_history = record_history
83
84
85 class PipelineCollapseRecordContext:
86 def __init__(self, record_history):
87 self.record_history = record_history
44 88
45 89
46 class ContentPipeline: 90 class ContentPipeline:
47 """ A pipeline that processes content from a `ContentSource`. 91 """ A pipeline that processes content from a `ContentSource`.
48 """ 92 """
49 PIPELINE_NAME = None 93 PIPELINE_NAME = None
50 PIPELINE_PASSES = 1
51 RECORD_ENTRY_CLASS = None 94 RECORD_ENTRY_CLASS = None
52 95 PASS_NUM = 0
53 def __init__(self, source): 96
97 def __init__(self, source, ctx):
54 self.source = source 98 self.source = source
99 self.ctx = ctx
100 self.record_name = '%s@%s' % (source.name, self.PIPELINE_NAME)
55 101
56 app = source.app 102 app = source.app
57 tmp_dir = app.cache_dir 103 tmp_dir = app.cache_dir
58 if not tmp_dir: 104 if not tmp_dir:
59 import tempfile 105 import tempfile
62 108
63 @property 109 @property
64 def app(self): 110 def app(self):
65 return self.source.app 111 return self.source.app
66 112
67 def initialize(self, ctx): 113 def initialize(self):
68 pass 114 pass
69 115
70 def run(self, content_item, ctx, result): 116 def createJobs(self):
117 return [
118 self.createJob(item)
119 for item in self.source.getAllContents()]
120
121 def createJob(self, content_item):
122 return PipelineJob(self, content_item)
123
124 def createRecordEntry(self, job):
125 entry_class = self.RECORD_ENTRY_CLASS
126 record_entry = entry_class()
127 record_entry.item_spec = job.content_item.spec
128 return record_entry
129
130 def mergeRecordEntry(self, record_entry, ctx):
71 raise NotImplementedError() 131 raise NotImplementedError()
72 132
133 def run(self, job, ctx, result):
134 raise NotImplementedError()
135
73 def getDeletions(self, ctx): 136 def getDeletions(self, ctx):
74 pass 137 pass
75 138
76 def collapseRecords(self, ctx): 139 def collapseRecords(self, ctx):
77 pass 140 pass
78 141
79 def shutdown(self, ctx): 142 def shutdown(self):
80 pass 143 pass
144
145
146 def get_pipeline_name_for_source(source):
147 pname = source.config['pipeline']
148 if not pname:
149 pname = source.DEFAULT_PIPELINE_NAME
150 if not pname:
151 raise ConfigurationError(
152 "Source '%s' doesn't specify any pipeline." % source.name)
153 return pname
154
155
156 class PipelineManager:
157 def __init__(self, app, out_dir, record_histories, *,
158 worker_id=-1, force=False):
159 self.app = app
160 self.record_histories = record_histories
161 self.out_dir = out_dir
162 self.worker_id = worker_id
163 self.force = force
164
165 self._pipeline_classes = {}
166 for pclass in app.plugin_loader.getPipelines():
167 self._pipeline_classes[pclass.PIPELINE_NAME] = pclass
168
169 self._pipelines = {}
170
171 def getPipeline(self, source_name):
172 return self._pipelines[source_name]
173
174 def getPipelines(self):
175 return self._pipelines.values()
176
177 def createPipeline(self, source):
178 if source.name in self._pipelines:
179 raise ValueError("Pipeline for source '%s' was already created." %
180 source.name)
181
182 pname = get_pipeline_name_for_source(source)
183 ppctx = PipelineContext(self.out_dir,
184 worker_id=self.worker_id, force=self.force)
185 pp = self._pipeline_classes[pname](source, ppctx)
186 pp.initialize()
187
188 record_history = self.record_histories.getHistory(pp.record_name)
189
190 info = _PipelineInfo(pp, record_history)
191 self._pipelines[source.name] = info
192 return info
193
194 def buildHistoryDiffs(self):
195 for ppinfo in self.getPipelines():
196 ppinfo.record_history.build()
197
198 def deleteStaleOutputs(self):
199 for ppinfo in self.getPipelines():
200 ctx = PipelineDeletionContext(ppinfo.record_history)
201 to_delete = ppinfo.pipeline.getDeletions(ctx)
202 current_record = ppinfo.record_history.current
203 if to_delete is not None:
204 for path, reason in to_delete:
205 logger.debug("Removing '%s': %s" % (path, reason))
206 current_record.deleted_out_paths.append(path)
207 try:
208 os.remove(path)
209 except FileNotFoundError:
210 pass
211 logger.info('[delete] %s' % path)
212
213 def collapseRecords(self):
214 for ppinfo in self.getPipelines():
215 ctx = PipelineCollapseRecordContext(ppinfo.record_history)
216 ppinfo.pipeline.collapseRecords(ctx)
217
218 def shutdownPipelines(self):
219 for ppinfo in self.getPipelines():
220 ppinfo.pipeline.shutdown()
221
222 self._pipelines = {}
223
224
225 class _PipelineInfo:
226 def __init__(self, pipeline, record_history):
227 self.pipeline = pipeline
228 self.record_history = record_history
229 self.userdata = None
230
231 @property
232 def source(self):
233 return self.pipeline.source
234
235 @property
236 def pipeline_name(self):
237 return self.pipeline.PIPELINE_NAME
238