Module clush
[hide private]
[frames] | no frames]

Source Code for Module clush

  1  #!/usr/bin/env python 
  2  # 
  3  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  4  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  5  # 
  6  # This file is part of the ClusterShell library. 
  7  # 
  8  # This software is governed by the CeCILL-C license under French law and 
  9  # abiding by the rules of distribution of free software.  You can  use, 
 10  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 11  # license as circulated by CEA, CNRS and INRIA at the following URL 
 12  # "http://www.cecill.info". 
 13  # 
 14  # As a counterpart to the access to the source code and  rights to copy, 
 15  # modify and redistribute granted by the license, users are provided only 
 16  # with a limited warranty  and the software's author,  the holder of the 
 17  # economic rights,  and the successive licensors  have only  limited 
 18  # liability. 
 19  # 
 20  # In this respect, the user's attention is drawn to the risks associated 
 21  # with loading,  using,  modifying and/or developing or reproducing the 
 22  # software by the user in light of its specific status of free software, 
 23  # that may mean  that it is complicated to manipulate,  and  that  also 
 24  # therefore means  that it is reserved for developers  and  experienced 
 25  # professionals having in-depth computer knowledge. Users are therefore 
 26  # encouraged to load and test the software's suitability as regards their 
 27  # requirements in conditions enabling the security of their systems and/or 
 28  # data to be ensured and,  more generally, to use and operate it in the 
 29  # same conditions as regards security. 
 30  # 
 31  # The fact that you are presently reading this means that you have had 
 32  # knowledge of the CeCILL-C license and that you accept its terms. 
 33  # 
 34  # $Id: clush.py 164 2009-11-03 13:58:33Z st-cea $ 
 35   
 36  """ 
 37  Utility program to run commands on a cluster using the ClusterShell library. 
 38   
 39  clush is a pdsh-like command which benefits from the ClusterShell library 
 40  and its Ssh worker. It features an integrated output results gathering 
 41  system (dshbak-like), can get node groups by running predefined external 
 42  commands and can redirect lines read on its standard input to the remote 
 43  commands. 
 44   
 45  When no command are specified, clush runs interactively. 
 46   
 47  """ 
 48   
 49  import fcntl 
 50  import optparse 
 51  import os 
 52  import sys 
 53  import signal 
 54  import thread 
 55  import ConfigParser 
 56   
 57  sys.path.insert(0, '../lib') 
 58   
 59  import warnings 
 60  warnings.simplefilter('ignore', DeprecationWarning) 
 61   
 62  from ClusterShell.Event import EventHandler 
 63  from ClusterShell.NodeSet import NodeSet, NodeSetParseError 
 64  from ClusterShell.Task import Task, task_self 
 65  from ClusterShell.Worker.Worker import WorkerSimple 
 66  from ClusterShell import __version__ 
 67   
 68  VERB_QUIET = 0 
 69  VERB_STD = 1 
 70  VERB_VERB = 2 
 71  VERB_DEBUG = 3 
 72   
