Module clush
[hide private]
[frames] | no frames]

Source Code for Module clush

   1  #!/usr/bin/env python 
   2  # 
   3  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
   4  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
   5  # 
   6  # This file is part of the ClusterShell library. 
   7  # 
   8  # This software is governed by the CeCILL-C license under French law and 
   9  # abiding by the rules of distribution of free software.  You can  use, 
  10  # modify and/ or redistribute the software under the terms of the CeCILL-C 
  11  # license as circulated by CEA, CNRS and INRIA at the following URL 
  12  # "http://www.cecill.info". 
  13  # 
  14  # As a counterpart to the access to the source code and  rights to copy, 
  15  # modify and redistribute granted by the license, users are provided only 
  16  # with a limited warranty  and the software's author,  the holder of the 
  17  # economic rights,  and the successive licensors  have only  limited 
  18  # liability. 
  19  # 
  20  # In this respect, the user's attention is drawn to the risks associated 
  21  # with loading,  using,  modifying and/or developing or reproducing the 
  22  # software by the user in light of its specific status of free software, 
  23  # that may mean  that it is complicated to manipulate,  and  that  also 
  24  # therefore means  that it is reserved for developers  and  experienced 
  25  # professionals having in-depth computer knowledge. Users are therefore 
  26  # encouraged to load and test the software's suitability as regards their 
  27  # requirements in conditions enabling the security of their systems and/or 
  28  # data to be ensured and,  more generally, to use and operate it in the 
  29  # same conditions as regards security. 
  30  # 
  31  # The fact that you are presently reading this means that you have had 
  32  # knowledge of the CeCILL-C license and that you accept its terms. 
  33  # 
  34  # $Id: clush.py 302 2010-07-23 19:59:20Z st-cea $ 
  35   
  36  """ 
  37  Utility program to run commands on a cluster using the ClusterShell 
  38  library. 
  39   
  40  clush is a pdsh-like command which benefits from the ClusterShell 
  41  library and its Ssh worker. It features an integrated output results 
  42  gathering system (dshbak-like), can get node groups by running 
  43  predefined external commands and can redirect lines read on its 
  44  standard input to the remote commands. 
  45   
  46  When no command are specified, clush runs interactively. 
  47   
  48  """ 
  49   
  50  import errno 
  51  import exceptions 
  52  import fcntl 
  53  import optparse 
  54  import os 
  55  import resource 
  56  import sys 
  57  import signal 
  58  import ConfigParser 
  59   
  60  from ClusterShell.NodeUtils import GroupResolverConfigError 
  61  from ClusterShell.NodeUtils import GroupResolverSourceError 
  62  from ClusterShell.NodeUtils import GroupSourceException 
  63  from ClusterShell.NodeUtils import GroupSourceNoUpcall 
  64  try: 
  65      from ClusterShell.Event import EventHandler 
  66      from ClusterShell.NodeSet import NodeSet 
  67      from ClusterShell.NodeSet import NOGROUP_RESOLVER, STD_GROUP_RESOLVER 
  68      from ClusterShell.NodeSet import NodeSetExternalError, NodeSetParseError 
  69      from ClusterShell.Task import Task, task_self 
  70      from ClusterShell.Worker.Worker import WorkerSimple 
  71      from ClusterShell import __version__ 
  72  except GroupResolverConfigError, e: 
  73      print >> sys.stderr, \ 
  74          "ERROR: ClusterShell Groups configuration error:\n\t%s" % e 
  75      sys.exit(1) 
  76   
  77  VERB_QUIET = 0 
  78  VERB_STD = 1 
  79  VERB_VERB = 2 
  80  VERB_DEBUG = 3 
  81   
  82  # Start of clubak.py common code 
  83   
  84  WHENCOLOR_CHOICES = ["never", "always", "auto"] 
  85   
