1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 A poll() based ClusterShell Engine.
37
38 The poll() system call is available on Linux and BSD.
39 """
40
41 from Engine import *
42
43 from ClusterShell.Worker.EngineClient import EngineClientEOF
44
45 import errno
46 import os
47 import select
48 import signal
49 import sys
50 import time
51 import thread
52
53
55 """
56 Poll Engine
57
58 ClusterShell engine using the select.poll mechanism (Linux poll()
59 syscall).
60 """
62 """
63 Initialize Engine.
64 """
65 Engine.__init__(self, info)
66 try:
67
68 self.polling = select.poll()
69 except AttributeError:
70 print >> sys.stderr, "Error: select.poll() not supported"
71 raise
72
73
74 self.exited = False
75
77 """
78 Engine-specific modifications after a interesting event change for
79 a file descriptor. Called automatically by Engine register/unregister and
80 set_events(). For the poll() engine, it reg/unreg or modifies the event mask
81 associated to a file descriptor.
82 """
83 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, setvalue))
84
85 if setvalue:
86 eventmask = 0
87 if event == Engine.E_READABLE:
88 eventmask = select.POLLIN
89 elif event == Engine.E_WRITABLE:
90 eventmask = select.POLLOUT
91 self.polling.register(fd, eventmask)
92 else:
93 self.polling.unregister(fd)
94
101
108
110 """
111 Pdsh engine run(): start clients and properly get replies
112 """
113 if timeout == 0:
114 timeout = -1
115
116 start_time = time.time()
117
118
119 while self.evlooprefcnt > 0:
120 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
121 (self.evlooprefcnt, self.reg_clifds.keys(), len(self.timerq)))
122 try:
123 timeo = self.timerq.nextfire_delay()
124 if timeout > 0 and timeo >= timeout:
125
126 self.timerq.clear()
127 timeo = timeout
128 elif timeo == -1:
129 timeo = timeout
130
131 self.reg_clifds_changed = False
132 evlist = self.polling.poll(timeo * 1000.0 + 1.0)
133
134 except select.error, (ex_errno, ex_strerror):
135
136 if ex_errno == errno.EINTR:
137 continue
138 elif ex_errno == errno.EINVAL:
139 print >>sys.stderr, \
140 "EnginePoll: please increase RLIMIT_NOFILE"
141 raise
142
143 for fd, event in evlist:
144
145 if event & select.POLLNVAL:
146 raise EngineException("Caught POLLNVAL on fd %d" % fd)
147
148 if self.reg_clifds_changed:
149 self._debug("REG CLIENTS CHANGED - Aborting current evlist")
150
151 break
152
153
154 if not self.reg_clifds.has_key(fd):
155 continue
156
157 client = self.reg_clifds[fd]
158
159
160 client._processing = True
161
162
163 if event & select.POLLERR:
164 self._debug("POLLERR %s" % client)
165 self.unregister_writer(client)
166 client.file_writer.close()
167 client.file_writer = None
168 continue
169
170
171 if event & select.POLLIN:
172 assert client._events & Engine.E_READABLE
173 self.modify(client, 0, Engine.E_READABLE)
174 try:
175 client._handle_read()
176 except EngineClientEOF, e:
177 self._debug("EngineClientEOF %s" % client)
178 self.remove(client)
179 continue
180
181
182
183 elif event & select.POLLHUP:
184 self._debug("POLLHUP fd=%d %s (r%s,w%s)" % (fd, client.__class__.__name__,
185 client.reader_fileno(), client.writer_fileno()))
186 self.remove(client)
187
188
189 if event & select.POLLOUT:
190 self._debug("POLLOUT fd=%d %s (r%s,w%s)" % (fd, client.__class__.__name__,
191 client.reader_fileno(), client.writer_fileno()))
192 assert client._events & Engine.E_WRITABLE
193 self.modify(client, 0, Engine.E_WRITABLE)
194 client._handle_write()
195
196
197 client._processing = False
198
199
200 if client.registered:
201 self.set_events(client, client._new_events)
202
203
204 if timeout > 0 and time.time() >= start_time + timeout:
205 raise EngineTimeoutException()
206
207
208 self.fire_timers()
209
210 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
211 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
212
214 """
215 Returns True if the engine has exited the runloop once.
216 """
217 return not self.running and self.exited
218