1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell Ssh/Scp support
37
38 This module implements OpenSSH engine client and task's worker.
39 """
40
41 import copy
42 import os
43 import signal
44
45 from ClusterShell.NodeSet import NodeSet
46 from ClusterShell.Worker.EngineClient import EngineClient
47 from ClusterShell.Worker.Worker import DistantWorker, WorkerBadArgumentError
48
49
50 -class Ssh(EngineClient):
51 """
52 Ssh EngineClient.
53 """
54
55 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False):
56 """
57 Initialize Ssh EngineClient instance.
58 """
59 EngineClient.__init__(self, worker, stderr, timeout, autoclose)
60
61 self.key = copy.copy(node)
62 self.command = command
63 self.popen = None
64
66 """
67 Start worker, initialize buffers, prepare command.
68 """
69 task = self.worker.task
70
71
72 cmd_l = [ task.info("ssh_path") or "ssh", "-a", "-x" ]
73
74 user = task.info("ssh_user")
75 if user:
76 cmd_l.append("-l %s" % user)
77
78 connect_timeout = task.info("connect_timeout", 0)
79 if connect_timeout > 0:
80 cmd_l.append("-oConnectTimeout=%d" % connect_timeout)
81
82
83 cmd_l.append("-oBatchMode=yes")
84
85
86 ssh_options = task.info("ssh_options")
87 if ssh_options:
88 cmd_l.append(ssh_options)
89
90 cmd_l.append("%s" % self.key)
91 cmd_l.append("%s" % self.command)
92
93 if task.info("debug", False):
94 task.info("print_debug")(task, "SSH: %s" % ' '.join(cmd_l))
95
96 self.popen = self._exec_nonblock(cmd_l)
97 self.file_error = self.popen.stderr
98 self.file_reader = self.popen.stdout
99 self.file_writer = self.popen.stdin
100
101 self.worker._on_start()
102
103 return self
104
105 - def _close(self, force, timeout):
106 """
107 Close client. Called by engine after the client has been
108 unregistered. This method should handle all termination types
109 (normal, forced or on timeout).
110 """
111 if not force and self._rbuf:
112
113
114 self.worker._on_node_msgline(self.key, self._rbuf)
115
116 rc = -1
117 if force or timeout:
118 prc = self.popen.poll()
119 if prc is None:
120
121 os.kill(self.popen.pid, signal.SIGKILL)
122 else:
123 prc = self.popen.wait()
124 if prc >= 0:
125 rc = prc
126
127 self.popen.stdin.close()
128 self.popen.stdout.close()
129
130 if rc >= 0:
131 self.worker._on_node_rc(self.key, rc)
132 elif timeout:
133 self.worker._on_node_timeout(self.key)
134
135 self.worker._check_fini()
136
138 """
139 Handle a read notification. Called by the engine as the result of an
140 event indicating that a read is available.
141 """
142 debug = self.worker.task.info("debug", False)
143 if debug:
144 print_debug = self.worker.task.info("print_debug")
145
146 for msg in self._readlines():
147 if debug:
148 print_debug(self.worker.task, "%s: %s" % (self.key, msg))
149
150 self.worker._on_node_msgline(self.key, msg)
151
153 """
154 Handle a read error (stderr) notification.
155 """
156 debug = self.worker.task.info("debug", False)
157 if debug:
158 print_debug = self.worker.task.info("print_debug")
159
160 for msg in self._readerrlines():
161 if debug:
162 print_debug(self.worker.task, "%s@STDERR: %s" % (self.key, msg))
163
164 self.worker._on_node_errline(self.key, msg)
165
166
168 """
169 Scp EngineClient.
170 """
171
172 - def __init__(self, node, source, dest, worker, stderr, timeout, preserve):
173 """
174 Initialize Scp instance.
175 """
176 Ssh.__init__(self, node, None, worker, stderr, timeout)
177 self.source = source
178 self.dest = dest
179 self.popen = None
180
181
182 self.isdir = os.path.isdir(self.source)
183
184
185
186
187
188 self.preserve = preserve
189
191 """
192 Start client, initialize buffers, prepare command.
193 """
194 task = self.worker.task
195
196
197 cmd_l = [ task.info("scp_path") or "scp" ]
198
199 if self.isdir:
200 cmd_l.append("-r")
201
202 if self.preserve:
203 cmd_l.append("-p")
204
205 user = task.info("scp_user") or task.info("ssh_user")
206 if user:
207 cmd_l.append("-l %s" % user)
208
209 connect_timeout = task.info("connect_timeout", 0)
210 if connect_timeout > 0:
211 cmd_l.append("-oConnectTimeout=%d" % connect_timeout)
212
213
214 cmd_l.append("-oBatchMode=yes")
215
216
217 for key in [ "ssh_options", "scp_options" ]:
218 ssh_options = task.info(key)
219 if ssh_options:
220 cmd_l.append(ssh_options)
221
222 cmd_l.append(self.source)
223
224 user = task.info("ssh_user")
225 if user:
226 cmd_l.append("%s@%s:%s" % (user, self.key, self.dest))
227 else:
228 cmd_l.append("%s:%s" % (self.key, self.dest))
229
230 if task.info("debug", False):
231 task.info("print_debug")(task, "SCP: %s" % ' '.join(cmd_l))
232
233 self.popen = self._exec_nonblock(cmd_l)
234 self.file_reader = self.popen.stdout
235 self.file_error = self.popen.stderr
236 self.file_writer = self.popen.stdin
237
238 return self
239
240
242 """
243 ClusterShell ssh-based worker Class.
244
245 Remote Shell (ssh) usage example:
246 >>> worker = WorkerSsh(nodeset, handler=MyEventHandler(),
247 ... timeout=30, command="/bin/hostname")
248 >>> task.schedule(worker) # schedule worker for execution
249 >>> task.resume() # run
250
251 Remote Copy (scp) usage example:
252 >>> worker = WorkerSsh(nodeset, handler=MyEventHandler(),
253 ... timeout=30, source="/etc/my.conf",
254 ... dest="/etc/my.conf")
255 >>> task.schedule(worker) # schedule worker for execution
256 >>> task.resume() # run
257 """
258
260 """
261 Initialize Ssh worker instance.
262 """
263 DistantWorker.__init__(self, handler)
264
265 self.clients = []
266 self.nodes = NodeSet(nodes)
267 self.command = kwargs.get('command')
268 self.source = kwargs.get('source')
269 self.dest = kwargs.get('dest')
270 autoclose = kwargs.get('autoclose', False)
271 stderr = kwargs.get('stderr', False)
272 self._close_count = 0
273 self._has_timeout = False
274
275
276 if self.command is not None:
277
278 for node in self.nodes:
279 self.clients.append(Ssh(node, self.command, self, stderr,
280 timeout, autoclose))
281 elif self.source:
282
283 for node in self.nodes:
284 self.clients.append(Scp(node, self.source, self.dest,
285 self, stderr, timeout, kwargs.get('preserve', False)))
286 else:
287 raise WorkerBadArgumentError()
288
290 """
291 Access underlying engine clients.
292 """
293 return self.clients
294
298
303
305 if self._close_count >= len(self.clients):
306 if self._has_timeout:
307 self._invoke("ev_timeout")
308 self._invoke("ev_close")
309
311 """
312 Write to worker clients.
313 """
314 for c in self.clients:
315 c._write(buf)
316
318 """
319 Tell worker to close its writer file descriptor once flushed. Do not
320 perform writes after this call.
321 """
322 for c in self.clients:
323 c._set_write_eof()
324