1 """SCons.Job
2
3 This module defines the Serial and Parallel classes that execute tasks to
4 complete a build. The Jobs class provides a higher level interface to start,
5 stop, and wait on jobs.
6
7 """
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 __revision__ = "src/engine/SCons/Job.py 2014/09/27 12:51:43 garyo"
33
34 import SCons.compat
35
36 import os
37 import signal
38
39 import SCons.Errors
40
41
42
43
44
45
46
47
48
49 explicit_stack_size = None
50 default_stack_size = 256
51
52 interrupt_msg = 'Build interrupted.'
53
54
57 self.interrupted = False
58
60 self.interrupted = True
61
63 return self.interrupted
64
65
67 """An instance of this class initializes N jobs, and provides
68 methods for starting, stopping, and waiting on all N jobs.
69 """
70
72 """
73 create 'num' jobs using the given taskmaster.
74
75 If 'num' is 1 or less, then a serial job will be used,
76 otherwise a parallel job with 'num' worker threads will
77 be used.
78
79 The 'num_jobs' attribute will be set to the actual number of jobs
80 allocated. If more than one job is requested but the Parallel
81 class can't do it, it gets reset to 1. Wrapping interfaces that
82 care should check the value of 'num_jobs' after initialization.
83 """
84
85 self.job = None
86 if num > 1:
87 stack_size = explicit_stack_size
88 if stack_size is None:
89 stack_size = default_stack_size
90
91 try:
92 self.job = Parallel(taskmaster, num, stack_size)
93 self.num_jobs = num
94 except NameError:
95 pass
96 if self.job is None:
97 self.job = Serial(taskmaster)
98 self.num_jobs = 1
99
100 - def run(self, postfunc=lambda: None):
101 """Run the jobs.
102
103 postfunc() will be invoked after the jobs has run. It will be
104 invoked even if the jobs are interrupted by a keyboard
105 interrupt (well, in fact by a signal such as either SIGINT,
106 SIGTERM or SIGHUP). The execution of postfunc() is protected
107 against keyboard interrupts and is guaranteed to run to
108 completion."""
109 self._setup_sig_handler()
110 try:
111 self.job.start()
112 finally:
113 postfunc()
114 self._reset_sig_handler()
115
117 """Returns whether the jobs were interrupted by a signal."""
118 return self.job.interrupted()
119
121 """Setup an interrupt handler so that SCons can shutdown cleanly in
122 various conditions:
123
124 a) SIGINT: Keyboard interrupt
125 b) SIGTERM: kill or system shutdown
126 c) SIGHUP: Controlling shell exiting
127
128 We handle all of these cases by stopping the taskmaster. It
129 turns out that it very difficult to stop the build process
130 by throwing asynchronously an exception such as
131 KeyboardInterrupt. For example, the python Condition
132 variables (threading.Condition) and queue's do not seem to
133 asynchronous-exception-safe. It would require adding a whole
134 bunch of try/finally block and except KeyboardInterrupt all
135 over the place.
136
137 Note also that we have to be careful to handle the case when
138 SCons forks before executing another process. In that case, we
139 want the child to exit immediately.
140 """
141 def handler(signum, stack, self=self, parentpid=os.getpid()):
142 if os.getpid() == parentpid:
143 self.job.taskmaster.stop()
144 self.job.interrupted.set()
145 else:
146 os._exit(2)
147
148 self.old_sigint = signal.signal(signal.SIGINT, handler)
149 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
150 try:
151 self.old_sighup = signal.signal(signal.SIGHUP, handler)
152 except AttributeError:
153 pass
154
156 """Restore the signal handlers to their previous state (before the
157 call to _setup_sig_handler()."""
158
159 signal.signal(signal.SIGINT, self.old_sigint)
160 signal.signal(signal.SIGTERM, self.old_sigterm)
161 try:
162 signal.signal(signal.SIGHUP, self.old_sighup)
163 except AttributeError:
164 pass
165
167 """This class is used to execute tasks in series, and is more efficient
168 than Parallel, but is only appropriate for non-parallel builds. Only
169 one instance of this class should be in existence at a time.
170
171 This class is not thread safe.
172 """
173
175 """Create a new serial job given a taskmaster.
176
177 The taskmaster's next_task() method should return the next task
178 that needs to be executed, or None if there are no more tasks. The
179 taskmaster's executed() method will be called for each task when it
180 is successfully executed or failed() will be called if it failed to
181 execute (e.g. execute() raised an exception)."""
182
183 self.taskmaster = taskmaster
184 self.interrupted = InterruptState()
185
187 """Start the job. This will begin pulling tasks from the taskmaster
188 and executing them, and return when there are no more tasks. If a task
189 fails to execute (i.e. execute() raises an exception), then the job will
190 stop."""
191
192 while True:
193 task = self.taskmaster.next_task()
194
195 if task is None:
196 break
197
198 try:
199 task.prepare()
200 if task.needs_execute():
201 task.execute()
202 except:
203 if self.interrupted():
204 try:
205 raise SCons.Errors.BuildError(
206 task.targets[0], errstr=interrupt_msg)
207 except:
208 task.exception_set()
209 else:
210 task.exception_set()
211
212
213
214 task.failed()
215 else:
216 task.executed()
217
218 task.postprocess()
219 self.taskmaster.cleanup()
220
221
222
223
224
225 try:
226 import queue
227 import threading
228 except ImportError:
229 pass
230 else:
231 - class Worker(threading.Thread):
232 """A worker thread waits on a task to be posted to its request queue,
233 dequeues the task, executes it, and posts a tuple including the task
234 and a boolean indicating whether the task executed successfully. """
235
236 - def __init__(self, requestQueue, resultsQueue, interrupted):
237 threading.Thread.__init__(self)
238 self.setDaemon(1)
239 self.requestQueue = requestQueue
240 self.resultsQueue = resultsQueue
241 self.interrupted = interrupted
242 self.start()
243
245 while True:
246 task = self.requestQueue.get()
247
248 if task is None:
249
250
251
252 break
253
254 try:
255 if self.interrupted():
256 raise SCons.Errors.BuildError(
257 task.targets[0], errstr=interrupt_msg)
258 task.execute()
259 except:
260 task.exception_set()
261 ok = False
262 else:
263 ok = True
264
265 self.resultsQueue.put((task, ok))
266
268 """This class is responsible for spawning and managing worker threads."""
269
270 - def __init__(self, num, stack_size, interrupted):
271 """Create the request and reply queues, and 'num' worker threads.
272
273 One must specify the stack size of the worker threads. The
274 stack size is specified in kilobytes.
275 """
276 self.requestQueue = queue.Queue(0)
277 self.resultsQueue = queue.Queue(0)
278
279 try:
280 prev_size = threading.stack_size(stack_size*1024)
281 except AttributeError, e:
282
283
284 if not explicit_stack_size is None:
285 msg = "Setting stack size is unsupported by this version of Python:\n " + \
286 e.args[0]
287 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
288 except ValueError, e:
289 msg = "Setting stack size failed:\n " + str(e)
290 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
291
292
293 self.workers = []
294 for _ in range(num):
295 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
296 self.workers.append(worker)
297
298 if 'prev_size' in locals():
299 threading.stack_size(prev_size)
300
301 - def put(self, task):
302 """Put task into request queue."""
303 self.requestQueue.put(task)
304
306 """Remove and return a result tuple from the results queue."""
307 return self.resultsQueue.get()
308
310 self.resultsQueue.put((task, False))
311
313 """
314 Shuts down the thread pool, giving each worker thread a
315 chance to shut down gracefully.
316 """
317
318
319
320
321 for _ in self.workers:
322 self.requestQueue.put(None)
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337 for worker in self.workers:
338 worker.join(1.0)
339 self.workers = []
340
342 """This class is used to execute tasks in parallel, and is somewhat
343 less efficient than Serial, but is appropriate for parallel builds.
344
345 This class is thread safe.
346 """
347
348 - def __init__(self, taskmaster, num, stack_size):
349 """Create a new parallel job given a taskmaster.
350
351 The taskmaster's next_task() method should return the next
352 task that needs to be executed, or None if there are no more
353 tasks. The taskmaster's executed() method will be called
354 for each task when it is successfully executed or failed()
355 will be called if the task failed to execute (i.e. execute()
356 raised an exception).
357
358 Note: calls to taskmaster are serialized, but calls to
359 execute() on distinct tasks are not serialized, because
360 that is the whole point of parallel jobs: they can execute
361 multiple tasks simultaneously. """
362
363 self.taskmaster = taskmaster
364 self.interrupted = InterruptState()
365 self.tp = ThreadPool(num, stack_size, self.interrupted)
366
367 self.maxjobs = num
368
370 """Start the job. This will begin pulling tasks from the
371 taskmaster and executing them, and return when there are no
372 more tasks. If a task fails to execute (i.e. execute() raises
373 an exception), then the job will stop."""
374
375 jobs = 0
376
377 while True:
378
379
380 while jobs < self.maxjobs:
381 task = self.taskmaster.next_task()
382 if task is None:
383 break
384
385 try:
386
387 task.prepare()
388 except:
389 task.exception_set()
390 task.failed()
391 task.postprocess()
392 else:
393 if task.needs_execute():
394
395 self.tp.put(task)
396 jobs = jobs + 1
397 else:
398 task.executed()
399 task.postprocess()
400
401 if not task and not jobs: break
402
403
404
405 while True:
406 task, ok = self.tp.get()
407 jobs = jobs - 1
408
409 if ok:
410 task.executed()
411 else:
412 if self.interrupted():
413 try:
414 raise SCons.Errors.BuildError(
415 task.targets[0], errstr=interrupt_msg)
416 except:
417 task.exception_set()
418
419
420
421 task.failed()
422
423 task.postprocess()
424
425 if self.tp.resultsQueue.empty():
426 break
427
428 self.tp.cleanup()
429 self.taskmaster.cleanup()
430
431
432
433
434
435
436