1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 """
37 Utility program to run commands on a cluster using the ClusterShell library.
38
39 clush is a pdsh-like command which benefits from the ClusterShell library
40 and its Ssh worker. It features an integrated output results gathering
41 system (dshbak-like), can get node groups by running predefined external
42 commands and can redirect lines read on its standard input to the remote
43 commands.
44
45 When no command are specified, clush runs interactively.
46
47 """
48
49 import fcntl
50 import optparse
51 import os
52 import sys
53 import signal
54 import thread
55 import ConfigParser
56
57 sys.path.insert(0, '../lib')
58
59 import warnings
60 warnings.simplefilter('ignore', DeprecationWarning)
61
62 from ClusterShell.Event import EventHandler
63 from ClusterShell.NodeSet import NodeSet, NodeSetParseError
64 from ClusterShell.Task import Task, task_self
65 from ClusterShell.Worker.Worker import WorkerSimple
66 from ClusterShell import __version__
67
68 VERB_QUIET = 0
69 VERB_STD = 1
70 VERB_VERB = 2
71 VERB_DEBUG = 3
72
74 """Exception used by the signal handler"""
75
87
89 """Direct output event handler class."""
90
93
95 t = worker.last_read()
96 if type(t) is tuple:
97 ns, buf = worker.last_read()
98 else:
99 buf = worker.last_read()
100 if self._label:
101 print "%s: %s" % (ns, buf)
102 else:
103 print "%s" % buf
104
106 if hasattr(worker, "last_retcode"):
107 ns, rc = worker.last_retcode()
108 else:
109 ns = "local"
110 rc = worker.retcode()
111 if rc > 0:
112 print >>sys.stderr, "clush: %s: exited with exit code %d" % (ns, rc)
113
117
119
120 worker.task.set_info("USER_running", False)
121 if worker.task.info("USER_handle_SIGHUP"):
122 os.kill(os.getpid(), signal.SIGHUP)
123
125 """Gathered output event handler class."""
126
128 self.runtimer = runtimer
129
158
161 self.task = task
162 self.total = total
163 self.cnt_last = -1
164 self.tslen = len(str(self.total))
165
168
170 clients = self.task._engine.clients()
171 cnt = len(clients)
172 if cnt != self.cnt_last:
173 self.cnt_last = cnt
174
175 sys.stderr.write('clush: %*d/%*d\r' % (self.tslen, self.total - cnt,
176 self.tslen, self.total))
177
179
180 fmt = 'clush: %*d/%*d'
181 if cr:
182 fmt += '\n'
183 else:
184 fmt += '\r'
185 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
186
188 """Exception used by the signal handler"""
189
190 - def __init__(self, section, option, msg):
191 self.section = section
192 self.option = option
193 self.msg = msg
194
196 return "ERROR (Config %s.%s): %s" % (self.section, self.option, self.msg)
197
199 """Config class for clush (specialized ConfigParser)"""
200
201 defaults = { "fanout" : "64",
202 "connect_timeout" : "30",
203 "command_timeout" : "0",
204 "history_size" : "100",
205 "verbosity" : "%d" % VERB_STD }
206
208 ConfigParser.ConfigParser.__init__(self, ClushConfig.defaults)
209 self.read(['/etc/clustershell/clush.conf', os.path.expanduser('~/.clush.conf')])
210 if not self.has_section("Main"):
211 self.add_section("Main")
212
216
217 - def set_main(self, option, value):
218 self.set("Main", option, str(value))
219
220 - def getint(self, section, option):
221 try:
222 return ConfigParser.ConfigParser.getint(self, section, option)
223 except (ConfigParser.Error, TypeError, ValueError), e:
224 raise ClushConfigError(section, option, e)
225
227 try:
228 return ConfigParser.ConfigParser.getfloat(self, section, option)
229 except (ConfigParser.Error, TypeError, ValueError), e:
230 raise ClushConfigError(section, option, e)
231
233 try:
234 return self.get(section, option)
235 except ConfigParser.Error, e:
236 pass
237
243
245 return self.getint("Main", "fanout")
246
248 return self.getfloat("Main", "connect_timeout")
249
251 return self.getfloat("Main", "command_timeout")
252
255
258
261
263 section = "External"
264 option = "nodes_all"
265 try:
266 return self.get(section, option)
267 except ConfigParser.Error, e:
268 raise ClushConfigError(section, option, e)
269
271 section = "External"
272 option = "nodes_group"
273 try:
274 return self.get(section, option, 0, { "group" : group })
275 except ConfigParser.Error, e:
276 raise ClushConfigError(section, option, e)
277
278
280 """Signal handler used for main thread notification"""
281 if signum == signal.SIGHUP:
282 raise UpdatePromptException()
283
285 """Turn the history file path"""
286 return os.path.join(os.environ["HOME"], ".clush_history")
287
289 """
290 Configure readline to automatically load and save a history file
291 named .clush_history
292 """
293 import readline
294 readline.parse_and_bind("tab: complete")
295 readline.set_completer_delims("")
296 try:
297 readline.read_history_file(get_history_file())
298 except IOError:
299 pass
300
301 -def ttyloop(task, nodeset, gather, timeout, label, verbosity):
302 """Manage the interactive prompt to run command"""
303 has_readline = False
304 if task.info("USER_interactive"):
305 assert sys.stdin.isatty()
306 readline_avail = False
307 try:
308 import readline
309 readline_setup()
310 readline_avail = True
311 except ImportError:
312 pass
313 if verbosity >= VERB_STD:
314 print "Enter 'quit' to leave this interactive mode"
315
316 rc = 0
317 ns = NodeSet(nodeset)
318 ns_info = True
319 cmd = ""
320 while task.info("USER_running") or cmd.lower() != 'quit':
321 try:
322 if task.info("USER_interactive") and not task.info("USER_running"):
323 if ns_info:
324 print "Working with nodes: %s" % ns
325 ns_info = False
326 prompt = "clush> "
327 else:
328 prompt = ""
329 if sys.version_info[:3] >= (2,4,0):
330 cmd = raw_input(prompt)
331 else:
332
333
334
335 if prompt:
336 sys.stdout.write("\r%s" % prompt)
337 cmd = sys.stdin.readline()[:-1]
338 except EOFError:
339 print
340 return
341 except UpdatePromptException:
342 if task.info("USER_interactive"):
343 continue
344 return
345 if task.info("USER_running"):
346 ns_reg, ns_unreg = NodeSet(), NodeSet()
347 for c in task._engine.clients():
348 if c.registered:
349 ns_reg.add(c.key)
350 else:
351 ns_unreg.add(c.key)
352 if ns_unreg:
353 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg)
354 else:
355 pending = ""
356 print >>sys.stderr, "clush: interrupt (^C to abort task)\n" \
357 "clush: in progress(%d): %s%s" % (len(ns_reg), ns_reg, pending)
358 else:
359 cmdl = cmd.lower()
360 try:
361 ns_info = True
362 if cmdl.startswith('+'):
363 ns.update(cmdl[1:])
364 elif cmdl.startswith('-'):
365 ns.difference_update(cmdl[1:])
366 elif cmdl.startswith('@'):
367 ns = NodeSet(cmdl[1:])
368 elif cmdl == '=':
369 gather = not gather
370 if verbosity >= VERB_STD:
371 if gather:
372 print "Switching to gathered output format"
373 else:
374 print "Switching to standard output format"
375 ns_info = False
376 continue
377 elif not cmdl.startswith('?'):
378 ns_info = False
379 except NodeSetParseError:
380 print >>sys.stderr, "clush: nodeset parse error (ignoring)"
381
382 if ns_info:
383 continue
384
385 if cmdl.startswith('!'):
386 run_command(task, cmd[1:], None, gather, timeout, None, verbosity)
387 elif cmdl != "quit":
388 if not cmd:
389 continue
390 if readline_avail:
391 readline.write_history_file(get_history_file())
392 run_command(task, cmd, ns, gather, timeout, label, verbosity)
393 return rc
394
396 assert not sys.stdin.isatty()
397
398 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, os.O_NDELAY)
399
400
401 worker_stdin = WorkerSimple(sys.stdin, None, None, None,
402 handler=StdInputHandler(worker), timeout=-1, autoclose=True)
403
404
405 worker.task.schedule(worker_stdin)
406
407 -def run_command(task, cmd, ns, gather, timeout, label, verbosity):
408 """
409 Create and run the specified command line, displaying
410 results in a dshbak way when gathering is used.
411 """
412 task.set_info("USER_running", True)
413
414 if gather:
415 runtimer = None
416 if verbosity == VERB_STD or verbosity == VERB_VERB:
417
418 runtimer = task.timer(2.0, RunTimer(task, len(ns)), interval=1./3., autoclose=True)
419 worker = task.shell(cmd, nodes=ns, handler=GatherOutputHandler(runtimer), timeout=timeout)
420 else:
421 worker = task.shell(cmd, nodes=ns, handler=DirectOutputHandler(label), timeout=timeout)
422
423 if not sys.stdin.isatty():
424 bind_stdin(worker)
425
426 task.resume()
427
428 -def run_copy(task, source, dest, ns, timeout):
429 """
430 run copy command
431 """
432 task.set_info("USER_running", True)
433
434 worker = task.copy(source, dest, ns, handler=DirectOutputHandler(), timeout=timeout)
435
436 task.resume()
437
439
440 for f in [sys.stdout, sys.stderr]:
441 f.flush()
442
443 os._exit(n)
444
445 -def clush_main(args):
446 """Main clush script function"""
447
448
449 nodeset_base, nodeset_exclude = NodeSet(), NodeSet()
450
451
452
453
454 usage = "%prog [options] command"
455
456 parser = optparse.OptionParser(usage, version="%%prog %s" % __version__)
457 parser.disable_interspersed_args()
458
459
460 optgrp = optparse.OptionGroup(parser, "Selecting target nodes")
461 optgrp.add_option("-w", action="store", dest="nodes",
462 help="nodes where to run the command")
463 optgrp.add_option("-x", action="store", dest="exclude",
464 help="exclude nodes from the node list")
465 optgrp.add_option("-a", "--all", action="store_true", dest="nodes_all",
466 help="run command on all nodes")
467 optgrp.add_option("-g", "--group", action="store", dest="group",
468 help="run command on a group of nodes")
469 parser.add_option_group(optgrp)
470
471
472 optgrp = optparse.OptionGroup(parser, "Output behaviour")
473 optgrp.add_option("-q", "--quiet", action="store_true", dest="quiet",
474 help="be quiet, print essential output only")
475 optgrp.add_option("-v", "--verbose", action="store_true", dest="verbose",
476 help="be verbose, print informative messages")
477 optgrp.add_option("-d", "--debug", action="store_true", dest="debug",
478 help="output more messages for debugging purpose")
479
480 optgrp.add_option("-N", action="store_false", dest="label", default=True,
481 help="disable labeling of command line")
482 optgrp.add_option("-S", action="store_true", dest="maxrc",
483 help="return the largest of command return codes")
484 optgrp.add_option("-b", "--dshbak", action="store_true", dest="gather",
485 help="display results in a dshbak-like way")
486 parser.add_option_group(optgrp)
487
488
489 optgrp = optparse.OptionGroup(parser, "File copying")
490 optgrp.add_option("-c", "--copy", action="store", dest="source_path",
491 help="copy local file or directory to the nodes")
492 optgrp.add_option("--dest", action="store", dest="dest_path",
493 help="destination file or directory on the nodes")
494 parser.add_option_group(optgrp)
495
496
497 optgrp = optparse.OptionGroup(parser, "Ssh options")
498 optgrp.add_option("-f", "--fanout", action="store", dest="fanout",
499 help="use a specified fanout", type="int")
500 optgrp.add_option("-l", "--user", action="store", dest="user",
501 help="execute remote command as user")
502 optgrp.add_option("-o", "--options", action="store", dest="options",
503 help="can be used to give ssh options")
504 optgrp.add_option("-t", "--connect_timeout", action="store", dest="connect_timeout",
505 help="limit time to connect to a node" ,type="float")
506 optgrp.add_option("-u", "--command_timeout", action="store", dest="command_timeout",
507 help="limit time for command to run on the node", type="float")
508 parser.add_option_group(optgrp)
509
510 (options, args) = parser.parse_args()
511
512
513
514
515 config = ClushConfig(options)
516
517
518 if options.quiet:
519 config.set_main("verbosity", VERB_QUIET)
520 if options.verbose:
521 config.set_main("verbosity", VERB_VERB)
522 if options.debug:
523 config.set_main("verbosity", VERB_DEBUG)
524 if options.fanout:
525 config.set_main("fanout", options.fanout)
526 if options.user:
527 config.set_main("ssh_user", options.user)
528 if options.options:
529 config.set_main("ssh_options", options.options)
530 if options.connect_timeout:
531 config.set_main("connect_timeout", options.connect_timeout)
532 if options.command_timeout:
533 config.set_main("command_timeout", options.command_timeout)
534
535
536
537
538 nodeset_base = NodeSet(options.nodes)
539 nodeset_exclude = NodeSet(options.exclude)
540
541
542 task = task_self()
543 task.set_info("debug", config.get_verbosity() > 1)
544 if options.nodes_all:
545 command = config.get_nodes_all_command()
546 task.shell(command, key="all")
547 if options.group:
548 command = config.get_nodes_group_command(options.group)
549 task.shell(command, key="group")
550
551
552 task.resume()
553
554 for buf, keys in task.iter_buffers():
555 for line in buf.splitlines():
556 config.verbose_print(VERB_DEBUG, "Nodes from option %s: %s" % (','.join(keys), buf))
557 nodeset_base.add(line)
558
559
560 nodeset_base.difference_update(nodeset_exclude)
561 if len(nodeset_base) < 1:
562 parser.error('No node to run on.')
563
564 config.verbose_print(VERB_DEBUG, "Final NodeSet is %s" % nodeset_base)
565
566
567
568
569 stdin_isatty = sys.stdin.isatty()
570 if stdin_isatty:
571
572
573
574
575 task = Task()
576 signal.signal(signal.SIGHUP, signal_handler)
577 task.set_info("USER_handle_SIGHUP", True)
578 else:
579
580 task.set_info("USER_handle_SIGHUP", False)
581
582 task.set_info("debug", config.get_verbosity() >= VERB_DEBUG)
583 task.set_info("fanout", config.get_fanout())
584
585 ssh_user = config.get_ssh_user()
586 if ssh_user:
587 task.set_info("ssh_user", ssh_user)
588 ssh_path = config.get_ssh_path()
589 if ssh_path:
590 task.set_info("ssh_path", ssh_path)
591 ssh_options = config.get_ssh_options()
592 if ssh_options:
593 task.set_info("ssh_options", ssh_options)
594
595
596 connect_timeout = config.get_connect_timeout()
597 task.set_info("connect_timeout", connect_timeout)
598 command_timeout = config.get_command_timeout()
599 task.set_info("command_timeout", command_timeout)
600
601
602 if command_timeout > 0:
603 timeout = command_timeout
604 else:
605 timeout = -1
606
607
608 task.set_info("USER_interactive", len(args) == 0 and not options.source_path)
609 task.set_info("USER_running", False)
610
611 if options.source_path and not options.dest_path:
612 options.dest_path = options.source_path
613
614 if options.source_path:
615 if not options.dest_path:
616 options.dest_path = options.source_path
617 op = "copy source=%s dest=%s" % (options.source_path, options.dest_path)
618 else:
619 op = "command=\"%s\"" % ' '.join(args)
620
621 config.verbose_print(VERB_VERB, "clush: nodeset=%s fanout=%d [timeout conn=%.1f " \
622 "cmd=%.1f] %s" % (nodeset_base, task.info("fanout"),
623 task.info("connect_timeout"),
624 task.info("command_timeout"), op))
625
626 if not task.info("USER_interactive"):
627 if options.source_path:
628 if not options.dest_path:
629 options.dest_path = options.source_path
630 run_copy(task, options.source_path, options.dest_path, nodeset_base, 0)
631 else:
632 run_command(task, ' '.join(args), nodeset_base, options.gather,
633 timeout, options.label, config.get_verbosity())
634
635 if stdin_isatty:
636 ttyloop(task, nodeset_base, options.gather, timeout, options.label,
637 config.get_verbosity())
638 elif task.info("USER_interactive"):
639 print >>sys.stderr, "ERROR: interactive mode requires a tty"
640 clush_exit(1)
641
642
643 if options.maxrc:
644 clush_exit(task.max_retcode())
645
646 else:
647 clush_exit(0)
648
649 if __name__ == '__main__':
650 try:
651 clush_main(sys.argv)
652 except KeyboardInterrupt:
653 print "Keyboard interrupt."
654 clush_exit(128 + signal.SIGINT)
655 except ClushConfigError, e:
656 print >>sys.stderr, e
657 sys.exit(1)
658