Package ClusterShell :: Module Task
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Task

   1  # 
   2  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
   3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
   4  # 
   5  # This file is part of the ClusterShell library. 
   6  # 
   7  # This software is governed by the CeCILL-C license under French law and 
   8  # abiding by the rules of distribution of free software.  You can  use, 
   9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
  10  # license as circulated by CEA, CNRS and INRIA at the following URL 
  11  # "http://www.cecill.info". 
  12  # 
  13  # As a counterpart to the access to the source code and  rights to copy, 
  14  # modify and redistribute granted by the license, users are provided only 
  15  # with a limited warranty  and the software's author,  the holder of the 
  16  # economic rights,  and the successive licensors  have only  limited 
  17  # liability. 
  18  # 
  19  # In this respect, the user's attention is drawn to the risks associated 
  20  # with loading,  using,  modifying and/or developing or reproducing the 
  21  # software by the user in light of its specific status of free software, 
  22  # that may mean  that it is complicated to manipulate,  and  that  also 
  23  # therefore means  that it is reserved for developers  and  experienced 
  24  # professionals having in-depth computer knowledge. Users are therefore 
  25  # encouraged to load and test the software's suitability as regards their 
  26  # requirements in conditions enabling the security of their systems and/or 
  27  # data to be ensured and,  more generally, to use and operate it in the 
  28  # same conditions as regards security. 
  29  # 
  30  # The fact that you are presently reading this means that you have had 
  31  # knowledge of the CeCILL-C license and that you accept its terms. 
  32  # 
  33  # $Id: Task.py 295 2010-07-21 19:55:15Z st-cea $ 
  34   
  35  """ 
  36  ClusterShell Task module. 
  37   
  38  Simple example of use: 
  39   
  40  >>> from ClusterShell.Task import * 
  41  >>>   
  42  >>> # get task associated with calling thread 
  43  ... task = task_self() 
  44  >>>  
  45  >>> # add a command to execute on distant nodes 
  46  ... task.shell("/bin/uname -r", nodes="tiger[1-30,35]") 
  47  <ClusterShell.Worker.Ssh.WorkerSsh object at 0x7f41da71b890> 
  48  >>>  
  49  >>> # run task in calling thread 
  50  ... task.resume() 
  51  >>>  
  52  >>> # get results 
  53  ... for buf, nodelist in task.iter_buffers(): 
  54  ...     print NodeSet.fromlist(nodelist), buf 
  55  ...  
  56   
  57  """ 
  58   
  59  from itertools import imap 
  60  from operator import itemgetter 
  61  import sys 
  62  import threading 
  63  import traceback 
  64   
  65  from ClusterShell.Engine.Engine import EngineAbortException 
  66  from ClusterShell.Engine.Engine import EngineTimeoutException 
  67  from ClusterShell.Engine.Engine import EngineAlreadyRunningError 
  68  from ClusterShell.Engine.Engine import EngineTimer 
  69  from ClusterShell.Engine.Factory import PreferredEngine 
  70  from ClusterShell.Worker.EngineClient import EnginePort 
  71  from ClusterShell.Worker.Ssh import WorkerSsh 
  72  from ClusterShell.Worker.Popen import WorkerPopen 
  73   
  74  from ClusterShell.Event import EventHandler 
  75  from ClusterShell.MsgTree import MsgTree 
  76  from ClusterShell.NodeSet import NodeSet 
