Package ClusterShell :: Module Task
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Task

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Task.py 163 2009-10-29 23:28:39Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell Task module. 
 37   
 38  Simple example of use: 
 39   
 40      from ClusterShell.Task import * 
 41   
 42      # get task associated with calling thread 
 43      task = task_self() 
 44   
 45      # add a command to execute on distant nodes 
 46      task.shell("/bin/uname -r", nodes="tiger[1-30,35]") 
 47   
 48      # run task in calling thread 
 49      task.resume() 
 50   
 51      # get results 
 52      for buf, nodelist in task.iter_buffers(): 
 53          print NodeSet.fromlist(nodelist), buf 
 54   
 55  """ 
 56   
 57  from Engine.Engine import EngineAbortException 
 58  from Engine.Engine import EngineTimeoutException 
 59  from Engine.Engine import EngineAlreadyRunningError 
 60  from Engine.Engine import EngineTimer 
 61  from Engine.Poll import EnginePoll 
 62  from Worker.Pdsh import WorkerPdsh 
 63  from Worker.Ssh import WorkerSsh 
 64  from Worker.Popen2 import WorkerPopen2 
 65   
 66  from MsgTree import MsgTreeElem 
 67  from NodeSet import NodeSet 
 68   
 69  from sets import Set 
 70  import thread 
 71   
72 -class TaskException(Exception):
73 """ 74 Base task exception. 75 """
76
77 -class TimeoutError(TaskException):
78 """ 79 Raised when the task timed out. 80 """
81
82 -class AlreadyRunningError(TaskException):
83 """ 84 Raised when trying to resume an already running task. 85 """
86 - def __str__(self):
87 return "current task already running"
88 89
90 -def _task_print_debug(task, s):
91 """ 92 Default task debug printing function. Cannot provide 'print' 93 directly as it is not a function (will be in Py3k!). 94 """ 95 print s
96 97
98 -class Task(object):
99 """ 100 Task to execute. May be bound to a specific thread. 101 102 To create a task in a new thread: 103 task = Task() 104 105 To create or get the instance of the task associated with the thread 106 identifier tid: 107 task = Task(thread_id=tid) 108 109 Add command to execute locally in task with: 110 task.shell("/bin/hostname") 111 112 Add command to execute in a distant node in task with: 113 task.shell("/bin/hostname", nodes="tiger[1-20]") 114 115 Run task in its associated thread (will block only if the calling 116 thread is the associated thread: 117 task.resume() 118 """ 119 120 _default_info = { "debug" : False, 121 "print_debug" : _task_print_debug, 122 "fanout" : 32, 123 "connect_timeout" : 10, 124 "command_timeout" : 0 } 125 _tasks = {} 126
127 - def __new__(cls, thread_id=None):
128 """ 129 For task bound to a specific thread, this class acts like a 130 "thread singleton", so new style class is used and new object 131 are only instantiated if needed. 132 """ 133 if thread_id: # a thread identifier is a nonzero integer 134 if thread_id not in cls._tasks: 135 cls._tasks[thread_id] = object.__new__(cls) 136 return cls._tasks[thread_id] 137 138 return object.__new__(cls)
139
140 - def __init__(self, thread_id=None):
141 """ 142 Initialize a Task, creating a new thread if needed. 143 """ 144 if not getattr(self, "_engine", None): 145 # first time called 146 self._info = self.__class__._default_info.copy() 147 self._engine = EnginePoll(self._info) 148 self.timeout = 0 149 self.l_run = None 150 151 # root of msg tree 152 self._msg_root = MsgTreeElem() 153 # dict of sources to msg tree elements 154 self._d_source_msg = {} 155 # dict of sources to return codes 156 self._d_source_rc = {} 157 # dict of return codes to sources 158 self._d_rc_sources = {} 159 # keep max rc 160 self._max_rc = 0 161 # keep timeout'd sources 162 self._timeout_sources = Set() 163 164 # create new thread if needed 165 if not thread_id: 166 self.l_run = thread.allocate_lock() 167 self.l_run.acquire() 168 tid = thread.start_new_thread(Task._start_thread, (self,)) 169 self._tasks[tid] = self
170
171 - def _start_thread(self):
172 """New Task thread entry point.""" 173 try: 174 while True: 175 self.l_run.acquire() 176 self._engine.run(self.timeout) 177 except: 178 # TODO: dispatch exceptions 179 raise
180
181 - def info(self, info_key, def_val=None):
182 """ 183 Return per-task information. 184 """ 185 return self._info.get(info_key, def_val)
186
187 - def set_info(self, info_key, value):
188 """ 189 Set task-specific information state. 190 """ 191 self._info[info_key] = value
192
193 - def shell(self, command, **kwargs):
194 """ 195 Schedule a shell command for local or distant execution. 196 197 Local usage: 198 task.shell(command [, key=key] [, handler=handler] 199 [, timeout=secs]) 200 201 Distant usage: 202 task.shell(command, nodes=nodeset [, handler=handler] 203 [, timeout=secs]) 204 """ 205 206 handler = kwargs.get("handler", None) 207 timeo = kwargs.get("timeout", None) 208 ac = kwargs.get("autoclose", False) 209 210 if kwargs.get("nodes", None): 211 assert kwargs.get("key", None) is None, \ 212 "'key' argument not supported for distant command" 213 214 # create ssh-based worker 215 worker = WorkerSsh(NodeSet(kwargs["nodes"]), handler=handler, 216 timeout=timeo, command=command, autoclose=ac) 217 else: 218 # create popen2-based (local) worker 219 worker = WorkerPopen2(command, key=kwargs.get("key", None), 220 handler=handler, timeout=timeo, autoclose=ac) 221 222 # schedule worker for execution in this task 223 self.schedule(worker) 224 225 return worker
226
227 - def copy(self, source, dest, nodes, **kwargs):
228 """ 229 Copy local file to distant nodes. 230 """ 231 assert nodes != None, "local copy not supported" 232 233 handler = kwargs.get("handler", None) 234 timeo = kwargs.get("timeout", None) 235 236 # create a new Pdcp worker (supported by WorkerPdsh) 237 worker = WorkerSsh(nodes, source=source, dest=dest, handler=handler, 238 timeout=timeo) 239 240 self.schedule(worker) 241 242 return worker
243
244 - def timer(self, fire, handler, interval=-1.0, autoclose=False):
245 """ 246 Create task's timer. 247 """ 248 assert fire >= 0.0, "timer's relative fire time must be a positive floating number" 249 250 timer = EngineTimer(fire, interval, autoclose, handler) 251 self._engine.add_timer(timer) 252 return timer
253
254 - def schedule(self, worker):
255 """ 256 Schedule a worker for execution. Only useful for manually 257 instantiated workers. 258 """ 259 # bind worker to task self 260 worker._set_task(self) 261 262 # add worker clients to engine 263 for client in worker._engine_clients(): 264 self._engine.add(client)
265
266 - def resume(self, timeout=0):
267 """ 268 Resume task. If task is task_self(), workers are executed in 269 the calling thread so this method will block until workers have 270 finished. This is always the case for a single-threaded 271 application (eg. which doesn't create other Task() instance 272 than task_self()). Otherwise, the current thread doesn't block. 273 In that case, you may then want to call task_wait() to wait for 274 completion. 275 """ 276 if self.l_run: 277 self.timeout = timeout 278 self.l_run.release() 279 else: 280 try: 281 self._reset() 282 self._engine.run(timeout) 283 except EngineTimeoutException: 284 raise TimeoutError() 285 except EngineAbortException, e: 286 self._terminate(e.kill) 287 except EngineAlreadyRunningError: 288 raise AlreadyRunningError() 289 except: 290 raise
291
292 - def abort(self, kill=False):
293 """ 294 Abort a task. Aborting a task removes (and stops when needed) 295 all workers. If optional parameter kill is True, the task 296 object is unbound from the current thread, so calling 297 task_self() creates a new Task object. 298 """ 299 # Aborting a task from another thread (ie. not the thread 300 # bound to task) will be supported through the inter-task msg 301 # API (trac #21). 302 assert task_self() == self, "Inter-task abort not implemented yet" 303 304 # Raise an EngineAbortException when task is running. 305 self._engine.abort(kill) 306 307 # Called directly when task is not running. 308 self._terminate(kill)
309
310 - def _terminate(self, kill):
311 """ 312 Abort completion subroutine. 313 """ 314 self._reset() 315 316 if kill: 317 del self.__class__._tasks[thread.get_ident()]
318
319 - def join(self):
320 """ 321 Suspend execution of the calling thread until the target task 322 terminates, unless the target task has already terminated. 323 """ 324 self._engine.join()
325
326 - def _reset(self):
327 """ 328 Reset buffers and retcodes management variables. 329 """ 330 self._msg_root = MsgTreeElem() 331 self._d_source_msg = {} 332 self._d_source_rc = {} 333 self._d_rc_sources = {} 334 self._max_rc = 0 335 self._timeout_sources.clear()
336
337 - def _msg_add(self, source, msg):
338 """ 339 Add a worker message associated with a source. 340 """ 341 # try first to get current element in msgs tree 342 e_msg = self._d_source_msg.get(source) 343 if not e_msg: 344 # key not found (first msg from it) 345 e_msg = self._msg_root 346 347 # add child msg and update dict 348 self._d_source_msg[source] = e_msg.add_msg(source, msg)
349
350 - def _rc_set(self, source, rc, override=True):
351 """ 352 Add a worker return code associated with a source. 353 """ 354 if not override and self._d_source_rc.has_key(source): 355 return 356 357 # store rc by source 358 self._d_source_rc[source] = rc 359 360 # store source by rc 361 e = self._d_rc_sources.get(rc) 362 if e is None: 363 self._d_rc_sources[rc] = Set([source]) 364 else: 365 self._d_rc_sources[rc].add(source) 366 367 # update max rc 368 if rc > self._max_rc: 369 self._max_rc = rc
370
371 - def _timeout_add(self, source):
372 """ 373 Add a worker timeout associated with a source. 374 """ 375 # store source in timeout set 376 self._timeout_sources.add(source)
377
378 - def _msg_by_source(self, source):
379 """ 380 Get a message by its source (worker, key). 381 """ 382 e_msg = self._d_source_msg.get(source) 383 384 if e_msg is None: 385 return None 386 387 return e_msg.message()
388
389 - def _msg_iter_by_key(self, key):
390 """ 391 Return an iterator over stored messages for the given key. 392 """ 393 for (w, k), e in self._d_source_msg.iteritems(): 394 if k == key: 395 yield e.message()
396
397 - def _msg_iter_by_worker(self, worker, match_keys=None):
398 """ 399 Return an iterator over messages and keys list for a specific 400 worker and optional matching keys. 401 """ 402 if match_keys: 403 for e in self._msg_root: 404 keys = [t[1] for t in e.sources if t[0] is worker and t[1] in match_keys] 405 if len(keys) > 0: 406 yield e.message(), keys 407 else: 408 for e in self._msg_root: 409 keys = [t[1] for t in e.sources if t[0] is worker] 410 if len(keys) > 0: 411 yield e.message(), keys
412
413 - def _kmsg_iter_by_worker(self, worker):
414 """ 415 Return an iterator over key, message for a specific worker. 416 """ 417 for (w, k), e in self._d_source_msg.iteritems(): 418 if w is worker: 419 yield k, e.message()
420
421 - def _rc_by_source(self, source):
422 """ 423 Get a return code by its source (worker, key). 424 """ 425 return self._d_source_rc.get(source, 0)
426
427 - def _rc_iter_by_key(self, key):
428 """ 429 Return an iterator over return codes for the given key. 430 """ 431 for (w, k), rc in self._d_source_rc.iteritems(): 432 if k == key: 433 yield rc
434
435 - def _rc_iter_by_worker(self, worker, match_keys=None):
436 """ 437 Return an iterator over return codes and keys list for a 438 specific worker and optional matching keys. 439 """ 440 if match_keys: 441 # Use the items iterator for the underlying dict. 442 for rc, src in self._d_rc_sources.iteritems(): 443 keys = [t[1] for t in src if t[0] is worker and t[1] in match_keys] 444 if len(keys) > 0: 445 yield rc, keys 446 else: 447 for rc, src in self._d_rc_sources.iteritems(): 448 keys = [t[1] for t in src if t[0] is worker] 449 if len(keys) > 0: 450 yield rc, keys
451
452 - def _krc_iter_by_worker(self, worker):
453 """ 454 Return an iterator over key, rc for a specific worker. 455 """ 456 for rc, src in self._d_rc_sources.iteritems(): 457 for w, k in src: 458 if w is worker: 459 yield k, rc
460
461 - def _num_timeout_by_worker(self, worker):
462 """ 463 Return the number of timed out "keys" for a specific worker. 464 """ 465 cnt = 0 466 for (w, k) in self._timeout_sources: 467 if w is worker: 468 cnt += 1 469 return cnt
470
471 - def _iter_keys_timeout_by_worker(self, worker):
472 """ 473 Iterate over timed out keys (ie. nodes) for a specific worker. 474 """ 475 for (w, k) in self._timeout_sources: 476 if w is worker: 477 yield k
478
479 - def key_buffer(self, key):
480 """ 481 Get buffer for a specific key. When the key is associated 482 to multiple workers, the resulting buffer will contain 483 all workers content that may overlap. 484 """ 485 return "".join(self._msg_iter_by_key(key))
486 487 node_buffer = key_buffer 488
489 - def key_retcode(self, key):
490 """ 491 Return return code for a specific key. When the key is 492 associated to multiple workers, return the max return 493 code from these workers. 494 """ 495 return max(self._rc_iter_by_key(key))
496 497 node_retcode = key_retcode 498
499 - def max_retcode(self):
500 """ 501 Get max return code encountered during last run. 502 503 How retcodes work: 504 If the process exits normally, the return code is its exit 505 status. If the process is terminated by a signal, the return 506 code is 128 + signal number. 507 """ 508 return self._max_rc
509
510 - def iter_buffers(self, match_keys=None):
511 """ 512 Iterate over buffers, returns a tuple (buffer, keys). For remote 513 workers (Ssh), keys are list of nodes. In that case, you should use 514 NodeSet.fromlist(keys) to get a NodeSet instance (which is more 515 convenient and efficient): 516 517 Optional parameter match_keys add filtering on these keys. 518 519 Usage example: 520 521 for buffer, nodelist in task.iter_buffers(): 522 print NodeSet.fromlist(nodelist) 523 print buffer 524 """ 525 if match_keys: 526 for e in self._msg_root: 527 keys = [t[1] for t in e.sources if t[1] in match_keys] 528 if keys: 529 yield e.message(), keys 530 else: 531 for e in self._msg_root: 532 yield e.message(), [t[1] for t in e.sources]
533
534 - def iter_retcodes(self, match_keys=None):
535 """ 536 Iterate over return codes, returns a tuple (rc, keys). 537 538 Optional parameter match_keys add filtering on these keys. 539 540 How retcodes work: 541 If the process exits normally, the return code is its exit 542 status. If the process is terminated by a signal, the return 543 code is 128 + signal number. 544 """ 545 if match_keys: 546 # Use the items iterator for the underlying dict. 547 for rc, src in self._d_rc_sources.iteritems(): 548 keys = [t[1] for t in src if t[1] in match_keys] 549 yield rc, keys 550 else: 551 for rc, src in self._d_rc_sources.iteritems(): 552 yield rc, [t[1] for t in src]
553
554 - def num_timeout(self):
555 """ 556 Return the number of timed out "keys" (ie. nodes). 557 """ 558 return len(self._timeout_sources)
559
560 - def iter_keys_timeout(self):
561 """ 562 Iterate over timed out keys (ie. nodes). 563 """ 564 for (w, k) in self._timeout_sources: 565 yield k
566
567 - def wait(cls, from_thread_id):
568 """ 569 Class method that blocks calling thread until all tasks have 570 finished. 571 """ 572 for thread_id, task in Task._tasks.iteritems(): 573 if thread_id != from_thread_id: 574 task.join()
575 wait = classmethod(wait)
576 577
578 -def task_self():
579 """ 580 Get the Task instance bound to the current thread. This function 581 provided as a convenience is available in the top-level 582 ClusterShell.Task package namespace. 583 """ 584 return Task(thread_id=thread.get_ident())
585
586 -def task_wait():
587 """ 588 Suspend execution of the calling thread until all tasks terminate, 589 unless all tasks have already terminated. This function is provided 590 as a convenience and is available in the top-level 591 ClusterShell.Task package namespace. 592 """ 593 Task.wait(thread.get_ident())
594