comparison piecrust/pipelines/asset.py @ 853:f070a4fc033c

core: Continue PieCrust3 refactor, simplify pages. The asset pipeline is still the only function pipeline at this point. * No more `QualifiedPage`, and several other pieces of code deleted. * Data providers are simpler and more focused. For instance, the page iterator doesn't try to support other types of items. * Route parameters are proper known source metadata to remove the confusion between the two. * Make the baker and pipeline more correctly manage records and record histories. * Add support for record collapsing and deleting stale outputs in the asset pipeline.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 21 May 2017 00:06:59 -0700
parents 4850f8c21b6e
children 08e02c2a2a1a
comparison
equal deleted inserted replaced
852:4850f8c21b6e 853:f070a4fc033c
15 logger = logging.getLogger(__name__) 15 logger = logging.getLogger(__name__)
16 16
17 17
18 class AssetPipeline(ContentPipeline): 18 class AssetPipeline(ContentPipeline):
19 PIPELINE_NAME = 'asset' 19 PIPELINE_NAME = 'asset'
20 RECORD_CLASS = AssetPipelineRecordEntry 20 RECORD_ENTRY_CLASS = AssetPipelineRecordEntry
21 21
22 def __init__(self, source): 22 def __init__(self, source):
23 if not isinstance(source, FSContentSourceBase): 23 if not isinstance(source, FSContentSourceBase):
24 raise Exception( 24 raise Exception(
25 "The asset pipeline only support file-system sources.") 25 "The asset pipeline only support file-system sources.")
66 # See if we need to ignore this item. 66 # See if we need to ignore this item.
67 rel_path = os.path.relpath(content_item.spec, self._base_dir) 67 rel_path = os.path.relpath(content_item.spec, self._base_dir)
68 if re_matchany(rel_path, self.ignore_patterns): 68 if re_matchany(rel_path, self.ignore_patterns):
69 return 69 return
70 70
71 record = result.record 71 record_entry = result.record_entry
72 stats = self.app.env.stats 72 stats = self.app.env.stats
73 73
74 # Build the processing tree for this job. 74 # Build the processing tree for this job.
75 with stats.timerScope('BuildProcessingTree'): 75 with stats.timerScope('BuildProcessingTree'):
76 builder = ProcessingTreeBuilder(self._processors) 76 builder = ProcessingTreeBuilder(self._processors)
77 tree_root = builder.build(rel_path) 77 tree_root = builder.build(rel_path)
78 record.flags |= AssetPipelineRecordEntry.FLAG_PREPARED 78 record_entry.flags |= AssetPipelineRecordEntry.FLAG_PREPARED
79 79
80 # Prepare and run the tree. 80 # Prepare and run the tree.
81 print_node(tree_root, recursive=True) 81 print_node(tree_root, recursive=True)
82 leaves = tree_root.getLeaves() 82 leaves = tree_root.getLeaves()
83 record.rel_outputs = [l.path for l in leaves] 83 record_entry.out_paths = [os.path.join(ctx.out_dir, l.path)
84 record.proc_tree = get_node_name_tree(tree_root) 84 for l in leaves]
85 record_entry.proc_tree = get_node_name_tree(tree_root)
85 if tree_root.getProcessor().is_bypassing_structured_processing: 86 if tree_root.getProcessor().is_bypassing_structured_processing:
86 record.flags |= ( 87 record_entry.flags |= (
87 AssetPipelineRecordEntry.FLAG_BYPASSED_STRUCTURED_PROCESSING) 88 AssetPipelineRecordEntry.FLAG_BYPASSED_STRUCTURED_PROCESSING)
88 89
89 if ctx.force: 90 if ctx.force:
90 tree_root.setState(STATE_DIRTY, True) 91 tree_root.setState(STATE_DIRTY, True)
91 92
92 with stats.timerScope('RunProcessingTree'): 93 with stats.timerScope('RunProcessingTree'):
93 runner = ProcessingTreeRunner( 94 runner = ProcessingTreeRunner(
94 self._base_dir, self.tmp_dir, ctx.out_dir) 95 self._base_dir, self.tmp_dir, ctx.out_dir)
95 if runner.processSubTree(tree_root): 96 if runner.processSubTree(tree_root):
96 record.flags |= ( 97 record_entry.flags |= (
97 AssetPipelineRecordEntry.FLAG_PROCESSED) 98 AssetPipelineRecordEntry.FLAG_PROCESSED)
98 99
99 def shutdown(self, ctx): 100 def getDeletions(self, ctx):
100 # Invoke post-processors. 101 for prev, cur in ctx.record_history.diffs:
101 proc_ctx = ProcessorContext(self, ctx)
102 for proc in self._processors:
103 proc.onPipelineEnd(proc_ctx)
104
105 def collapseRecords(self, record_history):
106 for prev, cur in record_history.diffs():
107 if prev and cur and not cur.was_processed:
108 # This asset wasn't processed, so the information from
109 # last time is still valid.
110 cur.flags = (
111 prev.flags &
112 (~AssetPipelineRecordEntry.FLAG_PROCESSED |
113 AssetPipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN))
114 cur.out_paths = list(prev.out_paths)
115 cur.errors = list(prev.errors)
116
117 def getDeletions(self, record_history):
118 for prev, cur in record_history.diffs():
119 if prev and not cur: 102 if prev and not cur:
120 for p in prev.out_paths: 103 for p in prev.out_paths:
121 yield (p, 'previous asset was removed') 104 yield (p, 'previous asset was removed')
122 elif prev and cur and cur.was_processed_successfully: 105 elif prev and cur and cur.was_processed_successfully:
123 diff = set(prev.out_paths) - set(cur.out_paths) 106 diff = set(prev.out_paths) - set(cur.out_paths)
124 for p in diff: 107 for p in diff:
125 yield (p, 'asset changed outputs') 108 yield (p, 'asset changed outputs')
109
110 def collapseRecords(self, ctx):
111 for prev, cur in ctx.record_history.diffs:
112 if prev and cur and not cur.was_processed:
113 # This asset wasn't processed, so the information from
114 # last time is still valid.
115 cur.flags = (
116 (prev.flags & ~AssetPipelineRecordEntry.FLAG_PROCESSED) |
117 AssetPipelineRecordEntry.FLAG_COLLAPSED_FROM_LAST_RUN)
118 cur.out_paths = list(prev.out_paths)
119 cur.errors = list(prev.errors)
120
121 def shutdown(self, ctx):
122 # Invoke post-processors.
123 proc_ctx = ProcessorContext(self, ctx)
124 for proc in self._processors:
125 proc.onPipelineEnd(proc_ctx)
126 126
127 127
128 split_processor_names_re = re.compile(r'[ ,]+') 128 split_processor_names_re = re.compile(r'[ ,]+')
129 129
130 130