ARPDAS_QNX6 1.0
|
00001 /* DataQueue.c */ 00002 #include "DQ.h" 00003 #include "nortlib.h" 00004 #include "nl_assert.h" 00005 00006 /** 00007 * Base class for dq_data_ref and dq_tstamp_ref 00008 * These make up part of the control structure of data_queue. 00009 */ 00010 dq_ref::dq_ref(dqtype mytype) { 00011 next_dqr = 0; 00012 type = mytype; 00013 } 00014 00015 /** 00016 * Convenience function to append an item to the end of the linked list and return the new item 00017 */ 00018 dq_ref *dq_ref::next(dq_ref *dqr) { 00019 next_dqr = dqr; 00020 return dqr; 00021 } 00022 00023 dq_data_ref::dq_data_ref(mfc_t MFCtr, int mfrow, int Qrow_in, int nrows_in ) 00024 : dq_ref(dq_data) { 00025 MFCtr_start = MFCtr_next = MFCtr; 00026 row_start = row_next = mfrow; 00027 Qrow = Qrow_in; 00028 n_rows = 0; 00029 append_rows( nrows_in ); 00030 } 00031 00032 void dq_data_ref::append_rows( int nrows ) { 00033 row_next += nrows; 00034 MFCtr_next += row_next/tm_info.nrowminf; 00035 row_next = row_next % tm_info.nrowminf; 00036 n_rows += nrows; 00037 } 00038 00039 dq_tstamp_ref::dq_tstamp_ref( mfc_t MFCtr, time_t time ) : dq_ref(dq_tstamp) { 00040 TS.mfc_num = MFCtr; 00041 TS.secs = time; 00042 } 00043 00044 /** 00045 * data_queue base class constructor. 00046 * Determines the output_tm_type and allocates the queue storage. 00047 */ 00048 data_queue::data_queue( int n_Qrows, int low_water ) { 00049 total_Qrows = n_Qrows; 00050 dq_low_water = low_water; 00051 if ( low_water > n_Qrows ) 00052 nl_error( 3, "low_water must be <= n_Qrows" ); 00053 00054 raw = 0; 00055 row = 0; 00056 first = last = 0; 00057 full = false; 00058 last_dqr = first_dqr = 0; 00059 } 00060 00061 /** 00062 * General DG initialization. Assumes tm_info structure has been defined. 00063 * Establishes the connection to the TMbfr, specifying the O_NONBLOCK option for collection. 00064 * Initializes the queue itself. 00065 * Creates dispatch queue and registers "DG/cmd" device and initializes timer. 00066 */ 00067 void data_queue::init() { 00068 // Determine the output_tm_type 00069 nbQrow = tmi(nbrow); 00070 tm_info.nrowminf = tmi(nbminf)/tmi(nbrow); 00071 if (tm_info.nrowminf > 2) { 00072 output_tm_type = TMTYPE_DATA_T2; 00073 nbDataHdr = 10; 00074 } else if ( tmi(mfc_lsb)==0 && tmi(mfc_msb)==1 ) { 00075 output_tm_type = TMTYPE_DATA_T3; 00076 nbQrow -= 4; 00077 nbDataHdr = 8; 00078 } else { 00079 output_tm_type = TMTYPE_DATA_T1; 00080 nbDataHdr = 6; 00081 } 00082 if (nbQrow <= 0) nl_error(3,"nbQrow <= 0"); 00083 int total_size = nbQrow * total_Qrows; 00084 raw = new unsigned char[total_size]; 00085 if ( ! raw ) 00086 nl_error( 3, "memory allocation failure: raw" ); 00087 row = new unsigned char*[total_Qrows]; 00088 if ( ! row ) 00089 nl_error( 3, "memory allocation failure: row" ); 00090 int i; 00091 unsigned char *currow = raw; 00092 for ( i = 0; i < total_Qrows; i++ ) { 00093 row[i] = currow; 00094 currow += nbQrow; 00095 } 00096 } 00097 00098 void data_queue::lock(const char * by, int line) { 00099 by = by; 00100 line = line; 00101 } 00102 00103 void data_queue::unlock() {} 00104 00105 /** 00106 no longer a blocking function. Returns the largest number of contiguous rows currently free. 00107 Caller can decide whether that is adequate. 00108 Assumes the DQ is locked. 00109 */ 00110 int data_queue::allocate_rows(unsigned char **rowp) { 00111 int na; 00112 if ( full ) na = 0; 00113 else if ( first > last ) { 00114 na = first - last; 00115 } else na = total_Qrows - last; 00116 if ( rowp != NULL) *rowp = row[last]; 00117 return na; 00118 } 00119 00120 /** 00121 * MFCtr, mfrow are the MFCtr and minor frame row of the first row being committed. 00122 * Does not signal whoever is reading the queue 00123 * Assumes DQ is locked and unlocks before exit 00124 */ 00125 void data_queue::commit_rows( mfc_t MFCtr, int mfrow, int nrows ) { 00126 // we (the writer thread) own the last pointer, so we can read it without a lock, 00127 // but we must lock before writing 00128 nl_assert( !full ); 00129 nl_assert( last+nrows <= total_Qrows ); 00130 lock(__FILE__,__LINE__); 00131 // We need a new dqr if the last one is a dq_tstamp or my MFCtr,mfrow don't match the 'next' 00132 // elements in the current dqr 00133 dq_data_ref *dqdr = 0; 00134 if ( last_dqr && last_dqr->type == dq_data ) { 00135 dqdr = (dq_data_ref *)last_dqr; 00136 if ( MFCtr != dqdr->MFCtr_next || mfrow != dqdr->row_next ) 00137 dqdr = 0; 00138 } 00139 if ( dqdr == 0 ) { 00140 dqdr = new dq_data_ref(MFCtr, mfrow, last, nrows); // or retrieve from the free list? 00141 if ( last_dqr ) 00142 last_dqr = last_dqr->next(dqdr); 00143 else first_dqr = last_dqr = dqdr; 00144 } else dqdr->append_rows(nrows); 00145 last += nrows; 00146 if ( last == total_Qrows ) last = 0; 00147 if ( last == first ) full = 1; 00148 unlock(); 00149 } 00150 00151 /** 00152 * Does not signal whoever is reading the queue 00153 */ 00154 void data_queue::commit_tstamp( mfc_t MFCtr, time_t time ) { 00155 dq_tstamp_ref *dqt = new dq_tstamp_ref(MFCtr, time); 00156 lock(__FILE__,__LINE__); 00157 if ( last_dqr ) last_dqr = last_dqr->next(dqt); 00158 else first_dqr = last_dqr = dqt; 00159 unlock(); 00160 } 00161 void data_queue::retire_rows( dq_data_ref *dqd, int n_rows ) { 00162 lock(__FILE__,__LINE__); 00163 nl_assert( n_rows >= 0 ); 00164 nl_assert( dqd == first_dqr ); 00165 nl_assert( dqd->n_rows >= n_rows); 00166 nl_assert( dqd->Qrow == first ); 00167 if ( first < last ) { 00168 first += n_rows; 00169 if ( first > last ) 00170 nl_error( 4, "Underflow in retire_rows" ); 00171 } else { 00172 first += n_rows; 00173 if ( first >= total_Qrows ) { 00174 first -= total_Qrows; 00175 if ( first > last ) 00176 nl_error( 4, "Underflow after wrap in retire_rows" ); 00177 } 00178 } 00179 if (n_rows > 0) full = false; 00180 dqd->Qrow = first; 00181 dqd->n_rows -= n_rows; 00182 if ( dqd->n_rows == 0 && dqd->next_dqr ) { 00183 first_dqr = dqd->next_dqr; 00184 delete( dqd ); 00185 } else { 00186 dqd->row_start += n_rows; 00187 dqd->MFCtr_start += dqd->row_start / tm_info.nrowminf; 00188 dqd->row_start %= tm_info.nrowminf; 00189 } 00190 unlock(); 00191 } 00192 00193 void data_queue::retire_tstamp( dq_tstamp_ref *dqts ) { 00194 lock(__FILE__,__LINE__); 00195 nl_assert( dqts == first_dqr ); 00196 first_dqr = dqts->next_dqr; 00197 if ( first_dqr == 0 ) last_dqr = first_dqr; 00198 unlock(); 00199 delete(dqts); 00200 }