77 78 79 -class TaskException(Exception):
80 """Base task exception."""
81
82 -class TaskError(TaskException):
83 """Base task error exception."""
84
85 -class TimeoutError(TaskError):
86 """Raised when the task timed out."""
87
88 -class AlreadyRunningError(TaskError):
89 """Raised when trying to resume an already running task."""
90
91 -class TaskMsgTreeError(TaskError):
92 """Raised when trying to access disabled MsgTree."""
93
94 95 -class _TaskMsgTree(object):
96 """ 97 Task special MsgTree wrapper class, for easy disabling of MsgTree 98 buffering. This class checks if task.default(keyword) is set before 99 effective MsgTree attribute lookup, according to following rules: 100 - If set, allow all MsgTree methods, else: 101 - ignore add() calls 102 - disallow MsgTree methods except clear() 103 """
104 - def __init__(self, task, keyword):
105 self._task = task 106 self._keyword = keyword 107 self._tree = MsgTree()
108
109 - def __getattr__(self, name):
110 # check if msgtree is enabled, but always allow MsgTree.clear() 111 if name != 'clear' and not self._task.default(self._keyword): 112 # disable MsgTree.add method 113 if name == 'add': 114 return lambda *args: None 115 # all other MsgTree methods are not allowed 116 raise TaskMsgTreeError("%s not set" % self._keyword) 117 # msgtree enabled: lookup tree attribute name 118 return getattr(self._tree, name)
119
120 121 -def _task_print_debug(task, s):
122 """ 123 Default task debug printing function. Cannot provide 'print' 124 directly as it is not a function (will be in Py3k!). 125 """ 126 print s
127
128 129 -class Task(object):
130 """ 131 Always bound to a thread, the Task class allows you to execute 132 commands in parallel and get their results. 133 134 To create a task in a new thread: 135 >>> task = Task() 136 137 To create or get the instance of the task associated with the 138 thread object thr (threading.Thread): 139 >>> task = Task(thread=thr) 140 141 Add a command to execute locally within task with: 142 >>> task.shell("/bin/hostname") 143 144 Add a command to execute to a distant node within task with: 145 >>> task.shell("/bin/hostname", nodes="tiger[1-20]") 146 147 Run task in its associated thread (will block only if the calling 148 thread is the task associated thread): 149 >>> task.resume() 150 """ 151 _std_default = { "stderr" : False, 152 "stdout_msgtree" : True, 153 "stderr_msgtree" : True, 154 "engine" : 'auto', 155 "port_qlimit" : 32 } 156 157 _std_info = { "debug" : False, 158 "print_debug" : _task_print_debug, 159 "fanout" : 64, 160 "connect_timeout" : 10, 161 "command_timeout" : 0 } 162 _tasks = {} 163 _taskid_max = 0 164 _task_lock = threading.Lock() 165
166 - class _SyncMsgHandler(EventHandler):
167 """Special task control port event handler. 168 When a message is received on the port, call appropriate 169 task method."""
170 - def ev_msg(self, port, msg):
171 """Message received: call appropriate task method.""" 172 # pull out function and its arguments from message 173 func, (args, kwargs) = msg[0], msg[1:] 174 # call task method 175 func(port.task, *args, **kwargs)
176
177 - class tasksyncmethod(object):
178 """Class encapsulating a function that checks if the calling 179 task is running or is the current task, and allowing it to be 180 used as a decorator making the wrapped task method thread-safe.""" 181
182 - def __call__(self, f):
183 def taskfunc(*args, **kwargs): 184 # pull out the class instance 185 task, fargs = args[0], args[1:] 186 # check if the calling task is the current thread task 187 if task._is_task_self(): 188 return f(task, *fargs, **kwargs) 189 elif task._dispatch_port: 190 # no, safely call the task method by message 191 # through the task special dispatch port 192 task._dispatch_port.msg_send((f, fargs, kwargs)) 193 else: 194 task.info("print_debug")(task, "%s: dropped call: %s" % \ 195 (task, fargs))
196 # modify the decorator meta-data for pydoc 197 # Note: should be later replaced by @wraps (functools) 198 # as of Python 2.5 199 taskfunc.__name__ = f.__name__ 200 taskfunc.__doc__ = f.__doc__ 201 taskfunc.__dict__ = f.__dict__ 202 taskfunc.__module__ = f.__module__ 203 return taskfunc
204
205 - class _SuspendCondition(object):
206 """Special class to manage task suspend condition."""
207 - def __init__(self, lock=threading.RLock(), initial=0):
208 self._cond = threading.Condition(lock) 209 self.suspend_count = initial
210
211 - def atomic_inc(self):
212 """Increase suspend count.""" 213 self._cond.acquire() 214 self.suspend_count += 1 215 self._cond.release()
216
217 - def atomic_dec(self):
218 """Decrease suspend count.""" 219 self._cond.acquire() 220 self.suspend_count -= 1 221 self._cond.release()
222
223 - def wait_check(self, release_lock=None):
224 """Wait for condition if needed.""" 225 self._cond.acquire() 226 try: 227 if self.suspend_count > 0: 228 if release_lock: 229 release_lock.release() 230 self._cond.wait() 231 finally: 232 self._cond.release()
233
234 - def notify_all(self):
235 """Signal all threads waiting for condition.""" 236 self._cond.acquire() 237 try: 238 self.suspend_count = min(self.suspend_count, 0) 239 self._cond.notifyAll() 240 finally: 241 self._cond.release()
242 243
244 - def __new__(cls, thread=None):
245 """ 246 For task bound to a specific thread, this class acts like a 247 "thread singleton", so new style class is used and new object 248 are only instantiated if needed. 249 """ 250 if thread: 251 if thread not in cls._tasks: 252 cls._tasks[thread] = object.__new__(cls) 253 return cls._tasks[thread] 254 255 return object.__new__(cls)
256
257 - def __init__(self, thread=None):
258 """ 259 Initialize a Task, creating a new thread if needed. 260 """ 261 if not getattr(self, "_engine", None): 262 # first time called 263 self._default_lock = threading.Lock() 264 self._default = self.__class__._std_default.copy() 265 self._info = self.__class__._std_info.copy() 266 267 # use factory class PreferredEngine that gives the proper 268 # engine instance 269 self._engine = PreferredEngine(self.default("engine"), self._info) 270 self.timeout = 0 271 272 # task synchronization objects 273 self._run_lock = threading.Lock() # primitive lock 274 self._suspend_lock = threading.RLock() # reentrant lock 275 # both join and suspend conditions share the same underlying lock 276 self._suspend_cond = Task._SuspendCondition(self._suspend_lock, 1) 277 self._join_cond = threading.Condition(self._suspend_lock) 278 self._suspended = False 279 self._quit = False 280 281 # STDIN tree 282 self._msgtree = _TaskMsgTree(self, "stdout_msgtree") 283 284 # STDERR tree 285 self._errtree = _TaskMsgTree(self, "stderr_msgtree") 286 287 # dict of sources to return codes 288 self._d_source_rc = {} 289 # dict of return codes to sources 290 self._d_rc_sources = {} 291 # keep max rc 292 self._max_rc = 0 293 # keep timeout'd sources 294 self._timeout_sources = set() 295 296 # special engine port for task method dispatching 297 self._dispatch_port = EnginePort(self, 298 handler=Task._SyncMsgHandler(), 299 autoclose=True) 300 self._engine.add(self._dispatch_port) 301 302 # set taskid used as Thread name 303 Task._task_lock.acquire() 304 Task._taskid_max += 1 305 self._taskid = Task._taskid_max 306 Task._task_lock.release() 307 308 # create new thread if needed 309 self._thread_foreign = bool(thread) 310 if self._thread_foreign: 311 self.thread = thread 312 else: 313 self.thread = thread = \ 314 threading.Thread(None, 315 Task._thread_start, 316 "Task-%d" % self._taskid, 317 args=(self,)) 318 Task._tasks[thread] = self 319 thread.start()
320
321 - def _is_task_self(self):
322 """Private method used by the library to check if the task is 323 task_self(), but do not create any task_self() instance.""" 324 return self.thread == threading.currentThread()
325
326 - def default_excepthook(self, type, value, tb):
327 """Default excepthook for a newly Task. When an exception is 328 raised and uncaught on Task thread, excepthook is called, which 329 is default_excepthook by default. Once excepthook overriden, 330 you can still call default_excepthook if needed.""" 331 print >> sys.stderr, 'Exception in thread %s:' % self.thread 332 traceback.print_exception(type, value, tb, file=sys.stderr)
333 334 _excepthook = default_excepthook 335
336 - def _getexcepthook(self):
337 return self._excepthook
338
339 - def _setexcepthook(self, hook):
340 self._excepthook = hook 341 # If thread has not been created by us, install sys.excepthook which 342 # might handle uncaught exception. 343 if self._thread_foreign: 344 sys.excepthook = self._excepthook
345 346 # When an exception is raised and uncaught on Task's thread, 347 # excepthook is called. You may want to override this three 348 # arguments method (very similar of what you can do with 349 # sys.excepthook).""" 350 excepthook = property(_getexcepthook, _setexcepthook) 351
352 - def _thread_start(self):
353 """Task-managed thread entry point""" 354 while not self._quit: 355 self._suspend_cond.wait_check() 356 if self._quit: 357 break 358 try: 359 self._resume() 360 except: 361 self.excepthook(*sys.exc_info()) 362 self._quit = True 363 364 self._terminate(kill=True)
365
366 - def _run(self, timeout):
367 """Run task (always called from its self thread).""" 368 # check if task is already running 369 if self._run_lock.locked(): 370 raise AlreadyRunningError("task is already running") 371 # use with statement later 372 try: 373 self._run_lock.acquire() 374 self._engine.run(timeout) 375 finally: 376 self._run_lock.release()
377
378 - def default(self, default_key, def_val=None):
379 """ 380 Return per-task value for key from the "default" dictionary. 381 See set_default() for a list of reserved task default_keys. 382 """ 383 self._default_lock.acquire() 384 try: 385 return self._default.get(default_key, def_val) 386 finally: 387 self._default_lock.release()
388
389 - def set_default(self, default_key, value):
390 """ 391 Set task value for specified key in the dictionary "default". 392 Users may store their own task-specific key, value pairs 393 using this method and retrieve them with default(). 394 395 Task default_keys are: 396 - "stderr": Boolean value indicating whether to enable 397 stdout/stderr separation when using task.shell(), if not 398 specified explicitly (default: False). 399 - "stdout_msgtree": Whether to enable standard output MsgTree 400 for automatic internal gathering of result messages 401 (default: True). 402 - "stderr_msgtree": Same for stderr (default: True). 403 - "engine": Used to specify an underlying Engine explicitly 404 (default: "auto"). 405 - "port_qlimit": Size of port messages queue (default: 32). 406 407 Threading considerations 408 ======================== 409 Unlike set_info(), when called from the task's thread or 410 not, set_default() immediately updates the underlying 411 dictionary in a thread-safe manner. This method doesn't 412 wake up the engine when called. 413 """ 414 self._default_lock.acquire() 415 try: 416 self._default[default_key] = value 417 finally: 418 self._default_lock.release()
419
420 - def info(self, info_key, def_val=None):
421 """ 422 Return per-task information. See set_info() for a list of 423 reserved task info_keys. 424 """ 425 return self._info.get(info_key, def_val)
426 427 @tasksyncmethod()
428 - def set_info(self, info_key, value):
429 """ 430 Set task value for a specific key information. Key, value 431 pairs can be passed to the engine and/or workers. 432 Users may store their own task-specific info key, value pairs 433 using this method and retrieve them with info(). 434 435 Task info_keys are: 436 - "debug": Boolean value indicating whether to enable library 437 debugging messages (default: False). 438 - "print_debug": Debug messages processing function. This 439 function takes 2 arguments: the task instance and the 440 message string (default: an internal function doing standard 441 print). 442 - "fanout": Max number of registered clients in Engine at a 443 time (default: 64). 444 - "connect_timeout": Time in seconds to wait for connecting to 445 remote host before aborting (default: 10). 446 - "command_timeout": Time in seconds to wait for a command to 447 complete before aborting (default: 0, which means 448 unlimited). 449 450 Threading considerations 451 ======================== 452 Unlike set_default(), the underlying info dictionary is only 453 modified from the task's thread. So calling set_info() from 454 another thread leads to queueing the request for late apply 455 (at run time) using the task dispatch port. When received, 456 the request wakes up the engine when the task is running and 457 the info dictionary is then updated. 458 """ 459 self._info[info_key] = value
460
461 - def shell(self, command, **kwargs):
462 """ 463 Schedule a shell command for local or distant execution. 464 465 Local usage:: 466 task.shell(command [, key=key] [, handler=handler] 467 [, timeout=secs] [, autoclose=enable_autoclose] 468 [, stderr=enable_stderr]) 469 470 Distant usage:: 471 task.shell(command, nodes=nodeset [, handler=handler] 472 [, timeout=secs], [, autoclose=enable_autoclose] 473 [, strderr=enable_stderr]) 474 """ 475 476 handler = kwargs.get("handler", None) 477 timeo = kwargs.get("timeout", None) 478 ac = kwargs.get("autoclose", False) 479 stderr = kwargs.get("stderr", self.default("stderr")) 480 481 if kwargs.get("nodes", None): 482 assert kwargs.get("key", None) is None, \ 483 "'key' argument not supported for distant command" 484 485 # create ssh-based worker 486 worker = WorkerSsh(NodeSet(kwargs["nodes"]), command=command, 487 handler=handler, stderr=stderr, timeout=timeo, 488 autoclose=ac) 489 else: 490 # create (local) worker 491 worker = WorkerPopen(command, key=kwargs.get("key", None), 492 handler=handler, stderr=stderr, 493 timeout=timeo, autoclose=ac) 494 495 # schedule worker for execution in this task 496 self.schedule(worker) 497 498 return worker
499
500 - def copy(self, source, dest, nodes, **kwargs):
501 """ 502 Copy local file to distant nodes. 503 """ 504 assert nodes != None, "local copy not supported" 505 506 handler = kwargs.get("handler", None) 507 timeo = kwargs.get("timeout", None) 508 preserve = kwargs.get("preserve", None) 509 stderr = kwargs.get("stderr", self.default("stderr")) 510 511 # create a new copy worker 512 worker = WorkerSsh(nodes, source=source, dest=dest, handler=handler, 513 stderr=stderr, timeout=timeo, preserve=preserve) 514 515 self.schedule(worker) 516 return worker
517 518 @tasksyncmethod()
519 - def _add_port(self, port):
520 """Add an EnginePort instance to Engine (private method).""" 521 self._engine.add(port)
522 523 @tasksyncmethod()
524 - def _remove_port(self, port):
525 """Remove a port from Engine (private method).""" 526 self._engine.remove(port)
527
528 - def port(self, handler=None, autoclose=False):
529 """ 530 Create a new task port. A task port is an abstraction object to 531 deliver messages reliably between tasks. 532 533 Basic rules: 534 - A task can send messages to another task port (thread safe). 535 - A task can receive messages from an acquired port either by 536 setting up a notification mechanism or using a polling 537 mechanism that may block the task waiting for a message 538 sent on the port. 539 - A port can be acquired by one task only. 540 541 If handler is set to a valid EventHandler object, the port is 542 a send-once port, ie. a message sent to this port generates an 543 ev_msg event notification issued the port's task. If handler 544 is not set, the task can only receive messages on the port by 545 calling port.msg_recv(). 546 """ 547 port = EnginePort(self, handler, autoclose) 548 self._add_port(port) 549 return port
550 551 @tasksyncmethod()
552 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
553 """ 554 Create task's timer. 555 """ 556 assert fire >= 0.0, \ 557 "timer's relative fire time must be a positive floating number" 558 559 timer = EngineTimer(fire, interval, autoclose, handler) 560 self._engine.add_timer(timer) 561 return timer
562 563 @tasksyncmethod()
564 - def schedule(self, worker):
565 """ 566 Schedule a worker for execution. Only useful for manually 567 instantiated workers. 568 """ 569 assert self in Task._tasks.values(), "deleted task" 570 571 # bind worker to task self 572 worker._set_task(self) 573 574 # add worker clients to engine 575 for client in worker._engine_clients(): 576 self._engine.add(client)
577
578 - def _resume_thread(self):
579 """Resume called from another thread.""" 580 self._suspend_cond.notify_all()
581
582 - def _resume(self):
583 assert self.thread == threading.currentThread() 584 try: 585 try: 586 self._reset() 587 self._run(self.timeout) 588 except EngineTimeoutException: 589 raise TimeoutError() 590 except EngineAbortException, e: 591 self._terminate(e.kill) 592 except EngineAlreadyRunningError: 593 raise AlreadyRunningError("task engine is already running") 594 finally: 595 # task becomes joinable 596 self._join_cond.acquire() 597 self._suspend_cond.suspend_count += 1 598 self._join_cond.notifyAll() 599 self._join_cond.release()
600
601 - def resume(self, timeout=0):
602 """ 603 Resume task. If task is task_self(), workers are executed in 604 the calling thread so this method will block until workers have 605 finished. This is always the case for a single-threaded 606 application (eg. which doesn't create other Task() instance 607 than task_self()). Otherwise, the current thread doesn't block. 608 In that case, you may then want to call task_wait() to wait for 609 completion. 610 """ 611 self.timeout = timeout 612 613 self._suspend_cond.atomic_dec() 614 615 if self._is_task_self(): 616 self._resume() 617 else: 618 self._resume_thread()
619 620 @tasksyncmethod()
621 - def _suspend_wait(self):
622 assert task_self() == self 623 # atomically set suspend state 624 self._suspend_lock.acquire() 625 self._suspended = True 626 self._suspend_lock.release() 627 628 # wait for special suspend condition, while releasing l_run 629 self._suspend_cond.wait_check(self._run_lock) 630 631 # waking up, atomically unset suspend state 632 self._suspend_lock.acquire() 633 self._suspended = False 634 self._suspend_lock.release()
635
636 - def suspend(self):
637 """ 638 Suspend task execution. This method may be called from another 639 task (thread-safe). The function returns False if the task 640 cannot be suspended (eg. it's not running), or returns True if 641 the task has been successfully suspended. 642 To resume a suspended task, use task.resume(). 643 """ 644 # first of all, increase suspend count 645 self._suspend_cond.atomic_inc() 646 647 # call synchronized suspend method 648 self._suspend_wait() 649 650 # wait for stopped task 651 self._run_lock.acquire() # run_lock ownership transfer 652 653 # get result: are we really suspended or just stopped? 654 result = True 655 self._suspend_lock.acquire() 656 if not self._suspended: 657 # not acknowledging suspend state, task is stopped 658 result = False 659 self._run_lock.release() 660 self._suspend_lock.release() 661 return result
662 663 @tasksyncmethod()
664 - def _abort(self, kill=False):
665 assert task_self() == self 666 # raise an EngineAbortException when task is running 667 self._engine.abort(kill)
668
669 - def abort(self, kill=False):
670 """ 671 Abort a task. Aborting a task removes (and stops when needed) 672 all workers. If optional parameter kill is True, the task 673 object is unbound from the current thread, so calling 674 task_self() creates a new Task object. 675 """ 676 if self._run_lock.acquire(0): 677 self._quit = True 678 self._run_lock.release() 679 if self._is_task_self(): 680 self._terminate(kill) 681 else: 682 # abort on stopped/suspended task 683 self.resume() 684 else: 685 # self._run_lock is locked, call synchronized method 686 self._abort(kill)
687
688 - def _terminate(self, kill):
689 """ 690 Abort completion subroutine. 691 """ 692 if kill: 693 # invalidate dispatch port 694 self._dispatch_port = None 695 # clear engine 696 self._engine.clear() 697 698 # clear result objects 699 self._reset() 700 701 # destroy task if needed 702 if kill: 703 Task._task_lock.acquire() 704 try: 705 del Task._tasks[threading.currentThread()] 706 finally: 707 Task._task_lock.release()
708
709 - def join(self):
710 """ 711 Suspend execution of the calling thread until the target task 712 terminates, unless the target task has already terminated. 713 """ 714 self._join_cond.acquire() 715 try: 716 if self._suspend_cond.suspend_count > 0: 717 if not self._suspended: 718 # ignore stopped task 719 return 720 self._join_cond.wait() 721 finally: 722 self._join_cond.release()
723
724 - def running(self):
725 """ 726 Return True if the task is running. 727 """ 728 return self._engine.running
729
730 - def _reset(self):
731 """ 732 Reset buffers and retcodes management variables. 733 """ 734 self._msgtree.clear() 735 self._errtree.clear() 736 self._d_source_rc = {} 737 self._d_rc_sources = {} 738 self._max_rc = 0 739 self._timeout_sources.clear()
740
741 - def _msg_add(self, source, msg):
742 """ 743 Add a worker message associated with a source. 744 """ 745 self._msgtree.add(source, msg)
746
747 - def _errmsg_add(self, source, msg):
748 """ 749 Add a worker error message associated with a source. 750 """ 751 self._errtree.add(source, msg)
752
753 - def _rc_set(self, source, rc, override=True):
754 """ 755 Add a worker return code associated with a source. 756 """ 757 if not override and self._d_source_rc.has_key(source): 758 return 759 760 # store rc by source 761 self._d_source_rc[source] = rc 762 763 # store source by rc 764 e = self._d_rc_sources.get(rc) 765 if e is None: 766 self._d_rc_sources[rc] = set([source]) 767 else: 768 self._d_rc_sources[rc].add(source) 769 770 # update max rc 771 if rc > self._max_rc: 772 self._max_rc = rc
773
774 - def _timeout_add(self, source):
775 """ 776 Add a worker timeout associated with a source. 777 """ 778 # store source in timeout set 779 self._timeout_sources.add(source)
780
781 - def _msg_by_source(self, source):
782 """ 783 Get a message by its source (worker, key). 784 """ 785 s = self._msgtree.get(source) 786 if s is None: 787 return None 788 return str(s)
789
790 - def _errmsg_by_source(self, source):
791 """ 792 Get an error message by its source (worker, key). 793 """ 794 s = self._errtree.get(source) 795 if s is None: 796 return None 797 return str(s)
798
799 - def _call_tree_matcher(self, tree_match_func, match_keys=None, worker=None):
800 """Call identified tree matcher (items, walk) method with options.""" 801 # filter by worker and optionally by matching keys 802 if worker and not match_keys: 803 match = lambda k: k[0] is worker 804 elif worker and match_keys: 805 match = lambda k: k[0] is worker and k[1] in match_keys 806 elif match_keys: 807 match = lambda k: k[1] in match_keys 808 else: 809 match = None 810 # Call tree matcher function (items or walk) 811 return tree_match_func(match, itemgetter(1))
812
813 - def _rc_by_source(self, source):
814 """ 815 Get a return code by its source (worker, key). 816 """ 817 return self._d_source_rc[source]
818
819 - def _rc_iter_by_key(self, key):
820 """ 821 Return an iterator over return codes for the given key. 822 """ 823 for (w, k), rc in self._d_source_rc.iteritems(): 824 if k == key: 825 yield rc
826
827 - def _rc_iter_by_worker(self, worker, match_keys=None):
828 """ 829 Return an iterator over return codes and keys list for a 830 specific worker and optional matching keys. 831 """ 832 if match_keys: 833 # Use the items iterator for the underlying dict. 834 for rc, src in self._d_rc_sources.iteritems(): 835 keys = [t[1] for t in src if t[0] is worker and \ 836 t[1] in match_keys] 837 if len(keys) > 0: 838 yield rc, keys 839 else: 840 for rc, src in self._d_rc_sources.iteritems(): 841 keys = [t[1] for t in src if t[0] is worker] 842 if len(keys) > 0: 843 yield rc, keys
844
845 - def _krc_iter_by_worker(self, worker):
846 """ 847 Return an iterator over key, rc for a specific worker. 848 """ 849 for rc, src in self._d_rc_sources.iteritems(): 850 for w, k in src: 851 if w is worker: 852 yield k, rc
853
854 - def _num_timeout_by_worker(self, worker):
855 """ 856 Return the number of timed out "keys" for a specific worker. 857 """ 858 cnt = 0 859 for (w, k) in self._timeout_sources: 860 if w is worker: 861 cnt += 1 862 return cnt
863
864 - def _iter_keys_timeout_by_worker(self, worker):
865 """ 866 Iterate over timed out keys (ie. nodes) for a specific worker. 867 """ 868 for (w, k) in self._timeout_sources: 869 if w is worker: 870 yield k
871
872 - def _flush_buffers_by_worker(self, worker):
873 """ 874 Remove any messages from specified worker. 875 """ 876 self._msgtree.remove(lambda k: k[0] == worker)
877
878 - def _flush_errors_by_worker(self, worker):
879 """ 880 Remove any error messages from specified worker. 881 """ 882 self._errtree.remove(lambda k: k[0] == worker)
883
884 - def key_buffer(self, key):
885 """ 886 Get buffer for a specific key. When the key is associated 887 to multiple workers, the resulting buffer will contain 888 all workers content that may overlap. This method returns an 889 empty buffer if key is not found in any workers. 890 """ 891 select_key = lambda k: k[1] == key 892 return "".join(imap(str, self._msgtree.messages(select_key)))
893 894 node_buffer = key_buffer 895
896 - def key_error(self, key):
897 """ 898 Get error buffer for a specific key. When the key is associated 899 to multiple workers, the resulting buffer will contain all 900 workers content that may overlap. This method returns an empty 901 error buffer if key is not found in any workers. 902 """ 903 select_key = lambda k: k[1] == key 904 return "".join(imap(str, self._errtree.messages(select_key)))
905 906 node_error = key_error 907
908 - def key_retcode(self, key):
909 """ 910 Return return code for a specific key. When the key is 911 associated to multiple workers, return the max return 912 code from these workers. Raises a KeyError if key is not found 913 in any finished workers. 914 """ 915 codes = list(self._rc_iter_by_key(key)) 916 if not codes: 917 raise KeyError(key) 918 return max(codes)
919 920 node_retcode = key_retcode 921
922 - def max_retcode(self):
923 """ 924 Get max return code encountered during last run. 925 926 How retcodes work 927 ================= 928 If the process exits normally, the return code is its exit 929 status. If the process is terminated by a signal, the return 930 code is 128 + signal number. 931 """ 932 return self._max_rc
933
934 - def iter_buffers(self, match_keys=None):
935 """ 936 Iterate over buffers, returns a tuple (buffer, keys). For remote 937 workers (Ssh), keys are list of nodes. In that case, you should use 938 NodeSet.fromlist(keys) to get a NodeSet instance (which is more 939 convenient and efficient): 940 941 Optional parameter match_keys add filtering on these keys. 942 943 Usage example: 944 945 >>> for buffer, nodelist in task.iter_buffers(): 946 ... print NodeSet.fromlist(nodelist) 947 ... print buffer 948 """ 949 return self._call_tree_matcher(self._msgtree.walk, match_keys)
950
951 - def iter_errors(self, match_keys=None):
952 """ 953 Iterate over error buffers, returns a tuple (buffer, keys). 954 955 See iter_buffers(). 956 """ 957 return self._call_tree_matcher(self._errtree.walk, match_keys)
958
959 - def iter_retcodes(self, match_keys=None):
960 """ 961 Iterate over return codes, returns a tuple (rc, keys). 962 963 Optional parameter match_keys add filtering on these keys. 964 965 How retcodes work 966 ================= 967 If the process exits normally, the return code is its exit 968 status. If the process is terminated by a signal, the return 969 code is 128 + signal number. 970 """ 971 if match_keys: 972 # Use the items iterator for the underlying dict. 973 for rc, src in self._d_rc_sources.iteritems(): 974 keys = [t[1] for t in src if t[1] in match_keys] 975 yield rc, keys 976 else: 977 for rc, src in self._d_rc_sources.iteritems(): 978 yield rc, [t[1] for t in src]
979
980 - def num_timeout(self):
981 """ 982 Return the number of timed out "keys" (ie. nodes). 983 """ 984 return len(self._timeout_sources)
985
986 - def iter_keys_timeout(self):
987 """ 988 Iterate over timed out keys (ie. nodes). 989 """ 990 for (w, k) in self._timeout_sources: 991 yield k
992
993 - def flush_buffers(self):
994 """ 995 Flush all task messages (from all task workers). 996 """ 997 self._msgtree.clear()
998
999 - def flush_errors(self):
1000 """ 1001 Flush all task error messages (from all task workers). 1002 """ 1003 self._errtree.clear()
1004 1005 @classmethod
1006 - def wait(cls, from_thread):
1007 """ 1008 Class method that blocks calling thread until all tasks have 1009 finished (from a ClusterShell point of view, for instance, 1010 their task.resume() return). It doesn't necessarly mean that 1011 associated threads have finished. 1012 """ 1013 Task._task_lock.acquire() 1014 try: 1015 tasks = Task._tasks.copy() 1016 finally: 1017 Task._task_lock.release() 1018 for thread, task in tasks.iteritems(): 1019 if thread != from_thread: 1020 task.join()
1021
1022 1023 -def task_self():
1024 """ 1025 Get the Task instance bound to the current thread. This function 1026 provided as a convenience is available in the top-level 1027 ClusterShell.Task package namespace. 1028 """ 1029 return Task(thread=threading.currentThread())
1030
1031 -def task_wait():
1032 """ 1033 Suspend execution of the calling thread until all tasks terminate, 1034 unless all tasks have already terminated. This function is provided 1035 as a convenience and is available in the top-level 1036 ClusterShell.Task package namespace. 1037 """ 1038 Task.wait(threading.currentThread())
1039
1040 -def task_terminate():
1041 """ 1042 Destroy the Task instance bound to the current thread. A next call 1043 to task_self() will create a new Task object. This function provided 1044 as a convenience is available in the top-level ClusterShell.Task 1045 package namespace. 1046 """ 1047 task_self().abort(kill=True)
1048
1049 -def task_cleanup():
1050 """ 1051 Cleanup routine to destroy all created tasks. This function 1052 provided as a convenience is available in the top-level 1053 ClusterShell.Task package namespace. 1054 """ 1055 Task._task_lock.acquire() 1056 try: 1057 tasks = Task._tasks.copy() 1058 finally: 1059 Task._task_lock.release() 1060 for task in tasks.itervalues(): 1061 task.abort(kill=True)
1062