CARMA C++
GenericTask.h
Go to the documentation of this file.
1 #ifndef GENERICTASK_H
2 #define GENERICTASK_H
3 
11 #include <vector>
12 #include <unistd.h>
13 #include <sys/socket.h> // shutdown()
14 #include <sys/time.h>
15 #include <sys/types.h>
16 
17 #include "carma/szautil/Debug.h"
18 #include "carma/szautil/IoLock.h"
19 #include "carma/szautil/Exception.h" // Definition of Error macro
20 #include "carma/szautil/FdSet.h"
22 #include "carma/szautil/PipeQ.h"
23 #include "carma/szautil/Thread.h"
24 
25 namespace sza {
26  namespace util {
27 
31  template<class Msg>
32  class GenericTask {
33 
34  public:
35 
40  void sendRestartMsg();
41 
46  void sendStopMsg();
47 
52  void sendHeartBeatMsg();
53 
68  virtual void fwdTaskMsg(Msg* msg);
69 
70  protected:
71 
76  GenericTask();
77 
81  GenericTask(Thread* thread);
82 
88  virtual ~GenericTask();
89 
96  void sendTaskMsg(Msg* msg);
97 
105 
109  std::vector<Thread*> threads_;
110 
119  void startThreads(void* arg);
120 
124  void startThread(void* arg, unsigned order);
125 
130  unsigned getMinStartOrder();
131 
135  bool threadsNeedStarting();
136 
140  void cancelThreads();
141 
145  void cancelThread(unsigned order);
146 
151  unsigned getMinCancelOrder();
152 
156  bool threadsNeedCancelling();
157 
164  void pingThreads(void* arg);
165 
169  void raise(std::string name, int sigNo);
170 
175  Thread* getThread(std::string name);
176 
181  bool threadsAreRunning();
182 
188 
192  sza::util::FdSet fdSet_;
193 
194  // Fd utilities
195 
201  void shutdownConnection(int fd);
202 
210  virtual void serviceMsgQ(void);
211 
215  virtual void restart(void);
216 
220  virtual void run(void);
221 
227  virtual void processTaskMsg(bool* stop);
228 
235  virtual void processMsg(Msg* msg);
236 
242  virtual void respondToHeartBeat();
243 
244  //------------------------------------------------------------
245  // Signals and timers
246  //------------------------------------------------------------
247 
251  virtual void installTimer(Msg* msg);
252 
256  virtual void installSignal(Msg* msg);
257 
261  virtual void enableTimer(Msg* msg);
262 
266  virtual void addHandler(Msg* msg);
267 
268  }; // End class GenericTask
269 
270  // Template functions must be defined in the header file.
271 
275  template<class Msg>
277  {
278  thread_ = 0;
279 
280  // Register the message queue fd to be watched for readability
281 
282  fdSet_.zeroReadFdSet();
283  fdSet_.registerReadFd(msgq_.fd());
284  }
285 
289  template<class Msg>
291  {
292  thread_ = thread;
293 
294  // Register the message queue fd to be watched for readability
295 
296  fdSet_.zeroReadFdSet();
297  fdSet_.registerReadFd(msgq_.fd());
298  }
299 
303  template<class Msg>
305  {
306  // Broadcast to other threads that this task is shutting
307  // down.
308 
309  if(thread_ != 0)
310  thread_->setRunState(false);
311 
312  // Cancel all threads managed by this task.
313 
314  cancelThreads();
315 
316  // And delete any allocated memory. NB: the default
317  // destructor for vector<Thread*> will not delete memory
318  // pointed to by its elements.
319 
320  for(unsigned ithread=0; ithread < threads_.size(); ithread++)
321  if(threads_[ithread] != 0)
322  delete threads_[ithread];
323  }
324 
328  template<class Msg>
330 
335  template<class Msg>
337  {
338  serviceMsgQ();
339  }
340 
347  template<class Msg>
349  {
350  bool stop=false;
351  int nready; // number of file descriptors ready for reading
352 
353  if(msgq_.fd() < 0)
354  ThrowError("Received NULL file descriptor");
355 
356  // Loop, checking the message queue file descriptor for readability
357 
358  while(!stop && (nready=select(fdSet_.size(), fdSet_.readFdSet(),
359  NULL, NULL, NULL)) > 0) {
360 
361  // If no file descriptors were ready, throw an exception
362 
363  if(nready != 1)
364  ThrowError("Error");
365 
366  DBPRINT(true, Debug::DEBUG2, "About to call processTaskMsg: "
367  << "nready = " << nready);
368 
369 
370  processTaskMsg(&stop);
371  };
372  };
373 
377  template<class Msg>
379  {
380  Msg msg;
381 
382  msgq_.readMsg(&msg);
383 
384  switch (msg.genericMsgType_) {
385  case Msg::HEARTBEAT: // Is this a heartbeat request?
386  respondToHeartBeat();
387  break;
388  case Msg::RESTART: // Is this a request to restart?
389  restart();
390  break;
391  case Msg::STOP: // Did we receive a request to shut
392  // down?
393  *stop = true;
394  break;
395  default: // Else forward this message to the task-specific
396  // process method
397  processMsg(&msg);
398  break;
399  }
400  };
401 
405  template<class Msg>
406  void GenericTask<Msg>::processMsg(Msg* msg) {};
407 
412  template<class Msg>
414  {
415  if(thread_ != 0)
416  thread_->setRunState(true);
417  }
418 
422  template<class Msg>
424  {
425  Msg msg;
426  msg.genericMsgType_ = Msg::RESTART;
427 
428  // We use fwdTaskMsg() here instead of sendTaskMsg() so
429  // that inheriting tasks can control their own messages
430  // routing.
431 
432  fwdTaskMsg(&msg);
433  }
434 
438  template<class Msg>
440  {
441  Msg msg;
442  msg.genericMsgType_ = Msg::STOP;
443 
444  // We use fwdTaskMsg() here instead of sendTaskMsg() so
445  // that inheriting tasks can control their own messages
446  // routing.
447 
448  fwdTaskMsg(&msg);
449  };
450 
454  template<class Msg>
456  {
457  Msg msg;
458  msg.genericMsgType_ = Msg::HEARTBEAT;
459 
460  // We use fwdTaskMsg() here instead of sendTaskMsg() so
461  // that inheriting tasks can control their own message
462  // routing.
463 
464  fwdTaskMsg(&msg);
465  };
466 
471  template<class Msg>
473  {
474  msgq_.sendMsg(msg);
475  }
476 
481  template<class Msg>
483  {
484  msgq_.sendMsg(msg);
485  }
486 
490  template<class Msg>
492  {
493  // Start threads in priority order
494 
495  while(threadsNeedStarting()) {
496  unsigned order = getMinStartOrder();
497  startThread(arg, order);
498  }
499  }
500 
504  template<class Msg>
505  void GenericTask<Msg>::startThread(void* arg, unsigned order)
506  {
507  // Search through the list in first-in, first-started order
508 
509  for(unsigned ithread=0; ithread < threads_.size(); ithread++)
510  if(!threads_[ithread]->isRunning() && threads_[ithread]->startOrder()==order) {
511 
512  threads_[ithread]->start(arg);
513 
514  // Check the error code from the start up function
515 
516  if(threads_[ithread]->wasError_) {
517  ThrowError("Error in startup function for thread: "
518  << threads_[ithread]->name());
519  }
520  }
521  }
522 
526  template<class Msg>
528  {
529  for(unsigned ithread=0; ithread < threads_.size(); ithread++)
530  if(!threads_[ithread]->isRunning())
531  return true;
532  return false;
533  }
534 
538  template<class Msg>
540  {
541  for(unsigned ithread=0; ithread < threads_.size(); ithread++)
542  if(threads_[ithread]->isRunning())
543  return true;
544  return false;
545  }
546 
550  template<class Msg>
552  {
553  bool first=true;
554  unsigned minOrder=0;
555 
556  for(unsigned ithread=0; ithread < threads_.size(); ithread++) {
557  if(!threads_[ithread]->isRunning()) {
558 
559  if(first) {
560  minOrder = threads_[ithread]->startOrder();
561  first = false;
562  } else {
563  minOrder = threads_[ithread]->startOrder() < minOrder ?
564  threads_[ithread]->startOrder() : minOrder;
565  }
566  }
567 
568  }
569 
570  return minOrder;
571  }
572 
576  template<class Msg>
578  {
579  bool first=true;
580  unsigned minOrder=0;
581 
582  for(unsigned ithread=0; ithread < threads_.size(); ithread++) {
583  if(threads_[ithread]->isRunning()) {
584 
585  if(first) {
586  minOrder = threads_[ithread]->cancelOrder();
587  first = false;
588  } else {
589  minOrder = threads_[ithread]->cancelOrder() < minOrder ?
590  threads_[ithread]->cancelOrder() : minOrder;
591  }
592  }
593 
594  }
595 
596  return minOrder;
597  }
598 
607  template<class Msg>
609  {
610  // Cancel threads in first-started, last-canceled order.
611 
612  DBPRINT(true, Debug::DEBUG7, "Managing " << threads_.size()
613  << " threads");
614 
615  while(threadsNeedCancelling()) {
616  unsigned order = getMinCancelOrder();
617  cancelThread(order);
618  }
619 
620  DBPRINT(true, Debug::DEBUG7, "Leaving cancelThreads");
621  }
622 
631  template<class Msg>
632  void GenericTask<Msg>::cancelThread(unsigned order)
633  {
634  // Cancel threads in first-started, last-canceled order.
635 
636  for(int ithread=threads_.size()-1; ithread >=0; ithread--)
637  if(threads_[ithread]->isRunning() && threads_[ithread]->cancelOrder() == order)
638  threads_[ithread]->cancel();
639  }
640 
644  template<class Msg>
646  {
647  for(unsigned ithread=0; ithread < threads_.size(); ithread++)
648  if(threads_[ithread]->isPingable()) {
649  threads_[ithread]->setRunState(false);
650  threads_[ithread]->ping(arg);
651  }
652  }
653 
660  template<class Msg>
662  {
663  for(unsigned ithread=0; ithread < threads_.size(); ithread++) {
664  if(threads_[ithread]->matchName(name))
665  return threads_[ithread];
666  }
667 
668  ThrowError("No matching thread found");
669 
670  return 0;
671  };
672 
673 
674  // Raise a signal to a thread.
675 
676  template<class Msg>
677  void GenericTask<Msg>::raise(std::string name, int sigNo)
678  {
679  for(unsigned ithread=0; ithread < threads_.size(); ithread++) {
680  if(threads_[ithread]->matchName(name)) {
681  threads_[ithread]->raise(sigNo);
682  return;
683  }
684  }
685 
686  ReportError("No matching thread found");
687  };
688 
689 
690  // Check the running status of all threads
691 
692  template<class Msg>
694  {
695  for(unsigned ithread=0; ithread < threads_.size(); ithread++) {
696  if(!threads_[ithread]->isRunning()) {
697  ReportSimpleError("No heartbeat response from thread "
698  << threads_[ithread]->strName());
699 
700  return false;
701  } else {
702  COUT("Thread "
703  << threads_[ithread]->strName()
704  << " is running.");
705  }
706  }
707  return true;
708  }
709 
710  //------------------------------------------------------------
711  // Signals and timers
712  //------------------------------------------------------------
713 
717  template<class Msg>
719 
723  template<class Msg>
725 
729  template<class Msg>
730  void GenericTask<Msg>::enableTimer(Msg* msg) {};
731 
735  template<class Msg>
736  void GenericTask<Msg>::addHandler(Msg* msg) {};
737 
738  // Utiliies
739 
743  template<class Msg>
745  {
746  if(fd >= 0) {
747  fdSet_.clear(fd);
748  ::shutdown(fd, 2);
749  ::close(fd);
750  }
751  }
752  }; // End namespace util
753 }; // End namespace sza
754 
755 #endif // End #ifndef
void raise(std::string name, int sigNo)
Raise a signal to a named thread.
Definition: GenericTask.h:677
virtual ~GenericTask()
Making the destructor virtual ensures that the right destructor will be called for classes which inhe...
Definition: GenericTask.h:304
virtual void installTimer(Msg *msg)
Respond to a message to install a timer.
Definition: GenericTask.h:718
bool threadsNeedCancelling()
Return true if there are still uncancelled threads.
Definition: GenericTask.h:539
void startThread(void *arg, unsigned order)
Start the next thread with the specified order.
Definition: GenericTask.h:505
bool threadsNeedStarting()
Return true if there are still unstarted threads.
Definition: GenericTask.h:527
void pingThreads(void *arg)
A method to ping all pingable threads managed by this task.
Definition: GenericTask.h:645
Started: Wed Jan 14 11:00:24 PST 2004.
Thread * getThread(std::string name)
A method to start all threads managed by this task running.
Definition: GenericTask.h:661
virtual void fwdTaskMsg(Msg *msg)
Forward a message to this task via its message queue.
Definition: GenericTask.h:482
virtual void serviceMsgQ(void)
This routine will simply block, servicing messages on the message queue.
Definition: GenericTask.h:348
void cancelThreads()
A method to cancel threads managed by this task.
Definition: GenericTask.h:608
void cancelThread(unsigned order)
Cancel the next thread with the specified order.
Definition: GenericTask.h:632
virtual void addHandler(Msg *msg)
Respond to a message to add/remove a handler.
Definition: GenericTask.h:736
unsigned getMinCancelOrder()
Method to return the minimum cancel order for threads which are not yet running.
Definition: GenericTask.h:577
bool threadsAreRunning()
A method to test if the threads managed by this task are running.
Definition: GenericTask.h:693
void sendRestartMsg()
Method to send a stop message to this task via its message queue.
Definition: GenericTask.h:423
virtual void restart(void)
Restart this thread.
Definition: GenericTask.h:329
virtual void installSignal(Msg *msg)
Respond to a message to install a signal.
Definition: GenericTask.h:724
void sendHeartBeatMsg()
Method to send a heartbeat message to this task via its message queue.
Definition: GenericTask.h:455
All tasks will have the following functionality:
Definition: GenericTask.h:32
Tagged: Fri Nov 14 12:39:37 UTC 2003.
unsigned getMinStartOrder()
Method to return the minimum start order for threads which are not yet running.
Definition: GenericTask.h:551
virtual void enableTimer(Msg *msg)
Respond to a message to enable/disable a timer.
Definition: GenericTask.h:730
void shutdownConnection(int fd)
Shutdown a connection.
Definition: GenericTask.h:744
void sendTaskMsg(Msg *msg)
Send a message to this task via its message queue.
Definition: GenericTask.h:472
Thread * thread_
If this GenericTask object was instantiated by another thread, keep a pointer to it here...
Definition: GenericTask.h:104
PipeQ< Msg > msgq_
A message queue, implemented as a pipe, by which we can communicate with this task.
Definition: GenericTask.h:187
Template class for a message queue implemented using pipes.
Definition: PipeQ.h:20
std::vector< Thread * > threads_
A vector of Thread objects managed by this task.
Definition: GenericTask.h:109
virtual void processTaskMsg(bool *stop)
Process a message received on our message queue.
Definition: GenericTask.h:378
virtual void respondToHeartBeat()
Respond to a heartbeat message.
Definition: GenericTask.h:413
GenericTask()
Protected constructor ensures that the base class cannot be instantiated.
Definition: GenericTask.h:276
Started: Sun Dec 14 07:19:50 UTC 2003.
void startThreads(void *arg)
A method to start all threads.
Definition: GenericTask.h:491
sza::util::FdSet fdSet_
A set of file descriptors associated with this task.
Definition: GenericTask.h:192
virtual void run(void)
Force inheritors to define a run method.
Definition: GenericTask.h:336
Started: Thu Feb 26 22:08:23 UTC 2004.
virtual void processMsg(Msg *msg)
This method should be defined by each inheriting task to process its own task-specific messages...
Definition: GenericTask.h:406
void sendStopMsg()
Method to send a stop message to this task via its message queue.
Definition: GenericTask.h:439
Tagged: Fri Nov 14 12:39:33 UTC 2003.
Define a class to encapsulate thread handling.
Definition: Thread.h:57
Tagged: Fri Nov 14 12:39:35 UTC 2003.