1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Interface of underlying Task's Engine.
37
38 An Engine implements a loop your thread enters and uses to call event handlers
39 in response to incoming events (from workers, timers, etc.).
40 """
41
42 from sets import Set
43
44 import heapq
45 import thread
46 import time
47
49 """
50 Base engine exception.
51 """
52
54 """
55 Raised on user abort.
56 """
59
61 """
62 Raised when a timeout is encountered.
63 """
64
66 """
67 Error raised when an illegal operation has been performed.
68 """
69
71 """
72 Error raised when the engine is already running.
73 """
74
75
77 """
78 Abstract class for ClusterShell's engine timer. Such a timer
79 requires a relative fire time (delay) in seconds (as float), and
80 supports an optional repeating interval in seconds (as float too).
81
82 See EngineTimer for more information about ClusterShell timers.
83 """
84
85 - def __init__(self, fire_delay, interval=-1.0, autoclose=False):
86 """
87 Create a base timer.
88 """
89 self.fire_delay = fire_delay
90 self.interval = interval
91 self.autoclose = autoclose
92 self._engine = None
93 self._timercase = None
94
96 """
97 Bind to engine, called by Engine.
98 """
99 if self._engine:
100
101 raise EngineIllegalOperationError("Already bound to engine.")
102
103 self._engine = engine
104
106 """
107 Invalidates a timer object, stopping it from ever firing again.
108 """
109 if self._engine:
110 self._engine.timerq.invalidate(self)
111 self._engine = None
112
114 """
115 Returns a boolean value that indicates whether an EngineTimer
116 object is valid and able to fire.
117 """
118 return self._engine != None
119
121 """
122 Set the next firing delay in seconds for an EngineTimer object.
123
124 The optional paramater `interval' sets the firing interval
125 of the timer. If not specified, the timer fires once and then
126 is automatically invalidated.
127
128 Time values are expressed in second using floating point
129 values. Precision is implementation (and system) dependent.
130
131 It is safe to call this method from the task owning this
132 timer object, in any event handlers, anywhere.
133
134 However, resetting a timer's next firing time may be a
135 relatively expensive operation. It is more efficient to let
136 timers autorepeat or to use this method from the timer's own
137 event handler callback (ie. from its ev_timer).
138 """
139 if not self.is_valid():
140 raise EngineIllegalOperationError("Operation on invalid timer.")
141
142 self.fire_delay = fire_delay
143 self.interval = interval
144 self._engine.timerq.reschedule(self)
145
147 raise NotImplementedError("Derived classes must implement.")
148
149
151 """
152 Concrete class EngineTimer
153
154 An EngineTimer object represents a timer bound to an engine that
155 fires at a preset time in the future. Timers can fire either only
156 once or repeatedly at fixed time intervals. Repeating timers can
157 also have their next firing time manually adjusted.
158
159 A timer is not a real-time mechanism; it fires when the task's
160 underlying engine to which the timer has been added is running and
161 able to check if the timer's firing time has passed.
162 """
163
164 - def __init__(self, fire_delay, interval, autoclose, handler):
165 EngineBaseTimer.__init__(self, fire_delay, interval, autoclose)
166 self.eh = handler
167 assert self.eh != None, "An event handler is needed for timer."
168
170 self.eh._invoke("ev_timer", self)
171
173
175 """
176 Helper class that allows comparisons of fire times, to be easily used
177 in an heapq.
178 """
180 self.client = client
181 self.client._timercase = self
182
183 assert self.client.fire_delay > 0
184 self.fire_date = self.client.fire_delay + time.time()
185
187 if self.fire_date < other.fire_date:
188 return -1
189 elif self.fire_date > other.fire_date:
190 return 1
191 else:
192 return 0
193
194 - def arm(self, client):
195 assert client != None
196 self.client = client
197 self.client._timercase = self
198
199 time_current = time.time()
200 if self.client.fire_delay > 0:
201 self.fire_date = self.client.fire_delay + time_current
202 else:
203 interval = float(self.client.interval)
204 assert interval > 0
205 self.fire_date += interval
206
207
208
209 while self.fire_date < time_current:
210 self.fire_date += interval
211
213 client = self.client
214 client._timercase = None
215 self.client = None
216 return client
217
219 return self.client != None
220
221
223 """
224 Initializer.
225 """
226 self._engine = engine
227 self.timers = []
228 self.armed_count = 0
229
231 """
232 Return the number of active timers.
233 """
234 return self.armed_count
235
237 """
238 Insert and arm a client's timer.
239 """
240
241 if client.fire_delay > 0:
242 heapq.heappush(self.timers, _EngineTimerQ._EngineTimerCase(client))
243 self.armed_count += 1
244 if not client.autoclose:
245 self._engine.evlooprefcnt += 1
246
255
257 """
258 Invalidate client's timer. Current implementation doesn't really remove
259 the timer, but simply flags it as disarmed.
260 """
261 if not client._timercase:
262
263 client.fire_delay = 0
264 client.interval = 0
265 return
266
267 if self.armed_count <= 0:
268 raise ValueError, "Engine client timer not found in timer queue"
269
270 client._timercase.disarm()
271 self.armed_count -= 1
272 if not client.autoclose:
273 self._engine.evlooprefcnt -= 1
274
276 """
277 Dequeue disarmed timers (sort of garbage collection).
278 """
279 while len(self.timers) > 0 and not self.timers[0].armed():
280 heapq.heappop(self.timers)
281
283 """
284 Remove the smallest timer from the queue and fire its associated client.
285 Raise IndexError if the queue is empty.
286 """
287 self._dequeue_disarmed()
288
289 timercase = heapq.heappop(self.timers)
290 client = timercase.disarm()
291
292 client.fire_delay = 0
293 client._fire()
294
295 if client.fire_delay > 0 or client.interval > 0:
296 timercase.arm(client)
297 heapq.heappush(self.timers, timercase)
298 else:
299 self.armed_count -= 1
300 if not client.autoclose:
301 self._engine.evlooprefcnt -= 1
302
304 """
305 Return next timer fire delay (relative time).
306 """
307 self._dequeue_disarmed()
308 if len(self.timers) > 0:
309 return max(0., self.timers[0].fire_date - time.time())
310
311 return -1
312
314 """
315 Has a timer expired?
316 """
317 self._dequeue_disarmed()
318 return len(self.timers) > 0 and \
319 (self.timers[0].fire_date - time.time()) <= 1e-2
320
322 """
323 Stop and clear all timers.
324 """
325 for timer in self.timers:
326 if timer.armed():
327 timer.client.invalidate()
328
329 self.timers = []
330 self.armed_count = 0
331
332
334 """
335 Interface for ClusterShell engine. Subclasses have to implement a runloop
336 listening for client events.
337 """
338
339
340 E_READABLE = 0x1
341 E_WRITABLE = 0x2
342 E_ANY = 0x3
343
345 """
346 Initialize base class.
347 """
348
349 self.info = info
350
351
352 self._clients = Set()
353
354
355
356 self.reg_clients = 0
357
358
359
360 self.reg_clifds = {}
361
362
363
364
365
366
367
368
369
370 self.reg_clifds_changed = False
371
372
373 self.timerq = _EngineTimerQ(self)
374
375
376
377 self.evlooprefcnt = 0
378
379 self.joinable = True
380
381 self.run_lock = thread.allocate_lock()
382 self.start_lock = thread.allocate_lock()
383 self.start_lock.acquire()
384
386 """
387 Get a copy of clients set.
388 """
389 return self._clients.copy()
390
391 - def add(self, client):
392 """
393 Add a client to engine. Subclasses that override this method
394 should call base class method.
395 """
396
397 client._set_engine(self)
398
399
400 self._clients.add(client)
401
402 if self.run_lock.locked():
403
404 self.register(client._start())
405
406 - def remove(self, client, did_timeout=False):
407 """
408 Remove a client from engine. Subclasses that override this
409 method should call base class method.
410 """
411 self._debug("REMOVE %s" % client)
412 self._clients.remove(client)
413
414 if client.registered:
415 self.unregister(client)
416 client._close(force=False, timeout=did_timeout)
417 self.start_all()
418
419 - def clear(self, did_timeout=False):
420 """
421 Remove all clients. Subclasses that override this method should
422 call base class method.
423 """
424 while len(self._clients) > 0:
425 client = self._clients.pop()
426 if client.registered:
427 self.unregister(client)
428 client._close(force=True, timeout=did_timeout)
429
431 """
432 Register a client. Subclasses that override this method should
433 call base class method.
434 """
435 assert client in self._clients
436 assert not client.registered
437
438 rfd = client.reader_fileno()
439 wfd = client.writer_fileno()
440 assert rfd != None or wfd != None
441
442 self._debug("REG %s(r%s,w%s)(autoclose=%s)" % (client.__class__.__name__,
443 rfd, wfd, client.autoclose))
444
445 client._events = 0
446 client.registered = True
447 self.reg_clients += 1
448
449 if client.autoclose:
450 refcnt_inc = 0
451 else:
452 refcnt_inc = 1
453
454 if rfd != None:
455 self.reg_clifds[rfd] = client
456 self.reg_clifds_changed = True
457 client._events |= Engine.E_READABLE
458 self.evlooprefcnt += refcnt_inc
459 self._modify_specific(rfd, Engine.E_READABLE, 1)
460 if wfd != None:
461 self.reg_clifds[wfd] = client
462 self.reg_clifds_changed = True
463 client._events |= Engine.E_WRITABLE
464 self.evlooprefcnt += refcnt_inc
465 self._modify_specific(wfd, Engine.E_WRITABLE, 1)
466
467 client._new_events = client._events
468
469
470 self.timerq.schedule(client)
471
487
489 """
490 Unregister a client. Subclasses that override this method should
491 call base class method.
492 """
493
494 assert client.registered
495 self._debug("UNREG %s (r%s,w%s)" % (client.__class__.__name__,
496 client.reader_fileno(), client.writer_fileno()))
497
498
499 self.timerq.invalidate(client)
500
501 if client.autoclose:
502 refcnt_inc = 0
503 else:
504 refcnt_inc = 1
505
506
507 rfd = client.reader_fileno()
508 if rfd != None:
509 if client._events & Engine.E_READABLE:
510 self._modify_specific(rfd, Engine.E_READABLE, 0)
511 client._events &= ~Engine.E_READABLE
512 del self.reg_clifds[rfd]
513 self.reg_clifds_changed = True
514 self.evlooprefcnt -= refcnt_inc
515
516 wfd = client.writer_fileno()
517 if wfd != None:
518 if client._events & Engine.E_WRITABLE:
519 self._modify_specific(wfd, Engine.E_WRITABLE, 0)
520 client._events &= ~Engine.E_WRITABLE
521 del self.reg_clifds[wfd]
522 self.reg_clifds_changed = True
523 self.evlooprefcnt -= refcnt_inc
524
525 client._new_events = 0
526 client.registered = False
527 self.reg_clients -= 1
528
529 - def modify(self, client, set, clear):
530 """
531 Modify the next loop interest events bitset for a client.
532 """
533 self._debug("MODEV set:0x%x clear:0x%x %s" % (set, clear, client))
534 client._new_events &= ~clear
535 client._new_events |= set
536
537 if not client._processing:
538
539 self.reg_clifds_has_changed = True
540
541 self.set_events(client, client._new_events)
542
583
590
596
598 """
599 Fire expired timers for processing.
600 """
601 while self.timerq.expired():
602 self.timerq.fire()
603
605 """
606 Start and register all possible clients, in respect of task fanout.
607 """
608 fanout = self.info["fanout"]
609 assert fanout > 0
610 if fanout <= self.reg_clients:
611 return
612
613 for client in self._clients:
614 if not client.registered:
615 self._debug("START CLIENT %s" % client.__class__.__name__)
616 self.register(client._start())
617 if fanout <= self.reg_clients:
618 break
619
620 - def run(self, timeout):
621 """
622 Run engine in calling thread.
623 """
624
625 if not self.run_lock.acquire(0):
626 raise EngineAlreadyRunningError()
627
628
629 self.start_all()
630
631
632 self.joinable = True
633 self.start_lock.release()
634
635
636 try:
637 try:
638 self.runloop(timeout)
639 except Exception, e:
640
641 self.clear(isinstance(e, EngineTimeoutException))
642 raise
643 finally:
644
645 self.timerq.clear()
646
647
648 self.joinable = False
649 self.start_lock.acquire()
650 self.run_lock.release()
651
653 """
654 Engine specific run loop. Derived classes must implement.
655 """
656 raise NotImplementedError("Derived classes must implement.")
657
659 """
660 Abort runloop.
661 """
662 if not self.start_lock.locked() and self.joinable:
663 raise EngineAbortException(kill)
664 else:
665 self.clear()
666
668 """
669 Block calling thread until runloop has finished.
670 """
671
672 if not self.start_lock.acquire(0):
673
674 if not self.joinable:
675 return
676 self.start_lock.acquire()
677 self.start_lock.release()
678
679
680 self.run_lock.acquire()
681 self.run_lock.release()
682
686