1 #ifndef SZA_UTIL_SERVERTASK_H
2 #define SZA_UTIL_SERVERTASK_H
19 #include "carma/szautil/Runnable.h"
44 virtual ~ServerData() {};
51 class ServerConnection {
56 PipeQueue::MsgQueue msgQueue_;
65 std::vector<unsigned char> bytes_;
69 ServerTask<Msg>* parent_;
88 ServerConnection(
int fd,
89 unsigned readBufSize,
unsigned sendBufSize,
92 handler_.setReadBuffer(0, readBufSize);
93 handler_.setSendBuffer(0, sendBufSize);
98 handler_.installReadHandler(ServerTask::readHandler, (
void*)
this);
99 handler_.installSendHandler(ServerTask::sendHandler, (
void*)
this);
101 handler_.installReadErrorHandler(ServerTask::errHandler, (
void*)
this);
102 handler_.installSendErrorHandler(ServerTask::errHandler, (
void*)
this);
111 ~ServerConnection() {
115 if(handler_.getFd() > 0) {
116 close(handler_.getFd());
120 ::close(handler_.getFd());
131 void setSendBufferSize(
unsigned size) {
132 handler_.setSendBuffer(0, size);
135 void setReadBufferSize(
unsigned size) {
136 handler_.setReadBuffer(0, size);
143 void packClientData(NetDat& dat) {
144 std::vector<unsigned char>& data = dat.getSerializedData();
145 unsigned datSize = data.size();
147 packClientData(&data[0], datSize);
157 void packClientData(
unsigned char* buffer,
unsigned nbyte) {
159 stageClientData(buffer, nbyte);
161 msgQueue_.push(buffer, nbyte);
169 void stageClientData(
unsigned char* buffer,
unsigned datSize) {
174 if(datSize+8 > parent_->sendBufSize_) {
175 parent_->setSendBufSize(datSize + 8);
176 setSendBufferSize(datSize + 8);
181 handler_.getSendStr()->startPut(datSize);
182 handler_.getSendStr()->putChar(datSize, buffer);
183 handler_.getSendStr()->endPut();
191 parent_->GenericTask<Msg>::fdSet_.registerWriteFd(handler_.getSendFd());
199 void checkMsgQueue() {
200 if(!msgQueue_.empty()) {
201 PipeQueue::QueueNode& node = msgQueue_.front();
202 stageClientData(node.buffer_, node.nbyte_);
216 std::list<ServerConnection*> clients_;
222 ServerTask(
bool spawnThread,
int listenPort,
unsigned readBufSize=0,
225 initMembers(listenPort, readBufSize, sendBufSize);
232 virtual ~ServerTask() {
235 if(signalTask_ != 0) {
243 typename std::list<ServerConnection*>::iterator iClient;
244 for(iClient=clients_.begin(); iClient != clients_.end(); iClient++) {
260 virtual void processTaskMsg(
bool* stop) {
268 virtual void serviceMsgQ() {
273 timeOut_.setIntervalInSeconds(0);
274 timeOut_.activate(
true);
280 if((nready=select(GenericTask<Msg>::fdSet_.size(),
281 GenericTask<Msg>::fdSet_.readFdSet(),
282 GenericTask<Msg>::fdSet_.writeFdSet(),
284 timeOut_.tVal())) < 0) {
285 ThrowSysError(
"select()");
302 timeOut_.setIntervalInSeconds(timeOutSeconds_);
312 unsigned readBufSize_;
314 unsigned sendBufSize_;
322 virtual void serviceSelect() {
326 if(GenericTask<Msg>::fdSet_.isSetInRead(GenericTask<Msg>::msgq_.fd())) {
327 processTaskMsg(&stop_);
332 if(listener_ != 0 && GenericTask<Msg>::fdSet_.isSetInRead(listener_->getFd()))
337 checkClientsForReadableData();
341 checkClientsForWritability();
348 virtual void timeOutAction() {};
354 virtual void readClientData(ServerConnection* conn) {};
360 virtual void acceptClientAction(ServerConnection* conn) {};
366 void sendClientData(NetDat& dat, ServerConnection* client) {
371 client->packClientData(dat);
379 typename std::list<ServerConnection*>::iterator iClient;
380 for(iClient=clients_.begin(); iClient != clients_.end(); iClient++) {
381 ServerConnection* client = *iClient;
385 if(client->initialized_)
386 client->packClientData(dat);
391 void setReadBufSize(
unsigned size) {
395 void setSendBufSize(
unsigned size) {
399 void setTimeOutSeconds(
unsigned int seconds) {
400 timeOutSeconds_ = seconds;
405 unsigned int timeOutSeconds_;
410 SignalTask* signalTask_;
420 TcpListener* listener_;
426 void initMembers(
int listenPort,
unsigned readBufSize,
unsigned sendBufSize) {
430 readBufSize_ = readBufSize;
431 sendBufSize_ = sendBufSize;
435 setTimeOutSeconds(1);
439 signalTask_ =
new SignalTask(
true);
443 signalTask_->sendInstallSignalMsg(SIGINT, &shutDown,
this);
455 void listen(
unsigned port,
unsigned nClients = 5) {
457 listener_ =
new TcpListener(port, nClients);
458 GenericTask<Msg>::fdSet_.registerReadFd(listener_->getFd());
466 void acceptConnection() {
472 fd = listener_->acceptConnection(
true);
476 ServerConnection* conn =
new ServerConnection(fd, readBufSize_, sendBufSize_,
this);
480 GenericTask<Msg>::fdSet_.registerReadFd(fd);
484 CTOUT(
"About to call acceptClientAction with conn = " << conn);
485 acceptClientAction(conn);
490 clients_.insert(clients_.begin(), conn);
500 static SIGNALTASK_HANDLER_FN(shutDown) {
501 ServerTask* server = (ServerTask*) args;
502 server->sendStopMsg();
509 void checkClientsForReadableData() {
510 std::vector<ServerConnection*> disconnectedClients_;
512 typename std::list<ServerConnection*>::iterator iClient;
513 for(iClient=clients_.begin(); iClient != clients_.end(); iClient++) {
515 ServerConnection* client = *iClient;
517 if(GenericTask<Msg>::fdSet_.isSetInRead(client->handler_.getReadFd())) {
519 client->handler_.read();
524 if(client->handler_.getReadFd() < 0)
525 disconnectedClients_.push_back(client);
531 typename std::vector<ServerConnection*>::iterator iDisClient;
532 for(iDisClient=disconnectedClients_.begin(); iDisClient != disconnectedClients_.end(); iDisClient++) {
533 ServerConnection* client = *iDisClient;
534 clients_.remove(client);
543 void checkClientsForWritability() {
544 typename std::list<ServerConnection*>::iterator iClient;
545 for(iClient=clients_.begin(); iClient != clients_.end(); iClient++)
547 ServerConnection* client = *iClient;
549 if(GenericTask<Msg>::fdSet_.isSetInWrite(client->handler_.getSendFd()))
550 client->handler_.send();
559 static NET_READ_HANDLER(readHandler) {
560 ServerConnection* conn = (ServerConnection*)arg;
561 conn->parent_->readClientData(conn);
569 static NET_SEND_HANDLER(sendHandler) {
570 ServerConnection* conn = (ServerConnection*)arg;
571 conn->parent_->GenericTask<Msg>::fdSet_.clearFromWriteFdSet(conn->handler_.getFd());
576 conn->initialized_ =
true;
580 conn->sendPending_ =
false;
584 conn->checkMsgQueue();
592 static NET_ERROR_HANDLER(errHandler) {
593 ServerConnection* conn = (ServerConnection*)arg;
594 conn->parent_->GenericTask<Msg>::fdSet_.clearFromReadFdSet(conn->handler_.getReadFd());
595 conn->parent_->GenericTask<Msg>::fdSet_.clearFromWriteFdSet(conn->handler_.getSendFd());
596 conn->handler_.attach(-1);
600 conn->sendPending_ =
false;
610 #endif // End #ifndef SZA_UTIL_SERVERTASK_H
Tagged: Wed Jul 6 13:41:09 PDT 2005.
Tagged: Fri Nov 14 12:39:36 UTC 2003.
Tagged: Sun Apr 4 22:36:40 UTC 2004.
SpawnableTask(bool spawn)
Constructor.
Started: Tue Mar 2 13:43:07 UTC 2004.
virtual void processTaskMsg(bool *stop)
Process a message received on our message queue.
Tagged: Fri Nov 14 12:39:36 UTC 2003.
Tagged: Tue May 2 16:31:46 PDT 2006.
......................................................................
Started: Thu Feb 26 22:08:23 UTC 2004.
Tagged: Fri Nov 14 12:39:34 UTC 2003.
Tagged: Fri Jan 26 16:49:57 NZDT 2007.