Package ClusterShell :: Package Engine :: Module Engine
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Engine

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009, 2010) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Engine.py 295 2010-07-21 19:55:15Z st-cea $ 
 34   
 35  """ 
 36  Interface of underlying Task's Engine. 
 37   
 38  An Engine implements a loop your thread enters and uses to call event handlers 
 39  in response to incoming events (from workers, timers, etc.). 
 40  """ 
 41   
 42  import errno 
 43  import heapq 
 44  import time 
 45   
 46   
47 -class EngineException(Exception):
48 """ 49 Base engine exception. 50 """
51
52 -class EngineAbortException(EngineException):
53 """ 54 Raised on user abort. 55 """
56 - def __init__(self, kill):
57 self.kill = kill
58
59 -class EngineTimeoutException(EngineException):
60 """ 61 Raised when a timeout is encountered. 62 """
63
64 -class EngineIllegalOperationError(EngineException):
65 """ 66 Error raised when an illegal operation has been performed. 67 """
68
69 -class EngineAlreadyRunningError(EngineIllegalOperationError):
70 """ 71 Error raised when the engine is already running. 72 """
73
74 -class EngineNotSupportedError(EngineException):
75 """ 76 Error raised when the engine mechanism is not supported. 77 """
78 79
80 -class EngineBaseTimer:
81 """ 82 Abstract class for ClusterShell's engine timer. Such a timer 83 requires a relative fire time (delay) in seconds (as float), and 84 supports an optional repeating interval in seconds (as float too). 85 86 See EngineTimer for more information about ClusterShell timers. 87 """ 88
89 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
90 """ 91 Create a base timer. 92 """ 93 self.fire_delay = fire_delay 94 self.interval = interval 95 self.autoclose = autoclose 96 self._engine = None 97 self._timercase = None
98
99 - def _set_engine(self, engine):
100 """ 101 Bind to engine, called by Engine. 102 """ 103 if self._engine: 104 # A timer can be registered to only one engine at a time. 105 raise EngineIllegalOperationError("Already bound to engine.") 106 107 self._engine = engine
108
109 - def invalidate(self):
110 """ 111 Invalidates a timer object, stopping it from ever firing again. 112 """ 113 if self._engine: 114 self._engine.timerq.invalidate(self) 115 self._engine = None
116
117 - def is_valid(self):
118 """ 119 Returns a boolean value that indicates whether an EngineTimer 120 object is valid and able to fire. 121 """ 122 return self._engine != None
123
124 - def set_nextfire(self, fire_delay, interval=-1):
125 """ 126 Set the next firing delay in seconds for an EngineTimer object. 127 128 The optional paramater `interval' sets the firing interval 129 of the timer. If not specified, the timer fires once and then 130 is automatically invalidated. 131 132 Time values are expressed in second using floating point 133 values. Precision is implementation (and system) dependent. 134 135 It is safe to call this method from the task owning this 136 timer object, in any event handlers, anywhere. 137 138 However, resetting a timer's next firing time may be a 139 relatively expensive operation. It is more efficient to let 140 timers autorepeat or to use this method from the timer's own 141 event handler callback (ie. from its ev_timer). 142 """ 143 if not self.is_valid(): 144 raise EngineIllegalOperationError("Operation on invalid timer.") 145 146 self.fire_delay = fire_delay 147 self.interval = interval 148 self._engine.timerq.reschedule(self)
149
150 - def _fire(self):
151 raise NotImplementedError("Derived classes must implement.")
152 153
154 -class EngineTimer(EngineBaseTimer):
155 """ 156 Concrete class EngineTimer 157 158 An EngineTimer object represents a timer bound to an engine that 159 fires at a preset time in the future. Timers can fire either only 160 once or repeatedly at fixed time intervals. Repeating timers can 161 also have their next firing time manually adjusted. 162 163 A timer is not a real-time mechanism; it fires when the task's 164 underlying engine to which the timer has been added is running and 165 able to check if the timer's firing time has passed. 166 """ 167
168 - def __init__(self, fire_delay, interval, autoclose, handler):
169 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose) 170 self.eh = handler 171 assert self.eh != None, "An event handler is needed for timer."
172
173 - def _fire(self):
174 self.eh._invoke("ev_timer", self)
175
176 -class _EngineTimerQ:
177
178 - class _EngineTimerCase:
179 """ 180 Helper class that allows comparisons of fire times, to be easily used 181 in an heapq. 182 """
183 - def __init__(self, client):
184 self.client = client 185 self.client._timercase = self 186 # arm timer (first time) 187 assert self.client.fire_delay > 0 188 self.fire_date = self.client.fire_delay + time.time()
189
190 - def __cmp__(self, other):
191 return cmp(self.fire_date, other.fire_date)
192
193 - def arm(self, client):
194 assert client != None 195 self.client = client 196 self.client._timercase = self 197 # setup next firing date 198 time_current = time.time() 199 if self.client.fire_delay > 0: 200 self.fire_date = self.client.fire_delay + time_current 201 else: 202 interval = float(self.client.interval) 203 assert interval > 0 204 self.fire_date += interval 205 # If the firing time is delayed so far that it passes one 206 # or more of the scheduled firing times, reschedule the 207 # timer for the next scheduled firing time in the future. 208 while self.fire_date < time_current: 209 self.fire_date += interval
210
211 - def disarm(self):
212 client = self.client 213 client._timercase = None 214 self.client = None 215 return client
216
217 - def armed(self):
218 return self.client != None
219 220
221 - def __init__(self, engine):
222 """ 223 Initializer. 224 """ 225 self._engine = engine 226 self.timers = [] 227 self.armed_count = 0
228
229 - def __len__(self):
230 """ 231 Return the number of active timers. 232 """ 233 return self.armed_count
234
235 - def schedule(self, client):
236 """ 237 Insert and arm a client's timer. 238 """ 239 # arm only if fire is set 240 if client.fire_delay > 0: 241 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client)) 242 self.armed_count += 1 243 if not client.autoclose: 244 self._engine.evlooprefcnt += 1
245
246 - def reschedule(self, client):
247 """ 248 Re-insert client's timer. 249 """ 250 if client._timercase: 251 self.invalidate(client) 252 self._dequeue_disarmed() 253 self.schedule(client)
254
255 - def invalidate(self, client):
256 """ 257 Invalidate client's timer. Current implementation doesn't really remove 258 the timer, but simply flags it as disarmed. 259 """ 260 if not client._timercase: 261 # if timer is being fire, invalidate its values 262 client.fire_delay = 0 263 client.interval = 0 264 return 265 266 if self.armed_count <= 0: 267 raise ValueError, "Engine client timer not found in timer queue" 268 269 client._timercase.disarm() 270 self.armed_count -= 1 271 if not client.autoclose: 272 self._engine.evlooprefcnt -= 1
273
274 - def _dequeue_disarmed(self):
275 """ 276 Dequeue disarmed timers (sort of garbage collection). 277 """ 278 while len(self.timers) > 0 and not self.timers[0].armed(): 279 heapq.heappop(self.timers)
280
281 - def fire(self):
282 """ 283 Remove the smallest timer from the queue and fire its associated client. 284 Raise IndexError if the queue is empty. 285 """ 286 self._dequeue_disarmed() 287 288 timercase = heapq.heappop(self.timers) 289 client = timercase.disarm() 290 291 client.fire_delay = 0 292 client._fire() 293 294 if client.fire_delay > 0 or client.interval > 0: 295 timercase.arm(client) 296 heapq.heappush(self.timers, timercase) 297 else: 298 self.armed_count -= 1 299 if not client.autoclose: 300 self._engine.evlooprefcnt -= 1
301
302 - def nextfire_delay(self):
303 """ 304 Return next timer fire delay (relative time). 305 """ 306 self._dequeue_disarmed() 307 if len(self.timers) > 0: 308 return max(0., self.timers[0].fire_date - time.time()) 309 310 return -1
311
312 - def expired(self):
313 """ 314 Has a timer expired? 315 """ 316 self._dequeue_disarmed() 317 return len(self.timers) > 0 and \ 318 (self.timers[0].fire_date - time.time()) <= 1e-2
319
320 - def clear(self):
321 """ 322 Stop and clear all timers. 323 """ 324 for timer in self.timers: 325 if timer.armed(): 326 timer.client.invalidate() 327 328 self.timers = [] 329 self.armed_count = 0
330 331
332 -class Engine:
333 """ 334 Interface for ClusterShell engine. Subclasses have to implement a runloop 335 listening for client events. 336 """ 337 338 # Engine client I/O event interest bits 339 E_READ = 0x1 340 E_ERROR = 0x2 341 E_WRITE = 0x4 342 E_ANY = E_READ | E_ERROR | E_WRITE 343 344 identifier = "(none)" 345
346 - def __init__(self, info):
347 """ 348 Initialize base class. 349 """ 350 # take a reference on info dict 351 self.info = info 352 353 # and update engine id 354 self.info['engine'] = self.identifier 355 356 # keep track of all clients 357 self._clients = set() 358 self._ports = set() 359 360 # keep track of the number of registered clients (delayable only) 361 self.reg_clients = 0 362 363 # keep track of registered file descriptors in a dict where keys 364 # are fileno and values are clients 365 self.reg_clifds = {} 366 367 # A boolean that indicates when reg_clifds has changed, or when 368 # some client interest event mask has changed. It is set by the 369 # base class, and reset by each engine implementation. 370 # Engines often deal with I/O events in chunk, and some event 371 # may lead to change to some other "client interest event mask" 372 # or could even register or close other clients. When such 373 # changes are made, this boolean is set to True, allowing the 374 # engine implementation to reconsider their events got by chunk. 375 self.reg_clifds_changed = False 376 377 # timer queue to handle both timers and clients timeout 378 self.timerq = _EngineTimerQ(self) 379 380 # reference count to the event loop (must include registered 381 # clients and timers configured WITHOUT autoclose) 382 self.evlooprefcnt = 0 383 384 # running state 385 self.running = False 386 # runloop-has-exited flag 387 self._exited = False
388
389 - def clients(self):
390 """ 391 Get a copy of clients set. 392 """ 393 return self._clients.copy()
394
395 - def ports(self):
396 """ 397 Get a copy of ports set. 398 """ 399 return self._ports.copy()
400
401 - def _fd2client(self, fd):
402 fdev = None 403 client = self.reg_clifds.get(fd) 404 if client: 405 try: 406 if fd == client.reader_fileno(): 407 fdev = Engine.E_READ 408 elif fd == client.error_fileno(): 409 fdev = Engine.E_ERROR 410 elif fd == client.writer_fileno(): 411 fdev = Engine.E_WRITE 412 except: 413 return (client, Engine.E_ERROR) 414 return (client, fdev)
415
416 - def add(self, client):
417 """ 418 Add a client to engine. Subclasses that override this method 419 should call base class method. 420 """ 421 # bind to engine 422 client._set_engine(self) 423 424 if client.delayable: 425 # add to regular client set 426 self._clients.add(client) 427 else: 428 # add to port set (non-delayable) 429 self._ports.add(client) 430 431 if self.running: 432 # in-fly add if running 433 if not client.delayable: 434 self.register(client) 435 elif self.info["fanout"] > self.reg_clients: 436 self.register(client._start())
437
438 - def _remove(self, client, did_timeout=False, force=False):
439 """ 440 Remove a client from engine (subroutine). 441 """ 442 if client.registered: 443 self.unregister(client) 444 client._close(force=force, timeout=did_timeout)
445
446 - def remove(self, client, did_timeout=False):
447 """ 448 Remove a client from engine. Subclasses that override this 449 method should call base class method. 450 """ 451 self._debug("REMOVE %s" % client) 452 if client.delayable: 453 self._clients.remove(client) 454 else: 455 self._ports.remove(client) 456 self._remove(client, did_timeout, force=False) 457 self.start_all()
458
459 - def clear(self, did_timeout=False, clear_ports=False):
460 """ 461 Remove all clients. Subclasses that override this method should 462 call base class method. 463 """ 464 all_clients = [self._clients] 465 if clear_ports: 466 all_clients.append(self._ports) 467 468 for clients in all_clients: 469 while len(clients) > 0: 470 client = clients.pop() 471 self._remove(client, did_timeout, force=True)
472
473 - def register(self, client):
474 """ 475 Register an engine client. Subclasses that override this method 476 should call base class method. 477 """ 478 assert client in self._clients or client in self._ports 479 assert not client.registered 480 481 efd = client.error_fileno() 482 rfd = client.reader_fileno() 483 wfd = client.writer_fileno() 484 assert rfd != None or wfd != None 485 486 self._debug("REG %s(e%s,r%s,w%s)(autoclose=%s)" % \ 487 (client.__class__.__name__, efd, rfd, wfd, 488 client.autoclose)) 489 490 client._events = 0 491 client.registered = True 492 493 if client.delayable: 494 self.reg_clients += 1 495 496 if client.autoclose: 497 refcnt_inc = 0 498 else: 499 refcnt_inc = 1 500 501 if efd != None: 502 self.reg_clifds[efd] = client 503 self.reg_clifds_changed = True 504 client._events |= Engine.E_ERROR 505 self.evlooprefcnt += refcnt_inc 506 self._register_specific(efd, Engine.E_ERROR) 507 if rfd != None: 508 self.reg_clifds[rfd] = client 509 self.reg_clifds_changed = True 510 client._events |= Engine.E_READ 511 self.evlooprefcnt += refcnt_inc 512 self._register_specific(rfd, Engine.E_READ) 513 if wfd != None: 514 self.reg_clifds[wfd] = client 515 self.reg_clifds_changed = True 516 client._events |= Engine.E_WRITE 517 self.evlooprefcnt += refcnt_inc 518 self._register_specific(wfd, Engine.E_WRITE) 519 520 client._new_events = client._events 521 522 # start timeout timer 523 self.timerq.schedule(client)
524
525 - def unregister_writer(self, client):
526 self._debug("UNREG WRITER r%s,w%s" % (client.reader_fileno(), \ 527 client.writer_fileno())) 528 if client.autoclose: 529 refcnt_inc = 0 530 else: 531 refcnt_inc = 1 532 533 wfd = client.writer_fileno() 534 if wfd != None: 535 self._unregister_specific(wfd, client._events & Engine.E_WRITE) 536 client._events &= ~Engine.E_WRITE 537 del self.reg_clifds[wfd] 538 self.reg_clifds_changed = True 539 self.evlooprefcnt -= refcnt_inc
540
541 - def unregister(self, client):
542 """ 543 Unregister a client. Subclasses that override this method should 544 call base class method. 545 """ 546 # sanity check 547 assert client.registered 548 self._debug("UNREG %s (r%s,e%s,w%s)" % (client.__class__.__name__, 549 client.reader_fileno(), client.error_fileno(), 550 client.writer_fileno())) 551 552 # remove timeout timer 553 self.timerq.invalidate(client) 554 555 if client.autoclose: 556 refcnt_inc = 0 557 else: 558 refcnt_inc = 1 559 560 # clear interest events 561 efd = client.error_fileno() 562 if efd != None: 563 self._unregister_specific(efd, client._events & Engine.E_ERROR) 564 client._events &= ~Engine.E_ERROR 565 del self.reg_clifds[efd] 566 self.reg_clifds_changed = True 567 self.evlooprefcnt -= refcnt_inc 568 569 rfd = client.reader_fileno() 570 if rfd != None: 571 self._unregister_specific(rfd, client._events & Engine.E_READ) 572 client._events &= ~Engine.E_READ 573 del self.reg_clifds[rfd] 574 self.reg_clifds_changed = True 575 self.evlooprefcnt -= refcnt_inc 576 577 wfd = client.writer_fileno() 578 if wfd != None: 579 self._unregister_specific(wfd, client._events & Engine.E_WRITE) 580 client._events &= ~Engine.E_WRITE 581 del self.reg_clifds[wfd] 582 self.reg_clifds_changed = True 583 self.evlooprefcnt -= refcnt_inc 584 585 client._new_events = 0 586 client.registered = False 587 if client.delayable: 588 self.reg_clients -= 1
589
590 - def modify(self, client, setmask, clearmask):
591 """ 592 Modify the next loop interest events bitset for a client. 593 """ 594 self._debug("MODEV set:0x%x clear:0x%x %s" % (setmask, clearmask, 595 client)) 596 client._new_events &= ~clearmask 597 client._new_events |= setmask 598 599 if not client._processing: 600 # modifying a non processing client? 601 self.reg_clifds_changed = True 602 # apply new_events now 603 self.set_events(client, client._new_events)
604
605 - def _register_specific(self, fd, event):
606 """Engine-specific register fd for event method.""" 607 raise NotImplementedError("Derived classes must implement.")
608
609 - def _unregister_specific(self, fd, ev_is_set):
610 """Engine-specific unregister fd method.""" 611 raise NotImplementedError("Derived classes must implement.")
612
613 - def _modify_specific(self, fd, event, setvalue):
614 """Engine-specific modify fd for event method.""" 615 raise NotImplementedError("Derived classes must implement.")
616
617 - def set_events(self, client, new_events):
618 """ 619 Set the active interest events bitset for a client. 620 """ 621 assert not client._processing 622 623 self._debug("SETEV new_events:0x%x events:0x%x %s" % (new_events, 624 client._events, client)) 625 626 chgbits = new_events ^ client._events 627 if chgbits == 0: 628 return 629 630 # configure interest events as appropriate 631 efd = client.error_fileno() 632 if efd != None: 633 if chgbits & Engine.E_ERROR: 634 status = new_events & Engine.E_ERROR 635 self._modify_specific(efd, Engine.E_ERROR, status) 636 if status: 637 client._events |= Engine.E_ERROR 638 else: 639 client._events &= ~Engine.E_ERROR 640 641 rfd = client.reader_fileno() 642 if rfd != None: 643 if chgbits & Engine.E_READ: 644 status = new_events & Engine.E_READ 645 self._modify_specific(rfd, Engine.E_READ, status) 646 if status: 647 client._events |= Engine.E_READ 648 else: 649 client._events &= ~Engine.E_READ 650 651 wfd = client.writer_fileno() 652 if wfd != None: 653 if chgbits & Engine.E_WRITE: 654 status = new_events & Engine.E_WRITE 655 self._modify_specific(wfd, Engine.E_WRITE, status) 656 if status: 657 client._events |= Engine.E_WRITE 658 else: 659 client._events &= ~Engine.E_WRITE 660 661 client._new_events = client._events
662
663 - def set_reading(self, client):
664 """ 665 Set client reading state. 666 """ 667 # listen for readable events 668 self.modify(client, Engine.E_READ, 0)
669
670 - def set_reading_error(self, client):
671 """ 672 Set client reading error state. 673 """ 674 # listen for readable events 675 self.modify(client, Engine.E_ERROR, 0)
676
677 - def set_writing(self, client):
678 """ 679 Set client writing state. 680 """ 681 # listen for writable events 682 self.modify(client, Engine.E_WRITE, 0)
683
684 - def add_timer(self, timer):
685 """ 686 Add engine timer. 687 """ 688 timer._set_engine(self) 689 self.timerq.schedule(timer)
690
691 - def remove_timer(self, timer):
692 """ 693 Remove engine timer. 694 """ 695 self.timerq.invalidate(timer)
696
697 - def fire_timers(self):
698 """ 699 Fire expired timers for processing. 700 """ 701 while self.timerq.expired(): 702 self.timerq.fire()
703
704 - def start_ports(self):
705 """ 706 Start and register all port clients. 707 """ 708 # Ports are special, non-delayable engine clients 709 for port in self._ports: 710 if not port.registered: 711 self._debug("START PORT %s" % port) 712 self.register(port)
713
714 - def start_all(self):
715 """ 716 Start and register all other possible clients, in respect of task fanout. 717 """ 718 # Get current fanout value 719 fanout = self.info["fanout"] 720 assert fanout > 0 721 if fanout <= self.reg_clients: 722 return 723 724 # Register regular engine clients within the fanout limit 725 for client in self._clients: 726 if not client.registered: 727 self._debug("START CLIENT %s" % client.__class__.__name__) 728 self.register(client._start()) 729 if fanout <= self.reg_clients: 730 break
731
732 - def run(self, timeout):
733 """ 734 Run engine in calling thread. 735 """ 736 # change to running state 737 if self.running: 738 raise EngineAlreadyRunningError() 739 self.running = True 740 741 # start port clients 742 self.start_ports() 743 744 # peek in ports for early pending messages 745 self.snoop_ports() 746 747 # start all other clients 748 self.start_all() 749 750 # note: try-except-finally not supported before python 2.5 751 try: 752 try: 753 self.runloop(timeout) 754 except Exception, e: 755 # any exceptions invalidate clients 756 self.clear(isinstance(e, EngineTimeoutException)) 757 raise 758 finally: 759 # cleanup 760 self.timerq.clear() 761 self.running = False
762
763 - def snoop_ports(self):
764 """ 765 Peek in ports for possible early pending messages. 766 This method simply tries to read port pipes in non- 767 blocking mode. 768 """ 769 # make a copy so that early messages on installed ports may 770 # lead to new ports 771 ports = self._ports.copy() 772 for port in ports: 773 try: 774 port._handle_read() 775 except (IOError, OSError), (err, strerr): 776 if err == errno.EAGAIN or err == errno.EWOULDBLOCK: 777 # no pending message 778 return 779 # raise any other error 780 raise
781
782 - def runloop(self, timeout):
783 """ 784 Engine specific run loop. Derived classes must implement. 785 """ 786 raise NotImplementedError("Derived classes must implement.")
787
788 - def abort(self, kill):
789 """ 790 Abort runloop. 791 """ 792 if self.running: 793 raise EngineAbortException(kill) 794 795 self.clear(clear_ports=kill)
796
797 - def exited(self):
798 """ 799 Returns True if the engine has exited the runloop once. 800 """ 801 return not self.running and self._exited
802
803 - def _debug(self, s):
804 # library engine debugging hook 805 #print s 806 pass
807