1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 WorkerPdsh
37
38 ClusterShell worker for executing commands with LLNL pdsh.
39 """
40
41 from ClusterShell.NodeSet import NodeSet
42
43 from EngineClient import *
44 from Worker import DistantWorker, WorkerError
45
46 import errno
47 import fcntl
48 import os
49 import popen2
50 import signal
51
52
54 """
55 ClusterShell pdsh-based worker Class.
56
57 Remote Shell (pdsh) usage example:
58 worker = WorkerPdsh(nodeset, handler=MyEventHandler(),
59 timeout=30, command="/bin/hostname")
60 Remote Copy (pdcp) usage example:
61 worker = WorkerPdsh(nodeset, handler=MyEventHandler(),
62 timeout=30, source="/etc/my.conf",
63 dest="/etc/my.conf")
64 ...
65 task.schedule(worker) # schedule worker for execution
66 ...
67 task.resume() # run
68
69 Known Limitations:
70 * write() is not supported by WorkerPdsh
71 * return codes == 0 are not garanteed when a timeout is used (rc > 0
72 are fine)
73 """
74
75 - def __init__(self, nodes, handler, timeout, **kwargs):
76 """
77 Initialize Pdsh worker instance.
78 """
79 DistantWorker.__init__(self, handler)
80 EngineClient.__init__(self, self, timeout, kwargs.get('autoclose', False))
81
82 self.nodes = NodeSet(nodes)
83 self.closed_nodes = NodeSet()
84
85 if kwargs.has_key('command'):
86
87 self.command = kwargs['command']
88 self.source = None
89 self.dest = None
90 self.mode = 'pdsh'
91 elif kwargs.has_key('source'):
92
93 self.command = None
94 self.source = kwargs['source']
95 self.dest = kwargs['dest']
96 self.mode = 'pdcp'
97 else:
98 raise WorkerBadArgumentException()
99
100 self.fid = None
101 self.buf = ""
102
105
107 """
108 Start worker, initialize buffers, prepare command.
109 """
110
111 self._buf = ""
112
113 if self.command is not None:
114
115 cmd_l = [ self.task.info("pdsh_path") or "pdsh", "-b" ]
116
117 fanout = self.task.info("fanout", 0)
118 if fanout > 0:
119 cmd_l.append("-f %d" % fanout)
120
121
122
123
124 connect_timeout = self.task.info("connect_timeout", 0)
125 if connect_timeout > 0:
126 cmd_l.insert(0,
127 "PDSH_SSH_ARGS_APPEND=\"-o ConnectTimeout=%d\"" %
128 connect_timeout)
129
130 command_timeout = self.task.info("command_timeout", 0)
131 if command_timeout > 0:
132 cmd_l.append("-u %d" % command_timeout)
133
134 cmd_l.append("-w '%s'" % self.nodes)
135 cmd_l.append("'%s'" % self.command)
136
137 cmd = ' '.join(cmd_l)
138
139 if self.task.info("debug", False):
140 self.task.info("print_debug")(self.task, "PDSH: %s" % cmd)
141 else:
142
143 cmd_l = [ self.task.info("pdcp_path") or "pdcp", "-b" ]
144
145 fanout = self.task.info("fanout", 0)
146 if fanout > 0:
147 cmd_l.append("-f %d" % fanout)
148
149 connect_timeout = self.task.info("connect_timeout", 0)
150 if connect_timeout > 0:
151 cmd_l.append("-t %d" % connect_timeout)
152
153 cmd_l.append("-w '%s'" % self.nodes)
154
155 cmd_l.append("'%s'" % self.source)
156 cmd_l.append("'%s'" % self.dest)
157 cmd = ' '.join(cmd_l)
158
159 if self.task.info("debug", False):
160 self.task.info("print_debug")(self.task,"PDCP: %s" % cmd)
161
162 self.fid = self._exec_nonblock(cmd)
163
164 self._on_start()
165
166 return self
167
169 """
170 Return the reader file descriptor as an integer.
171 """
172 return self.fid.fromchild.fileno()
173
175 """
176 Return the writer file descriptor as an integer.
177 """
178 return self.fid.tochild.fileno()
179
180 - def _read(self, size=-1):
181 """
182 Read data from process.
183 """
184 result = self.fid.fromchild.read(size)
185 if result > 0:
186 self._set_reading()
187 return result
188
190 """
191 Write data to process. Not supported with Pdsh worker.
192 """
193 raise EngineClientNotSupportedError("writing is not supported by pdsh worker")
194
195 - def _close(self, force, timeout):
196 """
197 Close worker. Called by engine after worker has been
198 unregistered. This method should handle all termination types
199 (normal, forced or on timeout).
200 """
201 if force or timeout:
202 status = self.fid.poll()
203 if status == -1:
204
205 os.kill(self.fid.pid, signal.SIGKILL)
206 if timeout:
207 self._invoke("ev_timeout")
208 else:
209 status = self.fid.wait()
210 if os.WIFEXITED(status):
211 rc = os.WEXITSTATUS(status)
212 if rc != 0:
213 raise WorkerError("Cannot run pdsh (error %d)" % rc)
214
215
216 self.fid.tochild.close()
217 self.fid.fromchild.close()
218
219 if timeout:
220 for node in (self.nodes - self.closed_nodes):
221 self._on_node_timeout(node)
222 else:
223 for node in (self.nodes - self.closed_nodes):
224 self._on_node_rc(node, 0)
225
226 self._invoke("ev_close")
227
229 """
230 Engine is telling us a read is available.
231 """
232 debug = self.task.info("debug", False)
233 if debug:
234 print_debug = self.task.info("print_debug")
235
236
237 readbuf = self._read()
238 assert len(readbuf) > 0, "_handle_read() called with no data to read"
239
240 buf = self._buf + readbuf
241 lines = buf.splitlines(True)
242 self._buf = ""
243 for line in lines:
244 if debug:
245 print_debug(self.task, "LINE: %s" % line[:-1])
246 if line.endswith('\n'):
247 if line.startswith("pdsh@") or line.startswith("pdcp@") or line.startswith("sending "):
248 try:
249
250
251
252
253
254
255
256
257
258
259
260
261
262 words = line.split()
263
264 if self.mode == 'pdsh':
265 if len(words) == 4 and words[2] == "command" and \
266 words[3] == "timeout":
267 pass
268 elif len(words) == 8 and words[3] == "exited" and words[7].isdigit():
269 self._on_node_rc(words[1][:-1], int(words[7]))
270 elif self.mode == 'pdcp':
271 self._on_node_rc(words[1][:-1], errno.ENOENT)
272
273 except Exception, e:
274 print >>sys.stderr, e
275 raise EngineClientError()
276 else:
277
278
279 nodename, msg = line.split(': ', 1)
280 if msg.endswith('\n'):
281 if msg.endswith('\r\n'):
282 msgline = msg[:-2]
283 else:
284 msgline = msg[:-1]
285 self._on_node_msgline(nodename, msgline)
286 else:
287
288 self._buf = line
289
291 """
292 Return code received from a node, update last* stuffs.
293 """
294 DistantWorker._on_node_rc(self, node, rc)
295 self.closed_nodes.add(node)
296