1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell worker interface.
37
38 A worker is a generic object which provides "grouped" work in a specific task.
39 """
40
41 from EngineClient import EngineClient, EngineClientEOF
42
43 from ClusterShell.Event import EventHandler
44 from ClusterShell.NodeSet import NodeSet
45
46
48 """Generic worker exception."""
49
51 """Generic worker error."""
52
54 """
55 Base class Worker.
56 """
57
59 """
60 Initializer. Should be called from derived classes.
61 """
62 self.eh = handler
63 self.task = None
64
66 """
67 Bind worker to task. Called by task.schedule()
68 """
69 if self.task is not None:
70
71 raise WorkerError()
72
73 self.task = task
74
76 """
77 Return a list of underlying engine clients.
78 """
79 raise NotImplementedError("Derived classes must implement.")
80
82 """
83 Starts worker and returns worker instance as a convenience.
84 Derived classes must implement.
85 """
86 raise NotImplementedError("Derived classes must implement.")
87
89 """
90 Invoke user EventHandler method if needed.
91 """
92 if self.eh:
93 self.eh._invoke(ev_type, self)
94
96 """
97 Get last read message from event handler.
98 """
99 raise NotImplementedError("Derived classes must implement.")
100
102 """
103 Stop this worker.
104 """
105 raise NotImplementedError("Derived classes must implement.")
106
108 """
109 Return True if this worker aborted due to timeout.
110 """
111 return self.task._num_timeout_by_worker(self) > 0
112
113
115 """
116 Base class DistantWorker, which provides a useful set of setters/getters
117 to use with distant workers like ssh or pdsh.
118 """
119
121 Worker.__init__(self, handler)
122
123 self._last_node = None
124 self._last_msg = None
125 self._last_rc = 0
126 self.started = False
127
129 """
130 Starting
131 """
132 if not self.started:
133 self.started = True
134 self._invoke("ev_start")
135
137 """
138 Message received from node, update last* stuffs.
139 """
140 self._last_node = node
141 self._last_msg = msg
142
143 self.task._msg_add((self, node), msg)
144
145 self._invoke("ev_read")
146
148 """
149 Return code received from a node, update last* stuffs.
150 """
151 self._last_node = node
152 self._last_rc = rc
153
154 self.task._rc_set((self, node), rc)
155
156 self._invoke("ev_hup")
157
159 """
160 Update on node timeout.
161 """
162
163 self._last_node = node
164
165 self.task._timeout_add((self, node))
166
168 """
169 Get last node, useful to get the node in an EventHandler
170 callback like ev_timeout().
171 """
172 return self._last_node
173
175 """
176 Get last (node, buffer), useful in an EventHandler.ev_read()
177 """
178 return self._last_node, self._last_msg
179
181 """
182 Get last (node, rc), useful in an EventHandler.ev_hup()
183 """
184 return self._last_node, self._last_rc
185
187 """
188 Get specific node buffer.
189 """
190 return self.task._msg_by_source((self, node))
191
193 """
194 Get specific node return code.
195 """
196 return self.task._rc_by_source((self, node))
197
199 """
200 Returns an iterator over available buffers and associated
201 NodeSet. If the optional parameter match_keys is defined, only
202 keys found in match_keys are returned.
203 """
204 for msg, keys in self.task._msg_iter_by_worker(self, match_keys):
205 yield msg, NodeSet.fromlist(keys)
206
208 """
209 Returns an iterator over each node and associated buffer.
210 """
211 return self.task._kmsg_iter_by_worker(self)
212
214 """
215 Returns an iterator over return codes and associated NodeSet.
216 If the optional parameter match_keys is defined, only keys
217 found in match_keys are returned.
218 """
219 for rc, keys in self.task._rc_iter_by_worker(self, match_keys):
220 yield rc, NodeSet.fromlist(keys)
221
223 """
224 Returns an iterator over each node and associated return code.
225 """
226 return self.task._krc_iter_by_worker(self)
227
229 """
230 Return the number of timed out "keys" (ie. nodes) for this worker.
231 """
232 return self.task._num_timeout_by_worker(self)
233
239
240
242 """
243 Implements a simple Worker being itself an EngineClient.
244 """
245
246 - def __init__(self, file_reader, file_writer, file_error, key, handler, timeout, autoclose=False):
247 """
248 Initialize worker.
249 """
250 Worker.__init__(self, handler)
251 EngineClient.__init__(self, self, timeout, autoclose)
252
253 self.last_msg = None
254 self.key = key or self
255 self.file_reader = file_reader
256 self.file_writer = file_writer
257 self.file_error = file_error
258
260 """
261 Return a list of underlying engine clients.
262 """
263 return [self]
264
266 """
267 Source key for this worker is free for use. Use this method to
268 set the custom source key for this worker.
269 """
270 self.key = key
271
273 """
274 Start worker.
275 """
276 self._invoke("ev_start")
277
278 return self
279
281 """
282 Returns the reader file descriptor as an integer.
283 """
284 if self.file_reader:
285 return self.file_reader.fileno()
286
287 return None
288
290 """
291 Returns the writer file descriptor as an integer.
292 """
293 if self.file_writer:
294 return self.file_writer.fileno()
295
296 return None
297
298 - def _read(self, size=4096):
299 """
300 Read data from process.
301 """
302
303 result = self.file_reader.read(size)
304 if not len(result):
305 raise EngineClientEOF()
306 self._set_reading()
307 return result
308
309 - def _close(self, force, timeout):
310 """
311 Close worker. Called by engine after worker has been
312 unregistered. This method should handle all termination types
313 (normal, forced or on timeout).
314 """
315 if self.file_reader != None:
316 self.file_reader.close()
317 if self.file_writer != None:
318 self.file_writer.close()
319 if self.file_error != None:
320 self.file_error.close()
321
322 if timeout:
323 self._on_timeout()
324
325 self._invoke("ev_close")
326
328 """
329 Engine is telling us a read is available.
330 """
331 debug = self.task.info("debug", False)
332 if debug:
333 print_debug = self.task.info("print_debug")
334
335 for msg in self._readlines():
336 if debug:
337 print_debug(self.task, "LINE %s" % msg)
338 self._on_msgline(msg)
339
341 """
342 Read last msg, useful in an EventHandler.
343 """
344 return self.last_msg
345
347 """
348 Add a message.
349 """
350
351 self.last_msg = msg
352
353
354 self.task._msg_add((self, self.key), msg)
355
356 self._invoke("ev_read")
357
359 """
360 Update on timeout.
361 """
362 self.task._timeout_add((self, self.key))
363
364
365 self._invoke("ev_timeout")
366
368 """
369 Read worker buffer.
370 """
371 for key, msg in self.task._kmsg_iter_by_worker(self):
372 assert key == self.key
373 return msg
374
376 """
377 Write to worker.
378 """
379 self._write(buf)
380
382 """
383 Tell worker to close its writer file descriptor once flushed. Do not
384 perform writes after this call.
385 """
386 self._set_write_eof()
387