13 #include <sys/socket.h>
15 #include <sys/types.h>
18 #include "carma/szautil/IoLock.h"
169 void raise(std::string name,
int sigNo);
220 virtual void run(
void);
282 fdSet_.zeroReadFdSet();
283 fdSet_.registerReadFd(msgq_.fd());
296 fdSet_.zeroReadFdSet();
297 fdSet_.registerReadFd(msgq_.fd());
310 thread_->setRunState(
false);
320 for(
unsigned ithread=0; ithread < threads_.size(); ithread++)
321 if(threads_[ithread] != 0)
322 delete threads_[ithread];
354 ThrowError(
"Received NULL file descriptor");
358 while(!stop && (nready=select(fdSet_.size(), fdSet_.readFdSet(),
359 NULL, NULL, NULL)) > 0) {
366 DBPRINT(
true, Debug::DEBUG2,
"About to call processTaskMsg: "
367 <<
"nready = " << nready);
370 processTaskMsg(&stop);
384 switch (msg.genericMsgType_) {
386 respondToHeartBeat();
416 thread_->setRunState(
true);
426 msg.genericMsgType_ = Msg::RESTART;
442 msg.genericMsgType_ = Msg::STOP;
458 msg.genericMsgType_ = Msg::HEARTBEAT;
495 while(threadsNeedStarting()) {
496 unsigned order = getMinStartOrder();
497 startThread(arg, order);
509 for(
unsigned ithread=0; ithread < threads_.size(); ithread++)
510 if(!threads_[ithread]->isRunning() && threads_[ithread]->startOrder()==order) {
512 threads_[ithread]->start(arg);
516 if(threads_[ithread]->wasError_) {
517 ThrowError(
"Error in startup function for thread: "
518 << threads_[ithread]->name());
529 for(
unsigned ithread=0; ithread < threads_.size(); ithread++)
530 if(!threads_[ithread]->isRunning())
541 for(
unsigned ithread=0; ithread < threads_.size(); ithread++)
542 if(threads_[ithread]->isRunning())
556 for(
unsigned ithread=0; ithread < threads_.size(); ithread++) {
557 if(!threads_[ithread]->isRunning()) {
560 minOrder = threads_[ithread]->startOrder();
563 minOrder = threads_[ithread]->startOrder() < minOrder ?
564 threads_[ithread]->startOrder() : minOrder;
582 for(
unsigned ithread=0; ithread < threads_.size(); ithread++) {
583 if(threads_[ithread]->isRunning()) {
586 minOrder = threads_[ithread]->cancelOrder();
589 minOrder = threads_[ithread]->cancelOrder() < minOrder ?
590 threads_[ithread]->cancelOrder() : minOrder;
612 DBPRINT(
true, Debug::DEBUG7,
"Managing " << threads_.size()
615 while(threadsNeedCancelling()) {
616 unsigned order = getMinCancelOrder();
620 DBPRINT(
true, Debug::DEBUG7,
"Leaving cancelThreads");
636 for(
int ithread=threads_.size()-1; ithread >=0; ithread--)
637 if(threads_[ithread]->isRunning() && threads_[ithread]->cancelOrder() == order)
638 threads_[ithread]->cancel();
647 for(
unsigned ithread=0; ithread < threads_.size(); ithread++)
648 if(threads_[ithread]->isPingable()) {
649 threads_[ithread]->setRunState(
false);
650 threads_[ithread]->ping(arg);
663 for(
unsigned ithread=0; ithread < threads_.size(); ithread++) {
664 if(threads_[ithread]->matchName(name))
665 return threads_[ithread];
668 ThrowError(
"No matching thread found");
679 for(
unsigned ithread=0; ithread < threads_.size(); ithread++) {
680 if(threads_[ithread]->matchName(name)) {
681 threads_[ithread]->raise(sigNo);
686 ReportError(
"No matching thread found");
695 for(
unsigned ithread=0; ithread < threads_.size(); ithread++) {
696 if(!threads_[ithread]->isRunning()) {
697 ReportSimpleError(
"No heartbeat response from thread "
698 << threads_[ithread]->strName());
703 << threads_[ithread]->strName()
755 #endif // End #ifndef
void raise(std::string name, int sigNo)
Raise a signal to a named thread.
virtual ~GenericTask()
Making the destructor virtual ensures that the right destructor will be called for classes which inhe...
virtual void installTimer(Msg *msg)
Respond to a message to install a timer.
bool threadsNeedCancelling()
Return true if there are still uncancelled threads.
void startThread(void *arg, unsigned order)
Start the next thread with the specified order.
bool threadsNeedStarting()
Return true if there are still unstarted threads.
void pingThreads(void *arg)
A method to ping all pingable threads managed by this task.
Started: Wed Jan 14 11:00:24 PST 2004.
Thread * getThread(std::string name)
A method to start all threads managed by this task running.
virtual void fwdTaskMsg(Msg *msg)
Forward a message to this task via its message queue.
virtual void serviceMsgQ(void)
This routine will simply block, servicing messages on the message queue.
void cancelThreads()
A method to cancel threads managed by this task.
void cancelThread(unsigned order)
Cancel the next thread with the specified order.
virtual void addHandler(Msg *msg)
Respond to a message to add/remove a handler.
unsigned getMinCancelOrder()
Method to return the minimum cancel order for threads which are not yet running.
bool threadsAreRunning()
A method to test if the threads managed by this task are running.
void sendRestartMsg()
Method to send a stop message to this task via its message queue.
virtual void restart(void)
Restart this thread.
virtual void installSignal(Msg *msg)
Respond to a message to install a signal.
void sendHeartBeatMsg()
Method to send a heartbeat message to this task via its message queue.
All tasks will have the following functionality:
Tagged: Fri Nov 14 12:39:37 UTC 2003.
unsigned getMinStartOrder()
Method to return the minimum start order for threads which are not yet running.
virtual void enableTimer(Msg *msg)
Respond to a message to enable/disable a timer.
void shutdownConnection(int fd)
Shutdown a connection.
void sendTaskMsg(Msg *msg)
Send a message to this task via its message queue.
Thread * thread_
If this GenericTask object was instantiated by another thread, keep a pointer to it here...
PipeQ< Msg > msgq_
A message queue, implemented as a pipe, by which we can communicate with this task.
Template class for a message queue implemented using pipes.
std::vector< Thread * > threads_
A vector of Thread objects managed by this task.
virtual void processTaskMsg(bool *stop)
Process a message received on our message queue.
virtual void respondToHeartBeat()
Respond to a heartbeat message.
GenericTask()
Protected constructor ensures that the base class cannot be instantiated.
Started: Sun Dec 14 07:19:50 UTC 2003.
void startThreads(void *arg)
A method to start all threads.
sza::util::FdSet fdSet_
A set of file descriptors associated with this task.
virtual void run(void)
Force inheritors to define a run method.
Started: Thu Feb 26 22:08:23 UTC 2004.
virtual void processMsg(Msg *msg)
This method should be defined by each inheriting task to process its own task-specific messages...
void sendStopMsg()
Method to send a stop message to this task via its message queue.
Tagged: Fri Nov 14 12:39:33 UTC 2003.
Define a class to encapsulate thread handling.
Tagged: Fri Nov 14 12:39:35 UTC 2003.