CARMA C++
IPQbufferBase.h
Go to the documentation of this file.
1 #ifndef CARMA_UTIL_IPQBUFFERBASE_H
2 #define CARMA_UTIL_IPQBUFFERBASE_H
3 
19 #include "carma/util/IPQinterface.h"
20 
21 #include <string>
22 
23 namespace carma {
24  namespace util {
25 
26  // Forward declaration
27  class PthreadRWLock;
28  class ScopedFlockManager;
29 
77 class IPQbufferBase : public carma::util::IPQinterface {
78 protected:
100  IPQbufferBase( void * localElement,
101  int elementSize,
102  const ::std::string& filename,
103  bool isCreator = false,
104  int nElements = 0,
105  unsigned int testOffset = 0 );
106 public:
113  virtual ~IPQbufferBase() ;
114 
118  ::std::string getFileName( ) const;
119 
126  int getQueueSize() const ;
127 
132  int getElementSize() const ;
133 
139  int getNumAvailable() const ;
140 
145  bool isEmpty() const ;
146 
152  bool isDataAvailable() const;
153 
157  void setNoneAvailable();
158 
170  unsigned int read();
171 
172 
173 private:
174 
191  unsigned int readNoLock();
192 
193 public:
194 
205  bool readNewest();
206 
218 
219 private:
220 
230  bool readNewestNoLock();
231 
232 public:
233 
240  unsigned int getLostElementCount() const;
241 
243  unsigned int getPutOffset() const ;
245  unsigned int getGetOffset() const ;
247  unsigned int getMaxOffset() const ;
248 
249 protected:
250 
256  void init() ;
257 
263  void write() ;
264 
273  virtual bool openBuffer() = 0;
274 
283  virtual bool createBuffer() = 0;
284 
289  void trimShmemFilename();
290 
294  ::std::string getTrimmedFilename();
295 
296 private:
297  void copyMemory( void * const dest,
298  const void * const src,
299  const size_t bytes ) const;
300 
307  bool readHeader(bool keepOpen) ;
308 
312  bool doesElementSizeMatch() ;
313 
317  bool hasQueueChanged() ;
318 
322  bool writeHeader() ;
323 
330  void createSemFilename();
331 
335  bool openSemaphore();
336 
340  bool createSemaphore();
341 
342  // Internal routine that handles wraparound.
343  // Will lock the flock manager if the read/write pointers are adjacent
344  // returns adjusted readoffset
345  unsigned int adjustReadOffset(bool& lock, ScopedFlockManager& flockManager,
346  unsigned int readOffset);
347 
348  unsigned int internalRead(bool lock) ;
349  bool internalReadNewest(bool lock) ;
350 
351 private:
352  class BlockedOnSemaphoreQuitRequestHandler;
353 
354  friend class BlockedOnSemaphoreQuitRequestHandler;
355 
356  static const int MEMORYMAPPEDFILE_MAGIC = 0x00000100;
357 
358  struct ControlBlock {
359  // An index for the next element to be written.
360  // Steadily increments through the full range of a long,
361  // with modulos used to put it into the finite queue
362  volatile unsigned int putOffset;
363  };
364 
365  struct MapfileHeader {
366  int magic;
367  unsigned int headerSize;
368  unsigned int controlBlockOffset;
369  unsigned int controlBlockSize;
370  // Offset in bytes from the start of the shmem to start of queue area
371  unsigned int queueOffset;
372  // The size of a single element in the queue, in bytes
373  unsigned int queueElementSize;
374  // The number of elements allocated for the queue
375  unsigned int queueElements;
376  };
377 
378 protected:
379  const int protectionMask_;
380  const int openMask_;
381 
382 private:
383  const ::std::string filename_; // As input
384 
385  // The local copy of the data in an element.
386  // It has been read from the queue and placed here - or it is
387  // the copy that will be transferred into the queue on writes.
388  void * const localElement_;
389  const size_t localElementSize_;
390  const bool isCreator_;
391 
392 protected:
393  bool debug_;
394  int fileDescriptor_;
395 
396 private:
397  int semDescriptor_; // Semaphore descriptor...
398 
399  void* mmapAddr_;
400  size_t mmapSize_;
401 
402  size_t controlBlockOffset_;
403  volatile ControlBlock* controlBlock_;
404 
405  // Offset in bytes from the start of the shmem area to start of queue area
406  size_t queueOffset_;
407  // Size of an element in bytes
408  size_t queueElementSize_;
409  // Number of elements allocated for the queue
410  unsigned int queueElements_;
411  char* queue_; // Absolute pointe to beginning of the queue
412 
413  bool isNewFile_;
414  bool isFileOpen_;
415 
416  // Offset where next data will be read
417  unsigned int getOffset_;
418  unsigned int nLostElements_;
419 
420  ::std::string trimmedFilename_;
421  ::std::string displayFilename_;
422  ::std::string semFilename_; // Semaphore filename
423 
424  MapfileHeader header_;
425  PthreadRWLock& rwLock_;
426  unsigned int maxOffset_; // Max for put/get pointers (wraparound)
427  // To help resolve emptiness at wraparound time
428  bool empty_;
429  unsigned int testOffset_;
430 };
431 
432 } } // End namespace carma::util
433 
434 #endif // CARMA_UTIL_IPQBUFFERBASE_H
unsigned int getMaxOffset() const
Useful only for debugging.
int getNumAvailable() const
Gets the number of unread elements available in the queue.
bool readNewestConditionalCopy()
Reads and copies the newest element from the queue into the localElement buffer, but only if has not ...
IPQ (InterProcessQueue) provides a generic way for information to be shared between processes or thre...
Definition: IPQbufferBase.h:77
A simple wrapper class that makes use of ::pthread_rwlock_t easier in a C++ world.
Definition: PthreadRWLock.h:46
::std::string getTrimmedFilename()
Get the trimmed filename.
bool isEmpty() const
Checks to see if the queue is empty.
bool readNewest()
Get the newest element from the queue (with locking).
unsigned int getLostElementCount() const
Get the number of elements lost on the last read.
IPQbufferBase(void *localElement, int elementSize, const ::std::string &filename, bool isCreator=false, int nElements=0, unsigned int testOffset=0)
Constructor for a read/write queue.
virtual ~IPQbufferBase()
Destructor Making this d&#39;tor virtual causes the d&#39;tors of inheriting classes to be called first when ...
::std::string getFileName() const
Get the filename.
void trimShmemFilename()
Shared memory filenames must contain only one &#39;/&#39; and it must be the first character; this routine fi...
unsigned int getGetOffset() const
Useful only for debugging.
virtual bool openBuffer()=0
Open an existing file or shared memory.
unsigned int getPutOffset() const
Used internally, but made public for debugging.
unsigned int read()
Read the oldest unread element from the queue.
void setNoneAvailable()
Sets the number of available (unread) elements to zero (catches up)
int getQueueSize() const
Returns the allocated size of the queue.
int getElementSize() const
Returns the size of each element in the queue in bytes.
bool isDataAvailable() const
Checks to see if there are any unread elements (would a read not block?).
virtual bool createBuffer()=0
Create a new file or shared memory.
void write()
Put an element into the queue.
void init()
This does all the real constructor work.