1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 """
37 Utility program to run commands on a cluster using the ClusterShell
38 library.
39
40 clush is a pdsh-like command which benefits from the ClusterShell
41 library and its Ssh worker. It features an integrated output results
42 gathering system (dshbak-like), can get node groups by running
43 predefined external commands and can redirect lines read on its
44 standard input to the remote commands.
45
46 When no command are specified, clush runs interactively.
47
48 """
49
50 import errno
51 import exceptions
52 import fcntl
53 import optparse
54 import os
55 import resource
56 import sys
57 import signal
58 import ConfigParser
59
60 from ClusterShell.NodeUtils import GroupResolverConfigError
61 from ClusterShell.NodeUtils import GroupResolverSourceError
62 from ClusterShell.NodeUtils import GroupSourceException
63 from ClusterShell.NodeUtils import GroupSourceNoUpcall
64 try:
65 from ClusterShell.Event import EventHandler
66 from ClusterShell.NodeSet import NodeSet
67 from ClusterShell.NodeSet import NOGROUP_RESOLVER, STD_GROUP_RESOLVER
68 from ClusterShell.NodeSet import NodeSetExternalError, NodeSetParseError
69 from ClusterShell.Task import Task, task_self
70 from ClusterShell.Worker.Worker import WorkerSimple
71 from ClusterShell import __version__
72 except GroupResolverConfigError, e:
73 print >> sys.stderr, \
74 "ERROR: ClusterShell Groups configuration error:\n\t%s" % e
75 sys.exit(1)
76
77 VERB_QUIET = 0
78 VERB_STD = 1
79 VERB_VERB = 2
80 VERB_DEBUG = 3
81
82
83
84 WHENCOLOR_CHOICES = ["never", "always", "auto"]
85
87 """
88 Output display class for clush script.
89 """
90 COLOR_STDOUT_FMT = "\033[34m%s\033[0m"
91 COLOR_STDERR_FMT = "\033[31m%s\033[0m"
92 SEP = "-" * 15
93
95 self._color = color
96 self._display = self._print_buffer
97 self.out = sys.stdout
98 self.err = sys.stderr
99 self.label = True
100 self.regroup = False
101 self.groupsource = None
102 if self._color:
103 self.color_stdout_fmt = self.COLOR_STDOUT_FMT
104 self.color_stderr_fmt = self.COLOR_STDERR_FMT
105 else:
106 self.color_stdout_fmt = self.color_stderr_fmt = "%s"
107 self.noprefix = False
108
111
117 line_mode = property(_getlmode, _setlmode)
118
124
126 """Display a line with optional label."""
127 if self.label:
128 prefix = self.color_stdout_fmt % ("%s: " % nodeset)
129 self.out.write("%s%s\n" % (prefix, line))
130 else:
131 self.out.write("%s\n" % line)
132
134 """Display an error line with optional label."""
135 if self.label:
136 prefix = self.color_stderr_fmt % ("%s: " % nodeset)
137 self.err.write("%s%s\n" % (prefix, line))
138 else:
139 self.err.write("%s\n" % line)
140
142 """Generic method for displaying nodeset/content according to current
143 object settings."""
144 return self._display(nodeset, obj)
145
147 """Display a dshbak-like header block and content."""
148 header = self.color_stdout_fmt % ("%s\n%s\n%s\n" % (self.SEP,
149 self._format_header(nodeset),
150 self.SEP))
151 self.out.write("%s%s\n" % (header, content))
152
154 """Display a MsgTree buffer by line with prefixed header."""
155 header = self.color_stdout_fmt % \
156 ("%s: " % self._format_header(nodeset))
157 for line in msg:
158 self.out.write("%s%s\n" % (header, line))
159
161 """Compare 2 nodesets by their length (we want larger nodeset
162 first) and then by first node."""
163 len_cmp = cmp(len(ns2), len(ns1))
164 if not len_cmp:
165 smaller = NodeSet.fromlist([ns1[0], ns2[0]])[0]
166 if smaller == ns1[0]:
167 return -1
168 else:
169 return 1
170 return len_cmp
171
172
173
175 """Convenience function to compare 2 (buf, nodeset) tuples by their
176 nodeset length (we want larger nodeset first) and then by first
177 node."""
178
179 return nodeset_cmp(bn1[1], bn2[1])
180
182 """Exception used by the signal handler"""
183
195
197 """Direct output event handler class."""
198
201
205
209
211 if hasattr(worker, "last_retcode"):
212 ns, rc = worker.last_retcode()
213 else:
214 ns = "local"
215 rc = worker.retcode()
216 if rc > 0:
217 print >> sys.stderr, "clush: %s: exited with exit code %d" \
218 % (ns, rc)
219
223
225
226
227
228 worker.task.set_default("USER_running", False)
229 if worker.task.default("USER_handle_SIGUSR1"):
230 os.kill(os.getpid(), signal.SIGUSR1)
231
233 """Gathered output event handler class."""
234
235 - def __init__(self, display, runtimer, nodes):
236 self._display = display
237 self._runtimer = runtimer
238
239 self._nodes = NodeSet(nodes)
240 self._nodemap_base = dict.fromkeys(self._nodes, 0)
241 self._nodemap = self._nodemap_base.copy()
242 self._nodes_cnt = 0
243
245
246 if not self._display.line_mode:
247 return
248 result = worker.last_read()
249
250 node = result[0]
251 self._nodes_cnt += 1
252 self._nodemap[node] += 1
253 self._live_line(worker)
254
263
271
273
274 if self._nodes and self._nodes_cnt % len(self._nodes) == 0 and \
275 max(self._nodemap.itervalues()) == (self._nodes_cnt \
276 / len(self._nodes)):
277 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1]))
278 for buf, nodeset in sorted(map(nodesetify,
279 worker.iter_buffers()),
280 cmp=bufnodeset_cmp):
281 self._display.print_gather(nodeset, buf)
282
283 worker.flush_buffers()
284 self._nodemap = self._nodemap_base.copy()
285 self._nodes_cnt = 0
286
288
289 if self._runtimer:
290 self._runtimer.eh.finalize(worker.task.default("USER_interactive"))
291
292
293 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1]))
294 for rc, nodelist in worker.iter_retcodes():
295
296 for buf, nodeset in sorted(map(nodesetify,
297 worker.iter_buffers(nodelist)),
298 cmp=bufnodeset_cmp):
299 self._display.print_gather(nodeset, buf)
300
301
302 for rc, nodelist in worker.iter_retcodes():
303 if rc != 0:
304 ns = NodeSet.fromlist(nodelist)
305 print >> sys.stderr, "clush: %s: exited with exit code %s" \
306 % (ns, rc)
307
308
309 if worker.num_timeout() > 0:
310 print >> sys.stderr, "clush: %s: command timeout" % \
311 NodeSet.fromlist(worker.iter_keys_timeout())
312
313
314 worker.task.set_default("USER_running", False)
315 if worker.task.default("USER_handle_SIGUSR1"):
316 os.kill(os.getpid(), signal.SIGUSR1)
317
320 self.task = task
321 self.total = total
322 self.cnt_last = -1
323 self.tslen = len(str(self.total))
324 self.wholelen = 0
325
328
331
333 if self.wholelen:
334 sys.stderr.write(' ' * self.wholelen + '\r')
335
337 cnt = len(self.task._engine.clients())
338 if cnt != self.cnt_last:
339 self.cnt_last = cnt
340
341 towrite = 'clush: %*d/%*d\r' % (self.tslen, self.total - cnt,
342 self.tslen, self.total)
343 self.wholelen = len(towrite)
344 sys.stderr.write(towrite)
345
347
348 fmt = 'clush: %*d/%*d'
349 if cr:
350 fmt += '\n'
351 else:
352 fmt += '\r'
353 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
354
356 """Exception used by ClushConfig to report an error."""
357
358 - def __init__(self, section, option, msg):
359 Exception.__init__(self)
360 self.section = section
361 self.option = option
362 self.msg = msg
363
365 return "(Config %s.%s): %s" % (self.section, self.option, self.msg)
366
368 """Config class for clush (specialized ConfigParser)"""
369
370 main_defaults = { "fanout" : "64",
371 "connect_timeout" : "30",
372 "command_timeout" : "0",
373 "history_size" : "100",
374 "color" : WHENCOLOR_CHOICES[0],
375 "verbosity" : "%d" % VERB_STD }
376
378 ConfigParser.ConfigParser.__init__(self)
379
380 self.add_section("Main")
381 for key, value in ClushConfig.main_defaults.iteritems():
382 self.set("Main", key, value)
383
384 self.read(['/etc/clustershell/clush.conf',
385 os.path.expanduser('~/.clush.conf')])
386
390
392 """Make open file descriptors soft limit the max."""
393 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
394 if soft < hard:
395 self.verbose_print(VERB_DEBUG, "Setting max soft limit "
396 "RLIMIT_NOFILE: %d -> %d" % (soft, hard))
397 resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
398 else:
399 self.verbose_print(VERB_DEBUG, "Soft limit RLIMIT_NOFILE already "
400 "set to the max (%d)" % soft)
401
402 - def set_main(self, option, value):
403 self.set("Main", option, str(value))
404
405 - def getint(self, section, option):
406 try:
407 return ConfigParser.ConfigParser.getint(self, section, option)
408 except (ConfigParser.Error, TypeError, ValueError), e:
409 raise ClushConfigError(section, option, e)
410
412 try:
413 return ConfigParser.ConfigParser.getfloat(self, section, option)
414 except (ConfigParser.Error, TypeError, ValueError), e:
415 raise ClushConfigError(section, option, e)
416
418 try:
419 return self.get(section, option)
420 except ConfigParser.Error, e:
421 pass
422
429
435
437 return self.getint("Main", "fanout")
438
440 return self.getfloat("Main", "connect_timeout")
441
443 return self.getfloat("Main", "command_timeout")
444
447
450
453
454
456 """Signal handler used for main thread notification"""
457 if signum == signal.SIGUSR1:
458 raise UpdatePromptException()
459
461 """Turn the history file path"""
462 return os.path.join(os.environ["HOME"], ".clush_history")
463
465 """
466 Configure readline to automatically load and save a history file
467 named .clush_history
468 """
469 import readline
470 readline.parse_and_bind("tab: complete")
471 readline.set_completer_delims("")
472 try:
473 readline.read_history_file(get_history_file())
474 except IOError:
475 pass
476
477 -def ttyloop(task, nodeset, gather, timeout, verbosity, display):
478 """Manage the interactive prompt to run command"""
479 readline_avail = False
480 if task.default("USER_interactive"):
481 assert sys.stdin.isatty()
482 try:
483 import readline
484 readline_setup()
485 readline_avail = True
486 except ImportError:
487 pass
488 if verbosity >= VERB_STD:
489 print "Enter 'quit' to leave this interactive mode"
490
491 rc = 0
492 ns = NodeSet(nodeset)
493 ns_info = True
494 cmd = ""
495 while task.default("USER_running") or cmd.lower() != 'quit':
496 try:
497 if task.default("USER_interactive") and \
498 not task.default("USER_running"):
499 if ns_info:
500 print "Working with nodes: %s" % ns
501 ns_info = False
502 prompt = "clush> "
503 else:
504 prompt = ""
505 cmd = raw_input(prompt)
506 except EOFError:
507 print
508 return
509 except UpdatePromptException:
510 if task.default("USER_interactive"):
511 continue
512 return
513 except KeyboardInterrupt, e:
514 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
515 if gather:
516
517
518 task.suspend()
519
520 print_warn = False
521
522
523 nodesetify = lambda v: (v[0], NodeSet.fromlist(v[1]))
524 for buf, nodeset in sorted(map(nodesetify, task.iter_buffers()),
525 cmp=bufnodeset_cmp):
526 if not print_warn:
527 print_warn = True
528 print >> sys.stderr, "Warning: Caught keyboard interrupt!"
529 display.print_gather(nodeset, buf)
530
531
532 ns_ok = NodeSet()
533 for rc, nodelist in task.iter_retcodes():
534 ns_ok.add(NodeSet.fromlist(nodelist))
535 if rc != 0:
536
537 ns = NodeSet.fromlist(nodelist)
538 print >> sys.stderr, \
539 "clush: %s: exited with exit code %s" % (ns, rc)
540
541 e.uncompleted_nodes = ns - ns_ok
542
543
544 if task.num_timeout() > 0:
545 print >> sys.stderr, "clush: %s: command timeout" % \
546 NodeSet.fromlist(task.iter_keys_timeout())
547 raise e
548
549 if task.default("USER_running"):
550 ns_reg, ns_unreg = NodeSet(), NodeSet()
551 for c in task._engine.clients():
552 if c.registered:
553 ns_reg.add(c.key)
554 else:
555 ns_unreg.add(c.key)
556 if ns_unreg:
557 pending = "\nclush: pending(%d): %s" % (len(ns_unreg), ns_unreg)
558 else:
559 pending = ""
560 print >> sys.stderr, "clush: interrupt (^C to abort task)\n" \
561 "clush: in progress(%d): %s%s" % (len(ns_reg), ns_reg, pending)
562 else:
563 cmdl = cmd.lower()
564 try:
565 ns_info = True
566 if cmdl.startswith('+'):
567 ns.update(cmdl[1:])
568 elif cmdl.startswith('-'):
569 ns.difference_update(cmdl[1:])
570 elif cmdl.startswith('@'):
571 ns = NodeSet(cmdl[1:])
572 elif cmdl == '=':
573 gather = not gather
574 if verbosity >= VERB_STD:
575 if gather:
576 print "Switching to gathered output format"
577 else:
578 print "Switching to standard output format"
579 ns_info = False
580 continue
581 elif not cmdl.startswith('?'):
582 ns_info = False
583 except NodeSetParseError:
584 print >> sys.stderr, "clush: nodeset parse error (ignoring)"
585
586 if ns_info:
587 continue
588
589 if cmdl.startswith('!') and len(cmd.strip()) > 0:
590 run_command(task, cmd[1:], None, gather, timeout, None,
591 verbosity, display)
592 elif cmdl != "quit":
593 if not cmd:
594 continue
595 if readline_avail:
596 readline.write_history_file(get_history_file())
597 run_command(task, cmd, ns, gather, timeout, verbosity, display)
598 return rc
599
601 """Create a ClusterShell stdin-reader worker bound to specified
602 worker."""
603 assert not sys.stdin.isatty()
604
605 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, \
606 fcntl.fcntl(sys.stdin, fcntl.F_GETFL) | os.O_NDELAY)
607
608
609 worker_stdin = WorkerSimple(sys.stdin, None, None, None,
610 handler=StdInputHandler(worker), timeout=-1, autoclose=True)
611
612
613 worker.task.schedule(worker_stdin)
614
615 -def run_command(task, cmd, ns, gather, timeout, verbosity, display):
616 """
617 Create and run the specified command line, displaying
618 results in a dshbak way when gathering is used.
619 """
620 task.set_default("USER_running", True)
621
622 if gather:
623 runtimer = None
624 if verbosity == VERB_STD or verbosity == VERB_VERB:
625
626
627 runtimer = task.timer(2.0, RunTimer(task, len(ns)), interval=1./3.,
628 autoclose=True)
629 worker = task.shell(cmd, nodes=ns,
630 handler=GatherOutputHandler(display, runtimer,
631 ns), timeout=timeout)
632 else:
633 worker = task.shell(cmd, nodes=ns,
634 handler=DirectOutputHandler(display),
635 timeout=timeout)
636
637 if task.default("USER_stdin_worker"):
638 bind_stdin(worker)
639
640 task.resume()
641
642 -def run_copy(task, source, dest, ns, timeout, preserve_flag, display):
643 """
644 run copy command
645 """
646 task.set_default("USER_running", True)
647
648
649 if not os.path.exists(source):
650 print >> sys.stderr, "ERROR: file \"%s\" not found" % source
651 clush_exit(1)
652
653 task.copy(source, dest, ns, handler=DirectOutputHandler(display),
654 timeout=timeout, preserve=preserve_flag)
655
656 task.resume()
657
659
660 for stream in [sys.stdout, sys.stderr]:
661 stream.flush()
662
663 os._exit(status)
664
666 """Exceptions hook for clush: this method centralizes exception
667 handling from main thread and from (possible) separate task thread.
668 This hook has to be previously installed on startup by overriding
669 sys.excepthook and task.excepthook."""
670 try:
671 raise type, value
672 except ClushConfigError, e:
673 print >> sys.stderr, "ERROR: %s" % e
674 except NodeSetExternalError, e:
675 print >> sys.stderr, "clush: external error:", e
676 except NodeSetParseError, e:
677 print >> sys.stderr, "clush: parse error:", e
678 except GroupResolverSourceError, e:
679 print >> sys.stderr, "ERROR: unknown group source: \"%s\"" % e
680 except GroupSourceNoUpcall, e:
681 print >> sys.stderr, "ERROR: no %s upcall defined for group " \
682 "source \"%s\"" % (e, e.group_source.name)
683 except GroupSourceException, e:
684 print >> sys.stderr, "ERROR: other group error:", e
685 except IOError:
686
687 pass
688 except KeyboardInterrupt, e:
689 uncomp_nodes = getattr(e, 'uncompleted_nodes', None)
690 if uncomp_nodes:
691 print >> sys.stderr, "Keyboard interrupt (%s did not complete)." \
692 % uncomp_nodes
693 else:
694 print >> sys.stderr, "Keyboard interrupt."
695 clush_exit(128 + signal.SIGINT)
696 except OSError, value:
697 print >> sys.stderr, "ERROR: %s" % value
698 if value.errno == errno.EMFILE:
699 print >> sys.stderr, "ERROR: current `nofile' limits: " \
700 "soft=%d hard=%d" % resource.getrlimit(resource.RLIMIT_NOFILE)
701 except:
702
703 task_self().default_excepthook(type, value, traceback)
704
705 clush_exit(1)
706
707 -def clush_main(args):
708 """Main clush script function"""
709 sys.excepthook = clush_excepthook
710
711
712 nodeset_base, nodeset_exclude = NodeSet(), NodeSet()
713
714
715
716
717 usage = "%prog [options] command"
718
719 parser = optparse.OptionParser(usage, version="%%prog %s" % __version__)
720 parser.disable_interspersed_args()
721
722 parser.add_option("--nostdin", action="store_true", dest="nostdin",
723 help="don't watch for possible input from stdin")
724
725
726 optgrp = optparse.OptionGroup(parser, "Selecting target nodes")
727 optgrp.add_option("-w", action="append", dest="nodes",
728 help="nodes where to run the command")
729 optgrp.add_option("-x", action="append", dest="exclude",
730 help="exclude nodes from the node list")
731 optgrp.add_option("-a", "--all", action="store_true", dest="nodes_all",
732 help="run command on all nodes")
733 optgrp.add_option("-g", "--group", action="append", dest="group",
734 help="run command on a group of nodes")
735 optgrp.add_option("-X", action="append", dest="exgroup",
736 help="exclude nodes from this group")
737 parser.add_option_group(optgrp)
738
739
740 optgrp = optparse.OptionGroup(parser, "Output behaviour")
741 optgrp.add_option("-q", "--quiet", action="store_true", dest="quiet",
742 help="be quiet, print essential output only")
743 optgrp.add_option("-v", "--verbose", action="store_true", dest="verbose",
744 help="be verbose, print informative messages")
745 optgrp.add_option("-d", "--debug", action="store_true", dest="debug",
746 help="output more messages for debugging purpose")
747
748 optgrp.add_option("-G", "--groupbase", action="store_true",
749 dest="groupbase", default=False, help="do not display " \
750 "group source prefix")
751 optgrp.add_option("-L", action="store_true", dest="line_mode",
752 default=False, help="disable header block and order " \
753 "output by nodes")
754 optgrp.add_option("-N", action="store_false", dest="label", default=True,
755 help="disable labeling of command line")
756 optgrp.add_option("-S", action="store_true", dest="maxrc",
757 help="return the largest of command return codes")
758 optgrp.add_option("-b", "--dshbak", action="store_true", dest="gather",
759 default=False, help="display gathered results in a " \
760 "dshbak-like way")
761 optgrp.add_option("-B", action="store_true", dest="gatherall",
762 default=False, help="like -b but including standard " \
763 "error")
764 optgrp.add_option("-r", "--regroup", action="store_true", dest="regroup",
765 default=False, help="fold nodeset using node groups")
766 optgrp.add_option("-s", "--groupsource", action="store",
767 dest="groupsource", help="optional groups.conf(5) " \
768 "group source to use")
769 parser.add_option_group(optgrp)
770 optgrp.add_option("--color", action="store", dest="whencolor",
771 choices=WHENCOLOR_CHOICES,
772 help="whether to use ANSI colors (never, always or auto)")
773 parser.add_option_group(optgrp)
774
775
776 optgrp = optparse.OptionGroup(parser, "File copying")
777 optgrp.add_option("-c", "--copy", action="store", dest="source_path",
778 help="copy local file or directory to the nodes")
779 optgrp.add_option("--dest", action="store", dest="dest_path",
780 help="destination file or directory on the nodes")
781 optgrp.add_option("-p", action="store_true", dest="preserve_flag",
782 help="preserve modification times and modes")
783 parser.add_option_group(optgrp)
784
785
786 optgrp = optparse.OptionGroup(parser, "Ssh options")
787 optgrp.add_option("-f", "--fanout", action="store", dest="fanout",
788 help="use a specified fanout", type="int")
789 optgrp.add_option("-l", "--user", action="store", dest="user",
790 help="execute remote command as user")
791 optgrp.add_option("-o", "--options", action="store", dest="options",
792 help="can be used to give ssh options")
793 optgrp.add_option("-t", "--connect_timeout", action="store",
794 dest="connect_timeout", help="limit time to connect to " \
795 "a node" ,type="float")
796 optgrp.add_option("-u", "--command_timeout", action="store", dest="command_timeout",
797 help="limit time for command to run on the node", type="float")
798 parser.add_option_group(optgrp)
799
800 (options, args) = parser.parse_args()
801
802
803
804
805 config = ClushConfig()
806
807
808 if options.quiet:
809 config.set_main("verbosity", VERB_QUIET)
810 if options.verbose:
811 config.set_main("verbosity", VERB_VERB)
812 if options.debug:
813 config.set_main("verbosity", VERB_DEBUG)
814 if options.fanout:
815 config.set_main("fanout", options.fanout)
816 if options.user:
817 config.set_main("ssh_user", options.user)
818 if options.options:
819 config.set_main("ssh_options", options.options)
820 if options.connect_timeout:
821 config.set_main("connect_timeout", options.connect_timeout)
822 if options.command_timeout:
823 config.set_main("command_timeout", options.command_timeout)
824
825
826
827
828 if options.nodes:
829 nodeset_base = NodeSet.fromlist(options.nodes)
830 if options.exclude:
831 nodeset_exclude = NodeSet.fromlist(options.exclude)
832
833 if options.groupsource:
834
835 STD_GROUP_RESOLVER.default_sourcename = options.groupsource
836
837
838 task = task_self()
839 task.set_info("debug", config.get_verbosity() > 1)
840 if config.get_verbosity() > 1:
841 STD_GROUP_RESOLVER.set_verbosity(1)
842 if options.nodes_all:
843 all_nodeset = NodeSet.fromall()
844 config.verbose_print(VERB_DEBUG, \
845 "Adding nodes from option -a: %s" % all_nodeset)
846 nodeset_base.add(all_nodeset)
847
848 if options.group:
849 grp_nodeset = NodeSet.fromlist(options.group,
850 resolver=NOGROUP_RESOLVER)
851 for grp in grp_nodeset:
852 addingrp = NodeSet("@" + grp)
853 config.verbose_print(VERB_DEBUG, \
854 "Adding nodes from option -g %s: %s" % (grp, addingrp))
855 nodeset_base.update(addingrp)
856
857 if options.exgroup:
858 grp_nodeset = NodeSet.fromlist(options.exgroup,
859 resolver=NOGROUP_RESOLVER)
860 for grp in grp_nodeset:
861 removingrp = NodeSet("@" + grp)
862 config.verbose_print(VERB_DEBUG, \
863 "Excluding nodes from option -X %s: %s" % (grp, removingrp))
864 nodeset_exclude.update(removingrp)
865
866
867 nodeset_base.difference_update(nodeset_exclude)
868 if len(nodeset_base) < 1:
869 parser.error('No node to run on.')
870
871 config.verbose_print(VERB_DEBUG, "Final NodeSet: %s" % nodeset_base)
872
873
874 config.max_fdlimit()
875
876
877
878
879 interactive = not len(args) and not options.source_path
880 if options.nostdin and interactive:
881 parser.error("illegal option `--nostdin' in interactive mode")
882
883 user_interaction = not options.nostdin and sys.stdin.isatty() and \
884 sys.stdout.isatty()
885 config.verbose_print(VERB_DEBUG, "User interaction: %s" % user_interaction)
886 if user_interaction:
887
888
889
890
891 task = Task()
892 signal.signal(signal.SIGUSR1, signal_handler)
893 task.set_default("USER_handle_SIGUSR1", True)
894 else:
895
896 task.set_default("USER_handle_SIGUSR1", False)
897
898 task.excepthook = sys.excepthook
899 task.set_default("USER_stdin_worker", not (sys.stdin.isatty() or
900 options.nostdin))
901 config.verbose_print(VERB_DEBUG, "Create STDIN worker: %s" % \
902 task.default("USER_stdin_worker"))
903
904 task.set_info("debug", config.get_verbosity() >= VERB_DEBUG)
905 task.set_info("fanout", config.get_fanout())
906
907 ssh_user = config.get_ssh_user()
908 if ssh_user:
909 task.set_info("ssh_user", ssh_user)
910 ssh_path = config.get_ssh_path()
911 if ssh_path:
912 task.set_info("ssh_path", ssh_path)
913 ssh_options = config.get_ssh_options()
914 if ssh_options:
915 task.set_info("ssh_options", ssh_options)
916
917
918 connect_timeout = config.get_connect_timeout()
919 task.set_info("connect_timeout", connect_timeout)
920 command_timeout = config.get_command_timeout()
921 task.set_info("command_timeout", command_timeout)
922
923 gather = options.gatherall or options.gather
924
925 task.set_default("stderr", not options.gatherall)
926
927
928 task.set_default("stdout_msgtree", gather)
929
930 task.set_default("stderr_msgtree", False)
931
932
933 if command_timeout > 0:
934 timeout = command_timeout
935 else:
936 timeout = -1
937
938
939 task.set_default("USER_interactive", interactive)
940 task.set_default("USER_running", False)
941
942 if options.source_path:
943 if not options.dest_path:
944 options.dest_path = os.path.dirname(options.source_path)
945 op = "copy source=%s dest=%s" % (options.source_path, options.dest_path)
946 else:
947 op = "command=\"%s\"" % ' '.join(args)
948
949
950
951 config.verbose_print(VERB_VERB, "clush: nodeset=%s fanout=%d [timeout conn=%.1f " \
952 "cmd=%.1f] %s" % (nodeset_base, config.get_fanout(),
953 task.info("connect_timeout"),
954 task.info("command_timeout"), op))
955
956
957 if not options.whencolor:
958 options.whencolor = config.get_color()
959 if options.whencolor == "auto":
960 color = sys.stdout.isatty() and (options.gatherall or \
961 sys.stderr.isatty())
962 else:
963 color = options.whencolor == "always"
964
965
966 display = Display(color)
967 display.line_mode = options.line_mode
968 display.label = options.label
969 display.regroup = options.regroup
970 display.groupsource = options.groupsource
971 display.noprefix = options.groupbase
972
973 if not task.default("USER_interactive"):
974 if options.source_path:
975 if not args:
976 run_copy(task, options.source_path, options.dest_path,
977 nodeset_base, 0, options.preserve_flag, display)
978 else:
979 parser.error("please use `--dest' to specify a different " \
980 "destination")
981 else:
982 run_command(task, ' '.join(args), nodeset_base, gather, timeout,
983 config.get_verbosity(), display)
984
985 if user_interaction:
986 ttyloop(task, nodeset_base, gather, timeout, config.get_verbosity(), display)
987 elif task.default("USER_interactive"):
988 print >> sys.stderr, "ERROR: interactive mode requires a tty"
989 clush_exit(1)
990
991 rc = 0
992 if options.maxrc:
993
994 rc = task.max_retcode()
995 if task.num_timeout() > 0:
996 rc = 255
997 clush_exit(rc)
998
999 if __name__ == '__main__':
1000 clush_main(sys.argv)
1001