Package ClusterShell :: Package Worker :: Module Pdsh
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Pdsh

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Pdsh.py 292 2010-07-15 22:43:46Z st-cea $ 
 34   
 35  """ 
 36  WorkerPdsh 
 37   
 38  ClusterShell worker for executing commands with LLNL pdsh. 
 39  """ 
 40   
 41  import errno 
 42  import os 
 43  import signal 
 44  import sys 
 45   
 46  from ClusterShell.NodeSet import NodeSet 
 47  from ClusterShell.Worker.EngineClient import EngineClient 
 48  from ClusterShell.Worker.EngineClient import EngineClientError 
 49  from ClusterShell.Worker.EngineClient import EngineClientNotSupportedError 
 50  from ClusterShell.Worker.Worker import DistantWorker 
 51  from ClusterShell.Worker.Worker import WorkerError, WorkerBadArgumentError 
 52   
 53   
54 -class WorkerPdsh(EngineClient, DistantWorker):
55 """ 56 ClusterShell pdsh-based worker Class. 57 58 Remote Shell (pdsh) usage example: 59 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(), 60 ... timeout=30, command="/bin/hostname") 61 >>> task.schedule(worker) # schedule worker for execution 62 >>> task.resume() # run 63 64 Remote Copy (pdcp) usage example: 65 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(), 66 ... timeout=30, source="/etc/my.conf", 67 ... dest="/etc/my.conf") 68 >>> task.schedule(worker) # schedule worker for execution 69 >>> task.resume() # run 70 71 Known limitations: 72 - write() is not supported by WorkerPdsh 73 - return codes == 0 are not garanteed when a timeout is used (rc > 0 74 are fine) 75 """ 76
77 - def __init__(self, nodes, handler, timeout, **kwargs):
78 """ 79 Initialize Pdsh worker instance. 80 """ 81 DistantWorker.__init__(self, handler) 82 83 self.nodes = NodeSet(nodes) 84 self.closed_nodes = NodeSet() 85 86 self.command = kwargs.get('command') 87 self.source = kwargs.get('source') 88 self.dest = kwargs.get('dest') 89 90 autoclose = kwargs.get('autoclose', False) 91 stderr = kwargs.get('stderr', False) 92 93 EngineClient.__init__(self, self, stderr, timeout, autoclose) 94 95 if self.command is not None: 96 # PDSH 97 self.source = None 98 self.dest = None 99 self.mode = 'pdsh' 100 elif self.source: 101 # PDCP 102 self.command = None 103 self.mode = 'pdcp' 104 self.isdir = os.path.isdir(self.source) 105 # Preserve modification times and modes? 106 self.preserve = kwargs.get('preserve', False) 107 else: 108 raise WorkerBadArgumentError("missing command or source in " \ 109 "WorkerPdsh constructor") 110 self.popen = None 111 self._buf = ""
112
113 - def _engine_clients(self):
114 return [self]
115
116 - def _start(self):
117 """ 118 Start worker, initialize buffers, prepare command. 119 """ 120 # Initialize worker read buffer 121 self._buf = "" 122 123 pdsh_env = {} 124 125 if self.command is not None: 126 # Build pdsh command 127 executable = self.task.info("pdsh_path") or "pdsh" 128 cmd_l = [ executable, "-b" ] 129 130 fanout = self.task.info("fanout", 0) 131 if fanout > 0: 132 cmd_l.append("-f %d" % fanout) 133 134 # Pdsh flag '-t' do not really works well. Better to use 135 # PDSH_SSH_ARGS_APPEND variable to transmit ssh ConnectTimeout 136 # flag. 137 connect_timeout = self.task.info("connect_timeout", 0) 138 if connect_timeout > 0: 139 pdsh_env['PDSH_SSH_ARGS_APPEND'] = "-o ConnectTimeout=%d" % \ 140 connect_timeout 141 142 command_timeout = self.task.info("command_timeout", 0) 143 if command_timeout > 0: 144 cmd_l.append("-u %d" % command_timeout) 145 146 cmd_l.append("-w %s" % self.nodes) 147 cmd_l.append("%s" % self.command) 148 149 if self.task.info("debug", False): 150 self.task.info("print_debug")(self.task, "PDSH: %s" % \ 151 ' '.join(cmd_l)) 152 else: 153 # Build pdcp command 154 executable = self.task.info("pdcp_path") or "pdcp" 155 cmd_l = [ executable, "-b" ] 156 157 fanout = self.task.info("fanout", 0) 158 if fanout > 0: 159 cmd_l.append("-f %d" % fanout) 160 161 connect_timeout = self.task.info("connect_timeout", 0) 162 if connect_timeout > 0: 163 cmd_l.append("-t %d" % connect_timeout) 164 165 cmd_l.append("-w %s" % self.nodes) 166 167 if self.isdir: 168 cmd_l.append("-r") 169 170 if self.preserve: 171 cmd_l.append("-p") 172 173 cmd_l.append(self.source) 174 cmd_l.append(self.dest) 175 176 if self.task.info("debug", False): 177 self.task.info("print_debug")(self.task,"PDCP: %s" % \ 178 ' '.join(cmd_l)) 179 180 self.popen = self._exec_nonblock(cmd_l, env=pdsh_env) 181 self.file_error = self.popen.stderr 182 self.file_reader = self.popen.stdout 183 self.file_writer = self.popen.stdin 184 185 self._on_start() 186 187 return self
188
189 - def write(self, buf):
190 """ 191 Write data to process. Not supported with Pdsh worker. 192 """ 193 raise EngineClientNotSupportedError("writing is not " \ 194 "supported by pdsh worker")
195
196 - def _close(self, force, timeout):
197 """ 198 Close worker. Called by engine after worker has been 199 unregistered. This method should handle all termination types 200 (normal, forced or on timeout). 201 """ 202 if force or timeout: 203 prc = self.popen.poll() 204 if prc is None: 205 # process is still running, kill it 206 os.kill(self.popen.pid, signal.SIGKILL) 207 if timeout: 208 self._invoke("ev_timeout") 209 else: 210 prc = self.popen.wait() 211 if prc >= 0: 212 rc = prc 213 if rc != 0: 214 raise WorkerError("Cannot run pdsh (error %d)" % rc) 215 216 # close 217 self.popen.stdin.close() 218 self.popen.stdout.close() 219 220 if timeout: 221 for node in (self.nodes - self.closed_nodes): 222 self._on_node_timeout(node) 223 else: 224 for node in (self.nodes - self.closed_nodes): 225 self._on_node_rc(node, 0) 226 227 self._invoke("ev_close")
228
229 - def _parse_line(self, line, stderr):
230 """ 231 Parse Pdsh line syntax. 232 """ 233 if line.startswith("pdsh@") or \ 234 line.startswith("pdcp@") or \ 235 line.startswith("sending "): 236 try: 237 # pdsh@cors113: cors115: ssh exited with exit code 1 238 # 0 1 2 3 4 5 6 7 239 # corsUNKN: ssh: corsUNKN: Name or service not known 240 # 0 1 2 3 4 5 6 7 241 # pdsh@fortoy0: fortoy101: command timeout 242 # 0 1 2 3 243 # sending SIGTERM to ssh fortoy112 pid 32014 244 # 0 1 2 3 4 5 6 245 # pdcp@cors113: corsUNKN: ssh exited with exit code 255 246 # 0 1 2 3 4 5 6 7 247 # pdcp@cors113: cors115: fatal: /var/cache/shine/... 248 # 0 1 2 3... 249 250 words = line.split() 251 # Set return code for nodename of worker 252 if self.mode == 'pdsh': 253 if len(words) == 4 and words[2] == "command" and \ 254 words[3] == "timeout": 255 pass 256 elif len(words) == 8 and words[3] == "exited" and \ 257 words[7].isdigit(): 258 self._on_node_rc(words[1][:-1], int(words[7])) 259 elif self.mode == 'pdcp': 260 self._on_node_rc(words[1][:-1], errno.ENOENT) 261 262 except Exception, e: 263 print >> sys.stderr, e 264 raise EngineClientError() 265 else: 266 # split pdsh reply "nodename: msg" 267 nodename, msg = line.split(': ', 1) 268 if stderr: 269 self._on_node_errline(nodename, msg) 270 else: 271 self._on_node_msgline(nodename, msg)
272
273 - def _handle_read(self):
274 """ 275 Engine is telling us a read is available. 276 """ 277 debug = self.task.info("debug", False) 278 if debug: 279 print_debug = self.task.info("print_debug") 280 281 for msg in self._readlines(): 282 if debug: 283 print_debug(self.task, "PDSH: %s" % msg) 284 self._parse_line(msg, False)
285
286 - def _handle_error(self):
287 """ 288 Engine is telling us an error read is available. 289 """ 290 debug = self.worker.task.info("debug", False) 291 if debug: 292 print_debug = self.worker.task.info("print_debug") 293 294 for msg in self._readerrlines(): 295 if debug: 296 print_debug(self.task, "PDSH@STDERR: %s" % msg) 297 self._parse_line(msg, True)
298
299 - def _on_node_rc(self, node, rc):
300 """ 301 Return code received from a node, update last* stuffs. 302 """ 303 DistantWorker._on_node_rc(self, node, rc) 304 self.closed_nodes.add(node)
305