ARPDAS_QNX6 1.0
TMbfr.h
Go to the documentation of this file.
00001 /* Copyright 2007 by the President and Fellows of Harvard College */
00002 #ifndef TMBFR_H_INCLUDED
00003 #define TMBFR_H_INCLUDED
00004 #include <pthread.h>
00005 #include "tm.h"
00006 
00007 struct tm_ocb;
00008 struct tm_attr;
00009 #define THREAD_POOL_PARAM_T dispatch_context_t
00010 #define IOFUNC_OCB_T struct tm_ocb
00011 #define IOFUNC_ATTR_T struct tm_attr
00012 #include <sys/iofunc.h>
00013 #include <sys/dispatch.h>
00014 
00015 /* Semantics of Data_Queue
00016    Data_Queue.first, .last are indices into row and range from
00017      [0..total_Qrows)
00018    .first is where the next row will be read from
00019    .last is where the next row will be written to
00020    first==last is empty unless full is asserted
00021 */
00022 typedef struct dataqueue {
00023   char *raw;
00024   char **row;
00025   tm_hdrw_t output_tm_type;
00026   int pbuf_size; // nbQrow+nbDataHdr (or more)
00027   int total_size;
00028   int total_Qrows;
00029   int nbQrow; // may differ from nbrow if stripping MFCtr & Synch
00030   int nbDataHdr;
00031   int first;
00032   int last;
00033   int full;
00034   int nonblocking;
00035 } data_queue_t;
00036 
00037 extern data_queue_t Data_Queue; // There can be only one
00038 
00039 typedef struct tsqueue {
00040   int ref_count;
00041   tstamp_t TS;
00042 } TS_queue_t;
00043 
00044 /* Semantics of the dq_descriptor
00045    next points to a later descriptor. A separate descriptor is
00046      required when a new TS arrives or a row is skipped.
00047    ref_count indicates how many OCBs point to this dqd
00048    starting_Qrow is the index into Data_Queue.row for the first data
00049      row of this dqd that is still present in the Data_Queue, or the location
00050      of the next record if no rows are present.
00051    n_Qrows is the number of Qrows of this dqd still present in the
00052      Data_Queue, hence must be <= Data_Queue.total_Qrows.
00053    Qrows_expired indicates the number of Qrows belonging to this dqd
00054      that have been expired out of the Data_Queue
00055     min_reader is the number of rows in this dqd that the slowest
00056       reader has processed (including expired rows)
00057    TSq is the TS record our data is tied to
00058    MFCtr is the MFCtr for the starting row (possibly expired) of this
00059      dqd
00060    Row_num is the row number (0 <= Row_num < nrowminf) for the
00061      starting row (possibly expired) of this dqd
00062 
00063    n_Qrows + Qrows_expired is the total number of Qrows for this dqd
00064    
00065    To get the MFCtr and Row_Num for the current first row:
00066    XRow_Num = Row_Num + Qrows_expired
00067    NMinf = XRow_Num/tm->nrowminf
00068    MFCtr_start = MFCtr + NMinf
00069    Row_Num_start = XRow_Num % tm->nrowminf
00070    
00071    If n_Qrows == 0 and Qrows_expired == 0, MFCtr and Row_num can be
00072    redefined. After that, progress simply involves updating
00073    starting_Qrow, n_Qrows and Qrows_expired.
00074 */
00075 typedef struct dq_descriptor {
00076   struct dq_descriptor *next;
00077   int ref_count;
00078   int starting_Qrow;
00079   int n_Qrows;
00080   int Qrows_expired;
00081   int min_reader;
00082   TS_queue_t *TSq;
00083   mfc_t MFCtr;
00084   int Row_num;
00085 } dq_descriptor_t;
00086 
00087 typedef struct {
00088   dq_descriptor_t *first;
00089   dq_descriptor_t *last;
00090 } DQD_Queue_t;
00091 
00092 extern DQD_Queue_t DQD_Queue;
00093 
00094 /* I have grouped related members into structs here purely
00095    to help make clear which members are related.
00096    If you don't like this approach, let me know.
00097    
00098    Each OCB needs to point to a dq_descriptor and record its
00099    position within that descriptor. This may not apply exactly
00100    to the writer.
00101    
00102    Also need a buffer to store partial frames when a request is small.
00103    It may be reasonable to delay allocation of this buffer, since
00104    most apps will request complete frames.
00105    
00106    If a read request for a partial row arrives, we buffer the entire
00107    header+1row in the ocb->part.buf and increment ocb->data.n_Qrows
00108    (since we're done with the row held in the dq now that we've copied
00109    it.)
00110    
00111    ocb->data.n_Qrows is the number of rows from the start of dqd,
00112    including expired rows.
00113    
00114    On write:
00115      nbrow_rec and nbhdr_rec are set when the first data record
00116      arrives. It is an error to change source data formats mid-stream.
00117      
00118      The big challenge on writing is keeping track of where we are
00119      with respect to the received message, the current TM record and
00120      the destination buffer. The destination buffer is always less
00121      than or equal to the TM record size, but the record size may be
00122      smaller or larger than the message size.
00123      
00124      off_queue keeps track of the number of bytes that have been
00125      copied into the Data_Queue. When the current transfer is
00126      completed (part.nbdata == 0), the Data_Queue and the affected
00127      dqds are updated to incorporate the new data.
00128      
00129      ocb->part.nbdata records how many bytes are still expected in this sub-transfer
00130      For writes, this means how many bytes we must receive before we can take
00131      action. For reads, it means how many bytes are currently ready for transfer
00132      at ocb->part.nbdata.
00133      
00134 */
00135 typedef struct tm_ocb {
00136   iofunc_ocb_t hdr;
00137   int state;
00138   struct tm_ocb *next_ocb;
00139   struct part_s {
00140     tm_hdrs_t hdr;
00141     char *dptr; // pointer into other buffers
00142     int nbdata; // How many bytes are still expected in this sub-transfer
00143   } part;
00144   struct data_s {
00145     dq_descriptor_t *dqd; // Which dq_desc we reference
00146     int n_Qrows; // The number of Qrows in dq we have already processed
00147   } data;
00148   union rw_u {
00149     struct write_s {
00150       char *buf; // allocated temp buffer
00151       int rcvid; // Who is writing
00152       int nbrow_rec; // bytes per row received
00153       int nbhdr_rec; // bytes in the header of data messages
00154       int nb_msg; // bytes remaining in this write
00155       int off_msg; // bytes already read from this write
00156       int nb_rec; // bytes remaining in this record
00157       int off_queue; // bytes already read in this queue block
00158       // int off_rec; // bytes read in this record: redundant
00159       // int nb_queue; // bytes remaining in this queue block
00160       // deemed redundant with part.nbdata
00161     } write;
00162     struct read_s {
00163       char *buf; // allocated temp buffer
00164       int rcvid; // Who requested
00165       int nbytes; // size of request
00166       int maxQrows; // max number of Qrows to be returned with this request
00167       int rows_missing; // cumulative count
00168       int blocked; // non-zero if we are blocking
00169       int ionotify; // non-zero if we want to be notified
00170     } read;
00171   } rw;
00172 } tm_ocb_t;
00173 
00174 #define TM_STATE_HDR 0
00175 #define TM_STATE_INFO 1
00176 #define TM_STATE_DATA 2
00177 
00178 /* Just Identify which node */
00179 typedef struct tm_attr {
00180   iofunc_attr_t attr;
00181   int node_type;
00182   iofunc_notify_t notify[3];  /* notification list used by iofunc_notify*() */
00183 } tm_attr_t;
00184 #define TM_DG 1
00185 #define TM_DCf 2
00186 #define TM_DCo 3
00187 
00188 #endif
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Defines