Package ClusterShell :: Package Engine :: Module Poll
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Poll

  1  # 
  2  # Copyright CEA/DAM/DIF (2007, 2008, 2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id: Poll.py 162 2009-10-28 12:43:05Z st-cea $ 
 34   
 35  """ 
 36  A poll() based ClusterShell Engine. 
 37   
 38  The poll() system call is available on Linux and BSD. 
 39  """ 
 40   
 41  from Engine import * 
 42   
 43  from ClusterShell.Worker.EngineClient import EngineClientEOF 
 44   
 45  import errno 
 46  import os 
 47  import select 
 48  import signal 
 49  import sys 
 50  import time 
 51  import thread 
 52   
 53   
54 -class EnginePoll(Engine):
55 """ 56 Poll Engine 57 58 ClusterShell engine using the select.poll mechanism (Linux poll() 59 syscall). 60 """
61 - def __init__(self, info):
62 """ 63 Initialize Engine. 64 """ 65 Engine.__init__(self, info) 66 try: 67 # get a polling object 68 self.polling = select.poll() 69 except AttributeError: 70 print >> sys.stderr, "Error: select.poll() not supported" 71 raise 72 73 # runloop-has-exited flag 74 self.exited = False
75
76 - def _modify_specific(self, fd, event, setvalue):
77 """ 78 Engine-specific modifications after a interesting event change for 79 a file descriptor. Called automatically by Engine register/unregister and 80 set_events(). For the poll() engine, it reg/unreg or modifies the event mask 81 associated to a file descriptor. 82 """ 83 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, setvalue)) 84 85 if setvalue: 86 eventmask = 0 87 if event == Engine.E_READABLE: 88 eventmask = select.POLLIN 89 elif event == Engine.E_WRITABLE: 90 eventmask = select.POLLOUT 91 self.polling.register(fd, eventmask) 92 else: 93 self.polling.unregister(fd)
94
95 - def set_reading(self, client):
96 """ 97 Set client reading state. 98 """ 99 # listen for readable events 100 self.modify(client, Engine.E_READABLE, 0)
101
102 - def set_writing(self, client):
103 """ 104 Set client writing state. 105 """ 106 # listen for writable events 107 self.modify(client, Engine.E_WRITABLE, 0)
108
109 - def runloop(self, timeout):
110 """ 111 Pdsh engine run(): start clients and properly get replies 112 """ 113 if timeout == 0: 114 timeout = -1 115 116 start_time = time.time() 117 118 # run main event loop... 119 while self.evlooprefcnt > 0: 120 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 121 (self.evlooprefcnt, self.reg_clifds.keys(), len(self.timerq))) 122 try: 123 timeo = self.timerq.nextfire_delay() 124 if timeout > 0 and timeo >= timeout: 125 # task timeout may invalidate clients timeout 126 self.timerq.clear() 127 timeo = timeout 128 elif timeo == -1: 129 timeo = timeout 130 131 self.reg_clifds_changed = False 132 evlist = self.polling.poll(timeo * 1000.0 + 1.0) 133 134 except select.error, (ex_errno, ex_strerror): 135 # might get interrupted by a signal 136 if ex_errno == errno.EINTR: 137 continue 138 elif ex_errno == errno.EINVAL: 139 print >>sys.stderr, \ 140 "EnginePoll: please increase RLIMIT_NOFILE" 141 raise 142 143 for fd, event in evlist: 144 145 if event & select.POLLNVAL: 146 raise EngineException("Caught POLLNVAL on fd %d" % fd) 147 148 if self.reg_clifds_changed: 149 self._debug("REG CLIENTS CHANGED - Aborting current evlist") 150 # Oops, reconsider evlist by calling poll() again. 151 break 152 153 # get client instance 154 if not self.reg_clifds.has_key(fd): 155 continue 156 157 client = self.reg_clifds[fd] 158 159 # process this client 160 client._processing = True 161 162 # check for poll error condition of some sort 163 if event & select.POLLERR: 164 self._debug("POLLERR %s" % client) 165 self.unregister_writer(client) 166 client.file_writer.close() 167 client.file_writer = None 168 continue 169 170 # check for data to read 171 if event & select.POLLIN: 172 assert client._events & Engine.E_READABLE 173 self.modify(client, 0, Engine.E_READABLE) 174 try: 175 client._handle_read() 176 except EngineClientEOF, e: 177 self._debug("EngineClientEOF %s" % client) 178 self.remove(client) 179 continue 180 181 # or check for end of stream (do not handle both at the same time 182 # because handle_read() may perform a partial read) 183 elif event & select.POLLHUP: 184 self._debug("POLLHUP fd=%d %s (r%s,w%s)" % (fd, client.__class__.__name__, 185 client.reader_fileno(), client.writer_fileno())) 186 self.remove(client) 187 188 # check for writing 189 if event & select.POLLOUT: 190 self._debug("POLLOUT fd=%d %s (r%s,w%s)" % (fd, client.__class__.__name__, 191 client.reader_fileno(), client.writer_fileno())) 192 assert client._events & Engine.E_WRITABLE 193 self.modify(client, 0, Engine.E_WRITABLE) 194 client._handle_write() 195 196 # post processing 197 client._processing = False 198 199 # apply any changes occured during processing 200 if client.registered: 201 self.set_events(client, client._new_events) 202 203 # check for task runloop timeout 204 if timeout > 0 and time.time() >= start_time + timeout: 205 raise EngineTimeoutException() 206 207 # process clients timeout 208 self.fire_timers() 209 210 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 211 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
212
213 - def exited(self):
214 """ 215 Returns True if the engine has exited the runloop once. 216 """ 217 return not self.running and self.exited
218