1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Interface of underlying Task's Engine.
37
38 An Engine implements a loop your thread enters and uses to call event handlers
39 in response to incoming events (from workers, timers, etc.).
40 """
41
42 import errno
43 import heapq
44 import time
45
46
48 """
49 Base engine exception.
50 """
51
53 """
54 Raised on user abort.
55 """
58
60 """
61 Raised when a timeout is encountered.
62 """
63
65 """
66 Error raised when an illegal operation has been performed.
67 """
68
70 """
71 Error raised when the engine is already running.
72 """
73
75 """
76 Error raised when the engine mechanism is not supported.
77 """
78
79
81 """
82 Abstract class for ClusterShell's engine timer. Such a timer
83 requires a relative fire time (delay) in seconds (as float), and
84 supports an optional repeating interval in seconds (as float too).
85
86 See EngineTimer for more information about ClusterShell timers.
87 """
88
89 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
90 """
91 Create a base timer.
92 """
93 self.fire_delay = fire_delay
94 self.interval = interval
95 self.autoclose = autoclose
96 self._engine = None
97 self._timercase = None
98
100 """
101 Bind to engine, called by Engine.
102 """
103 if self._engine:
104
105 raise EngineIllegalOperationError("Already bound to engine.")
106
107 self._engine = engine
108
110 """
111 Invalidates a timer object, stopping it from ever firing again.
112 """
113 if self._engine:
114 self._engine.timerq.invalidate(self)
115 self._engine = None
116
118 """
119 Returns a boolean value that indicates whether an EngineTimer
120 object is valid and able to fire.
121 """
122 return self._engine != None
123
125 """
126 Set the next firing delay in seconds for an EngineTimer object.
127
128 The optional paramater `interval' sets the firing interval
129 of the timer. If not specified, the timer fires once and then
130 is automatically invalidated.
131
132 Time values are expressed in second using floating point
133 values. Precision is implementation (and system) dependent.
134
135 It is safe to call this method from the task owning this
136 timer object, in any event handlers, anywhere.
137
138 However, resetting a timer's next firing time may be a
139 relatively expensive operation. It is more efficient to let
140 timers autorepeat or to use this method from the timer's own
141 event handler callback (ie. from its ev_timer).
142 """
143 if not self.is_valid():
144 raise EngineIllegalOperationError("Operation on invalid timer.")
145
146 self.fire_delay = fire_delay
147 self.interval = interval
148 self._engine.timerq.reschedule(self)
149
151 raise NotImplementedError("Derived classes must implement.")
152
153
155 """
156 Concrete class EngineTimer
157
158 An EngineTimer object represents a timer bound to an engine that
159 fires at a preset time in the future. Timers can fire either only
160 once or repeatedly at fixed time intervals. Repeating timers can
161 also have their next firing time manually adjusted.
162
163 A timer is not a real-time mechanism; it fires when the task's
164 underlying engine to which the timer has been added is running and
165 able to check if the timer's firing time has passed.
166 """
167
168 - def __init__(self, fire_delay, interval, autoclose, handler):
169 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose)
170 self.eh = handler
171 assert self.eh != None, "An event handler is needed for timer."
172
174 self.eh._invoke("ev_timer", self)
175
177
179 """
180 Helper class that allows comparisons of fire times, to be easily used
181 in an heapq.
182 """
184 self.client = client
185 self.client._timercase = self
186
187 assert self.client.fire_delay > 0
188 self.fire_date = self.client.fire_delay + time.time()
189
191 return cmp(self.fire_date, other.fire_date)
192
193 - def arm(self, client):
194 assert client != None
195 self.client = client
196 self.client._timercase = self
197
198 time_current = time.time()
199 if self.client.fire_delay > 0:
200 self.fire_date = self.client.fire_delay + time_current
201 else:
202 interval = float(self.client.interval)
203 assert interval > 0
204 self.fire_date += interval
205
206
207
208 while self.fire_date < time_current:
209 self.fire_date += interval
210
212 client = self.client
213 client._timercase = None
214 self.client = None
215 return client
216
218 return self.client != None
219
220
222 """
223 Initializer.
224 """
225 self._engine = engine
226 self.timers = []
227 self.armed_count = 0
228
230 """
231 Return the number of active timers.
232 """
233 return self.armed_count
234
236 """
237 Insert and arm a client's timer.
238 """
239
240 if client.fire_delay > 0:
241 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client))
242 self.armed_count += 1
243 if not client.autoclose:
244 self._engine.evlooprefcnt += 1
245
254
256 """
257 Invalidate client's timer. Current implementation doesn't really remove
258 the timer, but simply flags it as disarmed.
259 """
260 if not client._timercase:
261
262 client.fire_delay = 0
263 client.interval = 0
264 return
265
266 if self.armed_count <= 0:
267 raise ValueError, "Engine client timer not found in timer queue"
268
269 client._timercase.disarm()
270 self.armed_count -= 1
271 if not client.autoclose:
272 self._engine.evlooprefcnt -= 1
273
275 """
276 Dequeue disarmed timers (sort of garbage collection).
277 """
278 while len(self.timers) > 0 and not self.timers[0].armed():
279 heapq.heappop(self.timers)
280
282 """
283 Remove the smallest timer from the queue and fire its associated client.
284 Raise IndexError if the queue is empty.
285 """
286 self._dequeue_disarmed()
287
288 timercase = heapq.heappop(self.timers)
289 client = timercase.disarm()
290
291 client.fire_delay = 0
292 client._fire()
293
294 if client.fire_delay > 0 or client.interval > 0:
295 timercase.arm(client)
296 heapq.heappush(self.timers, timercase)
297 else:
298 self.armed_count -= 1
299 if not client.autoclose:
300 self._engine.evlooprefcnt -= 1
301
303 """
304 Return next timer fire delay (relative time).
305 """
306 self._dequeue_disarmed()
307 if len(self.timers) > 0:
308 return max(0., self.timers[0].fire_date - time.time())
309
310 return -1
311
313 """
314 Has a timer expired?
315 """
316 self._dequeue_disarmed()
317 return len(self.timers) > 0 and \
318 (self.timers[0].fire_date - time.time()) <= 1e-2
319
321 """
322 Stop and clear all timers.
323 """
324 for timer in self.timers:
325 if timer.armed():
326 timer.client.invalidate()
327
328 self.timers = []
329 self.armed_count = 0
330
331
333 """
334 Interface for ClusterShell engine. Subclasses have to implement a runloop
335 listening for client events.
336 """
337
338
339 E_READ = 0x1
340 E_ERROR = 0x2
341 E_WRITE = 0x4
342 E_ANY = E_READ | E_ERROR | E_WRITE
343
344 identifier = "(none)"
345
347 """
348 Initialize base class.
349 """
350
351 self.info = info
352
353
354 self.info['engine'] = self.identifier
355
356
357 self._clients = set()
358 self._ports = set()
359
360
361 self.reg_clients = 0
362
363
364
365 self.reg_clifds = {}
366
367
368
369
370
371
372
373
374
375 self.reg_clifds_changed = False
376
377
378 self.timerq = _EngineTimerQ(self)
379
380
381
382 self.evlooprefcnt = 0
383
384
385 self.running = False
386
387 self._exited = False
388
390 """
391 Get a copy of clients set.
392 """
393 return self._clients.copy()
394
396 """
397 Get a copy of ports set.
398 """
399 return self._ports.copy()
400
415
416 - def add(self, client):
417 """
418 Add a client to engine. Subclasses that override this method
419 should call base class method.
420 """
421
422 client._set_engine(self)
423
424 if client.delayable:
425
426 self._clients.add(client)
427 else:
428
429 self._ports.add(client)
430
431 if self.running:
432
433 if not client.delayable:
434 self.register(client)
435 elif self.info["fanout"] > self.reg_clients:
436 self.register(client._start())
437
438 - def _remove(self, client, did_timeout=False, force=False):
439 """
440 Remove a client from engine (subroutine).
441 """
442 if client.registered:
443 self.unregister(client)
444 client._close(force=force, timeout=did_timeout)
445
446 - def remove(self, client, did_timeout=False):
447 """
448 Remove a client from engine. Subclasses that override this
449 method should call base class method.
450 """
451 self._debug("REMOVE %s" % client)
452 if client.delayable:
453 self._clients.remove(client)
454 else:
455 self._ports.remove(client)
456 self._remove(client, did_timeout, force=False)
457 self.start_all()
458
459 - def clear(self, did_timeout=False, clear_ports=False):
460 """
461 Remove all clients. Subclasses that override this method should
462 call base class method.
463 """
464 all_clients = [self._clients]
465 if clear_ports:
466 all_clients.append(self._ports)
467
468 for clients in all_clients:
469 while len(clients) > 0:
470 client = clients.pop()
471 self._remove(client, did_timeout, force=True)
472
474 """
475 Register an engine client. Subclasses that override this method
476 should call base class method.
477 """
478 assert client in self._clients or client in self._ports
479 assert not client.registered
480
481 efd = client.error_fileno()
482 rfd = client.reader_fileno()
483 wfd = client.writer_fileno()
484 assert rfd != None or wfd != None
485
486 self._debug("REG %s(e%s,r%s,w%s)(autoclose=%s)" % \
487 (client.__class__.__name__, efd, rfd, wfd,
488 client.autoclose))
489
490 client._events = 0
491 client.registered = True
492
493 if client.delayable:
494 self.reg_clients += 1
495
496 if client.autoclose:
497 refcnt_inc = 0
498 else:
499 refcnt_inc = 1
500
501 if efd != None:
502 self.reg_clifds[efd] = client
503 self.reg_clifds_changed = True
504 client._events |= Engine.E_ERROR
505 self.evlooprefcnt += refcnt_inc
506 self._register_specific(efd, Engine.E_ERROR)
507 if rfd != None:
508 self.reg_clifds[rfd] = client
509 self.reg_clifds_changed = True
510 client._events |= Engine.E_READ
511 self.evlooprefcnt += refcnt_inc
512 self._register_specific(rfd, Engine.E_READ)
513 if wfd != None:
514 self.reg_clifds[wfd] = client
515 self.reg_clifds_changed = True
516 client._events |= Engine.E_WRITE
517 self.evlooprefcnt += refcnt_inc
518 self._register_specific(wfd, Engine.E_WRITE)
519
520 client._new_events = client._events
521
522
523 self.timerq.schedule(client)
524
540
542 """
543 Unregister a client. Subclasses that override this method should
544 call base class method.
545 """
546
547 assert client.registered
548 self._debug("UNREG %s (r%s,e%s,w%s)" % (client.__class__.__name__,
549 client.reader_fileno(), client.error_fileno(),
550 client.writer_fileno()))
551
552
553 self.timerq.invalidate(client)
554
555 if client.autoclose:
556 refcnt_inc = 0
557 else:
558 refcnt_inc = 1
559
560
561 efd = client.error_fileno()
562 if efd != None:
563 self._unregister_specific(efd, client._events & Engine.E_ERROR)
564 client._events &= ~Engine.E_ERROR
565 del self.reg_clifds[efd]
566 self.reg_clifds_changed = True
567 self.evlooprefcnt -= refcnt_inc
568
569 rfd = client.reader_fileno()
570 if rfd != None:
571 self._unregister_specific(rfd, client._events & Engine.E_READ)
572 client._events &= ~Engine.E_READ
573 del self.reg_clifds[rfd]
574 self.reg_clifds_changed = True
575 self.evlooprefcnt -= refcnt_inc
576
577 wfd = client.writer_fileno()
578 if wfd != None:
579 self._unregister_specific(wfd, client._events & Engine.E_WRITE)
580 client._events &= ~Engine.E_WRITE
581 del self.reg_clifds[wfd]
582 self.reg_clifds_changed = True
583 self.evlooprefcnt -= refcnt_inc
584
585 client._new_events = 0
586 client.registered = False
587 if client.delayable:
588 self.reg_clients -= 1
589
590 - def modify(self, client, setmask, clearmask):
591 """
592 Modify the next loop interest events bitset for a client.
593 """
594 self._debug("MODEV set:0x%x clear:0x%x %s" % (setmask, clearmask,
595 client))
596 client._new_events &= ~clearmask
597 client._new_events |= setmask
598
599 if not client._processing:
600
601 self.reg_clifds_changed = True
602
603 self.set_events(client, client._new_events)
604
606 """Engine-specific register fd for event method."""
607 raise NotImplementedError("Derived classes must implement.")
608
610 """Engine-specific unregister fd method."""
611 raise NotImplementedError("Derived classes must implement.")
612
614 """Engine-specific modify fd for event method."""
615 raise NotImplementedError("Derived classes must implement.")
616
662
664 """
665 Set client reading state.
666 """
667
668 self.modify(client, Engine.E_READ, 0)
669
671 """
672 Set client reading error state.
673 """
674
675 self.modify(client, Engine.E_ERROR, 0)
676
683
690
696
698 """
699 Fire expired timers for processing.
700 """
701 while self.timerq.expired():
702 self.timerq.fire()
703
705 """
706 Start and register all port clients.
707 """
708
709 for port in self._ports:
710 if not port.registered:
711 self._debug("START PORT %s" % port)
712 self.register(port)
713
715 """
716 Start and register all other possible clients, in respect of task fanout.
717 """
718
719 fanout = self.info["fanout"]
720 assert fanout > 0
721 if fanout <= self.reg_clients:
722 return
723
724
725 for client in self._clients:
726 if not client.registered:
727 self._debug("START CLIENT %s" % client.__class__.__name__)
728 self.register(client._start())
729 if fanout <= self.reg_clients:
730 break
731
732 - def run(self, timeout):
762
764 """
765 Peek in ports for possible early pending messages.
766 This method simply tries to read port pipes in non-
767 blocking mode.
768 """
769
770
771 ports = self._ports.copy()
772 for port in ports:
773 try:
774 port._handle_read()
775 except (IOError, OSError), (err, strerr):
776 if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
777
778 return
779
780 raise
781
783 """
784 Engine specific run loop. Derived classes must implement.
785 """
786 raise NotImplementedError("Derived classes must implement.")
787
796
798 """
799 Returns True if the engine has exited the runloop once.
800 """
801 return not self.running and self._exited
802
807