1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 EngineClient
37
38 ClusterShell engine's client interface.
39
40 An engine client is similar to a process, you can start/stop it, read data from
41 it and write data to it.
42 """
43
44 import fcntl
45 import popen2
46 import os
47
48 from ClusterShell.Engine.Engine import EngineBaseTimer
49
50
52 """Generic EngineClient exception."""
53
55 """EOF from client."""
56
58 """Base EngineClient error exception."""
59
61 """Operation not supported by EngineClient."""
62
63
65 """
66 Abstract class EngineClient.
67 """
68
69 - def __init__(self, worker, timeout, autoclose):
70 """
71 Initializer. Should be called from derived classes.
72 """
73 EngineBaseTimer.__init__(self, timeout, -1, autoclose)
74
75
76 self._events = 0
77
78 self._new_events = 0
79 self._processing = False
80
81
82 self.registered = False
83
84 self.worker = worker
85
86
87 self._rbuf = ""
88 self._wbuf = ""
89 self._weof = False
90
92 """
93 Fire timeout timer.
94 """
95 if self._engine:
96 self._engine.remove(self, did_timeout=True)
97
99 """
100 Starts client and returns client instance as a convenience.
101 Derived classes must implement.
102 """
103 raise NotImplementedError("Derived classes must implement.")
104
106 """
107 Returns the reader file descriptor as an integer.
108 """
109 raise NotImplementedError("Derived classes must implement.")
110
112 """
113 Returns the writer file descriptor as an integer.
114 """
115 raise NotImplementedError("Derived classes must implement.")
116
118 """
119 Stop this client.
120 """
121 if self._engine:
122 self._engine.remove(self)
123
124 - def _close(self, force, timeout):
125 """
126 Close client. Called by the engine after client has been
127 unregistered. This method should handle all termination types
128 (normal, forced or on timeout).
129 Derived classes must implement.
130 """
131 raise NotImplementedError("Derived classes must implement.")
132
134 """
135 Set reading state.
136 """
137 self._engine.set_reading(self)
138
140 """
141 Set writing state.
142 """
143 self._engine.set_writing(self)
144
146 """
147 Handle a read notification. Called by the engine as the result of an
148 event indicating that a read is available.
149 """
150 raise NotImplementedError("Derived classes must implement.")
151
153 """
154 Handle a write notification. Called by the engine as the result of an
155 event indicating that a write can be performed now.
156 """
157 if len(self._wbuf) > 0:
158
159 c = os.write(self.file_writer.fileno(), self._wbuf)
160
161 self._wbuf = self._wbuf[c:]
162
163 if self._weof and not self._wbuf:
164 self._close_writer()
165 else:
166 self._set_writing()
167
169 """
170 Utility method to launch a command with stdin/stdout file
171 descriptors configured in non-blocking mode.
172 """
173
174 fid = popen2.Popen4(commandlist)
175 fl = fcntl.fcntl(fid.fromchild, fcntl.F_GETFL)
176 fcntl.fcntl(fid.fromchild, fcntl.F_SETFL, os.O_NDELAY)
177 fl = fcntl.fcntl(fid.tochild, fcntl.F_GETFL)
178 fcntl.fcntl(fid.tochild, fcntl.F_SETFL, os.O_NDELAY)
179 return fid
180
182 """
183 Utility method to read client lines
184 """
185
186 readbuf = self._read()
187 assert len(readbuf) > 0, "assertion failed: len(readbuf) > 0"
188
189
190
191
192 buf = self._rbuf + readbuf
193 lines = buf.splitlines(True)
194 self._rbuf = ""
195 for line in lines:
196 if line.endswith('\n'):
197 if line.endswith('\r\n'):
198 yield line[:-2]
199 else:
200
201 yield line[:-1]
202 else:
203
204 self._rbuf = line
205
206
208 """
209 Add some data to be written to the client.
210 """
211 fd = self.writer_fileno()
212 if fd:
213 assert not self.file_writer.closed
214
215 self._wbuf += buf
216 self._set_writing()
217 else:
218
219 self._wbuf += buf
220
222 self._weof = True
223 if not self._wbuf:
224
225 self._close_writer()
226
228 if self.file_writer and not self.file_writer.closed:
229 self._engine.unregister_writer(self)
230 self.file_writer.close()
231 self.file_writer = None
232