1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell Task module.
37
38 Simple example of use:
39
40 >>> from ClusterShell.Task import *
41 >>>
42 >>> # get task associated with calling thread
43 ... task = task_self()
44 >>>
45 >>> # add a command to execute on distant nodes
46 ... task.shell("/bin/uname -r", nodes="tiger[1-30,35]")
47 <ClusterShell.Worker.Ssh.WorkerSsh object at 0x7f41da71b890>
48 >>>
49 >>> # run task in calling thread
50 ... task.resume()
51 >>>
52 >>> # get results
53 ... for buf, nodelist in task.iter_buffers():
54 ... print NodeSet.fromlist(nodelist), buf
55 ...
56
57 """
58
59 from itertools import imap
60 from operator import itemgetter
61 import sys
62 import threading
63 import traceback
64
65 from ClusterShell.Engine.Engine import EngineAbortException
66 from ClusterShell.Engine.Engine import EngineTimeoutException
67 from ClusterShell.Engine.Engine import EngineAlreadyRunningError
68 from ClusterShell.Engine.Engine import EngineTimer
69 from ClusterShell.Engine.Factory import PreferredEngine
70 from ClusterShell.Worker.EngineClient import EnginePort
71 from ClusterShell.Worker.Ssh import WorkerSsh
72 from ClusterShell.Worker.Popen import WorkerPopen
73
74 from ClusterShell.Event import EventHandler
75 from ClusterShell.MsgTree import MsgTree
76 from ClusterShell.NodeSet import NodeSet
80 """Base task exception."""
81
83 """Base task error exception."""
84
86 """Raised when the task timed out."""
87
89 """Raised when trying to resume an already running task."""
90
92 """Raised when trying to access disabled MsgTree."""
93
96 """
97 Task special MsgTree wrapper class, for easy disabling of MsgTree
98 buffering. This class checks if task.default(keyword) is set before
99 effective MsgTree attribute lookup, according to following rules:
100 - If set, allow all MsgTree methods, else:
101 - ignore add() calls
102 - disallow MsgTree methods except clear()
103 """
105 self._task = task
106 self._keyword = keyword
107 self._tree = MsgTree()
108
110
111 if name != 'clear' and not self._task.default(self._keyword):
112
113 if name == 'add':
114 return lambda *args: None
115
116 raise TaskMsgTreeError("%s not set" % self._keyword)
117
118 return getattr(self._tree, name)
119
122 """
123 Default task debug printing function. Cannot provide 'print'
124 directly as it is not a function (will be in Py3k!).
125 """
126 print s
127
128
129 -class Task(object):
130 """
131 Always bound to a thread, the Task class allows you to execute
132 commands in parallel and get their results.
133
134 To create a task in a new thread:
135 >>> task = Task()
136
137 To create or get the instance of the task associated with the
138 thread object thr (threading.Thread):
139 >>> task = Task(thread=thr)
140
141 Add a command to execute locally within task with:
142 >>> task.shell("/bin/hostname")
143
144 Add a command to execute to a distant node within task with:
145 >>> task.shell("/bin/hostname", nodes="tiger[1-20]")
146
147 Run task in its associated thread (will block only if the calling
148 thread is the task associated thread):
149 >>> task.resume()
150 """
151 _std_default = { "stderr" : False,
152 "stdout_msgtree" : True,
153 "stderr_msgtree" : True,
154 "engine" : 'auto',
155 "port_qlimit" : 32 }
156
157 _std_info = { "debug" : False,
158 "print_debug" : _task_print_debug,
159 "fanout" : 64,
160 "connect_timeout" : 10,
161 "command_timeout" : 0 }
162 _tasks = {}
163 _taskid_max = 0
164 _task_lock = threading.Lock()
165
167 """Special task control port event handler.
168 When a message is received on the port, call appropriate
169 task method."""
171 """Message received: call appropriate task method."""
172
173 func, (args, kwargs) = msg[0], msg[1:]
174
175 func(port.task, *args, **kwargs)
176
178 """Class encapsulating a function that checks if the calling
179 task is running or is the current task, and allowing it to be
180 used as a decorator making the wrapped task method thread-safe."""
181
183 def taskfunc(*args, **kwargs):
184
185 task, fargs = args[0], args[1:]
186
187 if task._is_task_self():
188 return f(task, *fargs, **kwargs)
189 elif task._dispatch_port:
190
191
192 task._dispatch_port.msg_send((f, fargs, kwargs))
193 else:
194 task.info("print_debug")(task, "%s: dropped call: %s" % \
195 (task, fargs))
196
197
198
199 taskfunc.__name__ = f.__name__
200 taskfunc.__doc__ = f.__doc__
201 taskfunc.__dict__ = f.__dict__
202 taskfunc.__module__ = f.__module__
203 return taskfunc
204
206 """Special class to manage task suspend condition."""
207 - def __init__(self, lock=threading.RLock(), initial=0):
208 self._cond = threading.Condition(lock)
209 self.suspend_count = initial
210
212 """Increase suspend count."""
213 self._cond.acquire()
214 self.suspend_count += 1
215 self._cond.release()
216
218 """Decrease suspend count."""
219 self._cond.acquire()
220 self.suspend_count -= 1
221 self._cond.release()
222
224 """Wait for condition if needed."""
225 self._cond.acquire()
226 try:
227 if self.suspend_count > 0:
228 if release_lock:
229 release_lock.release()
230 self._cond.wait()
231 finally:
232 self._cond.release()
233
235 """Signal all threads waiting for condition."""
236 self._cond.acquire()
237 try:
238 self.suspend_count = min(self.suspend_count, 0)
239 self._cond.notifyAll()
240 finally:
241 self._cond.release()
242
243
245 """
246 For task bound to a specific thread, this class acts like a
247 "thread singleton", so new style class is used and new object
248 are only instantiated if needed.
249 """
250 if thread:
251 if thread not in cls._tasks:
252 cls._tasks[thread] = object.__new__(cls)
253 return cls._tasks[thread]
254
255 return object.__new__(cls)
256
258 """
259 Initialize a Task, creating a new thread if needed.
260 """
261 if not getattr(self, "_engine", None):
262
263 self._default_lock = threading.Lock()
264 self._default = self.__class__._std_default.copy()
265 self._info = self.__class__._std_info.copy()
266
267
268
269 self._engine = PreferredEngine(self.default("engine"), self._info)
270 self.timeout = 0
271
272
273 self._run_lock = threading.Lock()
274 self._suspend_lock = threading.RLock()
275
276 self._suspend_cond = Task._SuspendCondition(self._suspend_lock, 1)
277 self._join_cond = threading.Condition(self._suspend_lock)
278 self._suspended = False
279 self._quit = False
280
281
282 self._msgtree = _TaskMsgTree(self, "stdout_msgtree")
283
284
285 self._errtree = _TaskMsgTree(self, "stderr_msgtree")
286
287
288 self._d_source_rc = {}
289
290 self._d_rc_sources = {}
291
292 self._max_rc = 0
293
294 self._timeout_sources = set()
295
296
297 self._dispatch_port = EnginePort(self,
298 handler=Task._SyncMsgHandler(),
299 autoclose=True)
300 self._engine.add(self._dispatch_port)
301
302
303 Task._task_lock.acquire()
304 Task._taskid_max += 1
305 self._taskid = Task._taskid_max
306 Task._task_lock.release()
307
308
309 self._thread_foreign = bool(thread)
310 if self._thread_foreign:
311 self.thread = thread
312 else:
313 self.thread = thread = \
314 threading.Thread(None,
315 Task._thread_start,
316 "Task-%d" % self._taskid,
317 args=(self,))
318 Task._tasks[thread] = self
319 thread.start()
320
322 """Private method used by the library to check if the task is
323 task_self(), but do not create any task_self() instance."""
324 return self.thread == threading.currentThread()
325
327 """Default excepthook for a newly Task. When an exception is
328 raised and uncaught on Task thread, excepthook is called, which
329 is default_excepthook by default. Once excepthook overriden,
330 you can still call default_excepthook if needed."""
331 print >> sys.stderr, 'Exception in thread %s:' % self.thread
332 traceback.print_exception(type, value, tb, file=sys.stderr)
333
334 _excepthook = default_excepthook
335
337 return self._excepthook
338
340 self._excepthook = hook
341
342
343 if self._thread_foreign:
344 sys.excepthook = self._excepthook
345
346
347
348
349
350 excepthook = property(_getexcepthook, _setexcepthook)
351
353 """Task-managed thread entry point"""
354 while not self._quit:
355 self._suspend_cond.wait_check()
356 if self._quit:
357 break
358 try:
359 self._resume()
360 except:
361 self.excepthook(*sys.exc_info())
362 self._quit = True
363
364 self._terminate(kill=True)
365
366 - def _run(self, timeout):
367 """Run task (always called from its self thread)."""
368
369 if self._run_lock.locked():
370 raise AlreadyRunningError("task is already running")
371
372 try:
373 self._run_lock.acquire()
374 self._engine.run(timeout)
375 finally:
376 self._run_lock.release()
377
378 - def default(self, default_key, def_val=None):
379 """
380 Return per-task value for key from the "default" dictionary.
381 See set_default() for a list of reserved task default_keys.
382 """
383 self._default_lock.acquire()
384 try:
385 return self._default.get(default_key, def_val)
386 finally:
387 self._default_lock.release()
388
390 """
391 Set task value for specified key in the dictionary "default".
392 Users may store their own task-specific key, value pairs
393 using this method and retrieve them with default().
394
395 Task default_keys are:
396 - "stderr": Boolean value indicating whether to enable
397 stdout/stderr separation when using task.shell(), if not
398 specified explicitly (default: False).
399 - "stdout_msgtree": Whether to enable standard output MsgTree
400 for automatic internal gathering of result messages
401 (default: True).
402 - "stderr_msgtree": Same for stderr (default: True).
403 - "engine": Used to specify an underlying Engine explicitly
404 (default: "auto").
405 - "port_qlimit": Size of port messages queue (default: 32).
406
407 Threading considerations
408 ========================
409 Unlike set_info(), when called from the task's thread or
410 not, set_default() immediately updates the underlying
411 dictionary in a thread-safe manner. This method doesn't
412 wake up the engine when called.
413 """
414 self._default_lock.acquire()
415 try:
416 self._default[default_key] = value
417 finally:
418 self._default_lock.release()
419
420 - def info(self, info_key, def_val=None):
421 """
422 Return per-task information. See set_info() for a list of
423 reserved task info_keys.
424 """
425 return self._info.get(info_key, def_val)
426
427 @tasksyncmethod()
429 """
430 Set task value for a specific key information. Key, value
431 pairs can be passed to the engine and/or workers.
432 Users may store their own task-specific info key, value pairs
433 using this method and retrieve them with info().
434
435 Task info_keys are:
436 - "debug": Boolean value indicating whether to enable library
437 debugging messages (default: False).
438 - "print_debug": Debug messages processing function. This
439 function takes 2 arguments: the task instance and the
440 message string (default: an internal function doing standard
441 print).
442 - "fanout": Max number of registered clients in Engine at a
443 time (default: 64).
444 - "connect_timeout": Time in seconds to wait for connecting to
445 remote host before aborting (default: 10).
446 - "command_timeout": Time in seconds to wait for a command to
447 complete before aborting (default: 0, which means
448 unlimited).
449
450 Threading considerations
451 ========================
452 Unlike set_default(), the underlying info dictionary is only
453 modified from the task's thread. So calling set_info() from
454 another thread leads to queueing the request for late apply
455 (at run time) using the task dispatch port. When received,
456 the request wakes up the engine when the task is running and
457 the info dictionary is then updated.
458 """
459 self._info[info_key] = value
460
461 - def shell(self, command, **kwargs):
462 """
463 Schedule a shell command for local or distant execution.
464
465 Local usage::
466 task.shell(command [, key=key] [, handler=handler]
467 [, timeout=secs] [, autoclose=enable_autoclose]
468 [, stderr=enable_stderr])
469
470 Distant usage::
471 task.shell(command, nodes=nodeset [, handler=handler]
472 [, timeout=secs], [, autoclose=enable_autoclose]
473 [, strderr=enable_stderr])
474 """
475
476 handler = kwargs.get("handler", None)
477 timeo = kwargs.get("timeout", None)
478 ac = kwargs.get("autoclose", False)
479 stderr = kwargs.get("stderr", self.default("stderr"))
480
481 if kwargs.get("nodes", None):
482 assert kwargs.get("key", None) is None, \
483 "'key' argument not supported for distant command"
484
485
486 worker = WorkerSsh(NodeSet(kwargs["nodes"]), command=command,
487 handler=handler, stderr=stderr, timeout=timeo,
488 autoclose=ac)
489 else:
490
491 worker = WorkerPopen(command, key=kwargs.get("key", None),
492 handler=handler, stderr=stderr,
493 timeout=timeo, autoclose=ac)
494
495
496 self.schedule(worker)
497
498 return worker
499
500 - def copy(self, source, dest, nodes, **kwargs):
501 """
502 Copy local file to distant nodes.
503 """
504 assert nodes != None, "local copy not supported"
505
506 handler = kwargs.get("handler", None)
507 timeo = kwargs.get("timeout", None)
508 preserve = kwargs.get("preserve", None)
509 stderr = kwargs.get("stderr", self.default("stderr"))
510
511
512 worker = WorkerSsh(nodes, source=source, dest=dest, handler=handler,
513 stderr=stderr, timeout=timeo, preserve=preserve)
514
515 self.schedule(worker)
516 return worker
517
518 @tasksyncmethod()
520 """Add an EnginePort instance to Engine (private method)."""
521 self._engine.add(port)
522
523 @tasksyncmethod()
525 """Remove a port from Engine (private method)."""
526 self._engine.remove(port)
527
528 - def port(self, handler=None, autoclose=False):
529 """
530 Create a new task port. A task port is an abstraction object to
531 deliver messages reliably between tasks.
532
533 Basic rules:
534 - A task can send messages to another task port (thread safe).
535 - A task can receive messages from an acquired port either by
536 setting up a notification mechanism or using a polling
537 mechanism that may block the task waiting for a message
538 sent on the port.
539 - A port can be acquired by one task only.
540
541 If handler is set to a valid EventHandler object, the port is
542 a send-once port, ie. a message sent to this port generates an
543 ev_msg event notification issued the port's task. If handler
544 is not set, the task can only receive messages on the port by
545 calling port.msg_recv().
546 """
547 port = EnginePort(self, handler, autoclose)
548 self._add_port(port)
549 return port
550
551 @tasksyncmethod()
552 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
553 """
554 Create task's timer.
555 """
556 assert fire >= 0.0, \
557 "timer's relative fire time must be a positive floating number"
558
559 timer = EngineTimer(fire, interval, autoclose, handler)
560 self._engine.add_timer(timer)
561 return timer
562
563 @tasksyncmethod()
565 """
566 Schedule a worker for execution. Only useful for manually
567 instantiated workers.
568 """
569 assert self in Task._tasks.values(), "deleted task"
570
571
572 worker._set_task(self)
573
574
575 for client in worker._engine_clients():
576 self._engine.add(client)
577
579 """Resume called from another thread."""
580 self._suspend_cond.notify_all()
581
600
602 """
603 Resume task. If task is task_self(), workers are executed in
604 the calling thread so this method will block until workers have
605 finished. This is always the case for a single-threaded
606 application (eg. which doesn't create other Task() instance
607 than task_self()). Otherwise, the current thread doesn't block.
608 In that case, you may then want to call task_wait() to wait for
609 completion.
610 """
611 self.timeout = timeout
612
613 self._suspend_cond.atomic_dec()
614
615 if self._is_task_self():
616 self._resume()
617 else:
618 self._resume_thread()
619
620 @tasksyncmethod()
622 assert task_self() == self
623
624 self._suspend_lock.acquire()
625 self._suspended = True
626 self._suspend_lock.release()
627
628
629 self._suspend_cond.wait_check(self._run_lock)
630
631
632 self._suspend_lock.acquire()
633 self._suspended = False
634 self._suspend_lock.release()
635
637 """
638 Suspend task execution. This method may be called from another
639 task (thread-safe). The function returns False if the task
640 cannot be suspended (eg. it's not running), or returns True if
641 the task has been successfully suspended.
642 To resume a suspended task, use task.resume().
643 """
644
645 self._suspend_cond.atomic_inc()
646
647
648 self._suspend_wait()
649
650
651 self._run_lock.acquire()
652
653
654 result = True
655 self._suspend_lock.acquire()
656 if not self._suspended:
657
658 result = False
659 self._run_lock.release()
660 self._suspend_lock.release()
661 return result
662
663 @tasksyncmethod()
664 - def _abort(self, kill=False):
668
669 - def abort(self, kill=False):
670 """
671 Abort a task. Aborting a task removes (and stops when needed)
672 all workers. If optional parameter kill is True, the task
673 object is unbound from the current thread, so calling
674 task_self() creates a new Task object.
675 """
676 if self._run_lock.acquire(0):
677 self._quit = True
678 self._run_lock.release()
679 if self._is_task_self():
680 self._terminate(kill)
681 else:
682
683 self.resume()
684 else:
685
686 self._abort(kill)
687
689 """
690 Abort completion subroutine.
691 """
692 if kill:
693
694 self._dispatch_port = None
695
696 self._engine.clear()
697
698
699 self._reset()
700
701
702 if kill:
703 Task._task_lock.acquire()
704 try:
705 del Task._tasks[threading.currentThread()]
706 finally:
707 Task._task_lock.release()
708
710 """
711 Suspend execution of the calling thread until the target task
712 terminates, unless the target task has already terminated.
713 """
714 self._join_cond.acquire()
715 try:
716 if self._suspend_cond.suspend_count > 0:
717 if not self._suspended:
718
719 return
720 self._join_cond.wait()
721 finally:
722 self._join_cond.release()
723
725 """
726 Return True if the task is running.
727 """
728 return self._engine.running
729
731 """
732 Reset buffers and retcodes management variables.
733 """
734 self._msgtree.clear()
735 self._errtree.clear()
736 self._d_source_rc = {}
737 self._d_rc_sources = {}
738 self._max_rc = 0
739 self._timeout_sources.clear()
740
742 """
743 Add a worker message associated with a source.
744 """
745 self._msgtree.add(source, msg)
746
748 """
749 Add a worker error message associated with a source.
750 """
751 self._errtree.add(source, msg)
752
753 - def _rc_set(self, source, rc, override=True):
754 """
755 Add a worker return code associated with a source.
756 """
757 if not override and self._d_source_rc.has_key(source):
758 return
759
760
761 self._d_source_rc[source] = rc
762
763
764 e = self._d_rc_sources.get(rc)
765 if e is None:
766 self._d_rc_sources[rc] = set([source])
767 else:
768 self._d_rc_sources[rc].add(source)
769
770
771 if rc > self._max_rc:
772 self._max_rc = rc
773
775 """
776 Add a worker timeout associated with a source.
777 """
778
779 self._timeout_sources.add(source)
780
782 """
783 Get a message by its source (worker, key).
784 """
785 s = self._msgtree.get(source)
786 if s is None:
787 return None
788 return str(s)
789
791 """
792 Get an error message by its source (worker, key).
793 """
794 s = self._errtree.get(source)
795 if s is None:
796 return None
797 return str(s)
798
800 """Call identified tree matcher (items, walk) method with options."""
801
802 if worker and not match_keys:
803 match = lambda k: k[0] is worker
804 elif worker and match_keys:
805 match = lambda k: k[0] is worker and k[1] in match_keys
806 elif match_keys:
807 match = lambda k: k[1] in match_keys
808 else:
809 match = None
810
811 return tree_match_func(match, itemgetter(1))
812
814 """
815 Get a return code by its source (worker, key).
816 """
817 return self._d_source_rc[source]
818
820 """
821 Return an iterator over return codes for the given key.
822 """
823 for (w, k), rc in self._d_source_rc.iteritems():
824 if k == key:
825 yield rc
826
828 """
829 Return an iterator over return codes and keys list for a
830 specific worker and optional matching keys.
831 """
832 if match_keys:
833
834 for rc, src in self._d_rc_sources.iteritems():
835 keys = [t[1] for t in src if t[0] is worker and \
836 t[1] in match_keys]
837 if len(keys) > 0:
838 yield rc, keys
839 else:
840 for rc, src in self._d_rc_sources.iteritems():
841 keys = [t[1] for t in src if t[0] is worker]
842 if len(keys) > 0:
843 yield rc, keys
844
846 """
847 Return an iterator over key, rc for a specific worker.
848 """
849 for rc, src in self._d_rc_sources.iteritems():
850 for w, k in src:
851 if w is worker:
852 yield k, rc
853
855 """
856 Return the number of timed out "keys" for a specific worker.
857 """
858 cnt = 0
859 for (w, k) in self._timeout_sources:
860 if w is worker:
861 cnt += 1
862 return cnt
863
865 """
866 Iterate over timed out keys (ie. nodes) for a specific worker.
867 """
868 for (w, k) in self._timeout_sources:
869 if w is worker:
870 yield k
871
873 """
874 Remove any messages from specified worker.
875 """
876 self._msgtree.remove(lambda k: k[0] == worker)
877
879 """
880 Remove any error messages from specified worker.
881 """
882 self._errtree.remove(lambda k: k[0] == worker)
883
885 """
886 Get buffer for a specific key. When the key is associated
887 to multiple workers, the resulting buffer will contain
888 all workers content that may overlap. This method returns an
889 empty buffer if key is not found in any workers.
890 """
891 select_key = lambda k: k[1] == key
892 return "".join(imap(str, self._msgtree.messages(select_key)))
893
894 node_buffer = key_buffer
895
897 """
898 Get error buffer for a specific key. When the key is associated
899 to multiple workers, the resulting buffer will contain all
900 workers content that may overlap. This method returns an empty
901 error buffer if key is not found in any workers.
902 """
903 select_key = lambda k: k[1] == key
904 return "".join(imap(str, self._errtree.messages(select_key)))
905
906 node_error = key_error
907
909 """
910 Return return code for a specific key. When the key is
911 associated to multiple workers, return the max return
912 code from these workers. Raises a KeyError if key is not found
913 in any finished workers.
914 """
915 codes = list(self._rc_iter_by_key(key))
916 if not codes:
917 raise KeyError(key)
918 return max(codes)
919
920 node_retcode = key_retcode
921
923 """
924 Get max return code encountered during last run.
925
926 How retcodes work
927 =================
928 If the process exits normally, the return code is its exit
929 status. If the process is terminated by a signal, the return
930 code is 128 + signal number.
931 """
932 return self._max_rc
933
935 """
936 Iterate over buffers, returns a tuple (buffer, keys). For remote
937 workers (Ssh), keys are list of nodes. In that case, you should use
938 NodeSet.fromlist(keys) to get a NodeSet instance (which is more
939 convenient and efficient):
940
941 Optional parameter match_keys add filtering on these keys.
942
943 Usage example:
944
945 >>> for buffer, nodelist in task.iter_buffers():
946 ... print NodeSet.fromlist(nodelist)
947 ... print buffer
948 """
949 return self._call_tree_matcher(self._msgtree.walk, match_keys)
950
952 """
953 Iterate over error buffers, returns a tuple (buffer, keys).
954
955 See iter_buffers().
956 """
957 return self._call_tree_matcher(self._errtree.walk, match_keys)
958
960 """
961 Iterate over return codes, returns a tuple (rc, keys).
962
963 Optional parameter match_keys add filtering on these keys.
964
965 How retcodes work
966 =================
967 If the process exits normally, the return code is its exit
968 status. If the process is terminated by a signal, the return
969 code is 128 + signal number.
970 """
971 if match_keys:
972
973 for rc, src in self._d_rc_sources.iteritems():
974 keys = [t[1] for t in src if t[1] in match_keys]
975 yield rc, keys
976 else:
977 for rc, src in self._d_rc_sources.iteritems():
978 yield rc, [t[1] for t in src]
979
981 """
982 Return the number of timed out "keys" (ie. nodes).
983 """
984 return len(self._timeout_sources)
985
987 """
988 Iterate over timed out keys (ie. nodes).
989 """
990 for (w, k) in self._timeout_sources:
991 yield k
992
994 """
995 Flush all task messages (from all task workers).
996 """
997 self._msgtree.clear()
998
1000 """
1001 Flush all task error messages (from all task workers).
1002 """
1003 self._errtree.clear()
1004
1005 @classmethod
1006 - def wait(cls, from_thread):
1007 """
1008 Class method that blocks calling thread until all tasks have
1009 finished (from a ClusterShell point of view, for instance,
1010 their task.resume() return). It doesn't necessarly mean that
1011 associated threads have finished.
1012 """
1013 Task._task_lock.acquire()
1014 try:
1015 tasks = Task._tasks.copy()
1016 finally:
1017 Task._task_lock.release()
1018 for thread, task in tasks.iteritems():
1019 if thread != from_thread:
1020 task.join()
1021
1024 """
1025 Get the Task instance bound to the current thread. This function
1026 provided as a convenience is available in the top-level
1027 ClusterShell.Task package namespace.
1028 """
1029 return Task(thread=threading.currentThread())
1030
1032 """
1033 Suspend execution of the calling thread until all tasks terminate,
1034 unless all tasks have already terminated. This function is provided
1035 as a convenience and is available in the top-level
1036 ClusterShell.Task package namespace.
1037 """
1038 Task.wait(threading.currentThread())
1039
1041 """
1042 Destroy the Task instance bound to the current thread. A next call
1043 to task_self() will create a new Task object. This function provided
1044 as a convenience is available in the top-level ClusterShell.Task
1045 package namespace.
1046 """
1047 task_self().abort(kill=True)
1048
1050 """
1051 Cleanup routine to destroy all created tasks. This function
1052 provided as a convenience is available in the top-level
1053 ClusterShell.Task package namespace.
1054 """
1055 Task._task_lock.acquire()
1056 try:
1057 tasks = Task._tasks.copy()
1058 finally:
1059 Task._task_lock.release()
1060 for task in tasks.itervalues():
1061 task.abort(kill=True)
1062