1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell Ssh/Scp support
37
38 This module implements OpenSSH engine client and task's worker.
39 """
40
41 from EngineClient import EngineClient, EngineClientEOF
42 from Worker import DistantWorker
43
44 from ClusterShell.NodeSet import NodeSet
45
46 import copy
47 import errno
48 import os
49 import signal
50
51
52 -class Ssh(EngineClient):
53 """
54 Ssh EngineClient.
55 """
56
57 - def __init__(self, node, command, worker, timeout, autoclose=False):
58 """
59 Initialize Ssh EngineClient instance.
60 """
61 EngineClient.__init__(self, worker, timeout, autoclose)
62
63 self.key = copy.copy(node)
64 self.command = command
65 self.fid = None
66 self.file_reader = None
67 self.file_writer = None
68
70 """
71 Start worker, initialize buffers, prepare command.
72 """
73 task = self.worker.task
74
75
76 cmd_l = [ task.info("ssh_path") or "ssh", "-a", "-x" ]
77
78 user = task.info("ssh_user")
79 if user:
80 cmd_l.append("-l %s" % user)
81
82 connect_timeout = task.info("connect_timeout", 0)
83 if connect_timeout > 0:
84 cmd_l.append("-oConnectTimeout=%d" % connect_timeout)
85
86
87 cmd_l.append("-oBatchMode=yes")
88
89
90 ssh_options = task.info("ssh_options")
91 if ssh_options:
92 cmd_l.append(ssh_options)
93
94 cmd_l.append("%s" % self.key)
95 cmd_l.append("%s" % self.command)
96
97 if task.info("debug", False):
98 task.info("print_debug")(task, "SSH: %s" % ' '.join(cmd_l))
99
100 self.fid = self._exec_nonblock(cmd_l)
101 self.file_reader = self.fid.fromchild
102 self.file_writer = self.fid.tochild
103
104 self.worker._on_start()
105
106 return self
107
109 """
110 Return the reader file descriptor as an integer.
111 """
112 if self.file_reader:
113 return self.file_reader.fileno()
114 return None
115
117 """
118 Return the writer file descriptor as an integer.
119 """
120 if self.file_writer:
121 return self.file_writer.fileno()
122 return None
123
124 - def _read(self, size=-1):
125 """
126 Read data from process.
127 """
128 result = self.file_reader.read(size)
129 if not len(result):
130 raise EngineClientEOF()
131 self._set_reading()
132 return result
133
134 - def _close(self, force, timeout):
135 """
136 Close client. Called by engine after the client has been
137 unregistered. This method should handle all termination types
138 (normal, forced or on timeout).
139 """
140 rc = -1
141 if force or timeout:
142 status = self.fid.poll()
143 if status == -1:
144
145 os.kill(self.fid.pid, signal.SIGKILL)
146 else:
147 status = self.fid.wait()
148 if os.WIFEXITED(status):
149 rc = os.WEXITSTATUS(status)
150
151 self.fid.tochild.close()
152 self.fid.fromchild.close()
153
154 if rc >= 0:
155 self.worker._on_node_rc(self.key, rc)
156 elif timeout:
157 self.worker._on_node_timeout(self.key)
158
159 self.worker._check_fini()
160
162 """
163 Handle a read notification. Called by the engine as the result of an
164 event indicating that a read is available.
165 """
166 debug = self.worker.task.info("debug", False)
167 if debug:
168 print_debug = self.worker.task.info("print_debug")
169
170 for msg in self._readlines():
171 if debug:
172 print_debug(self.worker.task, "%s: %s" % (self.key, msg))
173
174 self.worker._on_node_msgline(self.key, msg)
175
176
178 """
179 Scp EngineClient.
180 """
181
182 - def __init__(self, node, source, dest, worker, timeout):
183 """
184 Initialize Scp instance.
185 """
186 Ssh.__init__(self, node, None, worker, timeout)
187 self.source = source
188 self.dest = dest
189 self.fid = None
190 self.file_reader = None
191 self.file_writer = None
192
194 """
195 Start worker, initialize buffers, prepare command.
196 """
197 task = self.worker.task
198
199
200 cmd_l = [ task.info("scp_path") or "scp" ]
201
202 user = task.info("scp_user") or task.info("ssh_user")
203 if user:
204 cmd_l.append("-l %s" % user)
205
206 connect_timeout = task.info("connect_timeout", 0)
207 if connect_timeout > 0:
208 cmd_l.append("-oConnectTimeout=%d" % connect_timeout)
209
210
211 cmd_l.append("-oBatchMode=yes")
212
213
214 for key in [ "ssh_options", "scp_options" ]:
215 ssh_options = task.info(key)
216 if ssh_options:
217 cmd_l.append(ssh_options)
218
219 cmd_l.append("'%s'" % self.source)
220
221 user = task.info("ssh_user")
222 if user:
223 cmd_l.append("%s@%s:%s" % (user, self.key, self.dest))
224 else:
225 cmd_l.append("'%s:%s'" % (self.key, self.dest))
226 cmd = ' '.join(cmd_l)
227
228 if task.info("debug", False):
229 task.info("print_debug")(task, "SCP: %s" % cmd)
230
231 self.fid = self._exec_nonblock(cmd)
232 self.file_reader = self.fid.fromchild
233 self.file_writer = self.fid.tochild
234
235 return self
236
237
239 """
240 ClusterShell ssh-based worker Class.
241
242 Remote Shell (ssh) usage example:
243 worker = WorkerSsh(nodeset, handler=MyEventHandler(),
244 timeout=30, command="/bin/hostname")
245 Remote Copy (scp) usage example:
246 worker = WorkerSsh(nodeset, handler=MyEventHandler(),
247 timeout=30, source="/etc/my.conf",
248 dest="/etc/my.conf")
249 ...
250 task.schedule(worker) # schedule worker for execution
251 ...
252 task.resume() # run
253 """
254
256 """
257 Initialize Ssh worker instance.
258 """
259 DistantWorker.__init__(self, handler)
260
261 self.clients = []
262 self.nodes = NodeSet(nodes)
263 self._close_count = 0
264 self._has_timeout = False
265
266 autoclose = kwargs.get('autoclose', False)
267
268
269 if kwargs.has_key('command'):
270
271 for node in self.nodes:
272 self.clients.append(Ssh(node, kwargs['command'], self,
273 timeout,autoclose))
274 elif kwargs.has_key('source'):
275
276 for node in self.nodes:
277 self.clients.append(Scp(node, kwargs['source'], kwargs['dest'],
278 self, timeout))
279 else:
280 raise WorkerBadArgumentException()
281
283 """
284 Access underlying engine clients.
285 """
286 return self.clients
287
291
296
298 if self._close_count >= len(self.clients):
299 if self._has_timeout:
300 self._invoke("ev_timeout")
301 self._invoke("ev_close")
302
304 """
305 Write to worker clients.
306 """
307 for c in self.clients:
308 c._write(buf)
309
311 """
312 Tell worker to close its writer file descriptor once flushed. Do not
313 perform writes after this call.
314 """
315 for c in self.clients:
316 c._set_write_eof()
317