86 -class Display(object):
87 """ 88 Output display class for clush script. 89 """ 90 COLOR_STDOUT_FMT = "\033[34m%s\033[0m" 91 COLOR_STDERR_FMT = "\033[31m%s\033[0m" 92 SEP = "-" * 15 93
94 - def __init__(self, color=True):
95 self._color = color 96 self._display = self._print_buffer 97 self.out = sys.stdout 98 self.err = sys.stderr 99 self.label = True 100 self.regroup = False 101 self.groupsource = None 102 if self._color: 103 self.color_stdout_fmt = self.COLOR_STDOUT_FMT 104 self.color_stderr_fmt = self.COLOR_STDERR_FMT 105 else: 106 self.color_stdout_fmt = self.color_stderr_fmt = "%s" 107 self.noprefix = False
108
109 - def _getlmode(self):
110 return self._display == self._print_lines
111
112 - def _setlmode(self, value):
113 if value: 114 self._display = self._print_lines 115 else: 116 self._display = self._print_buffer
117 line_mode = property(_getlmode, _setlmode) 118
119 - def _format_header(self, nodeset):
120 """Format nodeset-based header.""" 121 if self.regroup: 122 return nodeset.regroup(self.groupsource, noprefix=self.noprefix) 123 return str(nodeset)
124
125 - def print_line(self, nodeset, line):
126 """Display a line with optional label.""" 127 if self.label: 128 prefix = self.color_stdout_fmt % ("%s: " % nodeset) 129 self.out.write("%s%s\n" % (prefix, line)) 130 else: 131 self.out.write("%s\n" % line)
132
133 - def print_line_error(self, nodeset, line):
134 """Display an error line with optional label.""" 135 if self.label: 136 prefix = self.color_stderr_fmt % ("%s: " % nodeset) 137 self.err.write("%s%s\n" % (prefix, line)) 138 else: 139 self.err.write("%s\n" % line)
140
141 - def print_gather(self, nodeset, obj):
142 """Generic method for displaying nodeset/content according to current 143 object settings.""" 144 return self._display(nodeset, obj)
145
146 - def _print_buffer(self, nodeset, content):
147 """Display a dshbak-like header block and content.""" 148 header = self.color_stdout_fmt % ("%s\n%s\n%s\n" % (self.SEP, 149 self._format_header(nodeset), 150 self.SEP)) 151 self.out.write("%s%s\n" % (header, content))
152
153 - def _print_lines(self, nodeset, msg):
154 """Display a MsgTree buffer by line with prefixed header.""" 155 header = self.color_stdout_fmt % \ 156 ("%s: " % self._format_header(nodeset)) 157 for line in msg: 158 self.out.write("%s%s\n" % (header, line))
159
160 -def nodeset_cmp(ns1, ns2):
161 """Compare 2 nodesets by their length (we want larger nodeset 162 first) and then by first node.""" 163 len_cmp = cmp(len(ns2), len(ns1)) 164 if not len_cmp: 165 smaller = NodeSet.fromlist([ns1[0], ns2[0]])[0] 166 if smaller == ns1[0]: 167 return -1 168 else: 169 return 1 170 return len_cmp
171 172 # End of clubak.py common code 173
174 -def bufnodeset_cmp(bn1, bn2):
175 """Convenience function to compare 2 (buf, nodeset) tuples by their 176 nodeset length (we want larger nodeset first) and then by first 177 node.""" 178 # Extract nodesets and call nodeset_cmp 179 return nodeset_cmp(bn1[1], bn2[1])
180
181 -class UpdatePromptException(Exception):
182 """Exception used by the signal handler"""
183
184 -class StdInputHandler(EventHandler):
185 """Standard input event handler class.""" 186
187 - def __init__(self, worker):
188 self.master_worker = worker
189
190 - def ev_read(self, worker):
191 self.master_worker.write(worker.last_read() + '\n')
192
193 - def ev_close(self, worker):
194 self.master_worker.set_write_eof()
195
196 -class DirectOutputHandler(EventHandler):
197 """Direct output event handler class.""" 198
199 - def __init__(self, display):
200 self._display = display
201
202 - def ev_read(self, worker):
203 ns, buf = worker.last_read() 204 self._display.print_line(ns, buf)
205
206 - def ev_error(self, worker):
207 ns, buf = worker.last_error() 208 self._display.print_line_error(ns, buf)
209
210 - def ev_hup(self, worker):
211 if hasattr(worker, "last_retcode"): 212 ns, rc = worker.last_retcode() 213 else: 214 ns = "local" 215 rc = worker.retcode() 216 if rc > 0: 217 print >> sys.stderr, "clush: %s: exited with exit code %d" \ 218 % (ns, rc)
219
220 - def ev_timeout(self, worker):
221 print >> sys.stderr, "clush: %s: command timeout" % \ 222 NodeSet.fromlist(worker.iter_keys_timeout())
223
224 - def ev_close(self, worker):
225 # If needed, notify main thread to update its prompt by sending 226 # a SIGUSR1 signal. We use task-specific user-defined variable 227 # to record current states (prefixed by USER_). 228 worker.task.set_default("USER_running", False) 229 if worker.task.default("USER_handle_SIGUSR1"): 230 os.kill(os.getpid(), signal.SIGUSR1)
231
232 -class GatherOutputHandler(EventHandler):
233 """Gathered output event handler class.""" 234
235 - def __init__(self, display, runtimer, nodes):
236 self._display = display 237 self._runtimer = runtimer 238 # needed for -L enhancement: 239 self._nodes = NodeSet(nodes) 240 self._nodemap_base = dict.fromkeys(self._nodes, 0) 241 self._nodemap = self._nodemap_base.copy() 242 self._nodes_cnt = 0
243
244 - def ev_read(self, worker):
245 # Implementation of -bL "per line live gathering": 246 if not self._display.line_mode: 247 return 248 result = worker.last_read() 249 # keep counter and nodemap up-to-date 250 node = result[0] 251 self._nodes_cnt += 1 252 self._nodemap[node] += 1 253 self._live_line(worker)
254
255 - def ev_error(self, worker):
256 ns, buf = worker.last_error() 257 if self._runtimer: 258 self._runtimer.eh.erase_line() 259 self._display.print_line_error(ns, buf) 260 if self._runtimer: 261 # Force redisplay of counter 262 self._runtimer.eh.set_dirty()
263
264 - def ev_hup(self, worker):
265 if self._display.line_mode and self._nodemap[worker.last_node()] == 0: 266 # forget node that doesn't answer (used for live line gathering) 267 self._nodes.remove(worker.last_node()) 268 del self._nodemap[worker.last_node()] 269 del self._nodemap_base[worker.last_node()] 270 self._live_line(worker)
271
272 - def _live_line(self, worker):
273 # if all nodes replied 1 (or n) time(s), display gathered line(s) 274 if self._nodes and self._nodes_cnt % len(self._nodes) == 0 and \ 275 max(self._nodemap.itervalues()) == (self._nodes_cnt \ 276 / len(self._nodes)): 277 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1])) 278 for buf, nodeset in sorted(map(nodesetify, 279 worker.iter_buffers()), 280 cmp=bufnodeset_cmp): 281 self._display.print_gather(nodeset, buf) 282 # clear worker buffers, reset nodemap and node counter 283 worker.flush_buffers() 284 self._nodemap = self._nodemap_base.copy() 285 self._nodes_cnt = 0
286
287 - def ev_close(self, worker):
288 # Worker is closing -- it's time to gather results... 289 if self._runtimer: 290 self._runtimer.eh.finalize(worker.task.default("USER_interactive")) 291 292 # Display command output, try to order buffers by rc 293 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1])) 294 for rc, nodelist in worker.iter_retcodes(): 295 # Then order by node/nodeset (see bufnodeset_cmp) 296 for buf, nodeset in sorted(map(nodesetify, 297 worker.iter_buffers(nodelist)), 298 cmp=bufnodeset_cmp): 299 self._display.print_gather(nodeset, buf) 300 301 # Display return code if not ok ( != 0) 302 for rc, nodelist in worker.iter_retcodes(): 303 if rc != 0: 304 ns = NodeSet.fromlist(nodelist) 305 print >> sys.stderr, "clush: %s: exited with exit code %s" \ 306 % (ns, rc) 307 308 # Display nodes that didn't answer within command timeout delay 309 if worker.num_timeout() > 0: 310 print >> sys.stderr, "clush: %s: command timeout" % \ 311 NodeSet.fromlist(worker.iter_keys_timeout()) 312 313 # Notify main thread to update its prompt 314 worker.task.set_default("USER_running", False) 315 if worker.task.default("USER_handle_SIGUSR1"): 316 os.kill(os.getpid(), signal.SIGUSR1)
317
318 -class RunTimer(EventHandler):
319 - def __init__(self, task, total):
320 self.task = task 321 self.total = total 322 self.cnt_last = -1 323 self.tslen = len(str(self.total)) 324 self.wholelen = 0
325
326 - def ev_timer(self, timer):
327 self.update()
328
329 - def set_dirty(self):
330 self.cnt_last = -1
331
332 - def erase_line(self):
333 if self.wholelen: 334 sys.stderr.write(' ' * self.wholelen + '\r')
335
336 - def update(self):
337 cnt = len(self.task._engine.clients()) 338 if cnt != self.cnt_last: 339 self.cnt_last = cnt 340 # display completed/total clients 341 towrite = 'clush: %*d/%*d\r' % (self.tslen, self.total - cnt, 342 self.tslen, self.total) 343 self.wholelen = len(towrite) 344 sys.stderr.write(towrite)
345
346 - def finalize(self, cr):
347 # display completed/total clients 348 fmt = 'clush: %*d/%*d' 349 if cr: 350 fmt += '\n' 351 else: 352 fmt += '\r' 353 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
354
355 -class ClushConfigError(Exception):
356 """Exception used by ClushConfig to report an error.""" 357
358 - def __init__(self, section, option, msg):
359 Exception.__init__(self) 360 self.section = section 361 self.option = option 362 self.msg = msg
363
364 - def __str__(self):
365 return "(Config %s.%s): %s" % (self.section, self.option, self.msg)
366
367 -class ClushConfig(ConfigParser.ConfigParser):
368 """Config class for clush (specialized ConfigParser)""" 369 370 main_defaults = { "fanout" : "64", 371 "connect_timeout" : "30", 372 "command_timeout" : "0", 373 "history_size" : "100", 374 "color" : WHENCOLOR_CHOICES[0], 375 "verbosity" : "%d" % VERB_STD } 376
377 - def __init__(self):
378 ConfigParser.ConfigParser.__init__(self) 379 # create Main section with default values 380 self.add_section("Main") 381 for key, value in ClushConfig.main_defaults.iteritems(): 382 self.set("Main", key, value) 383 # config files override defaults values 384 self.read(['/etc/clustershell/clush.conf', 385 os.path.expanduser('~/.clush.conf')])
386
387 - def verbose_print(self, level, message):
388 if self.get_verbosity() >= level: 389 print message
390
391 - def max_fdlimit(self):
392 """Make open file descriptors soft limit the max.""" 393 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) 394 if soft < hard: 395 self.verbose_print(VERB_DEBUG, "Setting max soft limit " 396 "RLIMIT_NOFILE: %d -> %d" % (soft, hard)) 397 resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard)) 398 else: 399 self.verbose_print(VERB_DEBUG, "Soft limit RLIMIT_NOFILE already " 400 "set to the max (%d)" % soft)
401
402 - def set_main(self, option, value):
403 self.set("Main", option, str(value))
404
405 - def getint(self, section, option):
406 try: 407 return ConfigParser.ConfigParser.getint(self, section, option) 408 except (ConfigParser.Error, TypeError, ValueError), e: 409 raise ClushConfigError(section, option, e)
410
411 - def getfloat(self, section, option):
412 try: 413 return ConfigParser.ConfigParser.getfloat(self, section, option) 414 except (ConfigParser.Error, TypeError, ValueError), e: 415 raise ClushConfigError(section, option, e)
416
417 - def _get_optional(self, section, option):
418 try: 419 return self.get(section, option) 420 except ConfigParser.Error, e: 421 pass
422
423 - def get_color(self):
424 whencolor = self._get_optional("Main", "color") 425 if whencolor not in WHENCOLOR_CHOICES: 426 raise ClushConfigError("Main", "color", "choose from %s" % \ 427 WHENCOLOR_CHOICES) 428 return whencolor
429
430 - def get_verbosity(self):
431 try: 432 return self.getint("Main", "verbosity") 433 except ClushConfigError: 434 return 0
435
436 - def get_fanout(self):
437 return self.getint("Main", "fanout")
438
439 - def get_connect_timeout(self):
440 return self.getfloat("Main", "connect_timeout")
441
442 - def get_command_timeout(self):
443 return self.getfloat("Main", "command_timeout")
444
445 - def get_ssh_user(self):
446 return self._get_optional("Main", "ssh_user")
447
448 - def get_ssh_path(self):
449 return self._get_optional("Main", "ssh_path")
450
451 - def get_ssh_options(self):
452 return self._get_optional("Main", "ssh_options")
453 454
455 -def signal_handler(signum, frame):
456 """Signal handler used for main thread notification""" 457 if signum == signal.SIGUSR1: 458 raise UpdatePromptException()
459
460 -def get_history_file():
461 """Turn the history file path""" 462 return os.path.join(os.environ["HOME"], ".clush_history")
463
464 -def readline_setup():
465 """ 466 Configure readline to automatically load and save a history file 467 named .clush_history 468 """ 469 import readline 470 readline.parse_and_bind("tab: complete") 471 readline.set_completer_delims("") 472 try: 473 readline.read_history_file(get_history_file()) 474 except IOError: 475 pass
476
477 -def ttyloop(task, nodeset, gather, timeout, verbosity, display):
478 """Manage the interactive prompt to run command""" 479 readline_avail = False 480 if task.default("USER_interactive"): 481 assert sys.stdin.isatty() 482 try: 483 import readline 484 readline_setup() 485 readline_avail = True 486 except ImportError: 487 pass 488 if verbosity >= VERB_STD: 489 print "Enter 'quit' to leave this interactive mode" 490 491 rc = 0 492 ns = NodeSet(nodeset) 493 ns_info = True 494 cmd = "" 495 while task.default("USER_running") or cmd.lower() != 'quit': 496 try: 497 if task.default("USER_interactive") and \ 498 not task.default("USER_running"): 499 if ns_info: 500 print "Working with nodes: %s" % ns 501 ns_info = False 502 prompt = "clush> " 503 else: 504 prompt = "" 505 cmd = raw_input(prompt) 506 except EOFError: 507 print 508 return 509 except UpdatePromptException: 510 if task.default("USER_interactive"): 511 continue 512 return 513 except KeyboardInterrupt, e: 514 signal.signal(signal.SIGUSR1, signal.SIG_IGN) 515 if gather: 516 # Suspend task, so we can safely access its data from 517 # the main thread 518 task.suspend() 519 520 print_warn = False 521 522 # Display command output, but cannot order buffers by rc 523 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1])) 524 for buf, nodeset in sorted(map(nodesetify, task.iter_buffers()), 525 cmp=bufnodeset_cmp): 526 if not print_warn: 527 print_warn = True 528 print >> sys.stderr, "Warning: Caught keyboard interrupt!" 529 display.print_gather(nodeset, buf) 530 531 # Return code handling 532 ns_ok = NodeSet() 533 for rc, nodelist in task.iter_retcodes(): 534 ns_ok.add(NodeSet.fromlist(nodelist)) 535 if rc != 0: 536 # Display return code if not ok ( != 0) 537 ns = NodeSet.fromlist(nodelist) 538 print >> sys.stderr, \ 539 "clush: %s: exited with exit code %s" % (ns, rc) 540 # Add uncompleted nodeset to exception object 541 e.uncompleted_nodes = ns - ns_ok 542 543 # Display nodes that didn't answer within command timeout delay 544 if task.num_timeout() > 0: 545 print >> sys.stderr, "clush: %s: command timeout" % \ 546 NodeSet.fromlist(task.iter_keys_timeout()) 547 raise e 548 549 if task.default("USER_running"): 550 ns_reg, ns_unreg = NodeSet(), NodeSet() 551 for c in task._engine.clients(): 552 if c.registered: 553 ns_reg.add(c.key) 554 else: 555 ns_unreg.add(c.key) 556 if ns_unreg: 557 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg) 558 else: 559 pending = "" 560 print >> sys.stderr, "clush: interrupt (^C to abort task)\n" \ 561 "clush: in progress(%d): %s%s" % (len(ns_reg), ns_reg, pending) 562 else: 563 cmdl = cmd.lower() 564 try: 565 ns_info = True 566 if cmdl.startswith('+'): 567 ns.update(cmdl[1:]) 568 elif cmdl.startswith('-'): 569 ns.difference_update(cmdl[1:]) 570 elif cmdl.startswith('@'): 571 ns = NodeSet(cmdl[1:]) 572 elif cmdl == '=': 573 gather = not gather 574 if verbosity >= VERB_STD: 575 if gather: 576 print "Switching to gathered output format" 577 else: 578 print "Switching to standard output format" 579 ns_info = False 580 continue 581 elif not cmdl.startswith('?'): # if ?, just print ns_info 582 ns_info = False 583 except NodeSetParseError: 584 print >> sys.stderr, "clush: nodeset parse error (ignoring)" 585 586 if ns_info: 587 continue 588 589 if cmdl.startswith('!') and len(cmd.strip()) > 0: 590 run_command(task, cmd[1:], None, gather, timeout, None, 591 verbosity, display) 592 elif cmdl != "quit": 593 if not cmd: 594 continue 595 if readline_avail: 596 readline.write_history_file(get_history_file()) 597 run_command(task, cmd, ns, gather, timeout, verbosity, display) 598 return rc
599
600 -def bind_stdin(worker):
601 """Create a ClusterShell stdin-reader worker bound to specified 602 worker.""" 603 assert not sys.stdin.isatty() 604 # Switch stdin to non blocking mode 605 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, \ 606 fcntl.fcntl(sys.stdin, fcntl.F_GETFL) | os.O_NDELAY) 607 608 # Create a simple worker attached to stdin in autoclose mode 609 worker_stdin = WorkerSimple(sys.stdin, None, None, None, 610 handler=StdInputHandler(worker), timeout=-1, autoclose=True) 611 612 # Add stdin worker to the same task than given worker 613 worker.task.schedule(worker_stdin)
614
615 -def run_command(task, cmd, ns, gather, timeout, verbosity, display):
616 """ 617 Create and run the specified command line, displaying 618 results in a dshbak way when gathering is used. 619 """ 620 task.set_default("USER_running", True) 621 622 if gather: 623 runtimer = None 624 if verbosity == VERB_STD or verbosity == VERB_VERB: 625 # Create a ClusterShell timer used to display in live the 626 # number of completed commands 627 runtimer = task.timer(2.0, RunTimer(task, len(ns)), interval=1./3., 628 autoclose=True) 629 worker = task.shell(cmd, nodes=ns, 630 handler=GatherOutputHandler(display, runtimer, 631 ns), timeout=timeout) 632 else: 633 worker = task.shell(cmd, nodes=ns, 634 handler=DirectOutputHandler(display), 635 timeout=timeout) 636 637 if task.default("USER_stdin_worker"): 638 bind_stdin(worker) 639 640 task.resume()
641
642 -def run_copy(task, source, dest, ns, timeout, preserve_flag, display):
643 """ 644 run copy command 645 """ 646 task.set_default("USER_running", True) 647 648 # Source check 649 if not os.path.exists(source): 650 print >> sys.stderr, "ERROR: file \"%s\" not found" % source 651 clush_exit(1) 652 653 task.copy(source, dest, ns, handler=DirectOutputHandler(display), 654 timeout=timeout, preserve=preserve_flag) 655 656 task.resume()
657
658 -def clush_exit(status):
659 # Flush stdio buffers 660 for stream in [sys.stdout, sys.stderr]: 661 stream.flush() 662 # Use os._exit to avoid threads cleanup 663 os._exit(status)
664
665 -def clush_excepthook(type, value, traceback):
666 """Exceptions hook for clush: this method centralizes exception 667 handling from main thread and from (possible) separate task thread. 668 This hook has to be previously installed on startup by overriding 669 sys.excepthook and task.excepthook.""" 670 try: 671 raise type, value 672 except ClushConfigError, e: 673 print >> sys.stderr, "ERROR: %s" % e 674 except NodeSetExternalError, e: 675 print >> sys.stderr, "clush: external error:", e 676 except NodeSetParseError, e: 677 print >> sys.stderr, "clush: parse error:", e 678 except GroupResolverSourceError, e: 679 print >> sys.stderr, "ERROR: unknown group source: \"%s\"" % e 680 except GroupSourceNoUpcall, e: 681 print >> sys.stderr, "ERROR: no %s upcall defined for group " \ 682 "source \"%s\"" % (e, e.group_source.name) 683 except GroupSourceException, e: 684 print >> sys.stderr, "ERROR: other group error:", e 685 except IOError: 686 # Ignore broken pipe 687 pass 688 except KeyboardInterrupt, e: 689 uncomp_nodes = getattr(e, 'uncompleted_nodes', None) 690 if uncomp_nodes: 691 print >> sys.stderr, "Keyboard interrupt (%s did not complete)." \ 692 % uncomp_nodes 693 else: 694 print >> sys.stderr, "Keyboard interrupt." 695 clush_exit(128 + signal.SIGINT) 696 except OSError, value: 697 print >> sys.stderr, "ERROR: %s" % value 698 if value.errno == errno.EMFILE: 699 print >> sys.stderr, "ERROR: current `nofile' limits: " \ 700 "soft=%d hard=%d" % resource.getrlimit(resource.RLIMIT_NOFILE) 701 except: 702 # Not handled 703 task_self().default_excepthook(type, value, traceback) 704 # Exit with error code 1 (generic failure) 705 clush_exit(1)
706
707 -def clush_main(args):
708 """Main clush script function""" 709 sys.excepthook = clush_excepthook 710 711 # Default values 712 nodeset_base, nodeset_exclude = NodeSet(), NodeSet() 713 714 # 715 # Argument management 716 # 717 usage = "%prog [options] command" 718 719 parser = optparse.OptionParser(usage, version="%%prog %s" % __version__) 720 parser.disable_interspersed_args() 721 722 parser.add_option("--nostdin", action="store_true", dest="nostdin", 723 help="don't watch for possible input from stdin") 724 725 # Node selections 726 optgrp = optparse.OptionGroup(parser, "Selecting target nodes") 727 optgrp.add_option("-w", action="append", dest="nodes", 728 help="nodes where to run the command") 729 optgrp.add_option("-x", action="append", dest="exclude", 730 help="exclude nodes from the node list") 731 optgrp.add_option("-a", "--all", action="store_true", dest="nodes_all", 732 help="run command on all nodes") 733 optgrp.add_option("-g", "--group", action="append", dest="group", 734 help="run command on a group of nodes") 735 optgrp.add_option("-X", action="append", dest="exgroup", 736 help="exclude nodes from this group") 737 parser.add_option_group(optgrp) 738 739 # Output behaviour 740 optgrp = optparse.OptionGroup(parser, "Output behaviour") 741 optgrp.add_option("-q", "--quiet", action="store_true", dest="quiet", 742 help="be quiet, print essential output only") 743 optgrp.add_option("-v", "--verbose", action="store_true", dest="verbose", 744 help="be verbose, print informative messages") 745 optgrp.add_option("-d", "--debug", action="store_true", dest="debug", 746 help="output more messages for debugging purpose") 747 748 optgrp.add_option("-G", "--groupbase", action="store_true", 749 dest="groupbase", default=False, help="do not display " \ 750 "group source prefix") 751 optgrp.add_option("-L", action="store_true", dest="line_mode", 752 default=False, help="disable header block and order " \ 753 "output by nodes") 754 optgrp.add_option("-N", action="store_false", dest="label", default=True, 755 help="disable labeling of command line") 756 optgrp.add_option("-S", action="store_true", dest="maxrc", 757 help="return the largest of command return codes") 758 optgrp.add_option("-b", "--dshbak", action="store_true", dest="gather", 759 default=False, help="display gathered results in a " \ 760 "dshbak-like way") 761 optgrp.add_option("-B", action="store_true", dest="gatherall", 762 default=False, help="like -b but including standard " \ 763 "error") 764 optgrp.add_option("-r", "--regroup", action="store_true", dest="regroup", 765 default=False, help="fold nodeset using node groups") 766 optgrp.add_option("-s", "--groupsource", action="store", 767 dest="groupsource", help="optional groups.conf(5) " \ 768 "group source to use") 769 parser.add_option_group(optgrp) 770 optgrp.add_option("--color", action="store", dest="whencolor", 771 choices=WHENCOLOR_CHOICES, 772 help="whether to use ANSI colors (never, always or auto)") 773 parser.add_option_group(optgrp) 774 775 # Copy 776 optgrp = optparse.OptionGroup(parser, "File copying") 777 optgrp.add_option("-c", "--copy", action="store", dest="source_path", 778 help="copy local file or directory to the nodes") 779 optgrp.add_option("--dest", action="store", dest="dest_path", 780 help="destination file or directory on the nodes") 781 optgrp.add_option("-p", action="store_true", dest="preserve_flag", 782 help="preserve modification times and modes") 783 parser.add_option_group(optgrp) 784 785 # Ssh options 786 optgrp = optparse.OptionGroup(parser, "Ssh options") 787 optgrp.add_option("-f", "--fanout", action="store", dest="fanout", 788 help="use a specified fanout", type="int") 789 optgrp.add_option("-l", "--user", action="store", dest="user", 790 help="execute remote command as user") 791 optgrp.add_option("-o", "--options", action="store", dest="options", 792 help="can be used to give ssh options") 793 optgrp.add_option("-t", "--connect_timeout", action="store", 794 dest="connect_timeout", help="limit time to connect to " \ 795 "a node" ,type="float") 796 optgrp.add_option("-u", "--command_timeout", action="store", dest="command_timeout", 797 help="limit time for command to run on the node", type="float") 798 parser.add_option_group(optgrp) 799 800 (options, args) = parser.parse_args() 801 802 # 803 # Load config file 804 # 805 config = ClushConfig() 806 807 # Apply command line overrides 808 if options.quiet: 809 config.set_main("verbosity", VERB_QUIET) 810 if options.verbose: 811 config.set_main("verbosity", VERB_VERB) 812 if options.debug: 813 config.set_main("verbosity", VERB_DEBUG) 814 if options.fanout: 815 config.set_main("fanout", options.fanout) 816 if options.user: 817 config.set_main("ssh_user", options.user) 818 if options.options: 819 config.set_main("ssh_options", options.options) 820 if options.connect_timeout: 821 config.set_main("connect_timeout", options.connect_timeout) 822 if options.command_timeout: 823 config.set_main("command_timeout", options.command_timeout) 824 825 # 826 # Compute the nodeset 827 # 828 if options.nodes: 829 nodeset_base = NodeSet.fromlist(options.nodes) 830 if options.exclude: 831 nodeset_exclude = NodeSet.fromlist(options.exclude) 832 833 if options.groupsource: 834 # Be sure -a/g -s source work as espected. 835 STD_GROUP_RESOLVER.default_sourcename = options.groupsource 836 837 # Do we have nodes group? 838 task = task_self() 839 task.set_info("debug", config.get_verbosity() > 1) 840 if config.get_verbosity() > 1: 841 STD_GROUP_RESOLVER.set_verbosity(1) 842 if options.nodes_all: 843 all_nodeset = NodeSet.fromall() 844 config.verbose_print(VERB_DEBUG, \ 845 "Adding nodes from option -a: %s" % all_nodeset) 846 nodeset_base.add(all_nodeset) 847 848 if options.group: 849 grp_nodeset = NodeSet.fromlist(options.group, 850 resolver=NOGROUP_RESOLVER) 851 for grp in grp_nodeset: 852 addingrp = NodeSet("@" + grp) 853 config.verbose_print(VERB_DEBUG, \ 854 "Adding nodes from option -g %s: %s" % (grp, addingrp)) 855 nodeset_base.update(addingrp) 856 857 if options.exgroup: 858 grp_nodeset = NodeSet.fromlist(options.exgroup, 859 resolver=NOGROUP_RESOLVER) 860 for grp in grp_nodeset: 861 removingrp = NodeSet("@" + grp) 862 config.verbose_print(VERB_DEBUG, \ 863 "Excluding nodes from option -X %s: %s" % (grp, removingrp)) 864 nodeset_exclude.update(removingrp) 865 866 # Do we have an exclude list? (-x ...) 867 nodeset_base.difference_update(nodeset_exclude) 868 if len(nodeset_base) < 1: 869 parser.error('No node to run on.') 870 871 config.verbose_print(VERB_DEBUG, "Final NodeSet: %s" % nodeset_base) 872 873 # Make soft fd limit the max. 874 config.max_fdlimit() 875 876 # 877 # Task management 878 # 879 interactive = not len(args) and not options.source_path 880 if options.nostdin and interactive: 881 parser.error("illegal option `--nostdin' in interactive mode") 882 883 user_interaction = not options.nostdin and sys.stdin.isatty() and \ 884 sys.stdout.isatty() 885 config.verbose_print(VERB_DEBUG, "User interaction: %s" % user_interaction) 886 if user_interaction: 887 # Standard input is a terminal and we want to perform some user 888 # interactions in the main thread (using blocking calls), so 889 # we run cluster commands in a new ClusterShell Task (a new 890 # thread is created). 891 task = Task() 892 signal.signal(signal.SIGUSR1, signal_handler) 893 task.set_default("USER_handle_SIGUSR1", True) 894 else: 895 # Perform everything in main thread. 896 task.set_default("USER_handle_SIGUSR1", False) 897 898 task.excepthook = sys.excepthook 899 task.set_default("USER_stdin_worker", not (sys.stdin.isatty() or 900 options.nostdin)) 901 config.verbose_print(VERB_DEBUG, "Create STDIN worker: %s" % \ 902 task.default("USER_stdin_worker")) 903 904 task.set_info("debug", config.get_verbosity() >= VERB_DEBUG) 905 task.set_info("fanout", config.get_fanout()) 906 907 ssh_user = config.get_ssh_user() 908 if ssh_user: 909 task.set_info("ssh_user", ssh_user) 910 ssh_path = config.get_ssh_path() 911 if ssh_path: 912 task.set_info("ssh_path", ssh_path) 913 ssh_options = config.get_ssh_options() 914 if ssh_options: 915 task.set_info("ssh_options", ssh_options) 916 917 # Set detailed timeout values 918 connect_timeout = config.get_connect_timeout() 919 task.set_info("connect_timeout", connect_timeout) 920 command_timeout = config.get_command_timeout() 921 task.set_info("command_timeout", command_timeout) 922 923 gather = options.gatherall or options.gather 924 # Enable stdout/stderr separation 925 task.set_default("stderr", not options.gatherall) 926 927 # Disable MsgTree buffering if not gathering outputs 928 task.set_default("stdout_msgtree", gather) 929 # Always disable stderr MsgTree buffering 930 task.set_default("stderr_msgtree", False) 931 932 # Set timeout at worker level when command_timeout is defined. 933 if command_timeout > 0: 934 timeout = command_timeout 935 else: 936 timeout = -1 937 938 # Configure task custom status 939 task.set_default("USER_interactive", interactive) 940 task.set_default("USER_running", False) 941 942 if options.source_path: 943 if not options.dest_path: 944 options.dest_path = os.path.dirname(options.source_path) 945 op = "copy source=%s dest=%s" % (options.source_path, options.dest_path) 946 else: 947 op = "command=\"%s\"" % ' '.join(args) 948 949 # print debug values (fanout value is get from the config object and not task 950 # itself as set_info() is an asynchronous call. 951 config.verbose_print(VERB_VERB, "clush: nodeset=%s fanout=%d [timeout conn=%.1f " \ 952 "cmd=%.1f] %s" % (nodeset_base, config.get_fanout(), 953 task.info("connect_timeout"), 954 task.info("command_timeout"), op)) 955 956 # Should we use ANSI colors for nodes? 957 if not options.whencolor: 958 options.whencolor = config.get_color() 959 if options.whencolor == "auto": 960 color = sys.stdout.isatty() and (options.gatherall or \ 961 sys.stderr.isatty()) 962 else: 963 color = options.whencolor == "always" 964 965 # Create and configure display object. 966 display = Display(color) 967 display.line_mode = options.line_mode 968 display.label = options.label 969 display.regroup = options.regroup 970 display.groupsource = options.groupsource 971 display.noprefix = options.groupbase 972 973 if not task.default("USER_interactive"): 974 if options.source_path: 975 if not args: 976 run_copy(task, options.source_path, options.dest_path, 977 nodeset_base, 0, options.preserve_flag, display) 978 else: 979 parser.error("please use `--dest' to specify a different " \ 980 "destination") 981 else: 982 run_command(task, ' '.join(args), nodeset_base, gather, timeout, 983 config.get_verbosity(), display) 984 985 if user_interaction: 986 ttyloop(task, nodeset_base, gather, timeout, config.get_verbosity(), display) 987 elif task.default("USER_interactive"): 988 print >> sys.stderr, "ERROR: interactive mode requires a tty" 989 clush_exit(1) 990 991 rc = 0 992 if options.maxrc: 993 # Instead of clush return code, return commands retcode 994 rc = task.max_retcode() 995 if task.num_timeout() > 0: 996 rc = 255 997 clush_exit(rc)
998 999 if __name__ == '__main__': 1000 clush_main(sys.argv) 1001