Package ClusterShell :: Package Worker :: Module Ssh
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Ssh

  1  # 
  2  # Copyright CEA/DAM/DIF (2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Ssh.py 292 2010-07-15 22:43:46Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell Ssh/Scp support 
 37   
 38  This module implements OpenSSH engine client and task's worker. 
 39  """ 
 40   
 41  import copy 
 42  import os 
 43  import signal 
 44   
 45  from ClusterShell.NodeSet import NodeSet 
 46  from ClusterShell.Worker.EngineClient import EngineClient 
 47  from ClusterShell.Worker.Worker import DistantWorker, WorkerBadArgumentError 
 48   
 49   
50 -class Ssh(EngineClient):
51 """ 52 Ssh EngineClient. 53 """ 54
55 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False):
56 """ 57 Initialize Ssh EngineClient instance. 58 """ 59 EngineClient.__init__(self, worker, stderr, timeout, autoclose) 60 61 self.key = copy.copy(node) 62 self.command = command 63 self.popen = None
64
65 - def _start(self):
66 """ 67 Start worker, initialize buffers, prepare command. 68 """ 69 task = self.worker.task 70 71 # Build ssh command 72 cmd_l = [ task.info("ssh_path") or "ssh", "-a", "-x" ] 73 74 user = task.info("ssh_user") 75 if user: 76 cmd_l.append("-l %s" % user) 77 78 connect_timeout = task.info("connect_timeout", 0) 79 if connect_timeout > 0: 80 cmd_l.append("-oConnectTimeout=%d" % connect_timeout) 81 82 # Disable passphrase/password querying 83 cmd_l.append("-oBatchMode=yes") 84 85 # Add custom ssh options 86 ssh_options = task.info("ssh_options") 87 if ssh_options: 88 cmd_l.append(ssh_options) 89 90 cmd_l.append("%s" % self.key) 91 cmd_l.append("%s" % self.command) 92 93 if task.info("debug", False): 94 task.info("print_debug")(task, "SSH: %s" % ' '.join(cmd_l)) 95 96 self.popen = self._exec_nonblock(cmd_l) 97 self.file_error = self.popen.stderr 98 self.file_reader = self.popen.stdout 99 self.file_writer = self.popen.stdin 100 101 self.worker._on_start() 102 103 return self
104
105 - def _close(self, force, timeout):
106 """ 107 Close client. Called by engine after the client has been 108 unregistered. This method should handle all termination types 109 (normal, forced or on timeout). 110 """ 111 if not force and self._rbuf: 112 # We still have some read data available in buffer, but no 113 # EOL. Generate a final message before closing. 114 self.worker._on_node_msgline(self.key, self._rbuf) 115 116 rc = -1 117 if force or timeout: 118 prc = self.popen.poll() 119 if prc is None: 120 # process is still running, kill it 121 os.kill(self.popen.pid, signal.SIGKILL) 122 else: 123 prc = self.popen.wait() 124 if prc >= 0: 125 rc = prc 126 127 self.popen.stdin.close() 128 self.popen.stdout.close() 129 130 if rc >= 0: 131 self.worker._on_node_rc(self.key, rc) 132 elif timeout: 133 self.worker._on_node_timeout(self.key) 134 135 self.worker._check_fini()
136
137 - def _handle_read(self):
138 """ 139 Handle a read notification. Called by the engine as the result of an 140 event indicating that a read is available. 141 """ 142 debug = self.worker.task.info("debug", False) 143 if debug: 144 print_debug = self.worker.task.info("print_debug") 145 146 for msg in self._readlines(): 147 if debug: 148 print_debug(self.worker.task, "%s: %s" % (self.key, msg)) 149 # handle full msg line 150 self.worker._on_node_msgline(self.key, msg)
151
152 - def _handle_error(self):
153 """ 154 Handle a read error (stderr) notification. 155 """ 156 debug = self.worker.task.info("debug", False) 157 if debug: 158 print_debug = self.worker.task.info("print_debug") 159 160 for msg in self._readerrlines(): 161 if debug: 162 print_debug(self.worker.task, "%s@STDERR: %s" % (self.key, msg)) 163 # handle full msg line 164 self.worker._on_node_errline(self.key, msg)
165 166
167 -class Scp(Ssh):
168 """ 169 Scp EngineClient. 170 """ 171
172 - def __init__(self, node, source, dest, worker, stderr, timeout, preserve):
173 """ 174 Initialize Scp instance. 175 """ 176 Ssh.__init__(self, node, None, worker, stderr, timeout) 177 self.source = source 178 self.dest = dest 179 self.popen = None 180 181 # Directory check 182 self.isdir = os.path.isdir(self.source) 183 # Note: file sanity checks can be added to Scp._start() as 184 # soon as Task._start_thread is able to dispatch exceptions on 185 # _start (need trac ticket #21). 186 187 # Preserve modification times and modes? 188 self.preserve = preserve
189
190 - def _start(self):
191 """ 192 Start client, initialize buffers, prepare command. 193 """ 194 task = self.worker.task 195 196 # Build scp command 197 cmd_l = [ task.info("scp_path") or "scp" ] 198 199 if self.isdir: 200 cmd_l.append("-r") 201 202 if self.preserve: 203 cmd_l.append("-p") 204 205 user = task.info("scp_user") or task.info("ssh_user") 206 if user: 207 cmd_l.append("-l %s" % user) 208 209 connect_timeout = task.info("connect_timeout", 0) 210 if connect_timeout > 0: 211 cmd_l.append("-oConnectTimeout=%d" % connect_timeout) 212 213 # Disable passphrase/password querying 214 cmd_l.append("-oBatchMode=yes") 215 216 # Add custom scp options 217 for key in [ "ssh_options", "scp_options" ]: 218 ssh_options = task.info(key) 219 if ssh_options: 220 cmd_l.append(ssh_options) 221 222 cmd_l.append(self.source) 223 224 user = task.info("ssh_user") 225 if user: 226 cmd_l.append("%s@%s:%s" % (user, self.key, self.dest)) 227 else: 228 cmd_l.append("%s:%s" % (self.key, self.dest)) 229 230 if task.info("debug", False): 231 task.info("print_debug")(task, "SCP: %s" % ' '.join(cmd_l)) 232 233 self.popen = self._exec_nonblock(cmd_l) 234 self.file_reader = self.popen.stdout 235 self.file_error = self.popen.stderr 236 self.file_writer = self.popen.stdin 237 238 return self
239 240
241 -class WorkerSsh(DistantWorker):
242 """ 243 ClusterShell ssh-based worker Class. 244 245 Remote Shell (ssh) usage example: 246 >>> worker = WorkerSsh(nodeset, handler=MyEventHandler(), 247 ... timeout=30, command="/bin/hostname") 248 >>> task.schedule(worker) # schedule worker for execution 249 >>> task.resume() # run 250 251 Remote Copy (scp) usage example: 252 >>> worker = WorkerSsh(nodeset, handler=MyEventHandler(), 253 ... timeout=30, source="/etc/my.conf", 254 ... dest="/etc/my.conf") 255 >>> task.schedule(worker) # schedule worker for execution 256 >>> task.resume() # run 257 """ 258
259 - def __init__(self, nodes, handler, timeout, **kwargs):
260 """ 261 Initialize Ssh worker instance. 262 """ 263 DistantWorker.__init__(self, handler) 264 265 self.clients = [] 266 self.nodes = NodeSet(nodes) 267 self.command = kwargs.get('command') 268 self.source = kwargs.get('source') 269 self.dest = kwargs.get('dest') 270 autoclose = kwargs.get('autoclose', False) 271 stderr = kwargs.get('stderr', False) 272 self._close_count = 0 273 self._has_timeout = False 274 275 # Prepare underlying engine clients (ssh/scp processes) 276 if self.command is not None: 277 # secure remote shell 278 for node in self.nodes: 279 self.clients.append(Ssh(node, self.command, self, stderr, 280 timeout, autoclose)) 281 elif self.source: 282 # secure copy 283 for node in self.nodes: 284 self.clients.append(Scp(node, self.source, self.dest, 285 self, stderr, timeout, kwargs.get('preserve', False))) 286 else: 287 raise WorkerBadArgumentError()
288
289 - def _engine_clients(self):
290 """ 291 Access underlying engine clients. 292 """ 293 return self.clients
294
295 - def _on_node_rc(self, node, rc):
296 DistantWorker._on_node_rc(self, node, rc) 297 self._close_count += 1
298
299 - def _on_node_timeout(self, node):
300 DistantWorker._on_node_timeout(self, node) 301 self._close_count += 1 302 self._has_timeout = True
303
304 - def _check_fini(self):
305 if self._close_count >= len(self.clients): 306 if self._has_timeout: 307 self._invoke("ev_timeout") 308 self._invoke("ev_close")
309
310 - def write(self, buf):
311 """ 312 Write to worker clients. 313 """ 314 for c in self.clients: 315 c._write(buf)
316
317 - def set_write_eof(self):
318 """ 319 Tell worker to close its writer file descriptor once flushed. Do not 320 perform writes after this call. 321 """ 322 for c in self.clients: 323 c._set_write_eof()
324