CARMA C++
ClientTask.h
Go to the documentation of this file.
1 #ifndef SZA_UTIL_CLIENTTASK_H
2 #define SZA_UTIL_CLIENTTASK_H
3 
12 #include "carma/szautil/FdSet.h"
13 #include "carma/szautil/NetDat.h"
15 #include "carma/szautil/Runnable.h"
18 #include "carma/szautil/TimeVal.h"
19 
20 namespace sza {
21  namespace util {
22 
23  template <class Msg>
24  class ClientTask : public sza::util::SpawnableTask<Msg> {
25  public:
26 
30  ClientTask(bool spawn, std::string host, unsigned connectPort,
31  unsigned readBufSize=0, unsigned sendBufSize=0) :
32  SpawnableTask<Msg>(spawn) {
33  initMembers(host, connectPort, readBufSize, sendBufSize);
34  };
35 
39  virtual ~ClientTask() {
40  disconnect();
41  };
42 
43  //-----------------------------------------------------------------------
44  // Initialization method
45  //-----------------------------------------------------------------------
46 
47  void initMembers(std::string host, unsigned port,
48  unsigned readBufSize, unsigned sendBufSize) {
49  tcp_.setHost(host);
50  tcp_.setPort(port);
51 
52  handler_.installReadHandler(readHandler, (void*)this);
53  handler_.installSendHandler(sendHandler, (void*)this);
54 
55  handler_.installReadErrorHandler(errHandler, (void*)this);
56  handler_.installSendErrorHandler(errHandler, (void*)this);
57 
58  stop_ = false;
59 
60  timeOut_.setSeconds(0);
61  timeOutPtr_ = timeOut_.timeVal();
62  };
63 
64  //-----------------------------------------------------------------------
65  // Disconnect from server
66  //-----------------------------------------------------------------------
67 
68  void disconnect() {
69 
70  tcp_.disconnect();
71 
72  GenericTask<Msg>::fdSet_.clearFromReadFdSet(handler_.getReadFd());
73  GenericTask<Msg>::fdSet_.clearFromWriteFdSet(handler_.getSendFd());
74 
75  handler_.attach(-1);
76 
77  // And set a timer to reconnect!
78 
79  timeOut_.setSeconds(1);
80  timeOutPtr_ = timeOut_.timeVal();
81  };
82 
83  //-----------------------------------------------------------------------
84  // Connect to server
85  //-----------------------------------------------------------------------
86 
87  bool connect() {
88  if(tcp_.connectToServer(true) > 0) {
89  handler_.attach(tcp_.getFd());
90  GenericTask<Msg>::fdSet_.registerReadFd(handler_.getReadFd());
91  timeOutPtr_ = NULL;
92  return true;
93  } else {
94  timeOut_.setSeconds(1);
95  timeOut_.reset();
96  return false;
97  }
98  };
99 
100  //-----------------------------------------------------------------------
101  // Send data to the server
102  //-----------------------------------------------------------------------
103 
104  void sendServerData(NetDat& dat) {
105  std::vector<unsigned char>& data = dat.getSerializedData();
106  unsigned datSize = data.size();
107 
108  // Resize the send buffer if this object is larger than the
109  // current send buffer size
110 
111  if(datSize+8 > sendBufSize_)
112  setSendBufSize(datSize + 8);
113 
114  handler_.getSendStr()->startPut(data.size());
115  handler_.getSendStr()->putChar(data.size(), &data[0]);
116  handler_.getSendStr()->endPut();
117 
118  // And register this client fd to be watched for writability
119 
120  GenericTask<Msg>::fdSet_.registerWriteFd(handler_.getSendFd());
121  };
122 
123  //-----------------------------------------------------------------------
124  // Static method to be called when a message is fully read
125  // from the server
126  //-----------------------------------------------------------------------
127 
128  static NET_READ_HANDLER(readHandler) {
129  ClientTask* client = (ClientTask*)arg;
130  client->readServerData(client->handler_);
131  };
132 
133  //-----------------------------------------------------------------------
134  // Static method to be called when a message is fully sent to
135  // the server
136  //-----------------------------------------------------------------------
137 
138  static NET_SEND_HANDLER(sendHandler) {
139  ClientTask* client = (ClientTask*)arg;
140  client->fdSet_.clearFromWriteFdSet(client->handler_.getSendFd());
141  };
142 
143  //-----------------------------------------------------------------------
144  // Static method to be called when an error occurs reading or
145  // sending data
146  //-----------------------------------------------------------------------
147 
148  static NET_ERROR_HANDLER(errHandler) {
149  ClientTask* client = (ClientTask*)arg;
150  client->disconnect();
151  };
152 
153  //-----------------------------------------------------------------------
154  // Block in select
155  //-----------------------------------------------------------------------
156 
157  virtual void serviceMsgQ() {
158 
159  int nready=0;
160 
161  do {
162 
163  // Block in select() until one or more file descriptors are readable
164 
165  if((nready=select(GenericTask<Msg>::fdSet_.size(),
166  GenericTask<Msg>::fdSet_.readFdSet(),
167  GenericTask<Msg>::fdSet_.writeFdSet(),
168  NULL, timeOutPtr_)) < 0)
169  {
170  ThrowSysError("select()");
171  }
172 
173  if(nready > 0) {
174 
175  // read data received over the socket connection
176 
177  if(GenericTask<Msg>::fdSet_.isSetInRead(handler_.getReadFd()))
178  handler_.read();
179 
180  // send data over the socket connection
181 
182  if(GenericTask<Msg>::fdSet_.isSetInWrite(handler_.getSendFd()))
183  handler_.send();
184 
185  // Else check our message queue for task messages
186 
187  if(GenericTask<Msg>::fdSet_.isSetInRead(GenericTask<Msg>::msgq_.fd())) {
188  processTaskMsg(&stop_);
189  }
190 
191  } else
192  connect();
193 
194  } while(!stop_);
195  };
196 
197  void setReadBufSize(unsigned size) {
198  handler_.setReadBuffer(0, size);
199  readBufSize_ = size;
200  };
201 
202  void setSendBufSize(unsigned size) {
203  handler_.setSendBuffer(0, size);
204  sendBufSize_ = size;
205  };
206 
207  void readServerData(NetHandler& handler) {
208  int size;
209 
210  handler.getReadStr()->startGet(&size);
211 
212  // Changing this now to allow for variable network object sizes.
213  // The network buffer can change size if a message larger than the
214  // previously allocated buffer size is encountered, therefore our
215  // byte array will be resized accordingly.
216  //
217  // Note that the network buffer only changes size to accomodate
218  // larger messages, so that we do not reallocate just because a
219  // shorter message was encountered.
220 
221  if(size > bytes_.size()) {
222  bytes_.resize(size);
223  }
224 
225  handler.getReadStr()->getChar(size, &bytes_[0]);
226  handler.getReadStr()->endGet();
227 
228  sizeInBytesOfLastMessage_ = size;
229 
230  processServerData();
231  };
232 
233  virtual void processTaskMsg(bool* stop) {
235  }
236 
237  virtual void processServerData() {COUT("Inside base-class procesServerData");};
238 
239  TimeVal timeOut_;
240  struct timeval* timeOutPtr_;
241 
242  // A byte array into which serialized data will be returned
243 
244  std::vector<unsigned char> bytes_;
245 
246  // The size in bytes of the last message read. This is to allow
247  // for the case where a message is smaller than the allocated
248  // bytes_ array.
249 
250  unsigned sizeInBytesOfLastMessage_;
251 
252  unsigned sendBufSize_;
253  unsigned readBufSize_;
254 
255  bool stop_;
256 
257  // The connection manager
258 
259  TcpClient tcp_;
260 
261  NetHandler handler_;
262 
263  }; // End class ClientTask
264 
265  } // End namespace util
266 } // End namespace sza
267 
268 
269 #endif // End #ifndef SZA_UTIL_CLIENTTASK_H
Tagged: Wed Jul 6 13:41:09 PDT 2005.
Tagged: Sun Apr 4 22:36:40 UTC 2004.
Tagged: Fri Nov 14 12:39:38 UTC 2003.
SpawnableTask(bool spawn)
Constructor.
Definition: SpawnableTask.h:43
virtual void processTaskMsg(bool *stop)
Process a message received on our message queue.
Definition: GenericTask.h:378
Started: Sat Mar 6 14:44:17 UTC 2004.
......................................................................
Definition: SpawnableTask.h:34
Started: Thu Feb 26 22:08:23 UTC 2004.
Tagged: Fri Nov 14 12:39:33 UTC 2003.
Tagged: Fri Jan 26 16:49:57 NZDT 2007.