1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 __revision__ = "src/engine/SCons/Job.py 5110 2010/07/25 16:14:38 bdeegan"
33
34 import os
35 import signal
36
37 import SCons.Errors
38
39
40
41
42
43
44
45
46
47 explicit_stack_size = None
48 default_stack_size = 256
49
50 interrupt_msg = 'Build interrupted.'
51
52
55 self.interrupted = False
56
58 self.interrupted = True
59
61 return self.interrupted
62
63
65 """An instance of this class initializes N jobs, and provides
66 methods for starting, stopping, and waiting on all N jobs.
67 """
68
70 """
71 create 'num' jobs using the given taskmaster.
72
73 If 'num' is 1 or less, then a serial job will be used,
74 otherwise a parallel job with 'num' worker threads will
75 be used.
76
77 The 'num_jobs' attribute will be set to the actual number of jobs
78 allocated. If more than one job is requested but the Parallel
79 class can't do it, it gets reset to 1. Wrapping interfaces that
80 care should check the value of 'num_jobs' after initialization.
81 """
82
83 self.job = None
84 if num > 1:
85 stack_size = explicit_stack_size
86 if stack_size is None:
87 stack_size = default_stack_size
88
89 try:
90 self.job = Parallel(taskmaster, num, stack_size)
91 self.num_jobs = num
92 except NameError:
93 pass
94 if self.job is None:
95 self.job = Serial(taskmaster)
96 self.num_jobs = 1
97
98 - def run(self, postfunc=lambda: None):
99 """Run the jobs.
100
101 postfunc() will be invoked after the jobs has run. It will be
102 invoked even if the jobs are interrupted by a keyboard
103 interrupt (well, in fact by a signal such as either SIGINT,
104 SIGTERM or SIGHUP). The execution of postfunc() is protected
105 against keyboard interrupts and is guaranteed to run to
106 completion."""
107 self._setup_sig_handler()
108 try:
109 self.job.start()
110 finally:
111 postfunc()
112 self._reset_sig_handler()
113
115 """Returns whether the jobs were interrupted by a signal."""
116 return self.job.interrupted()
117
119 """Setup an interrupt handler so that SCons can shutdown cleanly in
120 various conditions:
121
122 a) SIGINT: Keyboard interrupt
123 b) SIGTERM: kill or system shutdown
124 c) SIGHUP: Controlling shell exiting
125
126 We handle all of these cases by stopping the taskmaster. It
127 turns out that it very difficult to stop the build process
128 by throwing asynchronously an exception such as
129 KeyboardInterrupt. For example, the python Condition
130 variables (threading.Condition) and Queue's do not seem to
131 asynchronous-exception-safe. It would require adding a whole
132 bunch of try/finally block and except KeyboardInterrupt all
133 over the place.
134
135 Note also that we have to be careful to handle the case when
136 SCons forks before executing another process. In that case, we
137 want the child to exit immediately.
138 """
139 def handler(signum, stack, self=self, parentpid=os.getpid()):
140 if os.getpid() == parentpid:
141 self.job.taskmaster.stop()
142 self.job.interrupted.set()
143 else:
144 os._exit(2)
145
146 self.old_sigint = signal.signal(signal.SIGINT, handler)
147 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
148 try:
149 self.old_sighup = signal.signal(signal.SIGHUP, handler)
150 except AttributeError:
151 pass
152
154 """Restore the signal handlers to their previous state (before the
155 call to _setup_sig_handler()."""
156
157 signal.signal(signal.SIGINT, self.old_sigint)
158 signal.signal(signal.SIGTERM, self.old_sigterm)
159 try:
160 signal.signal(signal.SIGHUP, self.old_sighup)
161 except AttributeError:
162 pass
163
165 """This class is used to execute tasks in series, and is more efficient
166 than Parallel, but is only appropriate for non-parallel builds. Only
167 one instance of this class should be in existence at a time.
168
169 This class is not thread safe.
170 """
171
173 """Create a new serial job given a taskmaster.
174
175 The taskmaster's next_task() method should return the next task
176 that needs to be executed, or None if there are no more tasks. The
177 taskmaster's executed() method will be called for each task when it
178 is successfully executed or failed() will be called if it failed to
179 execute (e.g. execute() raised an exception)."""
180
181 self.taskmaster = taskmaster
182 self.interrupted = InterruptState()
183
185 """Start the job. This will begin pulling tasks from the taskmaster
186 and executing them, and return when there are no more tasks. If a task
187 fails to execute (i.e. execute() raises an exception), then the job will
188 stop."""
189
190 while 1:
191 task = self.taskmaster.next_task()
192
193 if task is None:
194 break
195
196 try:
197 task.prepare()
198 if task.needs_execute():
199 task.execute()
200 except:
201 if self.interrupted():
202 try:
203 raise SCons.Errors.BuildError(
204 task.targets[0], errstr=interrupt_msg)
205 except:
206 task.exception_set()
207 else:
208 task.exception_set()
209
210
211
212 task.failed()
213 else:
214 task.executed()
215
216 task.postprocess()
217 self.taskmaster.cleanup()
218
219
220
221
222
223 try:
224 import Queue
225 import threading
226 except ImportError:
227 pass
228 else:
229 - class Worker(threading.Thread):
230 """A worker thread waits on a task to be posted to its request queue,
231 dequeues the task, executes it, and posts a tuple including the task
232 and a boolean indicating whether the task executed successfully. """
233
234 - def __init__(self, requestQueue, resultsQueue, interrupted):
235 threading.Thread.__init__(self)
236 self.setDaemon(1)
237 self.requestQueue = requestQueue
238 self.resultsQueue = resultsQueue
239 self.interrupted = interrupted
240 self.start()
241
243 while 1:
244 task = self.requestQueue.get()
245
246 if task is None:
247
248
249
250 break
251
252 try:
253 if self.interrupted():
254 raise SCons.Errors.BuildError(
255 task.targets[0], errstr=interrupt_msg)
256 task.execute()
257 except:
258 task.exception_set()
259 ok = False
260 else:
261 ok = True
262
263 self.resultsQueue.put((task, ok))
264
266 """This class is responsible for spawning and managing worker threads."""
267
268 - def __init__(self, num, stack_size, interrupted):
269 """Create the request and reply queues, and 'num' worker threads.
270
271 One must specify the stack size of the worker threads. The
272 stack size is specified in kilobytes.
273 """
274 self.requestQueue = Queue.Queue(0)
275 self.resultsQueue = Queue.Queue(0)
276
277 try:
278 prev_size = threading.stack_size(stack_size*1024)
279 except AttributeError, e:
280
281
282 if not explicit_stack_size is None:
283 msg = "Setting stack size is unsupported by this version of Python:\n " + \
284 e.args[0]
285 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
286 except ValueError, e:
287 msg = "Setting stack size failed:\n " + str(e)
288 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
289
290
291 self.workers = []
292 for _ in range(num):
293 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
294 self.workers.append(worker)
295
296
297
298 if 'prev_size' in locals().keys():
299 threading.stack_size(prev_size)
300
301 - def put(self, task):
302 """Put task into request queue."""
303 self.requestQueue.put(task)
304
306 """Remove and return a result tuple from the results queue."""
307 return self.resultsQueue.get()
308
310 self.resultsQueue.put((task, False))
311
313 """
314 Shuts down the thread pool, giving each worker thread a
315 chance to shut down gracefully.
316 """
317
318
319
320
321 for _ in self.workers:
322 self.requestQueue.put(None)
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337 for worker in self.workers:
338 worker.join(1.0)
339 self.workers = []
340
342 """This class is used to execute tasks in parallel, and is somewhat
343 less efficient than Serial, but is appropriate for parallel builds.
344
345 This class is thread safe.
346 """
347
348 - def __init__(self, taskmaster, num, stack_size):
349 """Create a new parallel job given a taskmaster.
350
351 The taskmaster's next_task() method should return the next
352 task that needs to be executed, or None if there are no more
353 tasks. The taskmaster's executed() method will be called
354 for each task when it is successfully executed or failed()
355 will be called if the task failed to execute (i.e. execute()
356 raised an exception).
357
358 Note: calls to taskmaster are serialized, but calls to
359 execute() on distinct tasks are not serialized, because
360 that is the whole point of parallel jobs: they can execute
361 multiple tasks simultaneously. """
362
363 self.taskmaster = taskmaster
364 self.interrupted = InterruptState()
365 self.tp = ThreadPool(num, stack_size, self.interrupted)
366
367 self.maxjobs = num
368
370 """Start the job. This will begin pulling tasks from the
371 taskmaster and executing them, and return when there are no
372 more tasks. If a task fails to execute (i.e. execute() raises
373 an exception), then the job will stop."""
374
375 jobs = 0
376
377 while 1:
378
379
380 while jobs < self.maxjobs:
381 task = self.taskmaster.next_task()
382 if task is None:
383 break
384
385 try:
386
387 task.prepare()
388 except:
389 task.exception_set()
390 task.failed()
391 task.postprocess()
392 else:
393 if task.needs_execute():
394
395 self.tp.put(task)
396 jobs = jobs + 1
397 else:
398 task.executed()
399 task.postprocess()
400
401 if not task and not jobs: break
402
403
404
405 while 1:
406 task, ok = self.tp.get()
407 jobs = jobs - 1
408
409 if ok:
410 task.executed()
411 else:
412 if self.interrupted():
413 try:
414 raise SCons.Errors.BuildError(
415 task.targets[0], errstr=interrupt_msg)
416 except:
417 task.exception_set()
418
419
420
421 task.failed()
422
423 task.postprocess()
424
425 if self.tp.resultsQueue.empty():
426 break
427
428 self.tp.cleanup()
429 self.taskmaster.cleanup()
430
431
432
433
434
435
436