Package ClusterShell :: Package Worker :: Module Pdsh
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Pdsh

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Pdsh.py 126 2009-07-13 21:35:07Z st-cea $ 
 34   
 35  """ 
 36  WorkerPdsh 
 37   
 38  ClusterShell worker for executing commands with LLNL pdsh. 
 39  """ 
 40   
 41  from ClusterShell.NodeSet import NodeSet 
 42   
 43  from EngineClient import * 
 44  from Worker import DistantWorker, WorkerError 
 45   
 46  import errno 
 47  import fcntl 
 48  import os 
 49  import popen2 
 50  import signal 
 51   
 52   
53 -class WorkerPdsh(EngineClient,DistantWorker):
54 """ 55 ClusterShell pdsh-based worker Class. 56 57 Remote Shell (pdsh) usage example: 58 worker = WorkerPdsh(nodeset, handler=MyEventHandler(), 59 timeout=30, command="/bin/hostname") 60 Remote Copy (pdcp) usage example: 61 worker = WorkerPdsh(nodeset, handler=MyEventHandler(), 62 timeout=30, source="/etc/my.conf", 63 dest="/etc/my.conf") 64 ... 65 task.schedule(worker) # schedule worker for execution 66 ... 67 task.resume() # run 68 69 Known Limitations: 70 * write() is not supported by WorkerPdsh 71 * return codes == 0 are not garanteed when a timeout is used (rc > 0 72 are fine) 73 """ 74
75 - def __init__(self, nodes, handler, timeout, **kwargs):
76 """ 77 Initialize Pdsh worker instance. 78 """ 79 DistantWorker.__init__(self, handler) 80 EngineClient.__init__(self, self, timeout, kwargs.get('autoclose', False)) 81 82 self.nodes = NodeSet(nodes) 83 self.closed_nodes = NodeSet() 84 85 if kwargs.has_key('command'): 86 # PDSH 87 self.command = kwargs['command'] 88 self.source = None 89 self.dest = None 90 self.mode = 'pdsh' 91 elif kwargs.has_key('source'): 92 # PDCP 93 self.command = None 94 self.source = kwargs['source'] 95 self.dest = kwargs['dest'] 96 self.mode = 'pdcp' 97 else: 98 raise WorkerBadArgumentException() 99 100 self.fid = None 101 self.buf = ""
102
103 - def _engine_clients(self):
104 return [self]
105
106 - def _start(self):
107 """ 108 Start worker, initialize buffers, prepare command. 109 """ 110 # Initialize worker read buffer 111 self._buf = "" 112 113 if self.command is not None: 114 # Build pdsh command 115 cmd_l = [ self.task.info("pdsh_path") or "pdsh", "-b" ] 116 117 fanout = self.task.info("fanout", 0) 118 if fanout > 0: 119 cmd_l.append("-f %d" % fanout) 120 121 # Pdsh flag '-t' do not really works well. Better to use 122 # PDSH_SSH_ARGS_APPEND variable to transmit ssh ConnectTimeout 123 # flag. 124 connect_timeout = self.task.info("connect_timeout", 0) 125 if connect_timeout > 0: 126 cmd_l.insert(0, 127 "PDSH_SSH_ARGS_APPEND=\"-o ConnectTimeout=%d\"" % 128 connect_timeout) 129 130 command_timeout = self.task.info("command_timeout", 0) 131 if command_timeout > 0: 132 cmd_l.append("-u %d" % command_timeout) 133 134 cmd_l.append("-w '%s'" % self.nodes) 135 cmd_l.append("'%s'" % self.command) 136 137 cmd = ' '.join(cmd_l) 138 139 if self.task.info("debug", False): 140 self.task.info("print_debug")(self.task, "PDSH: %s" % cmd) 141 else: 142 # Build pdcp command 143 cmd_l = [ self.task.info("pdcp_path") or "pdcp", "-b" ] 144 145 fanout = self.task.info("fanout", 0) 146 if fanout > 0: 147 cmd_l.append("-f %d" % fanout) 148 149 connect_timeout = self.task.info("connect_timeout", 0) 150 if connect_timeout > 0: 151 cmd_l.append("-t %d" % connect_timeout) 152 153 cmd_l.append("-w '%s'" % self.nodes) 154 155 cmd_l.append("'%s'" % self.source) 156 cmd_l.append("'%s'" % self.dest) 157 cmd = ' '.join(cmd_l) 158 159 if self.task.info("debug", False): 160 self.task.info("print_debug")(self.task,"PDCP: %s" % cmd) 161 162 self.fid = self._exec_nonblock(cmd) 163 164 self._on_start() 165 166 return self
167
168 - def reader_fileno(self):
169 """ 170 Return the reader file descriptor as an integer. 171 """ 172 return self.fid.fromchild.fileno()
173
174 - def writer_fileno(self):
175 """ 176 Return the writer file descriptor as an integer. 177 """ 178 return self.fid.tochild.fileno()
179
180 - def _read(self, size=-1):
181 """ 182 Read data from process. 183 """ 184 result = self.fid.fromchild.read(size) 185 if result > 0: 186 self._set_reading() 187 return result
188
189 - def write(self, buf):
190 """ 191 Write data to process. Not supported with Pdsh worker. 192 """ 193 raise EngineClientNotSupportedError("writing is not supported by pdsh worker")
194
195 - def _close(self, force, timeout):
196 """ 197 Close worker. Called by engine after worker has been 198 unregistered. This method should handle all termination types 199 (normal, forced or on timeout). 200 """ 201 if force or timeout: 202 status = self.fid.poll() 203 if status == -1: 204 # process is still running, kill it 205 os.kill(self.fid.pid, signal.SIGKILL) 206 if timeout: 207 self._invoke("ev_timeout") 208 else: 209 status = self.fid.wait() 210 if os.WIFEXITED(status): 211 rc = os.WEXITSTATUS(status) 212 if rc != 0: 213 raise WorkerError("Cannot run pdsh (error %d)" % rc) 214 215 # close 216 self.fid.tochild.close() 217 self.fid.fromchild.close() 218 219 if timeout: 220 for node in (self.nodes - self.closed_nodes): 221 self._on_node_timeout(node) 222 else: 223 for node in (self.nodes - self.closed_nodes): 224 self._on_node_rc(node, 0) 225 226 self._invoke("ev_close")
227
228 - def _handle_read(self):
229 """ 230 Engine is telling us a read is available. 231 """ 232 debug = self.task.info("debug", False) 233 if debug: 234 print_debug = self.task.info("print_debug") 235 236 # read a chunk 237 readbuf = self._read() 238 assert len(readbuf) > 0, "_handle_read() called with no data to read" 239 240 buf = self._buf + readbuf 241 lines = buf.splitlines(True) 242 self._buf = "" 243 for line in lines: 244 if debug: 245 print_debug(self.task, "LINE: %s" % line[:-1]) 246 if line.endswith('\n'): 247 if line.startswith("pdsh@") or line.startswith("pdcp@") or line.startswith("sending "): 248 try: 249 # pdsh@cors113: cors115: ssh exited with exit code 1 250 # 0 1 2 3 4 5 6 7 251 # corsUNKN: ssh: corsUNKN: Name or service not known 252 # 0 1 2 3 4 5 6 7 253 # pdsh@fortoy0: fortoy101: command timeout 254 # 0 1 2 3 255 # sending SIGTERM to ssh fortoy112 pid 32014 256 # 0 1 2 3 4 5 6 257 # pdcp@cors113: corsUNKN: ssh exited with exit code 255 258 # 0 1 2 3 4 5 6 7 259 # pdcp@cors113: cors115: fatal: /var/cache/shine/conf/testfs.xmf: No such file or directory 260 # 0 1 2 3... 261 262 words = line.split() 263 # Set return code for nodename of worker 264 if self.mode == 'pdsh': 265 if len(words) == 4 and words[2] == "command" and \ 266 words[3] == "timeout": 267 pass 268 elif len(words) == 8 and words[3] == "exited" and words[7].isdigit(): 269 self._on_node_rc(words[1][:-1], int(words[7])) 270 elif self.mode == 'pdcp': 271 self._on_node_rc(words[1][:-1], errno.ENOENT) 272 273 except Exception, e: 274 print >>sys.stderr, e 275 raise EngineClientError() 276 else: 277 # 278 # split pdsh reply "nodename: msg" 279 nodename, msg = line.split(': ', 1) 280 if msg.endswith('\n'): 281 if msg.endswith('\r\n'): 282 msgline = msg[:-2] # trim CRLF 283 else: 284 msgline = msg[:-1] # trim LF 285 self._on_node_msgline(nodename, msgline) 286 else: 287 # keep partial line in buffer 288 self._buf = line
289
290 - def _on_node_rc(self, node, rc):
291 """ 292 Return code received from a node, update last* stuffs. 293 """ 294 DistantWorker._on_node_rc(self, node, rc) 295 self.closed_nodes.add(node)
296