ARPDAS_QNX6 1.0
|
00001 /* Copyright 2007 by the President and Fellows of Harvard College */ 00002 #ifndef TMBFR_H_INCLUDED 00003 #define TMBFR_H_INCLUDED 00004 #include <pthread.h> 00005 #include "tm.h" 00006 00007 struct tm_ocb; 00008 struct tm_attr; 00009 #define THREAD_POOL_PARAM_T dispatch_context_t 00010 #define IOFUNC_OCB_T struct tm_ocb 00011 #define IOFUNC_ATTR_T struct tm_attr 00012 #include <sys/iofunc.h> 00013 #include <sys/dispatch.h> 00014 00015 /* Semantics of Data_Queue 00016 Data_Queue.first, .last are indices into row and range from 00017 [0..total_Qrows) 00018 .first is where the next row will be read from 00019 .last is where the next row will be written to 00020 first==last is empty unless full is asserted 00021 */ 00022 typedef struct dataqueue { 00023 char *raw; 00024 char **row; 00025 tm_hdrw_t output_tm_type; 00026 int pbuf_size; // nbQrow+nbDataHdr (or more) 00027 int total_size; 00028 int total_Qrows; 00029 int nbQrow; // may differ from nbrow if stripping MFCtr & Synch 00030 int nbDataHdr; 00031 int first; 00032 int last; 00033 int full; 00034 int nonblocking; 00035 } data_queue_t; 00036 00037 extern data_queue_t Data_Queue; // There can be only one 00038 00039 typedef struct tsqueue { 00040 int ref_count; 00041 tstamp_t TS; 00042 } TS_queue_t; 00043 00044 /* Semantics of the dq_descriptor 00045 next points to a later descriptor. A separate descriptor is 00046 required when a new TS arrives or a row is skipped. 00047 ref_count indicates how many OCBs point to this dqd 00048 starting_Qrow is the index into Data_Queue.row for the first data 00049 row of this dqd that is still present in the Data_Queue, or the location 00050 of the next record if no rows are present. 00051 n_Qrows is the number of Qrows of this dqd still present in the 00052 Data_Queue, hence must be <= Data_Queue.total_Qrows. 00053 Qrows_expired indicates the number of Qrows belonging to this dqd 00054 that have been expired out of the Data_Queue 00055 min_reader is the number of rows in this dqd that the slowest 00056 reader has processed (including expired rows) 00057 TSq is the TS record our data is tied to 00058 MFCtr is the MFCtr for the starting row (possibly expired) of this 00059 dqd 00060 Row_num is the row number (0 <= Row_num < nrowminf) for the 00061 starting row (possibly expired) of this dqd 00062 00063 n_Qrows + Qrows_expired is the total number of Qrows for this dqd 00064 00065 To get the MFCtr and Row_Num for the current first row: 00066 XRow_Num = Row_Num + Qrows_expired 00067 NMinf = XRow_Num/tm->nrowminf 00068 MFCtr_start = MFCtr + NMinf 00069 Row_Num_start = XRow_Num % tm->nrowminf 00070 00071 If n_Qrows == 0 and Qrows_expired == 0, MFCtr and Row_num can be 00072 redefined. After that, progress simply involves updating 00073 starting_Qrow, n_Qrows and Qrows_expired. 00074 */ 00075 typedef struct dq_descriptor { 00076 struct dq_descriptor *next; 00077 int ref_count; 00078 int starting_Qrow; 00079 int n_Qrows; 00080 int Qrows_expired; 00081 int min_reader; 00082 TS_queue_t *TSq; 00083 mfc_t MFCtr; 00084 int Row_num; 00085 } dq_descriptor_t; 00086 00087 typedef struct { 00088 dq_descriptor_t *first; 00089 dq_descriptor_t *last; 00090 } DQD_Queue_t; 00091 00092 extern DQD_Queue_t DQD_Queue; 00093 00094 /* I have grouped related members into structs here purely 00095 to help make clear which members are related. 00096 If you don't like this approach, let me know. 00097 00098 Each OCB needs to point to a dq_descriptor and record its 00099 position within that descriptor. This may not apply exactly 00100 to the writer. 00101 00102 Also need a buffer to store partial frames when a request is small. 00103 It may be reasonable to delay allocation of this buffer, since 00104 most apps will request complete frames. 00105 00106 If a read request for a partial row arrives, we buffer the entire 00107 header+1row in the ocb->part.buf and increment ocb->data.n_Qrows 00108 (since we're done with the row held in the dq now that we've copied 00109 it.) 00110 00111 ocb->data.n_Qrows is the number of rows from the start of dqd, 00112 including expired rows. 00113 00114 On write: 00115 nbrow_rec and nbhdr_rec are set when the first data record 00116 arrives. It is an error to change source data formats mid-stream. 00117 00118 The big challenge on writing is keeping track of where we are 00119 with respect to the received message, the current TM record and 00120 the destination buffer. The destination buffer is always less 00121 than or equal to the TM record size, but the record size may be 00122 smaller or larger than the message size. 00123 00124 off_queue keeps track of the number of bytes that have been 00125 copied into the Data_Queue. When the current transfer is 00126 completed (part.nbdata == 0), the Data_Queue and the affected 00127 dqds are updated to incorporate the new data. 00128 00129 ocb->part.nbdata records how many bytes are still expected in this sub-transfer 00130 For writes, this means how many bytes we must receive before we can take 00131 action. For reads, it means how many bytes are currently ready for transfer 00132 at ocb->part.nbdata. 00133 00134 */ 00135 typedef struct tm_ocb { 00136 iofunc_ocb_t hdr; 00137 int state; 00138 struct tm_ocb *next_ocb; 00139 struct part_s { 00140 tm_hdrs_t hdr; 00141 char *dptr; // pointer into other buffers 00142 int nbdata; // How many bytes are still expected in this sub-transfer 00143 } part; 00144 struct data_s { 00145 dq_descriptor_t *dqd; // Which dq_desc we reference 00146 int n_Qrows; // The number of Qrows in dq we have already processed 00147 } data; 00148 union rw_u { 00149 struct write_s { 00150 char *buf; // allocated temp buffer 00151 int rcvid; // Who is writing 00152 int nbrow_rec; // bytes per row received 00153 int nbhdr_rec; // bytes in the header of data messages 00154 int nb_msg; // bytes remaining in this write 00155 int off_msg; // bytes already read from this write 00156 int nb_rec; // bytes remaining in this record 00157 int off_queue; // bytes already read in this queue block 00158 // int off_rec; // bytes read in this record: redundant 00159 // int nb_queue; // bytes remaining in this queue block 00160 // deemed redundant with part.nbdata 00161 } write; 00162 struct read_s { 00163 char *buf; // allocated temp buffer 00164 int rcvid; // Who requested 00165 int nbytes; // size of request 00166 int maxQrows; // max number of Qrows to be returned with this request 00167 int rows_missing; // cumulative count 00168 int blocked; // non-zero if we are blocking 00169 int ionotify; // non-zero if we want to be notified 00170 } read; 00171 } rw; 00172 } tm_ocb_t; 00173 00174 #define TM_STATE_HDR 0 00175 #define TM_STATE_INFO 1 00176 #define TM_STATE_DATA 2 00177 00178 /* Just Identify which node */ 00179 typedef struct tm_attr { 00180 iofunc_attr_t attr; 00181 int node_type; 00182 iofunc_notify_t notify[3]; /* notification list used by iofunc_notify*() */ 00183 } tm_attr_t; 00184 #define TM_DG 1 00185 #define TM_DCf 2 00186 #define TM_DCo 3 00187 00188 #endif