Package ClusterShell :: Package Worker :: Module Ssh
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Ssh

  1  # 
  2  # Copyright CEA/DAM/DIF (2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Ssh.py 130 2009-07-22 15:11:11Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell Ssh/Scp support 
 37   
 38  This module implements OpenSSH engine client and task's worker. 
 39  """ 
 40   
 41  from EngineClient import EngineClient, EngineClientEOF 
 42  from Worker import DistantWorker 
 43   
 44  from ClusterShell.NodeSet import NodeSet 
 45   
 46  import copy 
 47  import errno 
 48  import os 
 49  import signal 
 50   
 51   
52 -class Ssh(EngineClient):
53 """ 54 Ssh EngineClient. 55 """ 56
57 - def __init__(self, node, command, worker, timeout, autoclose=False):
58 """ 59 Initialize Ssh EngineClient instance. 60 """ 61 EngineClient.__init__(self, worker, timeout, autoclose) 62 63 self.key = copy.copy(node) 64 self.command = command 65 self.fid = None 66 self.file_reader = None 67 self.file_writer = None
68
69 - def _start(self):
70 """ 71 Start worker, initialize buffers, prepare command. 72 """ 73 task = self.worker.task 74 75 # Build ssh command 76 cmd_l = [ task.info("ssh_path") or "ssh", "-a", "-x" ] 77 78 user = task.info("ssh_user") 79 if user: 80 cmd_l.append("-l %s" % user) 81 82 connect_timeout = task.info("connect_timeout", 0) 83 if connect_timeout > 0: 84 cmd_l.append("-oConnectTimeout=%d" % connect_timeout) 85 86 # Disable passphrase/password querying 87 cmd_l.append("-oBatchMode=yes") 88 89 # Add custom ssh options 90 ssh_options = task.info("ssh_options") 91 if ssh_options: 92 cmd_l.append(ssh_options) 93 94 cmd_l.append("%s" % self.key) 95 cmd_l.append("%s" % self.command) 96 97 if task.info("debug", False): 98 task.info("print_debug")(task, "SSH: %s" % ' '.join(cmd_l)) 99 100 self.fid = self._exec_nonblock(cmd_l) 101 self.file_reader = self.fid.fromchild 102 self.file_writer = self.fid.tochild 103 104 self.worker._on_start() 105 106 return self
107
108 - def reader_fileno(self):
109 """ 110 Return the reader file descriptor as an integer. 111 """ 112 if self.file_reader: 113 return self.file_reader.fileno() 114 return None
115
116 - def writer_fileno(self):
117 """ 118 Return the writer file descriptor as an integer. 119 """ 120 if self.file_writer: 121 return self.file_writer.fileno() 122 return None
123
124 - def _read(self, size=-1):
125 """ 126 Read data from process. 127 """ 128 result = self.file_reader.read(size) 129 if not len(result): 130 raise EngineClientEOF() 131 self._set_reading() 132 return result
133
134 - def _close(self, force, timeout):
135 """ 136 Close client. Called by engine after the client has been 137 unregistered. This method should handle all termination types 138 (normal, forced or on timeout). 139 """ 140 rc = -1 141 if force or timeout: 142 status = self.fid.poll() 143 if status == -1: 144 # process is still running, kill it 145 os.kill(self.fid.pid, signal.SIGKILL) 146 else: 147 status = self.fid.wait() 148 if os.WIFEXITED(status): 149 rc = os.WEXITSTATUS(status) 150 151 self.fid.tochild.close() 152 self.fid.fromchild.close() 153 154 if rc >= 0: 155 self.worker._on_node_rc(self.key, rc) 156 elif timeout: 157 self.worker._on_node_timeout(self.key) 158 159 self.worker._check_fini()
160
161 - def _handle_read(self):
162 """ 163 Handle a read notification. Called by the engine as the result of an 164 event indicating that a read is available. 165 """ 166 debug = self.worker.task.info("debug", False) 167 if debug: 168 print_debug = self.worker.task.info("print_debug") 169 170 for msg in self._readlines(): 171 if debug: 172 print_debug(self.worker.task, "%s: %s" % (self.key, msg)) 173 # handle full msg line 174 self.worker._on_node_msgline(self.key, msg)
175 176
177 -class Scp(Ssh):
178 """ 179 Scp EngineClient. 180 """ 181
182 - def __init__(self, node, source, dest, worker, timeout):
183 """ 184 Initialize Scp instance. 185 """ 186 Ssh.__init__(self, node, None, worker, timeout) 187 self.source = source 188 self.dest = dest 189 self.fid = None 190 self.file_reader = None 191 self.file_writer = None
192
193 - def _start(self):
194 """ 195 Start worker, initialize buffers, prepare command. 196 """ 197 task = self.worker.task 198 199 # Build scp command 200 cmd_l = [ task.info("scp_path") or "scp" ] 201 202 user = task.info("scp_user") or task.info("ssh_user") 203 if user: 204 cmd_l.append("-l %s" % user) 205 206 connect_timeout = task.info("connect_timeout", 0) 207 if connect_timeout > 0: 208 cmd_l.append("-oConnectTimeout=%d" % connect_timeout) 209 210 # Disable passphrase/password querying 211 cmd_l.append("-oBatchMode=yes") 212 213 # Add custom scp options 214 for key in [ "ssh_options", "scp_options" ]: 215 ssh_options = task.info(key) 216 if ssh_options: 217 cmd_l.append(ssh_options) 218 219 cmd_l.append("'%s'" % self.source) 220 221 user = task.info("ssh_user") 222 if user: 223 cmd_l.append("%s@%s:%s" % (user, self.key, self.dest)) 224 else: 225 cmd_l.append("'%s:%s'" % (self.key, self.dest)) 226 cmd = ' '.join(cmd_l) 227 228 if task.info("debug", False): 229 task.info("print_debug")(task, "SCP: %s" % cmd) 230 231 self.fid = self._exec_nonblock(cmd) 232 self.file_reader = self.fid.fromchild 233 self.file_writer = self.fid.tochild 234 235 return self
236 237
238 -class WorkerSsh(DistantWorker):
239 """ 240 ClusterShell ssh-based worker Class. 241 242 Remote Shell (ssh) usage example: 243 worker = WorkerSsh(nodeset, handler=MyEventHandler(), 244 timeout=30, command="/bin/hostname") 245 Remote Copy (scp) usage example: 246 worker = WorkerSsh(nodeset, handler=MyEventHandler(), 247 timeout=30, source="/etc/my.conf", 248 dest="/etc/my.conf") 249 ... 250 task.schedule(worker) # schedule worker for execution 251 ... 252 task.resume() # run 253 """ 254
255 - def __init__(self, nodes, handler, timeout, **kwargs):
256 """ 257 Initialize Ssh worker instance. 258 """ 259 DistantWorker.__init__(self, handler) 260 261 self.clients = [] 262 self.nodes = NodeSet(nodes) 263 self._close_count = 0 264 self._has_timeout = False 265 266 autoclose = kwargs.get('autoclose', False) 267 268 # Prepare underlying engine clients (ssh/scp processes) 269 if kwargs.has_key('command'): 270 # secure remote shell 271 for node in self.nodes: 272 self.clients.append(Ssh(node, kwargs['command'], self, 273 timeout,autoclose)) 274 elif kwargs.has_key('source'): 275 # secure copy 276 for node in self.nodes: 277 self.clients.append(Scp(node, kwargs['source'], kwargs['dest'], 278 self, timeout)) 279 else: 280 raise WorkerBadArgumentException()
281
282 - def _engine_clients(self):
283 """ 284 Access underlying engine clients. 285 """ 286 return self.clients
287
288 - def _on_node_rc(self, node, rc):
289 DistantWorker._on_node_rc(self, node, rc) 290 self._close_count += 1
291
292 - def _on_node_timeout(self, node):
293 DistantWorker._on_node_timeout(self, node) 294 self._close_count += 1 295 self._has_timeout = True
296
297 - def _check_fini(self):
298 if self._close_count >= len(self.clients): 299 if self._has_timeout: 300 self._invoke("ev_timeout") 301 self._invoke("ev_close")
302
303 - def write(self, buf):
304 """ 305 Write to worker clients. 306 """ 307 for c in self.clients: 308 c._write(buf)
309
310 - def set_write_eof(self):
311 """ 312 Tell worker to close its writer file descriptor once flushed. Do not 313 perform writes after this call. 314 """ 315 for c in self.clients: 316 c._set_write_eof()
317