1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell Task module.
37
38 Simple example of use:
39
40 from ClusterShell.Task import *
41
42 # get task associated with calling thread
43 task = task_self()
44
45 # add a command to execute on distant nodes
46 task.shell("/bin/uname -r", nodes="tiger[1-30,35]")
47
48 # run task in calling thread
49 task.resume()
50
51 # get results
52 for buf, nodelist in task.iter_buffers():
53 print NodeSet.fromlist(nodelist), buf
54
55 """
56
57 from Engine.Engine import EngineAbortException
58 from Engine.Engine import EngineTimeoutException
59 from Engine.Engine import EngineAlreadyRunningError
60 from Engine.Engine import EngineTimer
61 from Engine.Poll import EnginePoll
62 from Worker.Pdsh import WorkerPdsh
63 from Worker.Ssh import WorkerSsh
64 from Worker.Popen2 import WorkerPopen2
65
66 from MsgTree import MsgTreeElem
67 from NodeSet import NodeSet
68
69 from sets import Set
70 import thread
71
73 """
74 Base task exception.
75 """
76
78 """
79 Raised when the task timed out.
80 """
81
83 """
84 Raised when trying to resume an already running task.
85 """
87 return "current task already running"
88
89
91 """
92 Default task debug printing function. Cannot provide 'print'
93 directly as it is not a function (will be in Py3k!).
94 """
95 print s
96
97
99 """
100 Task to execute. May be bound to a specific thread.
101
102 To create a task in a new thread:
103 task = Task()
104
105 To create or get the instance of the task associated with the thread
106 identifier tid:
107 task = Task(thread_id=tid)
108
109 Add command to execute locally in task with:
110 task.shell("/bin/hostname")
111
112 Add command to execute in a distant node in task with:
113 task.shell("/bin/hostname", nodes="tiger[1-20]")
114
115 Run task in its associated thread (will block only if the calling
116 thread is the associated thread:
117 task.resume()
118 """
119
120 _default_info = { "debug" : False,
121 "print_debug" : _task_print_debug,
122 "fanout" : 32,
123 "connect_timeout" : 10,
124 "command_timeout" : 0 }
125 _tasks = {}
126
128 """
129 For task bound to a specific thread, this class acts like a
130 "thread singleton", so new style class is used and new object
131 are only instantiated if needed.
132 """
133 if thread_id:
134 if thread_id not in cls._tasks:
135 cls._tasks[thread_id] = object.__new__(cls)
136 return cls._tasks[thread_id]
137
138 return object.__new__(cls)
139
141 """
142 Initialize a Task, creating a new thread if needed.
143 """
144 if not getattr(self, "_engine", None):
145
146 self._info = self.__class__._default_info.copy()
147 self._engine = EnginePoll(self._info)
148 self.timeout = 0
149 self.l_run = None
150
151
152 self._msg_root = MsgTreeElem()
153
154 self._d_source_msg = {}
155
156 self._d_source_rc = {}
157
158 self._d_rc_sources = {}
159
160 self._max_rc = 0
161
162 self._timeout_sources = Set()
163
164
165 if not thread_id:
166 self.l_run = thread.allocate_lock()
167 self.l_run.acquire()
168 tid = thread.start_new_thread(Task._start_thread, (self,))
169 self._tasks[tid] = self
170
172 """New Task thread entry point."""
173 try:
174 while True:
175 self.l_run.acquire()
176 self._engine.run(self.timeout)
177 except:
178
179 raise
180
181 - def info(self, info_key, def_val=None):
182 """
183 Return per-task information.
184 """
185 return self._info.get(info_key, def_val)
186
188 """
189 Set task-specific information state.
190 """
191 self._info[info_key] = value
192
193 - def shell(self, command, **kwargs):
194 """
195 Schedule a shell command for local or distant execution.
196
197 Local usage:
198 task.shell(command [, key=key] [, handler=handler]
199 [, timeout=secs])
200
201 Distant usage:
202 task.shell(command, nodes=nodeset [, handler=handler]
203 [, timeout=secs])
204 """
205
206 handler = kwargs.get("handler", None)
207 timeo = kwargs.get("timeout", None)
208 ac = kwargs.get("autoclose", False)
209
210 if kwargs.get("nodes", None):
211 assert kwargs.get("key", None) is None, \
212 "'key' argument not supported for distant command"
213
214
215 worker = WorkerSsh(NodeSet(kwargs["nodes"]), handler=handler,
216 timeout=timeo, command=command, autoclose=ac)
217 else:
218
219 worker = WorkerPopen2(command, key=kwargs.get("key", None),
220 handler=handler, timeout=timeo, autoclose=ac)
221
222
223 self.schedule(worker)
224
225 return worker
226
227 - def copy(self, source, dest, nodes, **kwargs):
228 """
229 Copy local file to distant nodes.
230 """
231 assert nodes != None, "local copy not supported"
232
233 handler = kwargs.get("handler", None)
234 timeo = kwargs.get("timeout", None)
235
236
237 worker = WorkerSsh(nodes, source=source, dest=dest, handler=handler,
238 timeout=timeo)
239
240 self.schedule(worker)
241
242 return worker
243
244 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
245 """
246 Create task's timer.
247 """
248 assert fire >= 0.0, "timer's relative fire time must be a positive floating number"
249
250 timer = EngineTimer(fire, interval, autoclose, handler)
251 self._engine.add_timer(timer)
252 return timer
253
255 """
256 Schedule a worker for execution. Only useful for manually
257 instantiated workers.
258 """
259
260 worker._set_task(self)
261
262
263 for client in worker._engine_clients():
264 self._engine.add(client)
265
267 """
268 Resume task. If task is task_self(), workers are executed in
269 the calling thread so this method will block until workers have
270 finished. This is always the case for a single-threaded
271 application (eg. which doesn't create other Task() instance
272 than task_self()). Otherwise, the current thread doesn't block.
273 In that case, you may then want to call task_wait() to wait for
274 completion.
275 """
276 if self.l_run:
277 self.timeout = timeout
278 self.l_run.release()
279 else:
280 try:
281 self._reset()
282 self._engine.run(timeout)
283 except EngineTimeoutException:
284 raise TimeoutError()
285 except EngineAbortException, e:
286 self._terminate(e.kill)
287 except EngineAlreadyRunningError:
288 raise AlreadyRunningError()
289 except:
290 raise
291
292 - def abort(self, kill=False):
293 """
294 Abort a task. Aborting a task removes (and stops when needed)
295 all workers. If optional parameter kill is True, the task
296 object is unbound from the current thread, so calling
297 task_self() creates a new Task object.
298 """
299
300
301
302 assert task_self() == self, "Inter-task abort not implemented yet"
303
304
305 self._engine.abort(kill)
306
307
308 self._terminate(kill)
309
311 """
312 Abort completion subroutine.
313 """
314 self._reset()
315
316 if kill:
317 del self.__class__._tasks[thread.get_ident()]
318
320 """
321 Suspend execution of the calling thread until the target task
322 terminates, unless the target task has already terminated.
323 """
324 self._engine.join()
325
327 """
328 Reset buffers and retcodes management variables.
329 """
330 self._msg_root = MsgTreeElem()
331 self._d_source_msg = {}
332 self._d_source_rc = {}
333 self._d_rc_sources = {}
334 self._max_rc = 0
335 self._timeout_sources.clear()
336
338 """
339 Add a worker message associated with a source.
340 """
341
342 e_msg = self._d_source_msg.get(source)
343 if not e_msg:
344
345 e_msg = self._msg_root
346
347
348 self._d_source_msg[source] = e_msg.add_msg(source, msg)
349
350 - def _rc_set(self, source, rc, override=True):
351 """
352 Add a worker return code associated with a source.
353 """
354 if not override and self._d_source_rc.has_key(source):
355 return
356
357
358 self._d_source_rc[source] = rc
359
360
361 e = self._d_rc_sources.get(rc)
362 if e is None:
363 self._d_rc_sources[rc] = Set([source])
364 else:
365 self._d_rc_sources[rc].add(source)
366
367
368 if rc > self._max_rc:
369 self._max_rc = rc
370
372 """
373 Add a worker timeout associated with a source.
374 """
375
376 self._timeout_sources.add(source)
377
379 """
380 Get a message by its source (worker, key).
381 """
382 e_msg = self._d_source_msg.get(source)
383
384 if e_msg is None:
385 return None
386
387 return e_msg.message()
388
390 """
391 Return an iterator over stored messages for the given key.
392 """
393 for (w, k), e in self._d_source_msg.iteritems():
394 if k == key:
395 yield e.message()
396
398 """
399 Return an iterator over messages and keys list for a specific
400 worker and optional matching keys.
401 """
402 if match_keys:
403 for e in self._msg_root:
404 keys = [t[1] for t in e.sources if t[0] is worker and t[1] in match_keys]
405 if len(keys) > 0:
406 yield e.message(), keys
407 else:
408 for e in self._msg_root:
409 keys = [t[1] for t in e.sources if t[0] is worker]
410 if len(keys) > 0:
411 yield e.message(), keys
412
414 """
415 Return an iterator over key, message for a specific worker.
416 """
417 for (w, k), e in self._d_source_msg.iteritems():
418 if w is worker:
419 yield k, e.message()
420
422 """
423 Get a return code by its source (worker, key).
424 """
425 return self._d_source_rc.get(source, 0)
426
428 """
429 Return an iterator over return codes for the given key.
430 """
431 for (w, k), rc in self._d_source_rc.iteritems():
432 if k == key:
433 yield rc
434
436 """
437 Return an iterator over return codes and keys list for a
438 specific worker and optional matching keys.
439 """
440 if match_keys:
441
442 for rc, src in self._d_rc_sources.iteritems():
443 keys = [t[1] for t in src if t[0] is worker and t[1] in match_keys]
444 if len(keys) > 0:
445 yield rc, keys
446 else:
447 for rc, src in self._d_rc_sources.iteritems():
448 keys = [t[1] for t in src if t[0] is worker]
449 if len(keys) > 0:
450 yield rc, keys
451
453 """
454 Return an iterator over key, rc for a specific worker.
455 """
456 for rc, src in self._d_rc_sources.iteritems():
457 for w, k in src:
458 if w is worker:
459 yield k, rc
460
462 """
463 Return the number of timed out "keys" for a specific worker.
464 """
465 cnt = 0
466 for (w, k) in self._timeout_sources:
467 if w is worker:
468 cnt += 1
469 return cnt
470
472 """
473 Iterate over timed out keys (ie. nodes) for a specific worker.
474 """
475 for (w, k) in self._timeout_sources:
476 if w is worker:
477 yield k
478
480 """
481 Get buffer for a specific key. When the key is associated
482 to multiple workers, the resulting buffer will contain
483 all workers content that may overlap.
484 """
485 return "".join(self._msg_iter_by_key(key))
486
487 node_buffer = key_buffer
488
490 """
491 Return return code for a specific key. When the key is
492 associated to multiple workers, return the max return
493 code from these workers.
494 """
495 return max(self._rc_iter_by_key(key))
496
497 node_retcode = key_retcode
498
500 """
501 Get max return code encountered during last run.
502
503 How retcodes work:
504 If the process exits normally, the return code is its exit
505 status. If the process is terminated by a signal, the return
506 code is 128 + signal number.
507 """
508 return self._max_rc
509
511 """
512 Iterate over buffers, returns a tuple (buffer, keys). For remote
513 workers (Ssh), keys are list of nodes. In that case, you should use
514 NodeSet.fromlist(keys) to get a NodeSet instance (which is more
515 convenient and efficient):
516
517 Optional parameter match_keys add filtering on these keys.
518
519 Usage example:
520
521 for buffer, nodelist in task.iter_buffers():
522 print NodeSet.fromlist(nodelist)
523 print buffer
524 """
525 if match_keys:
526 for e in self._msg_root:
527 keys = [t[1] for t in e.sources if t[1] in match_keys]
528 if keys:
529 yield e.message(), keys
530 else:
531 for e in self._msg_root:
532 yield e.message(), [t[1] for t in e.sources]
533
535 """
536 Iterate over return codes, returns a tuple (rc, keys).
537
538 Optional parameter match_keys add filtering on these keys.
539
540 How retcodes work:
541 If the process exits normally, the return code is its exit
542 status. If the process is terminated by a signal, the return
543 code is 128 + signal number.
544 """
545 if match_keys:
546
547 for rc, src in self._d_rc_sources.iteritems():
548 keys = [t[1] for t in src if t[1] in match_keys]
549 yield rc, keys
550 else:
551 for rc, src in self._d_rc_sources.iteritems():
552 yield rc, [t[1] for t in src]
553
555 """
556 Return the number of timed out "keys" (ie. nodes).
557 """
558 return len(self._timeout_sources)
559
561 """
562 Iterate over timed out keys (ie. nodes).
563 """
564 for (w, k) in self._timeout_sources:
565 yield k
566
567 - def wait(cls, from_thread_id):
568 """
569 Class method that blocks calling thread until all tasks have
570 finished.
571 """
572 for thread_id, task in Task._tasks.iteritems():
573 if thread_id != from_thread_id:
574 task.join()
575 wait = classmethod(wait)
576
577
579 """
580 Get the Task instance bound to the current thread. This function
581 provided as a convenience is available in the top-level
582 ClusterShell.Task package namespace.
583 """
584 return Task(thread_id=thread.get_ident())
585
587 """
588 Suspend execution of the calling thread until all tasks terminate,
589 unless all tasks have already terminated. This function is provided
590 as a convenience and is available in the top-level
591 ClusterShell.Task package namespace.
592 """
593 Task.wait(thread.get_ident())
594