Package ClusterShell :: Package Worker :: Module Worker
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.Worker

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Worker.py 126 2009-07-13 21:35:07Z st-cea $ 
 34   
 35  """ 
 36  ClusterShell worker interface. 
 37   
 38  A worker is a generic object which provides "grouped" work in a specific task. 
 39  """ 
 40   
 41  from EngineClient import EngineClient, EngineClientEOF 
 42   
 43  from ClusterShell.Event import EventHandler 
 44  from ClusterShell.NodeSet import NodeSet 
 45   
 46   
47 -class WorkerException(Exception):
48 """Generic worker exception."""
49
50 -class WorkerError(WorkerException):
51 """Generic worker error."""
52
53 -class Worker(object):
54 """ 55 Base class Worker. 56 """ 57
58 - def __init__(self, handler):
59 """ 60 Initializer. Should be called from derived classes. 61 """ 62 self.eh = handler # associated EventHandler 63 self.task = None
64
65 - def _set_task(self, task):
66 """ 67 Bind worker to task. Called by task.schedule() 68 """ 69 if self.task is not None: 70 # one-shot-only schedule supported for now 71 raise WorkerError() 72 73 self.task = task
74
75 - def _engine_clients(self):
76 """ 77 Return a list of underlying engine clients. 78 """ 79 raise NotImplementedError("Derived classes must implement.")
80
81 - def _start(self):
82 """ 83 Starts worker and returns worker instance as a convenience. 84 Derived classes must implement. 85 """ 86 raise NotImplementedError("Derived classes must implement.")
87
88 - def _invoke(self, ev_type):
89 """ 90 Invoke user EventHandler method if needed. 91 """ 92 if self.eh: 93 self.eh._invoke(ev_type, self)
94
95 - def last_read(self):
96 """ 97 Get last read message from event handler. 98 """ 99 raise NotImplementedError("Derived classes must implement.")
100
101 - def abort(self):
102 """ 103 Stop this worker. 104 """ 105 raise NotImplementedError("Derived classes must implement.")
106
107 - def did_timeout(self):
108 """ 109 Return True if this worker aborted due to timeout. 110 """ 111 return self.task._num_timeout_by_worker(self) > 0
112 113
114 -class DistantWorker(Worker):
115 """ 116 Base class DistantWorker, which provides a useful set of setters/getters 117 to use with distant workers like ssh or pdsh. 118 """ 119
120 - def __init__(self, handler):
121 Worker.__init__(self, handler) 122 123 self._last_node = None 124 self._last_msg = None 125 self._last_rc = 0 126 self.started = False
127
128 - def _on_start(self):
129 """ 130 Starting 131 """ 132 if not self.started: 133 self.started = True 134 self._invoke("ev_start")
135
136 - def _on_node_msgline(self, node, msg):
137 """ 138 Message received from node, update last* stuffs. 139 """ 140 self._last_node = node 141 self._last_msg = msg 142 143 self.task._msg_add((self, node), msg) 144 145 self._invoke("ev_read")
146
147 - def _on_node_rc(self, node, rc):
148 """ 149 Return code received from a node, update last* stuffs. 150 """ 151 self._last_node = node 152 self._last_rc = rc 153 154 self.task._rc_set((self, node), rc) 155 156 self._invoke("ev_hup")
157
158 - def _on_node_timeout(self, node):
159 """ 160 Update on node timeout. 161 """ 162 # Update _last_node to allow node resolution after ev_timeout. 163 self._last_node = node 164 165 self.task._timeout_add((self, node))
166
167 - def last_node(self):
168 """ 169 Get last node, useful to get the node in an EventHandler 170 callback like ev_timeout(). 171 """ 172 return self._last_node
173
174 - def last_read(self):
175 """ 176 Get last (node, buffer), useful in an EventHandler.ev_read() 177 """ 178 return self._last_node, self._last_msg
179
180 - def last_retcode(self):
181 """ 182 Get last (node, rc), useful in an EventHandler.ev_hup() 183 """ 184 return self._last_node, self._last_rc
185
186 - def node_buffer(self, node):
187 """ 188 Get specific node buffer. 189 """ 190 return self.task._msg_by_source((self, node))
191
192 - def node_rc(self, node):
193 """ 194 Get specific node return code. 195 """ 196 return self.task._rc_by_source((self, node))
197
198 - def iter_buffers(self, match_keys=None):
199 """ 200 Returns an iterator over available buffers and associated 201 NodeSet. If the optional parameter match_keys is defined, only 202 keys found in match_keys are returned. 203 """ 204 for msg, keys in self.task._msg_iter_by_worker(self, match_keys): 205 yield msg, NodeSet.fromlist(keys)
206
207 - def iter_node_buffers(self):
208 """ 209 Returns an iterator over each node and associated buffer. 210 """ 211 return self.task._kmsg_iter_by_worker(self)
212
213 - def iter_retcodes(self, match_keys=None):
214 """ 215 Returns an iterator over return codes and associated NodeSet. 216 If the optional parameter match_keys is defined, only keys 217 found in match_keys are returned. 218 """ 219 for rc, keys in self.task._rc_iter_by_worker(self, match_keys): 220 yield rc, NodeSet.fromlist(keys)
221
222 - def iter_node_retcodes(self):
223 """ 224 Returns an iterator over each node and associated return code. 225 """ 226 return self.task._krc_iter_by_worker(self)
227
228 - def num_timeout(self):
229 """ 230 Return the number of timed out "keys" (ie. nodes) for this worker. 231 """ 232 return self.task._num_timeout_by_worker(self)
233
234 - def iter_keys_timeout(self):
235 """ 236 Iterate over timed out keys (ie. nodes) for a specific worker. 237 """ 238 return self.task._iter_keys_timeout_by_worker(self)
239 240
241 -class WorkerSimple(EngineClient,Worker):
242 """ 243 Implements a simple Worker being itself an EngineClient. 244 """ 245
246 - def __init__(self, file_reader, file_writer, file_error, key, handler, timeout, autoclose=False):
247 """ 248 Initialize worker. 249 """ 250 Worker.__init__(self, handler) 251 EngineClient.__init__(self, self, timeout, autoclose) 252 253 self.last_msg = None 254 self.key = key or self 255 self.file_reader = file_reader 256 self.file_writer = file_writer 257 self.file_error = file_error
258
259 - def _engine_clients(self):
260 """ 261 Return a list of underlying engine clients. 262 """ 263 return [self]
264
265 - def set_key(self, key):
266 """ 267 Source key for this worker is free for use. Use this method to 268 set the custom source key for this worker. 269 """ 270 self.key = key
271
272 - def _start(self):
273 """ 274 Start worker. 275 """ 276 self._invoke("ev_start") 277 278 return self
279
280 - def reader_fileno(self):
281 """ 282 Returns the reader file descriptor as an integer. 283 """ 284 if self.file_reader: 285 return self.file_reader.fileno() 286 287 return None
288
289 - def writer_fileno(self):
290 """ 291 Returns the writer file descriptor as an integer. 292 """ 293 if self.file_writer: 294 return self.file_writer.fileno() 295 296 return None
297
298 - def _read(self, size=4096):
299 """ 300 Read data from process. 301 """ 302 #result = self.file_reader.read(size) 303 result = self.file_reader.read(size) 304 if not len(result): 305 raise EngineClientEOF() 306 self._set_reading() 307 return result
308
309 - def _close(self, force, timeout):
310 """ 311 Close worker. Called by engine after worker has been 312 unregistered. This method should handle all termination types 313 (normal, forced or on timeout). 314 """ 315 if self.file_reader != None: 316 self.file_reader.close() 317 if self.file_writer != None: 318 self.file_writer.close() 319 if self.file_error != None: 320 self.file_error.close() 321 322 if timeout: 323 self._on_timeout() 324 325 self._invoke("ev_close")
326
327 - def _handle_read(self):
328 """ 329 Engine is telling us a read is available. 330 """ 331 debug = self.task.info("debug", False) 332 if debug: 333 print_debug = self.task.info("print_debug") 334 335 for msg in self._readlines(): 336 if debug: 337 print_debug(self.task, "LINE %s" % msg) 338 self._on_msgline(msg)
339
340 - def last_read(self):
341 """ 342 Read last msg, useful in an EventHandler. 343 """ 344 return self.last_msg
345
346 - def _on_msgline(self, msg):
347 """ 348 Add a message. 349 """ 350 # add last msg to local buffer 351 self.last_msg = msg 352 353 # update task 354 self.task._msg_add((self, self.key), msg) 355 356 self._invoke("ev_read")
357
358 - def _on_timeout(self):
359 """ 360 Update on timeout. 361 """ 362 self.task._timeout_add((self, self.key)) 363 364 # trigger timeout event 365 self._invoke("ev_timeout")
366
367 - def read(self):
368 """ 369 Read worker buffer. 370 """ 371 for key, msg in self.task._kmsg_iter_by_worker(self): 372 assert key == self.key 373 return msg
374
375 - def write(self, buf):
376 """ 377 Write to worker. 378 """ 379 self._write(buf)
380
381 - def set_write_eof(self):
382 """ 383 Tell worker to close its writer file descriptor once flushed. Do not 384 perform writes after this call. 385 """ 386 self._set_write_eof()
387