1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 ClusterShell worker interface.
37
38 A worker is a generic object which provides "grouped" work in a specific task.
39 """
40
41 from ClusterShell.Worker.EngineClient import EngineClient
42 from ClusterShell.NodeSet import NodeSet
43
44
46 """Generic worker exception."""
47
49 """Generic worker error."""
50
52 """Bad argument in worker error."""
53
55 """
56 Base class Worker.
57 """
58
60 """
61 Initializer. Should be called from derived classes.
62 """
63 self.eh = handler
64 self.task = None
65
67 """
68 Bind worker to task. Called by task.schedule()
69 """
70 if self.task is not None:
71
72 raise WorkerError("worker has already been scheduled")
73 self.task = task
74
76 if not self.task:
77 raise WorkerError("worker is not task bound")
78
80 """
81 Return a list of underlying engine clients.
82 """
83 raise NotImplementedError("Derived classes must implement.")
84
86 """
87 Invoke user EventHandler method if needed.
88 """
89 if self.eh:
90 self.eh._invoke(ev_type, self)
91
93 """
94 Get last read message from event handler.
95 """
96 raise NotImplementedError("Derived classes must implement.")
97
99 """
100 Get last error message from event handler.
101 """
102 raise NotImplementedError("Derived classes must implement.")
103
110
117
124
126 """
127 Base class DistantWorker, which provides a useful set of setters/getters
128 to use with distant workers like ssh or pdsh.
129 """
130
132 Worker.__init__(self, handler)
133
134 self._last_node = None
135 self._last_msg = None
136 self._last_rc = 0
137 self.started = False
138
140 """
141 Starting
142 """
143 if not self.started:
144 self.started = True
145 self._invoke("ev_start")
146
148 """
149 Message received from node, update last* stuffs.
150 """
151 self._last_node = node
152 self._last_msg = msg
153
154 self.task._msg_add((self, node), msg)
155
156 self._invoke("ev_read")
157
159 """
160 Error message received from node, update last* stuffs.
161 """
162 self._last_node = node
163 self._last_errmsg = msg
164
165 self.task._errmsg_add((self, node), msg)
166
167 self._invoke("ev_error")
168
170 """
171 Return code received from a node, update last* stuffs.
172 """
173 self._last_node = node
174 self._last_rc = rc
175
176 self.task._rc_set((self, node), rc)
177
178 self._invoke("ev_hup")
179
181 """
182 Update on node timeout.
183 """
184
185 self._last_node = node
186
187 self.task._timeout_add((self, node))
188
190 """
191 Get last node, useful to get the node in an EventHandler
192 callback like ev_timeout().
193 """
194 return self._last_node
195
197 """
198 Get last (node, buffer), useful in an EventHandler.ev_read()
199 """
200 return self._last_node, self._last_msg
201
203 """
204 Get last (node, error_buffer), useful in an EventHandler.ev_error()
205 """
206 return self._last_node, self._last_errmsg
207
209 """
210 Get last (node, rc), useful in an EventHandler.ev_hup()
211 """
212 return self._last_node, self._last_rc
213
220
227
228 node_error_buffer = node_error
229
231 """
232 Get specific node return code. Raises a KeyError if command on
233 node has not yet finished (no return code available), or is
234 node is not known by this worker.
235 """
236 self._task_bound_check()
237 try:
238 rc = self.task._rc_by_source((self, node))
239 except KeyError:
240 raise KeyError(node)
241 return rc
242
243 node_rc = node_retcode
244
246 """
247 Returns an iterator over available buffers and associated
248 NodeSet. If the optional parameter match_keys is defined, only
249 keys found in match_keys are returned.
250 """
251 self._task_bound_check()
252 for msg, keys in self.task._call_tree_matcher( \
253 self.task._msgtree.walk, match_keys, self):
254 yield msg, NodeSet.fromlist(keys)
255
257 """
258 Returns an iterator over available error buffers and associated
259 NodeSet. If the optional parameter match_keys is defined, only
260 keys found in match_keys are returned.
261 """
262 self._task_bound_check()
263 for msg, keys in self.task._call_tree_matcher( \
264 self.task._errtree.walk, match_keys, self):
265 yield msg, NodeSet.fromlist(keys)
266
274
276 """
277 Returns an iterator over each node and associated error buffer.
278 """
279 self._task_bound_check()
280 return self.task._call_tree_matcher(self.task._errtree.items,
281 match_keys, self)
282
284 """
285 Returns an iterator over return codes and associated NodeSet.
286 If the optional parameter match_keys is defined, only keys
287 found in match_keys are returned.
288 """
289 self._task_bound_check()
290 for rc, keys in self.task._rc_iter_by_worker(self, match_keys):
291 yield rc, NodeSet.fromlist(keys)
292
299
306
313
315 """
316 Implements a simple Worker being itself an EngineClient.
317 """
318
319 - def __init__(self, file_reader, file_writer, file_error, key, handler,
320 stderr=False, timeout=-1, autoclose=False):
321 """
322 Initialize worker.
323 """
324 Worker.__init__(self, handler)
325 EngineClient.__init__(self, self, stderr, timeout, autoclose)
326
327 self.last_msg = None
328 if key is None:
329 self.key = self
330 else:
331 self.key = key
332 self.file_reader = file_reader
333 self.file_writer = file_writer
334 self.file_error = file_error
335
337 """
338 Return a list of underlying engine clients.
339 """
340 return [self]
341
343 """
344 Source key for this worker is free for use. Use this method to
345 set the custom source key for this worker.
346 """
347 self.key = key
348
350 """
351 Start worker.
352 """
353 self._invoke("ev_start")
354
355 return self
356
358 """
359 Returns the standard error reader file descriptor as an integer.
360 """
361 if self.file_error and not self.file_error.closed:
362 return self.file_error.fileno()
363
364 return None
365
367 """
368 Returns the reader file descriptor as an integer.
369 """
370 if self.file_reader and not self.file_reader.closed:
371 return self.file_reader.fileno()
372
373 return None
374
376 """
377 Returns the writer file descriptor as an integer.
378 """
379 if self.file_writer and not self.file_writer.closed:
380 return self.file_writer.fileno()
381
382 return None
383
384 - def _read(self, size=4096):
385 """
386 Read data from process.
387 """
388 return EngineClient._read(self, size)
389
395
396 - def _close(self, force, timeout):
397 """
398 Close worker. Called by engine after worker has been
399 unregistered. This method should handle all termination types
400 (normal, forced or on timeout).
401 """
402 if not force and self._rbuf:
403
404
405 self.worker._on_msgline(self._rbuf)
406
407 if self.file_reader != None:
408 self.file_reader.close()
409 if self.file_writer != None:
410 self.file_writer.close()
411 if self.file_error != None:
412 self.file_error.close()
413
414 if timeout:
415 self._on_timeout()
416
417 self._invoke("ev_close")
418
420 """
421 Engine is telling us there is data available for reading.
422 """
423 debug = self.task.info("debug", False)
424 if debug:
425 print_debug = self.task.info("print_debug")
426
427 for msg in self._readlines():
428 if debug:
429 print_debug(self.task, "LINE %s" % msg)
430 self._on_msgline(msg)
431
433 """
434 Engine is telling us there is error available for reading.
435 """
436 debug = self.task.info("debug", False)
437 if debug:
438 print_debug = self.task.info("print_debug")
439
440 for msg in self._readerrlines():
441 if debug:
442 print_debug(self.task, "LINE@STDERR %s" % msg)
443 self._on_errmsgline(msg)
444
446 """
447 Read last msg, useful in an EventHandler.
448 """
449 return self.last_msg
450
452 """
453 Get last error message from event handler.
454 """
455 return self.last_errmsg
456
458 """
459 Add a message.
460 """
461
462 self.last_msg = msg
463
464
465 self.task._msg_add((self, self.key), msg)
466
467 self._invoke("ev_read")
468
470 """
471 Add a message.
472 """
473
474 self.last_errmsg = msg
475
476
477 self.task._errmsg_add((self, self.key), msg)
478
479 self._invoke("ev_error")
480
482 """
483 Update on timeout.
484 """
485 self.task._timeout_add((self, self.key))
486
487
488 self._invoke("ev_timeout")
489
499
509
511 """
512 Write to worker.
513 """
514 self._write(buf)
515
517 """
518 Tell worker to close its writer file descriptor once flushed. Do not
519 perform writes after this call.
520 """
521 self._set_write_eof()
522