Package ClusterShell :: Package Worker :: Module Worker
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Worker

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Worker.py 289 2010-07-12 21:30:00Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell worker interface. 
 37   
 38  A worker is a generic object which provides "grouped" work in a specific task. 
 39  """ 
 40   
 41  from ClusterShell.Worker.EngineClient import EngineClient 
 42  from ClusterShell.NodeSet import NodeSet 
 43   
 44   
45 -class WorkerException(Exception):
46 """Generic worker exception."""
47
48 -class WorkerError(WorkerException):
49 """Generic worker error."""
50
51 -class WorkerBadArgumentError(WorkerError):
52 """Bad argument in worker error."""
53
54 -class Worker(object):
55 """ 56 Base class Worker. 57 """ 58
59 - def __init__(self, handler):
60 """ 61 Initializer. Should be called from derived classes. 62 """ 63 self.eh = handler # associated EventHandler 64 self.task = None
65
66 - def _set_task(self, task):
67 """ 68 Bind worker to task. Called by task.schedule() 69 """ 70 if self.task is not None: 71 # one-shot-only schedule supported for now 72 raise WorkerError("worker has already been scheduled") 73 self.task = task
74
75 - def _task_bound_check(self):
76 if not self.task: 77 raise WorkerError("worker is not task bound")
78
79 - def _engine_clients(self):
80 """ 81 Return a list of underlying engine clients. 82 """ 83 raise NotImplementedError("Derived classes must implement.")
84
85 - def _invoke(self, ev_type):
86 """ 87 Invoke user EventHandler method if needed. 88 """ 89 if self.eh: 90 self.eh._invoke(ev_type, self)
91
92 - def last_read(self):
93 """ 94 Get last read message from event handler. 95 """ 96 raise NotImplementedError("Derived classes must implement.")
97
98 - def last_error(self):
99 """ 100 Get last error message from event handler. 101 """ 102 raise NotImplementedError("Derived classes must implement.")
103
104 - def did_timeout(self):
105 """ 106 Return True if this worker aborted due to timeout. 107 """ 108 self._task_bound_check() 109 return self.task._num_timeout_by_worker(self) > 0
110
111 - def flush_buffers(self):
112 """ 113 Flush any messages associated to this worker. 114 """ 115 self._task_bound_check() 116 self.task._flush_buffers_by_worker(self)
117
118 - def flush_errors(self):
119 """ 120 Flush any error messages associated to this worker. 121 """ 122 self._task_bound_check() 123 self.task._flush_errors_by_worker(self)
124
125 -class DistantWorker(Worker):
126 """ 127 Base class DistantWorker, which provides a useful set of setters/getters 128 to use with distant workers like ssh or pdsh. 129 """ 130
131 - def __init__(self, handler):
132 Worker.__init__(self, handler) 133 134 self._last_node = None 135 self._last_msg = None 136 self._last_rc = 0 137 self.started = False
138
139 - def _on_start(self):
140 """ 141 Starting 142 """ 143 if not self.started: 144 self.started = True 145 self._invoke("ev_start")
146
147 - def _on_node_msgline(self, node, msg):
148 """ 149 Message received from node, update last* stuffs. 150 """ 151 self._last_node = node 152 self._last_msg = msg 153 154 self.task._msg_add((self, node), msg) 155 156 self._invoke("ev_read")
157
158 - def _on_node_errline(self, node, msg):
159 """ 160 Error message received from node, update last* stuffs. 161 """ 162 self._last_node = node 163 self._last_errmsg = msg 164 165 self.task._errmsg_add((self, node), msg) 166 167 self._invoke("ev_error")
168
169 - def _on_node_rc(self, node, rc):
170 """ 171 Return code received from a node, update last* stuffs. 172 """ 173 self._last_node = node 174 self._last_rc = rc 175 176 self.task._rc_set((self, node), rc) 177 178 self._invoke("ev_hup")
179
180 - def _on_node_timeout(self, node):
181 """ 182 Update on node timeout. 183 """ 184 # Update _last_node to allow node resolution after ev_timeout. 185 self._last_node = node 186 187 self.task._timeout_add((self, node))
188
189 - def last_node(self):
190 """ 191 Get last node, useful to get the node in an EventHandler 192 callback like ev_timeout(). 193 """ 194 return self._last_node
195
196 - def last_read(self):
197 """ 198 Get last (node, buffer), useful in an EventHandler.ev_read() 199 """ 200 return self._last_node, self._last_msg
201
202 - def last_error(self):
203 """ 204 Get last (node, error_buffer), useful in an EventHandler.ev_error() 205 """ 206 return self._last_node, self._last_errmsg
207
208 - def last_retcode(self):
209 """ 210 Get last (node, rc), useful in an EventHandler.ev_hup() 211 """ 212 return self._last_node, self._last_rc
213
214 - def node_buffer(self, node):
215 """ 216 Get specific node buffer. 217 """ 218 self._task_bound_check() 219 return self.task._msg_by_source((self, node))
220
221 - def node_error(self, node):
222 """ 223 Get specific node error buffer. 224 """ 225 self._task_bound_check() 226 return self.task._errmsg_by_source((self, node))
227 228 node_error_buffer = node_error 229
230 - def node_retcode(self, node):
231 """ 232 Get specific node return code. Raises a KeyError if command on 233 node has not yet finished (no return code available), or is 234 node is not known by this worker. 235 """ 236 self._task_bound_check() 237 try: 238 rc = self.task._rc_by_source((self, node)) 239 except KeyError: 240 raise KeyError(node) 241 return rc
242 243 node_rc = node_retcode 244
245 - def iter_buffers(self, match_keys=None):
246 """ 247 Returns an iterator over available buffers and associated 248 NodeSet. If the optional parameter match_keys is defined, only 249 keys found in match_keys are returned. 250 """ 251 self._task_bound_check() 252 for msg, keys in self.task._call_tree_matcher( \ 253 self.task._msgtree.walk, match_keys, self): 254 yield msg, NodeSet.fromlist(keys)
255
256 - def iter_errors(self, match_keys=None):
257 """ 258 Returns an iterator over available error buffers and associated 259 NodeSet. If the optional parameter match_keys is defined, only 260 keys found in match_keys are returned. 261 """ 262 self._task_bound_check() 263 for msg, keys in self.task._call_tree_matcher( \ 264 self.task._errtree.walk, match_keys, self): 265 yield msg, NodeSet.fromlist(keys)
266
267 - def iter_node_buffers(self, match_keys=None):
268 """ 269 Returns an iterator over each node and associated buffer. 270 """ 271 self._task_bound_check() 272 return self.task._call_tree_matcher(self.task._msgtree.items, 273 match_keys, self)
274
275 - def iter_node_errors(self, match_keys=None):
276 """ 277 Returns an iterator over each node and associated error buffer. 278 """ 279 self._task_bound_check() 280 return self.task._call_tree_matcher(self.task._errtree.items, 281 match_keys, self)
282
283 - def iter_retcodes(self, match_keys=None):
284 """ 285 Returns an iterator over return codes and associated NodeSet. 286 If the optional parameter match_keys is defined, only keys 287 found in match_keys are returned. 288 """ 289 self._task_bound_check() 290 for rc, keys in self.task._rc_iter_by_worker(self, match_keys): 291 yield rc, NodeSet.fromlist(keys)
292
293 - def iter_node_retcodes(self):
294 """ 295 Returns an iterator over each node and associated return code. 296 """ 297 self._task_bound_check() 298 return self.task._krc_iter_by_worker(self)
299
300 - def num_timeout(self):
301 """ 302 Return the number of timed out "keys" (ie. nodes) for this worker. 303 """ 304 self._task_bound_check() 305 return self.task._num_timeout_by_worker(self)
306
307 - def iter_keys_timeout(self):
308 """ 309 Iterate over timed out keys (ie. nodes) for a specific worker. 310 """ 311 self._task_bound_check() 312 return self.task._iter_keys_timeout_by_worker(self)
313
314 -class WorkerSimple(EngineClient, Worker):
315 """ 316 Implements a simple Worker being itself an EngineClient. 317 """ 318
319 - def __init__(self, file_reader, file_writer, file_error, key, handler, 320 stderr=False, timeout=-1, autoclose=False):
321 """ 322 Initialize worker. 323 """ 324 Worker.__init__(self, handler) 325 EngineClient.__init__(self, self, stderr, timeout, autoclose) 326 327 self.last_msg = None 328 if key is None: # allow key=0 329 self.key = self 330 else: 331 self.key = key 332 self.file_reader = file_reader 333 self.file_writer = file_writer 334 self.file_error = file_error
335
336 - def _engine_clients(self):
337 """ 338 Return a list of underlying engine clients. 339 """ 340 return [self]
341
342 - def set_key(self, key):
343 """ 344 Source key for this worker is free for use. Use this method to 345 set the custom source key for this worker. 346 """ 347 self.key = key
348
349 - def _start(self):
350 """ 351 Start worker. 352 """ 353 self._invoke("ev_start") 354 355 return self
356
357 - def error_fileno(self):
358 """ 359 Returns the standard error reader file descriptor as an integer. 360 """ 361 if self.file_error and not self.file_error.closed: 362 return self.file_error.fileno() 363 364 return None
365
366 - def reader_fileno(self):
367 """ 368 Returns the reader file descriptor as an integer. 369 """ 370 if self.file_reader and not self.file_reader.closed: 371 return self.file_reader.fileno() 372 373 return None
374
375 - def writer_fileno(self):
376 """ 377 Returns the writer file descriptor as an integer. 378 """ 379 if self.file_writer and not self.file_writer.closed: 380 return self.file_writer.fileno() 381 382 return None
383
384 - def _read(self, size=4096):
385 """ 386 Read data from process. 387 """ 388 return EngineClient._read(self, size)
389
390 - def _readerr(self, size=4096):
391 """ 392 Read error data from process. 393 """ 394 return EngineClient._readerr(self, size)
395
396 - def _close(self, force, timeout):
397 """ 398 Close worker. Called by engine after worker has been 399 unregistered. This method should handle all termination types 400 (normal, forced or on timeout). 401 """ 402 if not force and self._rbuf: 403 # We still have some read data available in buffer, but no 404 # EOL. Generate a final message before closing. 405 self.worker._on_msgline(self._rbuf) 406 407 if self.file_reader != None: 408 self.file_reader.close() 409 if self.file_writer != None: 410 self.file_writer.close() 411 if self.file_error != None: 412 self.file_error.close() 413 414 if timeout: 415 self._on_timeout() 416 417 self._invoke("ev_close")
418
419 - def _handle_read(self):
420 """ 421 Engine is telling us there is data available for reading. 422 """ 423 debug = self.task.info("debug", False) 424 if debug: 425 print_debug = self.task.info("print_debug") 426 427 for msg in self._readlines(): 428 if debug: 429 print_debug(self.task, "LINE %s" % msg) 430 self._on_msgline(msg)
431
432 - def _handle_error(self):
433 """ 434 Engine is telling us there is error available for reading. 435 """ 436 debug = self.task.info("debug", False) 437 if debug: 438 print_debug = self.task.info("print_debug") 439 440 for msg in self._readerrlines(): 441 if debug: 442 print_debug(self.task, "LINE@STDERR %s" % msg) 443 self._on_errmsgline(msg)
444
445 - def last_read(self):
446 """ 447 Read last msg, useful in an EventHandler. 448 """ 449 return self.last_msg
450
451 - def last_error(self):
452 """ 453 Get last error message from event handler. 454 """ 455 return self.last_errmsg
456
457 - def _on_msgline(self, msg):
458 """ 459 Add a message. 460 """ 461 # add last msg to local buffer 462 self.last_msg = msg 463 464 # update task 465 self.task._msg_add((self, self.key), msg) 466 467 self._invoke("ev_read")
468
469 - def _on_errmsgline(self, msg):
470 """ 471 Add a message. 472 """ 473 # add last msg to local buffer 474 self.last_errmsg = msg 475 476 # update task 477 self.task._errmsg_add((self, self.key), msg) 478 479 self._invoke("ev_error")
480
481 - def _on_timeout(self):
482 """ 483 Update on timeout. 484 """ 485 self.task._timeout_add((self, self.key)) 486 487 # trigger timeout event 488 self._invoke("ev_timeout")
489
490 - def read(self):
491 """ 492 Read worker buffer. 493 """ 494 self._task_bound_check() 495 for key, msg in self.task._call_tree_matcher(self.task._msgtree.items, 496 worker=self): 497 assert key == self.key 498 return str(msg)
499
500 - def error(self):
501 """ 502 Read worker error buffer. 503 """ 504 self._task_bound_check() 505 for key, msg in self.task._call_tree_matcher(self.task._errtree.items, 506 worker=self): 507 assert key == self.key 508 return str(msg)
509
510 - def write(self, buf):
511 """ 512 Write to worker. 513 """ 514 self._write(buf)
515
516 - def set_write_eof(self):
517 """ 518 Tell worker to close its writer file descriptor once flushed. Do not 519 perform writes after this call. 520 """ 521 self._set_write_eof()
522