ARPDAS_QNX6 1.0
DQ.cc
Go to the documentation of this file.
00001 /* DataQueue.c */
00002 #include "DQ.h"
00003 #include "nortlib.h"
00004 #include "nl_assert.h"
00005 
00006 /**
00007  * Base class for dq_data_ref and dq_tstamp_ref
00008  * These make up part of the control structure of data_queue.
00009  */
00010 dq_ref::dq_ref(dqtype mytype) {
00011   next_dqr = 0;
00012   type = mytype;
00013 }
00014 
00015 /**
00016  * Convenience function to append an item to the end of the linked list and return the new item
00017  */
00018 dq_ref *dq_ref::next(dq_ref *dqr) {
00019   next_dqr = dqr;
00020   return dqr;
00021 }
00022 
00023 dq_data_ref::dq_data_ref(mfc_t MFCtr, int mfrow, int Qrow_in, int nrows_in )
00024       : dq_ref(dq_data) {
00025   MFCtr_start = MFCtr_next = MFCtr;
00026   row_start = row_next = mfrow;
00027   Qrow = Qrow_in;
00028   n_rows = 0;
00029   append_rows( nrows_in );
00030 }
00031 
00032 void dq_data_ref::append_rows( int nrows ) {
00033   row_next += nrows;
00034   MFCtr_next += row_next/tm_info.nrowminf;
00035   row_next = row_next % tm_info.nrowminf;
00036   n_rows += nrows;
00037 }
00038 
00039 dq_tstamp_ref::dq_tstamp_ref( mfc_t MFCtr, time_t time ) : dq_ref(dq_tstamp) {
00040   TS.mfc_num = MFCtr;
00041   TS.secs = time;
00042 }
00043 
00044 /**
00045   * data_queue base class constructor.
00046   * Determines the output_tm_type and allocates the queue storage.
00047   */
00048 data_queue::data_queue( int n_Qrows, int low_water ) {
00049   total_Qrows = n_Qrows;
00050   dq_low_water = low_water;
00051   if ( low_water > n_Qrows )
00052     nl_error( 3, "low_water must be <= n_Qrows" );
00053 
00054   raw = 0;
00055   row = 0;
00056   first = last = 0;
00057   full = false;
00058   last_dqr = first_dqr = 0;
00059 }
00060 
00061 /**
00062  * General DG initialization. Assumes tm_info structure has been defined.
00063  * Establishes the connection to the TMbfr, specifying the O_NONBLOCK option for collection.
00064  * Initializes the queue itself.
00065  * Creates dispatch queue and registers "DG/cmd" device and initializes timer.
00066  */
00067 void data_queue::init() {
00068   // Determine the output_tm_type
00069   nbQrow = tmi(nbrow);
00070   tm_info.nrowminf = tmi(nbminf)/tmi(nbrow);
00071   if (tm_info.nrowminf > 2) {
00072     output_tm_type = TMTYPE_DATA_T2;
00073     nbDataHdr = 10;
00074   } else if ( tmi(mfc_lsb)==0 && tmi(mfc_msb)==1 ) {
00075     output_tm_type = TMTYPE_DATA_T3;
00076     nbQrow -= 4;
00077     nbDataHdr = 8;
00078   } else {
00079     output_tm_type = TMTYPE_DATA_T1;
00080     nbDataHdr = 6;
00081   }
00082   if (nbQrow <= 0) nl_error(3,"nbQrow <= 0");
00083   int total_size = nbQrow * total_Qrows;
00084   raw = new unsigned char[total_size];
00085   if ( ! raw )
00086     nl_error( 3, "memory allocation failure: raw" );
00087   row = new unsigned char*[total_Qrows];
00088   if ( ! row )
00089     nl_error( 3, "memory allocation failure: row" );
00090   int i;
00091   unsigned char *currow = raw;
00092   for ( i = 0; i < total_Qrows; i++ ) {
00093     row[i] = currow;
00094     currow += nbQrow;
00095   }
00096 }
00097 
00098 void data_queue::lock(const char * by, int line) {
00099   by = by;
00100   line = line;
00101 }
00102 
00103 void data_queue::unlock() {}
00104 
00105 /**
00106   no longer a blocking function. Returns the largest number of contiguous rows currently free.
00107   Caller can decide whether that is adequate.
00108   Assumes the DQ is locked.
00109   */
00110 int data_queue::allocate_rows(unsigned char **rowp) {
00111   int na;
00112   if ( full ) na = 0;
00113   else if ( first > last ) {
00114     na = first - last;
00115   } else na = total_Qrows - last;
00116   if ( rowp != NULL) *rowp = row[last];
00117   return na;
00118 }
00119 
00120 /**
00121  *  MFCtr, mfrow are the MFCtr and minor frame row of the first row being committed.
00122  * Does not signal whoever is reading the queue
00123  * Assumes DQ is locked and unlocks before exit
00124  */
00125 void data_queue::commit_rows( mfc_t MFCtr, int mfrow, int nrows ) {
00126   // we (the writer thread) own the last pointer, so we can read it without a lock,
00127   // but we must lock before writing
00128   nl_assert( !full );
00129   nl_assert( last+nrows <= total_Qrows );
00130   lock(__FILE__,__LINE__);
00131   // We need a new dqr if the last one is a dq_tstamp or my MFCtr,mfrow don't match the 'next'
00132   // elements in the current dqr
00133   dq_data_ref *dqdr = 0;
00134   if ( last_dqr && last_dqr->type == dq_data ) {
00135     dqdr = (dq_data_ref *)last_dqr;
00136     if ( MFCtr != dqdr->MFCtr_next || mfrow != dqdr->row_next )
00137       dqdr = 0;
00138   }
00139   if ( dqdr == 0 ) {
00140     dqdr = new dq_data_ref(MFCtr, mfrow, last, nrows); // or retrieve from the free list?
00141     if ( last_dqr )
00142       last_dqr = last_dqr->next(dqdr);
00143     else first_dqr = last_dqr = dqdr;
00144   } else dqdr->append_rows(nrows);
00145   last += nrows;
00146   if ( last == total_Qrows ) last = 0;
00147   if ( last == first ) full = 1;
00148   unlock();
00149 }
00150 
00151 /**
00152  * Does not signal whoever is reading the queue
00153  */
00154 void data_queue::commit_tstamp( mfc_t MFCtr, time_t time ) {
00155   dq_tstamp_ref *dqt = new dq_tstamp_ref(MFCtr, time);
00156   lock(__FILE__,__LINE__);
00157   if ( last_dqr ) last_dqr = last_dqr->next(dqt);
00158   else first_dqr = last_dqr = dqt;
00159   unlock();
00160 }
00161 void data_queue::retire_rows( dq_data_ref *dqd, int n_rows ) {
00162   lock(__FILE__,__LINE__);
00163   nl_assert( n_rows >= 0 );
00164   nl_assert( dqd == first_dqr );
00165   nl_assert( dqd->n_rows >= n_rows);
00166   nl_assert( dqd->Qrow == first );
00167   if ( first < last ) {
00168     first += n_rows;
00169     if ( first > last )
00170       nl_error( 4, "Underflow in retire_rows" );
00171   } else {
00172     first += n_rows;
00173     if ( first >= total_Qrows ) {
00174       first -= total_Qrows;
00175       if ( first > last )
00176         nl_error( 4, "Underflow after wrap in retire_rows" );
00177     }
00178   }
00179   if (n_rows > 0) full = false;
00180   dqd->Qrow = first;
00181   dqd->n_rows -= n_rows;
00182   if ( dqd->n_rows == 0 && dqd->next_dqr ) {
00183     first_dqr = dqd->next_dqr;
00184     delete( dqd );
00185   } else {
00186     dqd->row_start += n_rows;
00187     dqd->MFCtr_start += dqd->row_start / tm_info.nrowminf;
00188     dqd->row_start %= tm_info.nrowminf;
00189   }
00190   unlock();
00191 }
00192 
00193 void data_queue::retire_tstamp( dq_tstamp_ref *dqts ) {
00194   lock(__FILE__,__LINE__);
00195   nl_assert( dqts == first_dqr );
00196   first_dqr = dqts->next_dqr;
00197   if ( first_dqr == 0 ) last_dqr = first_dqr;
00198   unlock();
00199   delete(dqts);
00200 }
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Defines