comparison piecrust/processing/worker.py @ 447:aefe70229fdd

bake: Commonize worker pool code between html and asset baking. The `workerpool` package now defines a generic-ish worker pool. It's similar to the Python framework pool but with a simpler use-case (only one way to queue jobs) and support for workers to send a final "report" to the master process, which we use to get timing information here. The rest of the changes basically remove a whole bunch of duplicated code that's not needed anymore.
author Ludovic Chabant <ludovic@chabant.com>
date Sun, 05 Jul 2015 00:09:41 -0700
parents 171dde4f61dc
children 3ceeca7bb71c
comparison
equal deleted inserted replaced
446:4cdf6c2157a0 447:aefe70229fdd
1 import re
1 import os.path 2 import os.path
2 import re
3 import time 3 import time
4 import queue
5 import logging 4 import logging
6 from piecrust.app import PieCrust 5 from piecrust.app import PieCrust
7 from piecrust.processing.base import PipelineContext 6 from piecrust.processing.base import PipelineContext
8 from piecrust.processing.records import ( 7 from piecrust.processing.records import (
9 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED, 8 FLAG_NONE, FLAG_PREPARED, FLAG_PROCESSED,
11 from piecrust.processing.tree import ( 10 from piecrust.processing.tree import (
12 ProcessingTreeBuilder, ProcessingTreeRunner, 11 ProcessingTreeBuilder, ProcessingTreeRunner,
13 ProcessingTreeError, ProcessorError, 12 ProcessingTreeError, ProcessorError,
14 get_node_name_tree, print_node, 13 get_node_name_tree, print_node,
15 STATE_DIRTY) 14 STATE_DIRTY)
15 from piecrust.workerpool import IWorker
16 16
17 17
18 logger = logging.getLogger(__name__) 18 logger = logging.getLogger(__name__)
19 19
20 20
21 split_processor_names_re = re.compile(r'[ ,]+') 21 split_processor_names_re = re.compile(r'[ ,]+')
22 re_ansicolors = re.compile('\033\\[\d+m') 22 re_ansicolors = re.compile('\033\\[\d+m')
23 23
24 24
25 def worker_func(wid, ctx):
26 if ctx.is_profiling:
27 try:
28 import cProfile as profile
29 except ImportError:
30 import profile
31
32 ctx.is_profiling = False
33 profile.runctx('_real_worker_func(wid, ctx)',
34 globals(), locals(),
35 filename='PipelineWorker-%d.prof' % wid)
36 else:
37 _real_worker_func(wid, ctx)
38
39
40 def _real_worker_func(wid, ctx):
41 logger.debug("Worker %d booting up..." % wid)
42 w = ProcessingWorker(wid, ctx)
43 w.run()
44
45
46 class ProcessingWorkerContext(object): 25 class ProcessingWorkerContext(object):
47 def __init__(self, root_dir, out_dir, tmp_dir, 26 def __init__(self, root_dir, out_dir, tmp_dir,
48 work_queue, results, abort_event,
49 force=False, debug=False): 27 force=False, debug=False):
50 self.root_dir = root_dir 28 self.root_dir = root_dir
51 self.out_dir = out_dir 29 self.out_dir = out_dir
52 self.tmp_dir = tmp_dir 30 self.tmp_dir = tmp_dir
53 self.work_queue = work_queue
54 self.results = results
55 self.abort_event = abort_event
56 self.force = force 31 self.force = force
57 self.debug = debug 32 self.debug = debug
58 self.is_profiling = False 33 self.is_profiling = False
59 self.enabled_processors = None 34 self.enabled_processors = None
60 self.additional_processors = None 35 self.additional_processors = None
75 self.proc_tree = None 50 self.proc_tree = None
76 self.rel_outputs = None 51 self.rel_outputs = None
77 self.errors = None 52 self.errors = None
78 53
79 54
80 class ProcessingWorker(object): 55 class ProcessingWorker(IWorker):
81 def __init__(self, wid, ctx): 56 def __init__(self, ctx):
82 self.wid = wid
83 self.ctx = ctx 57 self.ctx = ctx
58 self.work_start_time = time.perf_counter()
84 59
85 def run(self): 60 def initialize(self):
86 logger.debug("Worker %d initializing..." % self.wid)
87 work_start_time = time.perf_counter()
88
89 # Create the app local to this worker. 61 # Create the app local to this worker.
90 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug) 62 app = PieCrust(self.ctx.root_dir, debug=self.ctx.debug)
91 app.env.fs_cache_only_for_main_page = True
92 app.env.registerTimer("PipelineWorker_%d_Total" % self.wid) 63 app.env.registerTimer("PipelineWorker_%d_Total" % self.wid)
93 app.env.registerTimer("PipelineWorkerInit") 64 app.env.registerTimer("PipelineWorkerInit")
94 app.env.registerTimer("JobReceive") 65 app.env.registerTimer("JobReceive")
95 app.env.registerTimer('BuildProcessingTree') 66 app.env.registerTimer('BuildProcessingTree')
96 app.env.registerTimer('RunProcessingTree') 67 app.env.registerTimer('RunProcessingTree')
68 self.app = app
97 69
98 processors = app.plugin_loader.getProcessors() 70 processors = app.plugin_loader.getProcessors()
99 if self.ctx.enabled_processors: 71 if self.ctx.enabled_processors:
100 logger.debug("Filtering processors to: %s" % 72 logger.debug("Filtering processors to: %s" %
101 self.ctx.enabled_processors) 73 self.ctx.enabled_processors)
106 len(self.ctx.additional_processors)) 78 len(self.ctx.additional_processors))
107 for proc in self.ctx.additional_processors: 79 for proc in self.ctx.additional_processors:
108 app.env.registerTimer(proc.__class__.__name__) 80 app.env.registerTimer(proc.__class__.__name__)
109 proc.initialize(app) 81 proc.initialize(app)
110 processors.append(proc) 82 processors.append(proc)
83 self.processors = processors
111 84
112 # Invoke pre-processors. 85 # Invoke pre-processors.
113 pipeline_ctx = PipelineContext(self.wid, app, self.ctx.out_dir, 86 pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
114 self.ctx.tmp_dir, self.ctx.force) 87 self.ctx.tmp_dir, self.ctx.force)
115 for proc in processors: 88 for proc in processors:
116 proc.onPipelineStart(pipeline_ctx) 89 proc.onPipelineStart(pipeline_ctx)
117 90
118 # Sort our processors again in case the pre-process step involved 91 # Sort our processors again in case the pre-process step involved
119 # patching the processors with some new ones. 92 # patching the processors with some new ones.
120 processors.sort(key=lambda p: p.priority) 93 processors.sort(key=lambda p: p.priority)
121 94
122 app.env.stepTimerSince("PipelineWorkerInit", work_start_time) 95 app.env.stepTimerSince("PipelineWorkerInit", self.work_start_time)
123 96
124 aborted_with_exception = None 97 def process(self, job):
125 while not self.ctx.abort_event.is_set():
126 try:
127 with app.env.timerScope('JobReceive'):
128 job = self.ctx.work_queue.get(True, 0.01)
129 except queue.Empty:
130 continue
131
132 try:
133 result = self._unsafeRun(app, processors, job)
134 self.ctx.results.put_nowait(result)
135 except Exception as ex:
136 self.ctx.abort_event.set()
137 aborted_with_exception = ex
138 logger.error("[%d] Critical error, aborting." % self.wid)
139 if self.ctx.debug:
140 logger.exception(ex)
141 break
142 finally:
143 self.ctx.work_queue.task_done()
144
145 if aborted_with_exception is not None:
146 msgs = _get_errors(aborted_with_exception)
147 self.ctx.results.put_nowait({'type': 'error', 'messages': msgs})
148
149 # Invoke post-processors.
150 for proc in processors:
151 proc.onPipelineEnd(pipeline_ctx)
152
153 app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid,
154 work_start_time)
155 self.ctx.results.put_nowait({
156 'type': 'timers', 'data': app.env._timers})
157
158 def _unsafeRun(self, app, processors, job):
159 result = ProcessingWorkerResult(job.path) 98 result = ProcessingWorkerResult(job.path)
160 99
161 processors = get_filtered_processors( 100 processors = get_filtered_processors(
162 processors, job.mount_info['processors']) 101 self.processors, job.mount_info['processors'])
163 102
164 # Build the processing tree for this job. 103 # Build the processing tree for this job.
165 rel_path = os.path.relpath(job.path, job.base_dir) 104 rel_path = os.path.relpath(job.path, job.base_dir)
166 try: 105 try:
167 with app.env.timerScope('BuildProcessingTree'): 106 with self.app.env.timerScope('BuildProcessingTree'):
168 builder = ProcessingTreeBuilder(processors) 107 builder = ProcessingTreeBuilder(processors)
169 tree_root = builder.build(rel_path) 108 tree_root = builder.build(rel_path)
170 result.flags |= FLAG_PREPARED 109 result.flags |= FLAG_PREPARED
171 except ProcessingTreeError as ex: 110 except ProcessingTreeError as ex:
172 result.errors = _get_errors(ex) 111 result.errors = _get_errors(ex)
182 121
183 if job.force: 122 if job.force:
184 tree_root.setState(STATE_DIRTY, True) 123 tree_root.setState(STATE_DIRTY, True)
185 124
186 try: 125 try:
187 with app.env.timerScope('RunProcessingTree'): 126 with self.app.env.timerScope('RunProcessingTree'):
188 runner = ProcessingTreeRunner( 127 runner = ProcessingTreeRunner(
189 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir) 128 job.base_dir, self.ctx.tmp_dir, self.ctx.out_dir)
190 if runner.processSubTree(tree_root): 129 if runner.processSubTree(tree_root):
191 result.flags |= FLAG_PROCESSED 130 result.flags |= FLAG_PROCESSED
192 except ProcessingTreeError as ex: 131 except ProcessingTreeError as ex:
194 ex = ex.__cause__ 133 ex = ex.__cause__
195 # Need to strip out colored errors from external processes. 134 # Need to strip out colored errors from external processes.
196 result.errors = _get_errors(ex, strip_colors=True) 135 result.errors = _get_errors(ex, strip_colors=True)
197 136
198 return result 137 return result
138
139 def getReport(self):
140 # Invoke post-processors.
141 pipeline_ctx = PipelineContext(self.wid, self.app, self.ctx.out_dir,
142 self.ctx.tmp_dir, self.ctx.force)
143 for proc in self.processors:
144 proc.onPipelineEnd(pipeline_ctx)
145
146 self.app.env.stepTimerSince("PipelineWorker_%d_Total" % self.wid,
147 self.work_start_time)
148 return {
149 'type': 'timers',
150 'data': self.app.env._timers}
199 151
200 152
201 def get_filtered_processors(processors, authorized_names): 153 def get_filtered_processors(processors, authorized_names):
202 if not authorized_names or authorized_names == 'all': 154 if not authorized_names or authorized_names == 'all':
203 return processors 155 return processors