1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 WorkerPdsh
37
38 ClusterShell worker for executing commands with LLNL pdsh.
39 """
40
41 import errno
42 import os
43 import signal
44 import sys
45
46 from ClusterShell.NodeSet import NodeSet
47 from ClusterShell.Worker.EngineClient import EngineClient
48 from ClusterShell.Worker.EngineClient import EngineClientError
49 from ClusterShell.Worker.EngineClient import EngineClientNotSupportedError
50 from ClusterShell.Worker.Worker import DistantWorker
51 from ClusterShell.Worker.Worker import WorkerError, WorkerBadArgumentError
52
53
55 """
56 ClusterShell pdsh-based worker Class.
57
58 Remote Shell (pdsh) usage example:
59 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(),
60 ... timeout=30, command="/bin/hostname")
61 >>> task.schedule(worker) # schedule worker for execution
62 >>> task.resume() # run
63
64 Remote Copy (pdcp) usage example:
65 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(),
66 ... timeout=30, source="/etc/my.conf",
67 ... dest="/etc/my.conf")
68 >>> task.schedule(worker) # schedule worker for execution
69 >>> task.resume() # run
70
71 Known limitations:
72 - write() is not supported by WorkerPdsh
73 - return codes == 0 are not garanteed when a timeout is used (rc > 0
74 are fine)
75 """
76
77 - def __init__(self, nodes, handler, timeout, **kwargs):
78 """
79 Initialize Pdsh worker instance.
80 """
81 DistantWorker.__init__(self, handler)
82
83 self.nodes = NodeSet(nodes)
84 self.closed_nodes = NodeSet()
85
86 self.command = kwargs.get('command')
87 self.source = kwargs.get('source')
88 self.dest = kwargs.get('dest')
89
90 autoclose = kwargs.get('autoclose', False)
91 stderr = kwargs.get('stderr', False)
92
93 EngineClient.__init__(self, self, stderr, timeout, autoclose)
94
95 if self.command is not None:
96
97 self.source = None
98 self.dest = None
99 self.mode = 'pdsh'
100 elif self.source:
101
102 self.command = None
103 self.mode = 'pdcp'
104 self.isdir = os.path.isdir(self.source)
105
106 self.preserve = kwargs.get('preserve', False)
107 else:
108 raise WorkerBadArgumentError("missing command or source in " \
109 "WorkerPdsh constructor")
110 self.popen = None
111 self._buf = ""
112
115
117 """
118 Start worker, initialize buffers, prepare command.
119 """
120
121 self._buf = ""
122
123 pdsh_env = {}
124
125 if self.command is not None:
126
127 executable = self.task.info("pdsh_path") or "pdsh"
128 cmd_l = [ executable, "-b" ]
129
130 fanout = self.task.info("fanout", 0)
131 if fanout > 0:
132 cmd_l.append("-f %d" % fanout)
133
134
135
136
137 connect_timeout = self.task.info("connect_timeout", 0)
138 if connect_timeout > 0:
139 pdsh_env['PDSH_SSH_ARGS_APPEND'] = "-o ConnectTimeout=%d" % \
140 connect_timeout
141
142 command_timeout = self.task.info("command_timeout", 0)
143 if command_timeout > 0:
144 cmd_l.append("-u %d" % command_timeout)
145
146 cmd_l.append("-w %s" % self.nodes)
147 cmd_l.append("%s" % self.command)
148
149 if self.task.info("debug", False):
150 self.task.info("print_debug")(self.task, "PDSH: %s" % \
151 ' '.join(cmd_l))
152 else:
153
154 executable = self.task.info("pdcp_path") or "pdcp"
155 cmd_l = [ executable, "-b" ]
156
157 fanout = self.task.info("fanout", 0)
158 if fanout > 0:
159 cmd_l.append("-f %d" % fanout)
160
161 connect_timeout = self.task.info("connect_timeout", 0)
162 if connect_timeout > 0:
163 cmd_l.append("-t %d" % connect_timeout)
164
165 cmd_l.append("-w %s" % self.nodes)
166
167 if self.isdir:
168 cmd_l.append("-r")
169
170 if self.preserve:
171 cmd_l.append("-p")
172
173 cmd_l.append(self.source)
174 cmd_l.append(self.dest)
175
176 if self.task.info("debug", False):
177 self.task.info("print_debug")(self.task,"PDCP: %s" % \
178 ' '.join(cmd_l))
179
180 self.popen = self._exec_nonblock(cmd_l, env=pdsh_env)
181 self.file_error = self.popen.stderr
182 self.file_reader = self.popen.stdout
183 self.file_writer = self.popen.stdin
184
185 self._on_start()
186
187 return self
188
190 """
191 Write data to process. Not supported with Pdsh worker.
192 """
193 raise EngineClientNotSupportedError("writing is not " \
194 "supported by pdsh worker")
195
196 - def _close(self, force, timeout):
197 """
198 Close worker. Called by engine after worker has been
199 unregistered. This method should handle all termination types
200 (normal, forced or on timeout).
201 """
202 if force or timeout:
203 prc = self.popen.poll()
204 if prc is None:
205
206 os.kill(self.popen.pid, signal.SIGKILL)
207 if timeout:
208 self._invoke("ev_timeout")
209 else:
210 prc = self.popen.wait()
211 if prc >= 0:
212 rc = prc
213 if rc != 0:
214 raise WorkerError("Cannot run pdsh (error %d)" % rc)
215
216
217 self.popen.stdin.close()
218 self.popen.stdout.close()
219
220 if timeout:
221 for node in (self.nodes - self.closed_nodes):
222 self._on_node_timeout(node)
223 else:
224 for node in (self.nodes - self.closed_nodes):
225 self._on_node_rc(node, 0)
226
227 self._invoke("ev_close")
228
230 """
231 Parse Pdsh line syntax.
232 """
233 if line.startswith("pdsh@") or \
234 line.startswith("pdcp@") or \
235 line.startswith("sending "):
236 try:
237
238
239
240
241
242
243
244
245
246
247
248
249
250 words = line.split()
251
252 if self.mode == 'pdsh':
253 if len(words) == 4 and words[2] == "command" and \
254 words[3] == "timeout":
255 pass
256 elif len(words) == 8 and words[3] == "exited" and \
257 words[7].isdigit():
258 self._on_node_rc(words[1][:-1], int(words[7]))
259 elif self.mode == 'pdcp':
260 self._on_node_rc(words[1][:-1], errno.ENOENT)
261
262 except Exception, e:
263 print >> sys.stderr, e
264 raise EngineClientError()
265 else:
266
267 nodename, msg = line.split(': ', 1)
268 if stderr:
269 self._on_node_errline(nodename, msg)
270 else:
271 self._on_node_msgline(nodename, msg)
272
274 """
275 Engine is telling us a read is available.
276 """
277 debug = self.task.info("debug", False)
278 if debug:
279 print_debug = self.task.info("print_debug")
280
281 for msg in self._readlines():
282 if debug:
283 print_debug(self.task, "PDSH: %s" % msg)
284 self._parse_line(msg, False)
285
287 """
288 Engine is telling us an error read is available.
289 """
290 debug = self.worker.task.info("debug", False)
291 if debug:
292 print_debug = self.worker.task.info("print_debug")
293
294 for msg in self._readerrlines():
295 if debug:
296 print_debug(self.task, "PDSH@STDERR: %s" % msg)
297 self._parse_line(msg, True)
298
300 """
301 Return code received from a node, update last* stuffs.
302 """
303 DistantWorker._on_node_rc(self, node, rc)
304 self.closed_nodes.add(node)
305