Package ClusterShell :: Package Engine :: Module Engine
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Engine

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Engine.py 163 2009-10-29 23:28:39Z st-cea $ 
 34   
 35  """ 
 36  Interface of underlying Task's Engine. 
 37   
 38  An Engine implements a loop your thread enters and uses to call event handlers 
 39  in response to incoming events (from workers, timers, etc.). 
 40  """ 
 41   
 42  from sets import Set 
 43   
 44  import heapq 
 45  import thread 
 46  import time 
 47   
48 -class EngineException(Exception):
49 """ 50 Base engine exception. 51 """
52
53 -class EngineAbortException(EngineException):
54 """ 55 Raised on user abort. 56 """
57 - def __init__(self, kill):
58 self.kill = kill
59
60 -class EngineTimeoutException(EngineException):
61 """ 62 Raised when a timeout is encountered. 63 """
64
65 -class EngineIllegalOperationError(EngineException):
66 """ 67 Error raised when an illegal operation has been performed. 68 """
69
70 -class EngineAlreadyRunningError(EngineIllegalOperationError):
71 """ 72 Error raised when the engine is already running. 73 """
74 75
76 -class EngineBaseTimer:
77 """ 78 Abstract class for ClusterShell's engine timer. Such a timer 79 requires a relative fire time (delay) in seconds (as float), and 80 supports an optional repeating interval in seconds (as float too). 81 82 See EngineTimer for more information about ClusterShell timers. 83 """ 84
85 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
86 """ 87 Create a base timer. 88 """ 89 self.fire_delay = fire_delay 90 self.interval = interval 91 self.autoclose = autoclose 92 self._engine = None 93 self._timercase = None
94
95 - def _set_engine(self, engine):
96 """ 97 Bind to engine, called by Engine. 98 """ 99 if self._engine: 100 # A timer can be registered to only one engine at a time. 101 raise EngineIllegalOperationError("Already bound to engine.") 102 103 self._engine = engine
104
105 - def invalidate(self):
106 """ 107 Invalidates a timer object, stopping it from ever firing again. 108 """ 109 if self._engine: 110 self._engine.timerq.invalidate(self) 111 self._engine = None
112
113 - def is_valid(self):
114 """ 115 Returns a boolean value that indicates whether an EngineTimer 116 object is valid and able to fire. 117 """ 118 return self._engine != None
119
120 - def set_nextfire(self, fire_delay, interval=-1):
121 """ 122 Set the next firing delay in seconds for an EngineTimer object. 123 124 The optional paramater `interval' sets the firing interval 125 of the timer. If not specified, the timer fires once and then 126 is automatically invalidated. 127 128 Time values are expressed in second using floating point 129 values. Precision is implementation (and system) dependent. 130 131 It is safe to call this method from the task owning this 132 timer object, in any event handlers, anywhere. 133 134 However, resetting a timer's next firing time may be a 135 relatively expensive operation. It is more efficient to let 136 timers autorepeat or to use this method from the timer's own 137 event handler callback (ie. from its ev_timer). 138 """ 139 if not self.is_valid(): 140 raise EngineIllegalOperationError("Operation on invalid timer.") 141 142 self.fire_delay = fire_delay 143 self.interval = interval 144 self._engine.timerq.reschedule(self)
145
146 - def _fire(self):
147 raise NotImplementedError("Derived classes must implement.")
148 149
150 -class EngineTimer(EngineBaseTimer):
151 """ 152 Concrete class EngineTimer 153 154 An EngineTimer object represents a timer bound to an engine that 155 fires at a preset time in the future. Timers can fire either only 156 once or repeatedly at fixed time intervals. Repeating timers can 157 also have their next firing time manually adjusted. 158 159 A timer is not a real-time mechanism; it fires when the task's 160 underlying engine to which the timer has been added is running and 161 able to check if the timer's firing time has passed. 162 """ 163
164 - def __init__(self, fire_delay, interval, autoclose, handler):
165 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose) 166 self.eh = handler 167 assert self.eh != None, "An event handler is needed for timer."
168
169 - def _fire(self):
170 self.eh._invoke("ev_timer", self)
171
172 -class _EngineTimerQ:
173
174 - class _EngineTimerCase:
175 """ 176 Helper class that allows comparisons of fire times, to be easily used 177 in an heapq. 178 """
179 - def __init__(self, client):
180 self.client = client 181 self.client._timercase = self 182 # arm timer (first time) 183 assert self.client.fire_delay > 0 184 self.fire_date = self.client.fire_delay + time.time()
185
186 - def __cmp__(self, other):
187 if self.fire_date < other.fire_date: 188 return -1 189 elif self.fire_date > other.fire_date: 190 return 1 191 else: 192 return 0
193
194 - def arm(self, client):
195 assert client != None 196 self.client = client 197 self.client._timercase = self 198 # setup next firing date 199 time_current = time.time() 200 if self.client.fire_delay > 0: 201 self.fire_date = self.client.fire_delay + time_current 202 else: 203 interval = float(self.client.interval) 204 assert interval > 0 205 self.fire_date += interval 206 # If the firing time is delayed so far that it passes one 207 # or more of the scheduled firing times, reschedule the 208 # timer for the next scheduled firing time in the future. 209 while self.fire_date < time_current: 210 self.fire_date += interval
211
212 - def disarm(self):
213 client = self.client 214 client._timercase = None 215 self.client = None 216 return client
217
218 - def armed(self):
219 return self.client != None
220 221
222 - def __init__(self, engine):
223 """ 224 Initializer. 225 """ 226 self._engine = engine 227 self.timers = [] 228 self.armed_count = 0
229
230 - def __len__(self):
231 """ 232 Return the number of active timers. 233 """ 234 return self.armed_count
235
236 - def schedule(self, client):
237 """ 238 Insert and arm a client's timer. 239 """ 240 # arm only if fire is set 241 if client.fire_delay > 0: 242 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client)) 243 self.armed_count += 1 244 if not client.autoclose: 245 self._engine.evlooprefcnt += 1
246
247 - def reschedule(self, client):
248 """ 249 Re-insert client's timer. 250 """ 251 if client._timercase: 252 self.invalidate(client) 253 self._dequeue_disarmed() 254 self.schedule(client)
255
256 - def invalidate(self, client):
257 """ 258 Invalidate client's timer. Current implementation doesn't really remove 259 the timer, but simply flags it as disarmed. 260 """ 261 if not client._timercase: 262 # if timer is being fire, invalidate its values 263 client.fire_delay = 0 264 client.interval = 0 265 return 266 267 if self.armed_count <= 0: 268 raise ValueError, "Engine client timer not found in timer queue" 269 270 client._timercase.disarm() 271 self.armed_count -= 1 272 if not client.autoclose: 273 self._engine.evlooprefcnt -= 1
274
275 - def _dequeue_disarmed(self):
276 """ 277 Dequeue disarmed timers (sort of garbage collection). 278 """ 279 while len(self.timers) > 0 and not self.timers[0].armed(): 280 heapq.heappop(self.timers)
281
282 - def fire(self):
283 """ 284 Remove the smallest timer from the queue and fire its associated client. 285 Raise IndexError if the queue is empty. 286 """ 287 self._dequeue_disarmed() 288 289 timercase = heapq.heappop(self.timers) 290 client = timercase.disarm() 291 292 client.fire_delay = 0 293 client._fire() 294 295 if client.fire_delay > 0 or client.interval > 0: 296 timercase.arm(client) 297 heapq.heappush(self.timers, timercase) 298 else: 299 self.armed_count -= 1 300 if not client.autoclose: 301 self._engine.evlooprefcnt -= 1
302
303 - def nextfire_delay(self):
304 """ 305 Return next timer fire delay (relative time). 306 """ 307 self._dequeue_disarmed() 308 if len(self.timers) > 0: 309 return max(0., self.timers[0].fire_date - time.time()) 310 311 return -1
312
313 - def expired(self):
314 """ 315 Has a timer expired? 316 """ 317 self._dequeue_disarmed() 318 return len(self.timers) > 0 and \ 319 (self.timers[0].fire_date - time.time()) <= 1e-2
320
321 - def clear(self):
322 """ 323 Stop and clear all timers. 324 """ 325 for timer in self.timers: 326 if timer.armed(): 327 timer.client.invalidate() 328 329 self.timers = [] 330 self.armed_count = 0
331 332
333 -class Engine:
334 """ 335 Interface for ClusterShell engine. Subclasses have to implement a runloop 336 listening for client events. 337 """ 338 339 # Engine client I/O event interest bits 340 E_READABLE = 0x1 341 E_WRITABLE = 0x2 342 E_ANY = 0x3 343
344 - def __init__(self, info):
345 """ 346 Initialize base class. 347 """ 348 # take a reference on info dict 349 self.info = info 350 351 # keep track of all clients 352 self._clients = Set() 353 354 # keep track of the number of registered clients 355 # note: len(self.reg_clients) <= configured fanout 356 self.reg_clients = 0 357 358 # keep track of registered file descriptors in a dict where keys 359 # are fileno and values are clients 360 self.reg_clifds = {} 361 362 # A boolean that indicates when reg_clifds has changed, or when 363 # some client interest event mask has changed. It is set by the 364 # base class, and reset by each engine implementation. 365 # Engines often deal with I/O events in chunk, and some event 366 # may lead to change to some other "client interest event mask" 367 # or could even register or close other clients. When such 368 # changes are made, this boolean is set to True, allowing the 369 # engine implementation to reconsider their events got by chunk. 370 self.reg_clifds_changed = False 371 372 # timer queue to handle both timers and clients timeout 373 self.timerq = _EngineTimerQ(self) 374 375 # reference count to the event loop (must include registered 376 # clients and timers configured WITHOUT autoclose) 377 self.evlooprefcnt = 0 378 379 self.joinable = True 380 # thread stuffs 381 self.run_lock = thread.allocate_lock() 382 self.start_lock = thread.allocate_lock() 383 self.start_lock.acquire()
384
385 - def clients(self):
386 """ 387 Get a copy of clients set. 388 """ 389 return self._clients.copy()
390
391 - def add(self, client):
392 """ 393 Add a client to engine. Subclasses that override this method 394 should call base class method. 395 """ 396 # bind to engine 397 client._set_engine(self) 398 399 # add to clients set 400 self._clients.add(client) 401 402 if self.run_lock.locked(): 403 # in-fly add if running 404 self.register(client._start())
405
406 - def remove(self, client, did_timeout=False):
407 """ 408 Remove a client from engine. Subclasses that override this 409 method should call base class method. 410 """ 411 self._debug("REMOVE %s" % client) 412 self._clients.remove(client) 413 414 if client.registered: 415 self.unregister(client) 416 client._close(force=False, timeout=did_timeout) 417 self.start_all()
418
419 - def clear(self, did_timeout=False):
420 """ 421 Remove all clients. Subclasses that override this method should 422 call base class method. 423 """ 424 while len(self._clients) > 0: 425 client = self._clients.pop() 426 if client.registered: 427 self.unregister(client) 428 client._close(force=True, timeout=did_timeout)
429
430 - def register(self, client):
431 """ 432 Register a client. Subclasses that override this method should 433 call base class method. 434 """ 435 assert client in self._clients 436 assert not client.registered 437 438 rfd = client.reader_fileno() 439 wfd = client.writer_fileno() 440 assert rfd != None or wfd != None 441 442 self._debug("REG %s(r%s,w%s)(autoclose=%s)" % (client.__class__.__name__, 443 rfd, wfd, client.autoclose)) 444 445 client._events = 0 446 client.registered = True 447 self.reg_clients += 1 448 449 if client.autoclose: 450 refcnt_inc = 0 451 else: 452 refcnt_inc = 1 453 454 if rfd != None: 455 self.reg_clifds[rfd] = client 456 self.reg_clifds_changed = True 457 client._events |= Engine.E_READABLE 458 self.evlooprefcnt += refcnt_inc 459 self._modify_specific(rfd, Engine.E_READABLE, 1) 460 if wfd != None: 461 self.reg_clifds[wfd] = client 462 self.reg_clifds_changed = True 463 client._events |= Engine.E_WRITABLE 464 self.evlooprefcnt += refcnt_inc 465 self._modify_specific(wfd, Engine.E_WRITABLE, 1) 466 467 client._new_events = client._events 468 469 # start timeout timer 470 self.timerq.schedule(client)
471
472 - def unregister_writer(self, client):
473 self._debug("UNREG WRITER r%s,w%s" % (client.reader_fileno(), client.writer_fileno())) 474 if client.autoclose: 475 refcnt_inc = 0 476 else: 477 refcnt_inc = 1 478 479 wfd = client.writer_fileno() 480 if wfd != None: 481 if client._events & Engine.E_WRITABLE: 482 self._modify_specific(wfd, Engine.E_WRITABLE, 0) 483 client._events &= ~Engine.E_WRITABLE 484 del self.reg_clifds[wfd] 485 self.reg_clifds_changed = True 486 self.evlooprefcnt -= refcnt_inc
487
488 - def unregister(self, client):
489 """ 490 Unregister a client. Subclasses that override this method should 491 call base class method. 492 """ 493 # sanity check 494 assert client.registered 495 self._debug("UNREG %s (r%s,w%s)" % (client.__class__.__name__, 496 client.reader_fileno(), client.writer_fileno())) 497 498 # remove timeout timer 499 self.timerq.invalidate(client) 500 501 if client.autoclose: 502 refcnt_inc = 0 503 else: 504 refcnt_inc = 1 505 506 # clear interest events 507 rfd = client.reader_fileno() 508 if rfd != None: 509 if client._events & Engine.E_READABLE: 510 self._modify_specific(rfd, Engine.E_READABLE, 0) 511 client._events &= ~Engine.E_READABLE 512 del self.reg_clifds[rfd] 513 self.reg_clifds_changed = True 514 self.evlooprefcnt -= refcnt_inc 515 516 wfd = client.writer_fileno() 517 if wfd != None: 518 if client._events & Engine.E_WRITABLE: 519 self._modify_specific(wfd, Engine.E_WRITABLE, 0) 520 client._events &= ~Engine.E_WRITABLE 521 del self.reg_clifds[wfd] 522 self.reg_clifds_changed = True 523 self.evlooprefcnt -= refcnt_inc 524 525 client._new_events = 0 526 client.registered = False 527 self.reg_clients -= 1
528
529 - def modify(self, client, set, clear):
530 """ 531 Modify the next loop interest events bitset for a client. 532 """ 533 self._debug("MODEV set:0x%x clear:0x%x %s" % (set, clear, client)) 534 client._new_events &= ~clear 535 client._new_events |= set 536 537 if not client._processing: 538 # modifying a non processing client? 539 self.reg_clifds_has_changed = True 540 # apply new_events now 541 self.set_events(client, client._new_events)
542
543 - def set_events(self, client, new_events):
544 """ 545 Set the active interest events bitset for a client. 546 """ 547 assert not client._processing 548 549 self._debug("SETEV new_events:0x%x events:0x%x %s" % (new_events, 550 client._events, client)) 551 552 if client.autoclose: 553 refcnt_inc = 0 554 else: 555 refcnt_inc = 1 556 557 chgbits = new_events ^ client._events 558 if chgbits == 0: 559 return 560 561 # configure interest events as appropriate 562 rfd = client.reader_fileno() 563 if rfd != None: 564 if chgbits & Engine.E_READABLE: 565 status = new_events & Engine.E_READABLE 566 self._modify_specific(rfd, Engine.E_READABLE, status) 567 if status: 568 client._events |= Engine.E_READABLE 569 else: 570 client._events &= ~Engine.E_READABLE 571 572 wfd = client.writer_fileno() 573 if wfd != None: 574 if chgbits & Engine.E_WRITABLE: 575 status = new_events & Engine.E_WRITABLE 576 self._modify_specific(wfd, Engine.E_WRITABLE, status) 577 if status: 578 client._events |= Engine.E_WRITABLE 579 else: 580 client._events &= ~Engine.E_WRITABLE 581 582 client._new_events = client._events
583
584 - def add_timer(self, timer):
585 """ 586 Add engine timer. 587 """ 588 timer._set_engine(self) 589 self.timerq.schedule(timer)
590
591 - def remove_timer(self, timer):
592 """ 593 Remove engine timer. 594 """ 595 self.timerq.invalidate(timer)
596
597 - def fire_timers(self):
598 """ 599 Fire expired timers for processing. 600 """ 601 while self.timerq.expired(): 602 self.timerq.fire()
603
604 - def start_all(self):
605 """ 606 Start and register all possible clients, in respect of task fanout. 607 """ 608 fanout = self.info["fanout"] 609 assert fanout > 0 610 if fanout <= self.reg_clients: 611 return 612 613 for client in self._clients: 614 if not client.registered: 615 self._debug("START CLIENT %s" % client.__class__.__name__) 616 self.register(client._start()) 617 if fanout <= self.reg_clients: 618 break
619
620 - def run(self, timeout):
621 """ 622 Run engine in calling thread. 623 """ 624 # change to running state 625 if not self.run_lock.acquire(0): 626 raise EngineAlreadyRunningError() 627 628 # start clients now 629 self.start_all() 630 631 # we're started 632 self.joinable = True 633 self.start_lock.release() 634 635 # note: try-except-finally not supported before python 2.5 636 try: 637 try: 638 self.runloop(timeout) 639 except Exception, e: 640 # any exceptions invalidate clients 641 self.clear(isinstance(e, EngineTimeoutException)) 642 raise 643 finally: 644 # cleanup 645 self.timerq.clear() 646 647 # change to idle state 648 self.joinable = False 649 self.start_lock.acquire() 650 self.run_lock.release()
651
652 - def runloop(self, timeout):
653 """ 654 Engine specific run loop. Derived classes must implement. 655 """ 656 raise NotImplementedError("Derived classes must implement.")
657
658 - def abort(self, kill):
659 """ 660 Abort runloop. 661 """ 662 if not self.start_lock.locked() and self.joinable: 663 raise EngineAbortException(kill) 664 else: 665 self.clear()
666
667 - def join(self):
668 """ 669 Block calling thread until runloop has finished. 670 """ 671 # make sure engine has started first 672 if not self.start_lock.acquire(0): # try 673 # check for joinable state 674 if not self.joinable: 675 return 676 self.start_lock.acquire() 677 self.start_lock.release() 678 679 # joined once run_lock is available 680 self.run_lock.acquire() 681 self.run_lock.release()
682
683 - def _debug(self, s):
684 # library engine debugging hook 685 pass
686