73 -class UpdatePromptException(Exception):
74 """Exception used by the signal handler"""
75
76 -class StdInputHandler(EventHandler):
77 """Standard input event handler class.""" 78
79 - def __init__(self, worker):
80 self.master_worker = worker
81
82 - def ev_read(self, worker):
83 self.master_worker.write(worker.last_read() + '\n')
84
85 - def ev_close(self, worker):
86 self.master_worker.set_write_eof()
87
88 -class DirectOutputHandler(EventHandler):
89 """Direct output event handler class.""" 90
91 - def __init__(self, label=None):
92 self._label = label
93
94 - def ev_read(self, worker):
95 t = worker.last_read() 96 if type(t) is tuple: 97 ns, buf = worker.last_read() 98 else: 99 buf = worker.last_read() 100 if self._label: 101 print "%s: %s" % (ns, buf) 102 else: 103 print "%s" % buf
104
105 - def ev_hup(self, worker):
106 if hasattr(worker, "last_retcode"): 107 ns, rc = worker.last_retcode() 108 else: 109 ns = "local" 110 rc = worker.retcode() 111 if rc > 0: 112 print >>sys.stderr, "clush: %s: exited with exit code %d" % (ns, rc)
113
114 - def ev_timeout(self, worker):
115 print >>sys.stderr, "clush: %s: command timeout" % \ 116 NodeSet.fromlist(worker.iter_keys_timeout())
117
118 - def ev_close(self, worker):
119 # Notify main thread to update its prompt 120 worker.task.set_info("USER_running", False) 121 if worker.task.info("USER_handle_SIGHUP"): 122 os.kill(os.getpid(), signal.SIGHUP)
123
124 -class GatherOutputHandler(EventHandler):
125 """Gathered output event handler class.""" 126
127 - def __init__(self, runtimer):
128 self.runtimer = runtimer
129
130 - def ev_close(self, worker):
131 # Worker is closing -- it's time to gather results... 132 if self.runtimer: 133 self.runtimer.eh.finalize(worker.task.info("USER_interactive")) 134 135 # Display command output, try to order buffers by rc 136 for rc, nodeset in worker.iter_retcodes(): 137 for buffer, nodeset in worker.iter_buffers(nodeset): 138 print "-" * 15 139 print NodeSet.fromlist(nodeset) 140 print "-" * 15 141 print buffer 142 143 # Display return code if not ok ( != 0) 144 for rc, nodeset in worker.iter_retcodes(): 145 if rc != 0: 146 ns = NodeSet.fromlist(nodeset) 147 print "clush: %s: exited with exit code %s" % (ns, rc) 148 149 # Display nodes that didn't answer within command timeout delay 150 if worker.num_timeout() > 0: 151 print >>sys.stderr, "clush: %s: command timeout" % \ 152 NodeSet.fromlist(worker.iter_keys_timeout()) 153 154 # Notify main thread to update its prompt 155 worker.task.set_info("USER_running", False) 156 if worker.task.info("USER_handle_SIGHUP"): 157 os.kill(os.getpid(), signal.SIGHUP)
158
159 -class RunTimer(EventHandler):
160 - def __init__(self, task, total):
161 self.task = task 162 self.total = total 163 self.cnt_last = -1 164 self.tslen = len(str(self.total))
165
166 - def ev_timer(self, timer):
167 self.update()
168
169 - def update(self):
170 clients = self.task._engine.clients() 171 cnt = len(clients) 172 if cnt != self.cnt_last: 173 self.cnt_last = cnt 174 # display completed/total clients 175 sys.stderr.write('clush: %*d/%*d\r' % (self.tslen, self.total - cnt, 176 self.tslen, self.total))
177
178 - def finalize(self, cr):
179 # display completed/total clients 180 fmt = 'clush: %*d/%*d' 181 if cr: 182 fmt += '\n' 183 else: 184 fmt += '\r' 185 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
186
187 -class ClushConfigError(Exception):
188 """Exception used by the signal handler""" 189
190 - def __init__(self, section, option, msg):
191 self.section = section 192 self.option = option 193 self.msg = msg
194
195 - def __str__(self):
196 return "ERROR (Config %s.%s): %s" % (self.section, self.option, self.msg)
197
198 -class ClushConfig(ConfigParser.ConfigParser):
199 """Config class for clush (specialized ConfigParser)""" 200 201 defaults = { "fanout" : "64", 202 "connect_timeout" : "30", 203 "command_timeout" : "0", 204 "history_size" : "100", 205 "verbosity" : "%d" % VERB_STD } 206
207 - def __init__(self, overrides):
208 ConfigParser.ConfigParser.__init__(self, ClushConfig.defaults) 209 self.read(['/etc/clustershell/clush.conf', os.path.expanduser('~/.clush.conf')]) 210 if not self.has_section("Main"): 211 self.add_section("Main")
212
213 - def verbose_print(self, level, message):
214 if self.get_verbosity() >= level: 215 print message
216
217 - def set_main(self, option, value):
218 self.set("Main", option, str(value))
219
220 - def getint(self, section, option):
221 try: 222 return ConfigParser.ConfigParser.getint(self, section, option) 223 except (ConfigParser.Error, TypeError, ValueError), e: 224 raise ClushConfigError(section, option, e)
225
226 - def getfloat(self, section, option):
227 try: 228 return ConfigParser.ConfigParser.getfloat(self, section, option) 229 except (ConfigParser.Error, TypeError, ValueError), e: 230 raise ClushConfigError(section, option, e)
231
232 - def _get_optional(self, section, option):
233 try: 234 return self.get(section, option) 235 except ConfigParser.Error, e: 236 pass
237
238 - def get_verbosity(self):
239 try: 240 return self.getint("Main", "verbosity") 241 except ClushConfigError: 242 return 0
243
244 - def get_fanout(self):
245 return self.getint("Main", "fanout")
246
247 - def get_connect_timeout(self):
248 return self.getfloat("Main", "connect_timeout")
249
250 - def get_command_timeout(self):
251 return self.getfloat("Main", "command_timeout")
252
253 - def get_ssh_user(self):
254 return self._get_optional("Main", "ssh_user")
255
256 - def get_ssh_path(self):
257 return self._get_optional("Main", "ssh_path")
258
259 - def get_ssh_options(self):
260 return self._get_optional("Main", "ssh_options")
261
262 - def get_nodes_all_command(self):
263 section = "External" 264 option = "nodes_all" 265 try: 266 return self.get(section, option) 267 except ConfigParser.Error, e: 268 raise ClushConfigError(section, option, e)
269
270 - def get_nodes_group_command(self, group):
271 section = "External" 272 option = "nodes_group" 273 try: 274 return self.get(section, option, 0, { "group" : group }) 275 except ConfigParser.Error, e: 276 raise ClushConfigError(section, option, e)
277 278
279 -def signal_handler(signum, frame):
280 """Signal handler used for main thread notification""" 281 if signum == signal.SIGHUP: 282 raise UpdatePromptException()
283
284 -def get_history_file():
285 """Turn the history file path""" 286 return os.path.join(os.environ["HOME"], ".clush_history")
287
288 -def readline_setup():
289 """ 290 Configure readline to automatically load and save a history file 291 named .clush_history 292 """ 293 import readline 294 readline.parse_and_bind("tab: complete") 295 readline.set_completer_delims("") 296 try: 297 readline.read_history_file(get_history_file()) 298 except IOError: 299 pass
300
301 -def ttyloop(task, nodeset, gather, timeout, label, verbosity):
302 """Manage the interactive prompt to run command""" 303 has_readline = False 304 if task.info("USER_interactive"): 305 assert sys.stdin.isatty() 306 readline_avail = False 307 try: 308 import readline 309 readline_setup() 310 readline_avail = True 311 except ImportError: 312 pass 313 if verbosity >= VERB_STD: 314 print "Enter 'quit' to leave this interactive mode" 315 316 rc = 0 317 ns = NodeSet(nodeset) 318 ns_info = True 319 cmd = "" 320 while task.info("USER_running") or cmd.lower() != 'quit': 321 try: 322 if task.info("USER_interactive") and not task.info("USER_running"): 323 if ns_info: 324 print "Working with nodes: %s" % ns 325 ns_info = False 326 prompt = "clush> " 327 else: 328 prompt = "" 329 if sys.version_info[:3] >= (2,4,0): 330 cmd = raw_input(prompt) 331 else: 332 # raw_input defers signals in Python 2.3, (bugs #685846 333 # and #706406) and is not interruptible, so we provide 334 # a quick workaround here (won't have readline support). 335 if prompt: 336 sys.stdout.write("\r%s" % prompt) 337 cmd = sys.stdin.readline()[:-1] 338 except EOFError: 339 print 340 return 341 except UpdatePromptException: 342 if task.info("USER_interactive"): 343 continue 344 return 345 if task.info("USER_running"): 346 ns_reg, ns_unreg = NodeSet(), NodeSet() 347 for c in task._engine.clients(): 348 if c.registered: 349 ns_reg.add(c.key) 350 else: 351 ns_unreg.add(c.key) 352 if ns_unreg: 353 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg) 354 else: 355 pending = "" 356 print >>sys.stderr, "clush: interrupt (^C to abort task)\n" \ 357 "clush: in progress(%d): %s%s" % (len(ns_reg), ns_reg, pending) 358 else: 359 cmdl = cmd.lower() 360 try: 361 ns_info = True 362 if cmdl.startswith('+'): 363 ns.update(cmdl[1:]) 364 elif cmdl.startswith('-'): 365 ns.difference_update(cmdl[1:]) 366 elif cmdl.startswith('@'): 367 ns = NodeSet(cmdl[1:]) 368 elif cmdl == '=': 369 gather = not gather 370 if verbosity >= VERB_STD: 371 if gather: 372 print "Switching to gathered output format" 373 else: 374 print "Switching to standard output format" 375 ns_info = False 376 continue 377 elif not cmdl.startswith('?'): # if ?, just print ns_info 378 ns_info = False 379 except NodeSetParseError: 380 print >>sys.stderr, "clush: nodeset parse error (ignoring)" 381 382 if ns_info: 383 continue 384 385 if cmdl.startswith('!'): 386 run_command(task, cmd[1:], None, gather, timeout, None, verbosity) 387 elif cmdl != "quit": 388 if not cmd: 389 continue 390 if readline_avail: 391 readline.write_history_file(get_history_file()) 392 run_command(task, cmd, ns, gather, timeout, label, verbosity) 393 return rc
394
395 -def bind_stdin(worker):
396 assert not sys.stdin.isatty() 397 # Switch stdin to non blocking mode 398 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, os.O_NDELAY) 399 400 # Create a simple worker attached to stdin in autoclose mode 401 worker_stdin = WorkerSimple(sys.stdin, None, None, None, 402 handler=StdInputHandler(worker), timeout=-1, autoclose=True) 403 404 # Add stdin worker to the same task than given worker 405 worker.task.schedule(worker_stdin)
406
407 -def run_command(task, cmd, ns, gather, timeout, label, verbosity):
408 """ 409 Create and run the specified command line, displaying 410 results in a dshbak way when gathering is used. 411 """ 412 task.set_info("USER_running", True) 413 414 if gather: 415 runtimer = None 416 if verbosity == VERB_STD or verbosity == VERB_VERB: 417 # Create a ClusterShell timer used to display the number of completed commands 418 runtimer = task.timer(2.0, RunTimer(task, len(ns)), interval=1./3., autoclose=True) 419 worker = task.shell(cmd, nodes=ns, handler=GatherOutputHandler(runtimer), timeout=timeout) 420 else: 421 worker = task.shell(cmd, nodes=ns, handler=DirectOutputHandler(label), timeout=timeout) 422 423 if not sys.stdin.isatty(): 424 bind_stdin(worker) 425 426 task.resume()
427
428 -def run_copy(task, source, dest, ns, timeout):
429 """ 430 run copy command 431 """ 432 task.set_info("USER_running", True) 433 434 worker = task.copy(source, dest, ns, handler=DirectOutputHandler(), timeout=timeout) 435 436 task.resume()
437
438 -def clush_exit(n):
439 # Flush stdio buffers 440 for f in [sys.stdout, sys.stderr]: 441 f.flush() 442 # Use os._exit to avoid threads cleanup 443 os._exit(n)
444
445 -def clush_main(args):
446 """Main clush script function""" 447 448 # Default values 449 nodeset_base, nodeset_exclude = NodeSet(), NodeSet() 450 451 # 452 # Argument management 453 # 454 usage = "%prog [options] command" 455 456 parser = optparse.OptionParser(usage, version="%%prog %s" % __version__) 457 parser.disable_interspersed_args() 458 459 # Node selections 460 optgrp = optparse.OptionGroup(parser, "Selecting target nodes") 461 optgrp.add_option("-w", action="store", dest="nodes", 462 help="nodes where to run the command") 463 optgrp.add_option("-x", action="store", dest="exclude", 464 help="exclude nodes from the node list") 465 optgrp.add_option("-a", "--all", action="store_true", dest="nodes_all", 466 help="run command on all nodes") 467 optgrp.add_option("-g", "--group", action="store", dest="group", 468 help="run command on a group of nodes") 469 parser.add_option_group(optgrp) 470 471 # Output behaviour 472 optgrp = optparse.OptionGroup(parser, "Output behaviour") 473 optgrp.add_option("-q", "--quiet", action="store_true", dest="quiet", 474 help="be quiet, print essential output only") 475 optgrp.add_option("-v", "--verbose", action="store_true", dest="verbose", 476 help="be verbose, print informative messages") 477 optgrp.add_option("-d", "--debug", action="store_true", dest="debug", 478 help="output more messages for debugging purpose") 479 480 optgrp.add_option("-N", action="store_false", dest="label", default=True, 481 help="disable labeling of command line") 482 optgrp.add_option("-S", action="store_true", dest="maxrc", 483 help="return the largest of command return codes") 484 optgrp.add_option("-b", "--dshbak", action="store_true", dest="gather", 485 help="display results in a dshbak-like way") 486 parser.add_option_group(optgrp) 487 488 # Copy 489 optgrp = optparse.OptionGroup(parser, "File copying") 490 optgrp.add_option("-c", "--copy", action="store", dest="source_path", 491 help="copy local file or directory to the nodes") 492 optgrp.add_option("--dest", action="store", dest="dest_path", 493 help="destination file or directory on the nodes") 494 parser.add_option_group(optgrp) 495 496 # Ssh options 497 optgrp = optparse.OptionGroup(parser, "Ssh options") 498 optgrp.add_option("-f", "--fanout", action="store", dest="fanout", 499 help="use a specified fanout", type="int") 500 optgrp.add_option("-l", "--user", action="store", dest="user", 501 help="execute remote command as user") 502 optgrp.add_option("-o", "--options", action="store", dest="options", 503 help="can be used to give ssh options") 504 optgrp.add_option("-t", "--connect_timeout", action="store", dest="connect_timeout", 505 help="limit time to connect to a node" ,type="float") 506 optgrp.add_option("-u", "--command_timeout", action="store", dest="command_timeout", 507 help="limit time for command to run on the node", type="float") 508 parser.add_option_group(optgrp) 509 510 (options, args) = parser.parse_args() 511 512 # 513 # Load config file 514 # 515 config = ClushConfig(options) 516 517 # Apply command line overrides 518 if options.quiet: 519 config.set_main("verbosity", VERB_QUIET) 520 if options.verbose: 521 config.set_main("verbosity", VERB_VERB) 522 if options.debug: 523 config.set_main("verbosity", VERB_DEBUG) 524 if options.fanout: 525 config.set_main("fanout", options.fanout) 526 if options.user: 527 config.set_main("ssh_user", options.user) 528 if options.options: 529 config.set_main("ssh_options", options.options) 530 if options.connect_timeout: 531 config.set_main("connect_timeout", options.connect_timeout) 532 if options.command_timeout: 533 config.set_main("command_timeout", options.command_timeout) 534 535 # 536 # Compute the nodeset 537 # 538 nodeset_base = NodeSet(options.nodes) 539 nodeset_exclude = NodeSet(options.exclude) 540 541 # Do we have nodes group? 542 task = task_self() 543 task.set_info("debug", config.get_verbosity() > 1) 544 if options.nodes_all: 545 command = config.get_nodes_all_command() 546 task.shell(command, key="all") 547 if options.group: 548 command = config.get_nodes_group_command(options.group) 549 task.shell(command, key="group") 550 551 # Run needed external commands 552 task.resume() 553 554 for buf, keys in task.iter_buffers(): 555 for line in buf.splitlines(): 556 config.verbose_print(VERB_DEBUG, "Nodes from option %s: %s" % (','.join(keys), buf)) 557 nodeset_base.add(line) 558 559 # Do we have an exclude list? (-x ...) 560 nodeset_base.difference_update(nodeset_exclude) 561 if len(nodeset_base) < 1: 562 parser.error('No node to run on.') 563 564 config.verbose_print(VERB_DEBUG, "Final NodeSet is %s" % nodeset_base) 565 566 # 567 # Task management 568 # 569 stdin_isatty = sys.stdin.isatty() 570 if stdin_isatty: 571 # Standard input is a terminal and we want to perform some user 572 # interactions in the main thread (using blocking calls), so 573 # we run cluster commands in a new ClusterShell Task (a new 574 # thread is created). 575 task = Task() 576 signal.signal(signal.SIGHUP, signal_handler) 577 task.set_info("USER_handle_SIGHUP", True) 578 else: 579 # Perform everything in main thread. 580 task.set_info("USER_handle_SIGHUP", False) 581 582 task.set_info("debug", config.get_verbosity() >= VERB_DEBUG) 583 task.set_info("fanout", config.get_fanout()) 584 585 ssh_user = config.get_ssh_user() 586 if ssh_user: 587 task.set_info("ssh_user", ssh_user) 588 ssh_path = config.get_ssh_path() 589 if ssh_path: 590 task.set_info("ssh_path", ssh_path) 591 ssh_options = config.get_ssh_options() 592 if ssh_options: 593 task.set_info("ssh_options", ssh_options) 594 595 # Set detailed timeout values 596 connect_timeout = config.get_connect_timeout() 597 task.set_info("connect_timeout", connect_timeout) 598 command_timeout = config.get_command_timeout() 599 task.set_info("command_timeout", command_timeout) 600 601 # Set timeout at worker level when command_timeout is defined. 602 if command_timeout > 0: 603 timeout = command_timeout 604 else: 605 timeout = -1 606 607 # Configure custom task related status 608 task.set_info("USER_interactive", len(args) == 0 and not options.source_path) 609 task.set_info("USER_running", False) 610 611 if options.source_path and not options.dest_path: 612 options.dest_path = options.source_path 613 614 if options.source_path: 615 if not options.dest_path: 616 options.dest_path = options.source_path 617 op = "copy source=%s dest=%s" % (options.source_path, options.dest_path) 618 else: 619 op = "command=\"%s\"" % ' '.join(args) 620 621 config.verbose_print(VERB_VERB, "clush: nodeset=%s fanout=%d [timeout conn=%.1f " \ 622 "cmd=%.1f] %s" % (nodeset_base, task.info("fanout"), 623 task.info("connect_timeout"), 624 task.info("command_timeout"), op)) 625 626 if not task.info("USER_interactive"): 627 if options.source_path: 628 if not options.dest_path: 629 options.dest_path = options.source_path 630 run_copy(task, options.source_path, options.dest_path, nodeset_base, 0) 631 else: 632 run_command(task, ' '.join(args), nodeset_base, options.gather, 633 timeout, options.label, config.get_verbosity()) 634 635 if stdin_isatty: 636 ttyloop(task, nodeset_base, options.gather, timeout, options.label, 637 config.get_verbosity()) 638 elif task.info("USER_interactive"): 639 print >>sys.stderr, "ERROR: interactive mode requires a tty" 640 clush_exit(1) 641 642 # return the command retcode 643 if options.maxrc: 644 clush_exit(task.max_retcode()) 645 # return clush retcode 646 else: 647 clush_exit(0)
648 649 if __name__ == '__main__': 650 try: 651 clush_main(sys.argv) 652 except KeyboardInterrupt: 653 print "Keyboard interrupt." 654 clush_exit(128 + signal.SIGINT) 655 except ClushConfigError, e: 656 print >>sys.stderr, e 657 sys.exit(1) 658