CARMA C++
ServerTask.h
1 #ifndef SZA_UTIL_SERVERTASK_H
2 #define SZA_UTIL_SERVERTASK_H
3 
11 #include <list>
12 #include <string>
13 #include <stdio.h>
14 
15 #include "carma/szautil/FdSet.h"
16 #include "carma/szautil/NetDat.h"
19 #include "carma/szautil/Runnable.h"
24 #include "carma/szautil/TimeOut.h"
25 
26 namespace sza {
27  namespace util {
28 
29  class Port;
30  class SignalTask;
31  class TcpListener;
32 
33  template <class Msg>
34  class ServerTask : public sza::util::SpawnableTask<Msg> {
35  public:
36 
37  //=======================================================================
38  // Class for managing client data for this server
39  //=======================================================================
40 
41  class ServerData {
42  public:
43  ServerData() {};
44  virtual ~ServerData() {};
45  };
46 
47  //=======================================================================
48  // Class for managing client-specific connections
49  //=======================================================================
50 
51  class ServerConnection {
52  public:
53 
54  // A queue of messages to be send to this client
55 
56  PipeQueue::MsgQueue msgQueue_;
57 
58  // A Handler for sending/receiving messages to/from this client
59 
60  NetHandler handler_;
61 
62  // A temporary buffer for storing data received from this
63  // client
64 
65  std::vector<unsigned char> bytes_;
66 
67  // A handle to our parent
68 
69  ServerTask<Msg>* parent_;
70 
71  // Client-specific data
72 
73  ServerData* data_;
74 
75  // True when this connection has been initialized, whatever
76  // that means for a particular type of server
77 
78  bool initialized_;
79 
80  // True when a send is pending
81 
82  bool sendPending_;
83 
84  //------------------------------------------------------------
85  // Methods of this class
86  //------------------------------------------------------------
87 
88  ServerConnection(int fd,
89  unsigned readBufSize, unsigned sendBufSize,
90  ServerTask* parent) {
91  handler_.attach(fd);
92  handler_.setReadBuffer(0, readBufSize);
93  handler_.setSendBuffer(0, sendBufSize);
94  parent_ = parent;
95  initialized_ = false;
96  sendPending_ = false;
97 
98  handler_.installReadHandler(ServerTask::readHandler, (void*)this);
99  handler_.installSendHandler(ServerTask::sendHandler, (void*)this);
100 
101  handler_.installReadErrorHandler(ServerTask::errHandler, (void*)this);
102  handler_.installSendErrorHandler(ServerTask::errHandler, (void*)this);
103 
104  data_ = 0;
105  };
106 
107  //------------------------------------------------------------
108  // Destructor for ServerConnection class
109  //------------------------------------------------------------
110 
111  ~ServerConnection() {
112 
113  // If any handlers were attached, detach them now
114 
115  if(handler_.getFd() > 0) {
116  close(handler_.getFd());
117 
118  handler_.attach(-1);
119 
120  ::close(handler_.getFd());
121  }
122 
123  // Delete any data associated with this connection too
124 
125  if(data_) {
126  delete data_;
127  data_ = 0;
128  }
129  };
130 
131  void setSendBufferSize(unsigned size) {
132  handler_.setSendBuffer(0, size);
133  };
134 
135  void setReadBufferSize(unsigned size) {
136  handler_.setReadBuffer(0, size);
137  };
138 
139  //------------------------------------------------------------
140  // Private method to pack data intended for a client
141  //------------------------------------------------------------
142 
143  void packClientData(NetDat& dat) {
144  std::vector<unsigned char>& data = dat.getSerializedData();
145  unsigned datSize = data.size();
146 
147  packClientData(&data[0], datSize);
148  };
149 
150  //------------------------------------------------------------
151  // Private method to pack data intended for a client. If our
152  // message queue for this client is currently empty, stage it
153  // directly into the network handler, else push it onto the
154  // message queue for later sending.
155  //------------------------------------------------------------
156 
157  void packClientData(unsigned char* buffer, unsigned nbyte) {
158  if(!sendPending_) {
159  stageClientData(buffer, nbyte);
160  } else {
161  msgQueue_.push(buffer, nbyte);
162  }
163  };
164 
165  //------------------------------------------------------------
166  // Private method to pack data intended for a client
167  //------------------------------------------------------------
168 
169  void stageClientData(unsigned char* buffer, unsigned datSize) {
170 
171  // Resize the send buffer if this object is larger than the
172  // current send buffer size
173 
174  if(datSize+8 > parent_->sendBufSize_) {
175  parent_->setSendBufSize(datSize + 8);
176  setSendBufferSize(datSize + 8);
177  }
178 
179  // Now send the data
180 
181  handler_.getSendStr()->startPut(datSize);
182  handler_.getSendStr()->putChar(datSize, buffer);
183  handler_.getSendStr()->endPut();
184 
185  // Set our pending flag to true
186 
187  sendPending_ = true;
188 
189  // And register this client fd to be watched for writability
190 
191  parent_->GenericTask<Msg>::fdSet_.registerWriteFd(handler_.getSendFd());
192  };
193 
194  //------------------------------------------------------------
195  // Check this client's message queue for pending messages. If
196  // the queue is now empty, stage the next message to be sent
197  //------------------------------------------------------------
198 
199  void checkMsgQueue() {
200  if(!msgQueue_.empty()) {
201  PipeQueue::QueueNode& node = msgQueue_.front();
202  stageClientData(node.buffer_, node.nbyte_);
203  msgQueue_.pop();
204  }
205  };
206 
207  };
208 
209  //=======================================================================
210  // Methods of ServerTask class
211  //=======================================================================
212 
216  std::list<ServerConnection*> clients_;
217 
218  //------------------------------------------------------------
219  // Constructor for serial connections
220  //------------------------------------------------------------
221 
222  ServerTask(bool spawnThread, int listenPort, unsigned readBufSize=0,
223  unsigned sendBufSize=0) : SpawnableTask<Msg>(spawnThread)
224  {
225  initMembers(listenPort, readBufSize, sendBufSize);
226  };
227 
228  //------------------------------------------------------------
229  // Destructor
230  //------------------------------------------------------------
231 
232  virtual ~ServerTask() {
233  // Shut down the signal task
234 
235  if(signalTask_ != 0) {
236  delete signalTask_;
237  signalTask_ = 0;
238  }
239 
240  // Free any memory allocated in this class
241 
242 
243  typename std::list<ServerConnection*>::iterator iClient;
244  for(iClient=clients_.begin(); iClient != clients_.end(); iClient++) {
245  delete *iClient;
246  }
247 
248  // Free the listener
249 
250  if(listener_ != 0) {
251  delete listener_;
252  listener_ = 0;
253  }
254  };
255 
256  //------------------------------------------------------------
257  // Process task messages
258  //------------------------------------------------------------
259 
260  virtual void processTaskMsg(bool* stop) {
262  };
263 
264  //------------------------------------------------------------
265  // Block in select
266  //------------------------------------------------------------
267 
268  virtual void serviceMsgQ() {
269  int nready=0;
270 
271  // On entry to the loop, timeout immediately
272 
273  timeOut_.setIntervalInSeconds(0);
274  timeOut_.activate(true);
275 
276  do {
277 
278  // Block in select() until one or more file descriptors are readable
279 
280  if((nready=select(GenericTask<Msg>::fdSet_.size(),
281  GenericTask<Msg>::fdSet_.readFdSet(),
282  GenericTask<Msg>::fdSet_.writeFdSet(),
283  NULL,
284  timeOut_.tVal())) < 0) {
285  ThrowSysError("select()");
286  }
287 
288  if(nready > 0) {
289 
290  // Service a select()able event
291 
292  serviceSelect();
293 
294  } else {
295 
296  // Do whatever it is we are supposed to do on timeout
297 
298  timeOutAction();
299 
300  // And reset the timeout
301 
302  timeOut_.setIntervalInSeconds(timeOutSeconds_);
303  timeOut_.reset();
304  }
305 
306  } while(!stop_);
307 
308  };
309 
310  protected:
311 
312  unsigned readBufSize_;
313 
314  unsigned sendBufSize_;
315 
316  TimeOut timeOut_;
317 
318  //------------------------------------------------------------
319  // Service a select()able event
320  //------------------------------------------------------------
321 
322  virtual void serviceSelect() {
323 
324  // Check our message queue for task messages
325 
326  if(GenericTask<Msg>::fdSet_.isSetInRead(GenericTask<Msg>::msgq_.fd())) {
327  processTaskMsg(&stop_);
328  }
329 
330  // Service requests received over the socket connection
331 
332  if(listener_ != 0 && GenericTask<Msg>::fdSet_.isSetInRead(listener_->getFd()))
333  acceptConnection();
334 
335  // Check connected clients for data
336 
337  checkClientsForReadableData();
338 
339  // Check connected clients for sendable data
340 
341  checkClientsForWritability();
342  };
343 
344  //------------------------------------------------------------
345  // Method called when we time out in our select loop
346  //------------------------------------------------------------
347 
348  virtual void timeOutAction() {};
349 
350  //------------------------------------------------------------
351  // Method called when data have been completely read from a client
352  //------------------------------------------------------------
353 
354  virtual void readClientData(ServerConnection* conn) {};
355 
356  //------------------------------------------------------------
357  // Method called immediately after a client has connected
358  //------------------------------------------------------------
359 
360  virtual void acceptClientAction(ServerConnection* conn) {};
361 
362  //------------------------------------------------------------
363  // Method to send data to all connected clients
364  //------------------------------------------------------------
365 
366  void sendClientData(NetDat& dat, ServerConnection* client) {
367 
368  // If a client was specified, send only to that client.
369 
370  if(client) {
371  client->packClientData(dat);
372 
373  // Else send to all clients, but only clients that are
374  // initialized. This is to prevent a server from packing data to
375  // all clients that may overwrite the initialization data that a
376  // client is waiting for
377 
378  } else {
379  typename std::list<ServerConnection*>::iterator iClient;
380  for(iClient=clients_.begin(); iClient != clients_.end(); iClient++) {
381  ServerConnection* client = *iClient;
382 
383  // Only send to this client if the client is initialized
384 
385  if(client->initialized_)
386  client->packClientData(dat);
387  }
388  }
389  };
390 
391  void setReadBufSize(unsigned size) {
392  readBufSize_ = size;
393  };
394 
395  void setSendBufSize(unsigned size) {
396  sendBufSize_ = size;
397  };
398 
399  void setTimeOutSeconds(unsigned int seconds) {
400  timeOutSeconds_ = seconds;
401  };
402 
403  // The timeout for our select loop
404 
405  unsigned int timeOutSeconds_;
406 
410  SignalTask* signalTask_;
411 
415  bool stop_;
416 
420  TcpListener* listener_;
421 
422  //------------------------------------------------------------
423  // Initialize members
424  //------------------------------------------------------------
425 
426  void initMembers(int listenPort, unsigned readBufSize, unsigned sendBufSize) {
427  listener_ = 0;
428  signalTask_ = 0;
429  stop_ = false;
430  readBufSize_ = readBufSize;
431  sendBufSize_ = sendBufSize;
432 
433  // We will set the timeout to 1 second for our select() loop
434 
435  setTimeOutSeconds(1);
436 
437  // Finally, spawn a thread for managing signal handling
438 
439  signalTask_ = new SignalTask(true);
440 
441  // And install a signal handler for SIGINT
442 
443  signalTask_->sendInstallSignalMsg(SIGINT, &shutDown, this);
444 
445  // Start listening on the requested port
446 
447  listen(listenPort);
448  };
449 
450  //------------------------------------------------------------
451  // Set the port number on which we should listen for connection
452  // requests. Also sets the queue length
453  //------------------------------------------------------------
454 
455  void listen(unsigned port, unsigned nClients = 5) {
456  if(nClients > 0) {
457  listener_ = new TcpListener(port, nClients);
458  GenericTask<Msg>::fdSet_.registerReadFd(listener_->getFd());
459  }
460  };
461 
462  //------------------------------------------------------------
463  // Accept a client connection
464  //------------------------------------------------------------
465 
466  void acceptConnection() {
467  int fd = -1;
468 
469  // Allow the caller to connect. The fd returned will be configured
470  // for blocking I/O.
471 
472  fd = listener_->acceptConnection(true);
473 
474  // Insert a new connection into the list
475 
476  ServerConnection* conn = new ServerConnection(fd, readBufSize_, sendBufSize_, this);
477 
478  // And register the descriptor to be watched for input
479 
480  GenericTask<Msg>::fdSet_.registerReadFd(fd);
481 
482  // Do anything inheritors define when a client connects
483 
484  CTOUT("About to call acceptClientAction with conn = " << conn);
485  acceptClientAction(conn);
486 
487  // Only after acceptClientAction is done should we attempt to insert
488  // the client in the list
489 
490  clients_.insert(clients_.begin(), conn);
491  };
492 
493 
494  private:
495 
496  //------------------------------------------------------------
497  // A shutdown method
498  //------------------------------------------------------------
499 
500  static SIGNALTASK_HANDLER_FN(shutDown) {
501  ServerTask* server = (ServerTask*) args;
502  server->sendStopMsg();
503  };
504 
505  //------------------------------------------------------------
506  // Check clients for data to be read
507  //------------------------------------------------------------
508 
509  void checkClientsForReadableData() {
510  std::vector<ServerConnection*> disconnectedClients_;
511 
512  typename std::list<ServerConnection*>::iterator iClient;
513  for(iClient=clients_.begin(); iClient != clients_.end(); iClient++) {
514 
515  ServerConnection* client = *iClient;
516 
517  if(GenericTask<Msg>::fdSet_.isSetInRead(client->handler_.getReadFd())) {
518 
519  client->handler_.read();
520 
521  // If after processing messages from this client, the client is
522  // disconnected, mark it for removal
523 
524  if(client->handler_.getReadFd() < 0)
525  disconnectedClients_.push_back(client);
526  }
527  }
528 
529  // Finally, remove any clients that were disconnected after reading
530 
531  typename std::vector<ServerConnection*>::iterator iDisClient;
532  for(iDisClient=disconnectedClients_.begin(); iDisClient != disconnectedClients_.end(); iDisClient++) {
533  ServerConnection* client = *iDisClient;
534  clients_.remove(client);
535  delete client;
536  }
537  };
538 
539  //------------------------------------------------------------
540  // Check clients for writability
541  //------------------------------------------------------------
542 
543  void checkClientsForWritability() {
544  typename std::list<ServerConnection*>::iterator iClient;
545  for(iClient=clients_.begin(); iClient != clients_.end(); iClient++)
546  {
547  ServerConnection* client = *iClient;
548 
549  if(GenericTask<Msg>::fdSet_.isSetInWrite(client->handler_.getSendFd()))
550  client->handler_.send();
551  }
552  };
553 
554  //------------------------------------------------------------
555  // Static method to be called when a message is fully read froym
556  // a client
557  //------------------------------------------------------------
558 
559  static NET_READ_HANDLER(readHandler) {
560  ServerConnection* conn = (ServerConnection*)arg;
561  conn->parent_->readClientData(conn);
562  };
563 
564  //------------------------------------------------------------
565  // Static method to be called when a message is fully sent to a
566  // client
567  //------------------------------------------------------------
568 
569  static NET_SEND_HANDLER(sendHandler) {
570  ServerConnection* conn = (ServerConnection*)arg;
571  conn->parent_->GenericTask<Msg>::fdSet_.clearFromWriteFdSet(conn->handler_.getFd());
572 
573  // Only after the first send is completed can we consider the client
574  // initialized
575 
576  conn->initialized_ = true;
577 
578  // Set our pending flag to false
579 
580  conn->sendPending_ = false;
581 
582  // Check msg queue for other queued messages
583 
584  conn->checkMsgQueue();
585  };
586 
587  //------------------------------------------------------------
588  // Static method to be called when a message is fully read from
589  // a client
590  //------------------------------------------------------------
591 
592  static NET_ERROR_HANDLER(errHandler) {
593  ServerConnection* conn = (ServerConnection*)arg;
594  conn->parent_->GenericTask<Msg>::fdSet_.clearFromReadFdSet(conn->handler_.getReadFd());
595  conn->parent_->GenericTask<Msg>::fdSet_.clearFromWriteFdSet(conn->handler_.getSendFd());
596  conn->handler_.attach(-1);
597 
598  // Set our pending flag to false
599 
600  conn->sendPending_ = false;
601  };
602 
603  }; // End class Server
604 
605  } // End namespace util
606 } // End namespace sza
607 
608 
609 
610 #endif // End #ifndef SZA_UTIL_SERVERTASK_H
Tagged: Wed Jul 6 13:41:09 PDT 2005.
Tagged: Fri Nov 14 12:39:36 UTC 2003.
Tagged: Sun Apr 4 22:36:40 UTC 2004.
SpawnableTask(bool spawn)
Constructor.
Definition: SpawnableTask.h:43
Started: Tue Mar 2 13:43:07 UTC 2004.
virtual void processTaskMsg(bool *stop)
Process a message received on our message queue.
Definition: GenericTask.h:378
Tagged: Fri Nov 14 12:39:36 UTC 2003.
Tagged: Tue May 2 16:31:46 PDT 2006.
......................................................................
Definition: SpawnableTask.h:34
Started: Thu Feb 26 22:08:23 UTC 2004.
Tagged: Fri Nov 14 12:39:34 UTC 2003.
Tagged: Fri Jan 26 16:49:57 NZDT 2007.