Package ClusterShell :: Package Worker :: Module EngineClient
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.EngineClient

  1  # 
  2  # Copyright CEA/DAM/DIF (2009) 
  3  #  Contributor: Stephane THIELL <stephane.thiell@cea.fr> 
  4  # 
  5  # This file is part of the ClusterShell library. 
  6  # 
  7  # This software is governed by the CeCILL-C license under French law and 
  8  # abiding by the rules of distribution of free software.  You can  use, 
  9  # modify and/ or redistribute the software under the terms of the CeCILL-C 
 10  # license as circulated by CEA, CNRS and INRIA at the following URL 
 11  # "http://www.cecill.info". 
 12  # 
 13  # As a counterpart to the access to the source code and  rights to copy, 
 14  # modify and redistribute granted by the license, users are provided only 
 15  # with a limited warranty  and the software's author,  the holder of the 
 16  # economic rights,  and the successive licensors  have only  limited 
 17  # liability. 
 18  # 
 19  # In this respect, the user's attention is drawn to the risks associated 
 20  # with loading,  using,  modifying and/or developing or reproducing the 
 21  # software by the user in light of its specific status of free software, 
 22  # that may mean  that it is complicated to manipulate,  and  that  also 
 23  # therefore means  that it is reserved for developers  and  experienced 
 24  # professionals having in-depth computer knowledge. Users are therefore 
 25  # encouraged to load and test the software's suitability as regards their 
 26  # requirements in conditions enabling the security of their systems and/or 
 27  # data to be ensured and,  more generally, to use and operate it in the 
 28  # same conditions as regards security. 
 29  # 
 30  # The fact that you are presently reading this means that you have had 
 31  # knowledge of the CeCILL-C license and that you accept its terms. 
 32  # 
 33  # $Id$ 
 34   
 35  """ 
 36  EngineClient 
 37   
 38  ClusterShell engine's client interface. 
 39   
 40  An engine client is similar to a process, you can start/stop it, read data from 
 41  it and write data to it. 
 42  """ 
 43   
 44  import fcntl 
 45  import popen2 
 46  import os 
 47   
 48  from ClusterShell.Engine.Engine import EngineBaseTimer 
 49   
 50   
51 -class EngineClientException(Exception):
52 """Generic EngineClient exception."""
53
54 -class EngineClientEOF(EngineClientException):
55 """EOF from client."""
56
57 -class EngineClientError(EngineClientException):
58 """Base EngineClient error exception."""
59
60 -class EngineClientNotSupportedError(EngineClientError):
61 """Operation not supported by EngineClient."""
62 63
64 -class EngineClient(EngineBaseTimer):
65 """ 66 Abstract class EngineClient. 67 """ 68
69 - def __init__(self, worker, timeout, autoclose):
70 """ 71 Initializer. Should be called from derived classes. 72 """ 73 EngineBaseTimer.__init__(self, timeout, -1, autoclose) 74 75 # engine-friendly variables 76 self._events = 0 # current configured set of interesting 77 # events (read, write) for client 78 self._new_events = 0 # new set of interesting events 79 self._processing = False # engine is working on us 80 81 # read-only public 82 self.registered = False # registered on engine or not 83 84 self.worker = worker 85 86 # initialize read and write buffers 87 self._rbuf = "" 88 self._wbuf = "" 89 self._weof = False # write-ends notification
90
91 - def _fire(self):
92 """ 93 Fire timeout timer. 94 """ 95 if self._engine: 96 self._engine.remove(self, did_timeout=True)
97
98 - def _start(self):
99 """ 100 Starts client and returns client instance as a convenience. 101 Derived classes must implement. 102 """ 103 raise NotImplementedError("Derived classes must implement.")
104
105 - def reader_fileno(self):
106 """ 107 Returns the reader file descriptor as an integer. 108 """ 109 raise NotImplementedError("Derived classes must implement.")
110
111 - def writer_fileno(self):
112 """ 113 Returns the writer file descriptor as an integer. 114 """ 115 raise NotImplementedError("Derived classes must implement.")
116
117 - def abort(self):
118 """ 119 Stop this client. 120 """ 121 if self._engine: 122 self._engine.remove(self)
123
124 - def _close(self, force, timeout):
125 """ 126 Close client. Called by the engine after client has been 127 unregistered. This method should handle all termination types 128 (normal, forced or on timeout). 129 Derived classes must implement. 130 """ 131 raise NotImplementedError("Derived classes must implement.")
132
133 - def _set_reading(self):
134 """ 135 Set reading state. 136 """ 137 self._engine.set_reading(self)
138
139 - def _set_writing(self):
140 """ 141 Set writing state. 142 """ 143 self._engine.set_writing(self)
144
145 - def _handle_read(self):
146 """ 147 Handle a read notification. Called by the engine as the result of an 148 event indicating that a read is available. 149 """ 150 raise NotImplementedError("Derived classes must implement.")
151
152 - def _handle_write(self):
153 """ 154 Handle a write notification. Called by the engine as the result of an 155 event indicating that a write can be performed now. 156 """ 157 if len(self._wbuf) > 0: 158 # write syscall 159 c = os.write(self.file_writer.fileno(), self._wbuf) 160 # dequeue written buffer 161 self._wbuf = self._wbuf[c:] 162 # check for possible ending 163 if self._weof and not self._wbuf: 164 self._close_writer() 165 else: 166 self._set_writing()
167
168 - def _exec_nonblock(self, commandlist):
169 """ 170 Utility method to launch a command with stdin/stdout file 171 descriptors configured in non-blocking mode. 172 """ 173 # Launch process in non-blocking mode 174 fid = popen2.Popen4(commandlist) 175 fl = fcntl.fcntl(fid.fromchild, fcntl.F_GETFL) 176 fcntl.fcntl(fid.fromchild, fcntl.F_SETFL, os.O_NDELAY) 177 fl = fcntl.fcntl(fid.tochild, fcntl.F_GETFL) 178 fcntl.fcntl(fid.tochild, fcntl.F_SETFL, os.O_NDELAY) 179 return fid
180
181 - def _readlines(self):
182 """ 183 Utility method to read client lines 184 """ 185 # read a chunk of data, may raise eof 186 readbuf = self._read() 187 assert len(readbuf) > 0, "assertion failed: len(readbuf) > 0" 188 189 # Current version implements line-buffered reads. If needed, we could 190 # easily provide direct, non-buffered, data reads in the future. 191 192 buf = self._rbuf + readbuf 193 lines = buf.splitlines(True) 194 self._rbuf = "" 195 for line in lines: 196 if line.endswith('\n'): 197 if line.endswith('\r\n'): 198 yield line[:-2] # trim CRLF 199 else: 200 # trim LF 201 yield line[:-1] # trim LF 202 else: 203 # keep partial line in buffer 204 self._rbuf = line
205 # breaking here 206
207 - def _write(self, buf):
208 """ 209 Add some data to be written to the client. 210 """ 211 fd = self.writer_fileno() 212 if fd: 213 assert not self.file_writer.closed 214 # TODO: write now if ready 215 self._wbuf += buf 216 self._set_writing() 217 else: 218 # bufferize until pipe is ready 219 self._wbuf += buf
220
221 - def _set_write_eof(self):
222 self._weof = True 223 if not self._wbuf: 224 # sendq empty, try to close writer now 225 self._close_writer()
226
227 - def _close_writer(self):
228 if self.file_writer and not self.file_writer.closed: 229 self._engine.unregister_writer(self) 230 self.file_writer.close() 231 self.file_writer = None
232