ARPDAS_QNX6 1.0
TMbfr.c
Go to the documentation of this file.
00001 /* TMbfr2.c */
00002 #include <errno.h>
00003 #include <stdio.h>
00004 #include <stdlib.h>
00005 #include <stddef.h>
00006 #include <stdlib.h>
00007 #include <string.h>
00008 #include <unistd.h>
00009 #include <stdarg.h>
00010 #include <ctype.h>
00011 #include "TMbfr.h"
00012 #include "nortlib.h"
00013 #include "nl_assert.h"
00014 #include "oui.h"
00015 #include "rundir.h"
00016 
00017 static int io_read (resmgr_context_t *ctp, io_read_t *msg, RESMGR_OCB_T *ocb);
00018 static int io_write(resmgr_context_t *ctp, io_write_t *msg, RESMGR_OCB_T *ocb);
00019 static int io_notify(resmgr_context_t *ctp, io_notify_t *msg, RESMGR_OCB_T *ocb);
00020 static void do_write( IOFUNC_OCB_T *ocb, int nonblock, int new_rows );
00021 static int io_open( resmgr_context_t *ctp, io_open_t *msg,
00022                     IOFUNC_ATTR_T *attr, void *extra );
00023 static void read_reply( RESMGR_OCB_T *ocb, int nonblock );
00024 static int process_tm_info( IOFUNC_OCB_T *ocb );
00025 static int (*data_state_eval)( IOFUNC_OCB_T *ocb, int nonblock );
00026 static int data_state_T1( IOFUNC_OCB_T *ocb, int nonblock );
00027 static int data_state_T2( IOFUNC_OCB_T *ocb, int nonblock );
00028 static int data_state_T3( IOFUNC_OCB_T *ocb, int nonblock );
00029 static void queue_tstamp( tstamp_t *ts );
00030 static dq_descriptor_t *new_dq_descriptor( TS_queue_t *TS );
00031 static dq_descriptor_t *dq_deref( dq_descriptor_t *dqd, int use_next );
00032 static dq_descriptor_t *dq_expire_check( dq_descriptor_t *dqd );
00033 static void enqueue_read( IOFUNC_OCB_T *ocb, int nonblock );
00034 static void run_read_queue(void);
00035 static void lock_dq(void);
00036 static void unlock_dq(void);
00037 static int all_closed(void);
00038 static int allocate_qrows( IOFUNC_OCB_T *ocb, int nrows_req, int nonblock );
00039 
00040 static resmgr_connect_funcs_t    connect_funcs;
00041 static resmgr_io_funcs_t         rd_io_funcs, wr_io_funcs;
00042 static IOFUNC_ATTR_T             dg_attr, dcf_attr, dco_attr;
00043 static int                       dg_id, dcf_id, dco_id;
00044 static resmgr_attr_t             resmgr_attr;
00045 static dispatch_t                *dpp;
00046 static pthread_mutex_t           dg_mutex = PTHREAD_MUTEX_INITIALIZER;
00047 static pthread_mutex_t           dq_mutex = PTHREAD_MUTEX_INITIALIZER;
00048 
00049 static int dg_opened = 0;
00050 data_queue_t Data_Queue;
00051 DQD_Queue_t DQD_Queue;
00052 static tm_ocb_t *blocked_writer;
00053 static tm_ocb_t *all_readers;
00054 
00055 #define LOG_EVENTS 0
00056 #if LOG_EVENTS
00057   #define EVENT_LOG_SIZE 100
00058   static int my_events[EVENT_LOG_SIZE];
00059   static unsigned my_evt_num, my_evt_idx;
00060   static void log_event(int evt_id) {
00061     my_events[my_evt_idx++] = evt_id;
00062     if ( my_evt_idx >= EVENT_LOG_SIZE )
00063       my_evt_idx = 0;
00064     ++my_evt_num;
00065   }
00066 #else
00067   #define log_event(x)
00068 #endif
00069 
00070 /* enqueue_read() queues the specified ocb.
00071    Assumes dq is locked.
00072    If nonblock is set, returns error to the client.
00073    Should prioritize by some clever scheme including priority, but
00074    for starters, I'll just FIFO.
00075  */
00076 static void enqueue_read( IOFUNC_OCB_T *ocb, int nonblock ) {
00077   if ( dg_opened == 2 ) {
00078     MsgReply(ocb->rw.read.rcvid, 0, ocb->part.dptr, 0 );
00079   } else if ( nonblock ) {
00080     log_event(2);
00081     if ( MsgError( ocb->rw.read.rcvid, EAGAIN ) == -1 )
00082       nl_error( 2, "Error %d on MsgError", errno );
00083   } else {
00084     log_event(1);
00085     ocb->rw.read.blocked = 1;
00086   }
00087 }
00088 
00089 static void enqueue_reader( IOFUNC_OCB_T *ocb ) {
00090   lock_dq();
00091   ocb->next_ocb = all_readers;
00092   all_readers = ocb;
00093   unlock_dq();
00094 }
00095 
00096 static void dequeue_reader( IOFUNC_OCB_T *ocb ) {
00097   IOFUNC_OCB_T **rq;
00098   lock_dq();
00099   assert(all_readers != NULL);
00100   for ( rq = &all_readers; *rq != 0 && *rq != ocb; rq = &(*rq)->next_ocb );
00101   assert(*rq == ocb);
00102   *rq = ocb->next_ocb;
00103   unlock_dq();
00104 }
00105 
00106 static void run_write_queue(void) {
00107   if ( blocked_writer ) {
00108     log_event(3);
00109     int new_rows = data_state_eval(blocked_writer, 0);
00110     if ( blocked_writer->part.nbdata > 0 ) {
00111       log_event(4);
00112       do_write(blocked_writer, 0, new_rows);
00113     }
00114   }
00115 }
00116 
00117 /* run_read_queue() invokes read_reply() on all the blocked ocbs
00118      in the read queue.
00119    assumes dq is unlocked (since read_reply() will lock it as
00120      needed)
00121  */
00122 static void run_read_queue(void) {
00123   tm_ocb_t *rq;
00124   lock_dq();
00125   rq = all_readers;
00126   unlock_dq();
00127   log_event(5);
00128   while ( rq ) {
00129     if ( rq->rw.read.blocked ) {
00130       rq->rw.read.blocked = 0;
00131       log_event(6);
00132       read_reply(rq, 0);
00133     }
00134     rq = rq->next_ocb;
00135   }
00136   // if (IOFUNC_NOTIFY_INPUT_CHECK(dcf_attr.notify, 1, 0))
00137     iofunc_notify_trigger(dcf_attr.notify, 1, IOFUNC_NOTIFY_INPUT);
00138   // if (IOFUNC_NOTIFY_INPUT_CHECK(dco_attr.notify, 1, 0))
00139     iofunc_notify_trigger(dco_attr.notify, 1, IOFUNC_NOTIFY_INPUT);
00140   run_write_queue();
00141 }
00142 
00143 // Return the minimum number of rows processed by any reader that is currently
00144 // referencing the specified dqd.  Assumes dq is locked.
00145 // ### should mark the min readers as laggards
00146 int min_reader( dq_descriptor_t *dqd ) {
00147   if ( dqd != DQD_Queue.first ) return 0;
00148   int min = dqd->Qrows_expired + dqd->n_Qrows;
00149   IOFUNC_OCB_T *ocb;
00150   for ( ocb = all_readers; ocb != 0; ocb = ocb->next_ocb ) {
00151     if ( ocb->data.dqd == dqd && ocb->data.n_Qrows < min )
00152       min = ocb->data.n_Qrows;
00153   }
00154   return min;
00155 }
00156 
00157 /* dq_deref() reduces reference count by one, does any work
00158    associated with ref count dropping to zero and returns the
00159    next dq_descriptor. dq must be locked.
00160  */
00161 static dq_descriptor_t *dq_deref( dq_descriptor_t *dqd, int use_next ) {
00162   dq_descriptor_t *next_dqd = dqd->next;
00163   assert(dqd->ref_count > 0);
00164   if (next_dqd != 0 && use_next)
00165     ++next_dqd->ref_count;
00166   if ( --dqd->ref_count == 0 )
00167     dq_expire_check(dqd);
00168   return next_dqd;
00169 }
00170 
00171 static dq_descriptor_t *dq_expire_check( dq_descriptor_t *dqd ) {
00172   assert(dqd->ref_count >= 0);
00173   while ( dqd->ref_count == 0 && dqd->next != NULL
00174        && dqd->n_Qrows == 0 && DQD_Queue.first == dqd ) {
00175     /* Can expire this dqd */
00176     dq_descriptor_t *next_dqd = dqd->next;
00177     assert(dqd->TSq->ref_count >= 0);
00178     if ( --dqd->TSq->ref_count == 0 ) {
00179       free_memory(dqd->TSq);
00180     }
00181     free_memory(dqd);
00182     DQD_Queue.first = next_dqd;
00183     dqd = next_dqd;
00184   }
00185   return dqd;
00186 }
00187 
00188 static void lock_dq(void) {
00189   int rv = pthread_mutex_lock( &dq_mutex );
00190   if ( rv != EOK )
00191     nl_error( 4, "Error %d locking dq mutex", rv );
00192 }
00193 
00194 static void unlock_dq(void) {
00195   int rv = pthread_mutex_unlock( &dq_mutex );
00196   if ( rv != EOK )
00197     nl_error( 4, "Error %d unlocking dq mutex", rv );
00198 }
00199 
00200 static tm_ocb_t *ocb_calloc(resmgr_context_t *ctp, IOFUNC_ATTR_T *device) {
00201   tm_ocb_t *ocb = calloc( 1, sizeof(tm_ocb_t) );
00202   if ( ocb == 0 ) return 0;
00203   /* Initialize any other elements. */
00204   ocb->next_ocb = 0;
00205   // ocb->rw.read.buf = 0;
00206   ocb->data.dqd = 0;
00207   ocb->data.n_Qrows = 0;
00208   ocb->rw.read.rows_missing = 0;
00209   ocb->state = TM_STATE_HDR; // ### do this in a state init function?
00210   if (device->node_type == TM_DG ) {
00211     ocb->part.nbdata = sizeof(ocb->part.hdr); // ### and this
00212     ocb->part.dptr = (char *)&ocb->part.hdr; // ### and this
00213   } else {
00214     ocb->part.nbdata = 0;
00215     ocb->part.dptr = NULL;
00216     ocb->rw.read.blocked = 0;
00217     ocb->rw.read.ionotify = 0;
00218     enqueue_reader(ocb);
00219   }
00220   return ocb;
00221 }
00222 
00223 static void ocb_free(struct tm_ocb *ocb) {
00224   /* Be sure to remove this from the blocking list:
00225      Actually, there really is no way it should be on
00226      the blocking list. */
00227   // assert( ocb->rw.read.rcvid == 0 );
00228   // rcvid never gets reset
00229   lock_dq();
00230   if ( ocb->data.dqd != 0 )
00231     dq_deref(ocb->data.dqd, 0);
00232   unlock_dq();
00233   if (ocb->hdr.attr->node_type == TM_DG ) {
00234     dg_opened = 2;
00235     run_read_queue();
00236   } else {
00237     ocb->rw.read.blocked = 0;
00238     ocb->rw.read.ionotify = 0;
00239     if ( ocb->rw.read.buf ) free(ocb->rw.read.buf);
00240     dequeue_reader(ocb);
00241   }
00242   free( ocb );
00243 }
00244 
00245 static iofunc_funcs_t ocb_funcs = { /* our ocb allocating & freeing functions */
00246     _IOFUNC_NFUNCS,
00247     ocb_calloc,
00248     ocb_free
00249 };
00250 
00251 /* the mount structure, we have only one so we statically declare it */
00252 static iofunc_mount_t mountpoint = { 0, 0, 0, 0, &ocb_funcs };
00253 
00254 static int setup_mount( char *namebase, int node_type, int mode,
00255          IOFUNC_ATTR_T *attr, resmgr_io_funcs_t *funcs ) {
00256   const char *server_name;
00257   int mnt_id;
00258 
00259   iofunc_attr_init((iofunc_attr_t *)attr, S_IFNAM | mode, 0, 0);
00260   attr->attr.nbytes = 0;
00261   attr->attr.mount = &mountpoint;
00262   attr->node_type = node_type;
00263   IOFUNC_NOTIFY_INIT(attr->notify);
00264 
00265   server_name = tm_dev_name( namebase );
00266   mnt_id = resmgr_attach(dpp,            /* dispatch handle        */
00267                      &resmgr_attr,   /* resource manager attrs */
00268                      server_name,    /* device name            */
00269                      _FTYPE_ANY,     /* open type              */
00270                      0,              /* flags                  */
00271                      &connect_funcs, /* connect routines       */
00272                      funcs,         /* I/O routines           */
00273                      attr);          /* handle                 */
00274   if( mnt_id == -1 )
00275     nl_error( 3, "Unable to attach name '%s'", server_name );
00276   return mnt_id;
00277 }
00278 
00279 static void shutdown_mount( int id, char *name ) {
00280   int rv = resmgr_detach( dpp, id, _RESMGR_DETACH_ALL );
00281   if ( rv < 0 )
00282     nl_error( 2, "Error %d from resmgr_detach(%s)", rv, name );
00283 }
00284 
00285 // all_closed() returns non-zero if all OCBs are closed
00286 static int all_closed(void) {
00287   return dg_attr.attr.count == 0 &&
00288          dcf_attr.attr.count == 0 &&
00289          dco_attr.attr.count == 0;
00290 }
00291 
00292 int main(int argc, char **argv ) {
00293   int use_threads = 0;
00294 
00295   oui_init_options( argc, argv );
00296   nl_error( 0, "Startup" );
00297   setup_rundir();
00298   /* initialize dispatch interface */
00299   if((dpp = dispatch_create()) == NULL) {
00300       nl_error(3, "Unable to allocate dispatch handle.");
00301   }
00302 
00303   /* initialize resource manager attributes. */
00304   /* planning to share this struct between rd and wr */
00305   memset(&resmgr_attr, 0, sizeof resmgr_attr);
00306   // resmgr_attr.nparts_max = 0;
00307   // resmgr_attr.msg_max_size = 0;
00308 
00309   /* initialize functions for handling messages */
00310   iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
00311                    _RESMGR_IO_NFUNCS, &rd_io_funcs);
00312   rd_io_funcs.read = io_read;
00313   rd_io_funcs.notify = io_notify;
00314 
00315   iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
00316                    _RESMGR_IO_NFUNCS, &wr_io_funcs);
00317   wr_io_funcs.write = io_write;
00318   connect_funcs.open = io_open;
00319 
00320   dg_id = setup_mount( "TM/DG", TM_DG, 0664, &dg_attr, &wr_io_funcs );
00321   dcf_id = setup_mount( "TM/DCf", TM_DCf, 0444, &dcf_attr, &rd_io_funcs );
00322   dco_id = setup_mount( "TM/DCo", TM_DCo, 0444, &dco_attr, &rd_io_funcs );
00323 
00324   if ( use_threads ) {
00325     thread_pool_attr_t   pool_attr;
00326     thread_pool_t        *tpp;
00327 
00328     /* initialize thread pool attributes */
00329     memset(&pool_attr, 0, sizeof pool_attr);
00330     pool_attr.handle = dpp;
00331     pool_attr.context_alloc = dispatch_context_alloc;
00332     pool_attr.block_func = dispatch_block;
00333     pool_attr.handler_func = dispatch_handler;
00334     pool_attr.context_free = dispatch_context_free;
00335     pool_attr.lo_water = 2;
00336     pool_attr.hi_water = 4;
00337     pool_attr.increment = 1;
00338     pool_attr.maximum = 50;     /* allocate a thread pool handle */
00339     if((tpp = thread_pool_create(&pool_attr,
00340                                  POOL_FLAG_EXIT_SELF)) == NULL) {
00341         nl_error(3, "Unable to initialize thread pool");
00342     }     /* start the threads, will not return */
00343     thread_pool_start(tpp);
00344   } else {
00345     dispatch_context_t   *ctp;
00346     ctp = dispatch_context_alloc(dpp);
00347     while ( 1 ) {
00348       if ((ctp = dispatch_block(ctp)) == NULL) {
00349         nl_error( 2, "block error\n" );
00350         return 1;
00351       }
00352       // printf( "  type = %d,%d  attr.count = %d\n",
00353       //   ctp->resmgr_context.msg->type,
00354       //   ctp->resmgr_context.msg->connect.subtype, attr.count );
00355       dispatch_handler(ctp);
00356       if ( dg_opened && ctp->resmgr_context.rcvid == 0
00357             && all_closed() ) {
00358         break;
00359       }
00360     }
00361   }
00362   shutdown_mount( dg_id, "TM/DG" );
00363   shutdown_mount( dcf_id, "TM/DCf" );
00364   shutdown_mount( dco_id, "TM/DCo" );
00365   nl_error( 0, "Shutdown" );
00366   return 0;
00367 }
00368 
00369 static int io_open( resmgr_context_t *ctp, io_open_t *msg,
00370                     IOFUNC_ATTR_T *attr, void *extra ) {
00371   // Check to make sure it isn't already open
00372   if ( attr->node_type == TM_DG ) {
00373     int rv = pthread_mutex_lock( &dg_mutex );
00374     if ( rv == EOK ) {
00375       int count = attr->attr.count;
00376       if ( count == 0 && dg_opened == 0 ) {
00377         rv = iofunc_open_default( ctp, msg, &attr->attr, extra );
00378         if ( rv == EOK ) {
00379           assert( attr->attr.count );
00380           dg_opened = 1;
00381         }
00382       } else rv = EBUSY;
00383       pthread_mutex_unlock( &dg_mutex );
00384       return rv;
00385     } else nl_error( 3, "pthread_mutex_lock returned error %d", rv );
00386   }
00387   return iofunc_open_default( ctp, msg, &attr->attr, extra );
00388 }
00389 
00390 int io_notify(resmgr_context_t *ctp, io_notify_t *msg, RESMGR_OCB_T *ocb) {
00391   tm_attr_t *dattr = (tm_attr_t *) ocb->hdr.attr;
00392   int trig;
00393 
00394   log_event(7);
00395   trig = _NOTIFY_COND_OUTPUT;         /* clients can always give us data */
00396   lock_dq();
00397   if ( dg_opened == 2 || ocb->part.nbdata )
00398     trig |= _NOTIFY_COND_INPUT;
00399   else if ( ocb->data.dqd == 0 ) {
00400     if ( DQD_Queue.first )
00401       trig |= _NOTIFY_COND_INPUT;
00402   } else {
00403     dq_descriptor_t *dqd = ocb->data.dqd;
00404     if ( dqd->next != 0)
00405       trig |= _NOTIFY_COND_INPUT;
00406     else {
00407       int nQrows_ready = dqd->n_Qrows;
00408       if ( ocb->data.n_Qrows > dqd->Qrows_expired )
00409         nQrows_ready -= ocb->data.n_Qrows - dqd->Qrows_expired;
00410       if ( nQrows_ready )
00411         trig |= _NOTIFY_COND_INPUT;
00412     }
00413   }
00414   if (trig & _NOTIFY_COND_INPUT) log_event(8);
00415   unlock_dq();
00416   return (iofunc_notify(ctp, msg, dattr->notify, trig, NULL, NULL));
00417 }
00418 
00419 // This is where data is recieved.
00420 static int io_write( resmgr_context_t *ctp, io_write_t *msg, RESMGR_OCB_T *ocb ) {
00421   int status, nonblock;
00422 
00423   log_event(9);
00424   status = iofunc_write_verify(ctp, msg, (iofunc_ocb_t *)ocb, &nonblock);
00425   if ( status != EOK )
00426     return status;
00427 
00428   if ((msg->i.xtype &_IO_XTYPE_MASK) != _IO_XTYPE_NONE )
00429     return ENOSYS;
00430 
00431   // iofunc_write_verify should have stopped us if this is a DC node,
00432   // since they have no write permissions, but just check to make sure.
00433   assert( ocb->hdr.attr->node_type == TM_DG );
00434 
00435   // Assert this isn't a combine message
00436   if (msg->i.combine_len & _IO_COMBINE_FLAG)
00437     return (ENOSYS);
00438   assert(ctp->offset == 0);
00439 
00440   // Store the information we need from the message
00441   ocb->rw.write.nb_msg = msg->i.nbytes;
00442   ocb->rw.write.off_msg = 0; // sizeof(msg->i);
00443   ocb->rw.write.rcvid = ctp->rcvid;
00444 
00445   do_write( ocb, nonblock, 0 );
00446   return _RESMGR_NOREPLY;
00447 }
00448 
00449 // ### figure out how to get back into this routine after
00450 // being blocked.
00451 // Writer can only be blocked when processing data, since
00452 // INIT and TSTAMP records can always be processed immediately.
00453 // The data blocking can occur in either TM_STATE_HDR after
00454 // we've looked at the header and decided we don't have room
00455 // for the data or in TM_STATE_DATA after we've read some of
00456 // the data from a message/record and run out of room.
00457 // The trickiest part here is in the transition from TM_STATE_HDR
00458 // to TM_STATE_DATA, we have to copy the surplus header info
00459 // into the allocated space. If we can get allocate_qrows to
00460 // do that based on the state (and update the state), then
00461 // coming out of blocking involves calling data_state_eval,
00462 // and then calling do_write if space is available.
00463 static void do_write( IOFUNC_OCB_T *ocb, int nonblock, int new_rows ) {
00464   blocked_writer = 0;
00465   
00466   // _IO_SET_WRITE_NBYTES( ctp, msg->i.nbytes );
00467   // Not necessary since we'll handle the return ourselves
00468   // However, that means we need to store the total message size
00469   // Use off_msg for total size.
00470   
00471   // We loop here as long as we have work to do. We can get out
00472   // if we've processed all the data in the message (nb_msg == 0)
00473   // or exhausted all the available space in the DQ (nbdata == 0)
00474   while ( ocb->rw.write.nb_msg > 0 && ocb->part.nbdata > 0 ) {
00475     int nbread;
00476     nbread = ocb->rw.write.nb_msg < ocb->part.nbdata ?
00477       ocb->rw.write.nb_msg : ocb->part.nbdata;
00478     if ( MsgRead( ocb->rw.write.rcvid, ocb->part.dptr, nbread,
00479           ocb->rw.write.off_msg + sizeof(struct _io_write) ) < 0 ) {
00480       // These errors are all pretty serious, and since writes
00481       // come from our DG, they are very bad news.
00482       // Retrying MsgRead() doesn't make sense, and if we ignore
00483       // this message, we won't know where we are when the next
00484       // message comes in. A reasonable assumption would be that
00485       // the next message would start at the beginning of a header,
00486       // but that's not guaranteed.
00487       nl_error( 2, "Error %d from MsgRead: %s", errno, strerror(errno) );
00488       MsgError( ocb->rw.write.rcvid, EFAULT );
00489       ocb->state = TM_STATE_HDR;
00490       ocb->part.nbdata = sizeof( ocb->part.hdr );
00491       ocb->part.dptr = (char *)&ocb->part.hdr;
00492       return;
00493     }
00494     ocb->part.dptr += nbread;
00495     ocb->part.nbdata -= nbread;
00496     ocb->rw.write.nb_msg -= nbread;
00497     ocb->rw.write.off_msg += nbread;
00498     if ( ocb->state == TM_STATE_DATA ) {
00499       ocb->rw.write.nb_rec -= nbread;
00500       // ocb->rw.write.off_rec += nbread;
00501       ocb->rw.write.off_queue += nbread;
00502     }
00503     assert(ocb->part.nbdata >= 0);
00504     if ( ocb->part.nbdata == 0 ) {
00505       switch ( ocb->state ) {
00506         case TM_STATE_HDR:
00507           if ( ocb->part.hdr.s.hdr.tm_id != TMHDR_WORD ) {
00508             nl_error( 2, "Invalid Message header" );
00509             MsgError( ocb->rw.write.rcvid, EINVAL );
00510             ocb->part.nbdata = sizeof( ocb->part.hdr );
00511             ocb->part.dptr = (char *)&ocb->part.hdr;
00512             return;
00513           }
00514           switch ( ocb->part.hdr.s.hdr.tm_type ) {
00515             case TMTYPE_INIT:
00516               if ( DQD_Queue.last )
00517                 nl_error( 3, "Second TMTYPE_INIT received" );
00518               ocb->rw.write.off_queue = sizeof(tm_hdrs_t)-sizeof(tm_hdr_t);
00519               ocb->part.nbdata = sizeof(tm_info) - ocb->rw.write.off_queue;
00520               ocb->part.dptr = (char *)&tm_info;
00521               memcpy( ocb->part.dptr, &ocb->part.hdr.raw[sizeof(tm_hdr_t)],
00522                       ocb->rw.write.off_queue );
00523               ocb->part.dptr += ocb->rw.write.off_queue;
00524               ocb->state = TM_STATE_INFO;
00525               break;
00526             case TMTYPE_TSTAMP:
00527               if ( DQD_Queue.last == 0 )
00528                 nl_error( 3, "TMTYPE_TSTAMP received before _INIT" );
00529               lock_dq();
00530               queue_tstamp( &ocb->part.hdr.s.u.ts );
00531               unlock_dq();
00532               new_rows++;
00533               // ocb->state = TM_STATE_HDR; // already there!
00534               ocb->part.nbdata = sizeof( ocb->part.hdr );
00535               ocb->part.dptr = (char *)&ocb->part.hdr;
00536               break;
00537             case TMTYPE_DATA_T1:
00538             case TMTYPE_DATA_T2:
00539             case TMTYPE_DATA_T3:
00540             case TMTYPE_DATA_T4:
00541               if ( DQD_Queue.last == 0 )
00542                 nl_error( 3, "Second TMTYPE_DATA* received before _INIT" );
00543               assert( data_state_eval != 0 );
00544               assert( ocb->part.hdr.s.hdr.tm_type == Data_Queue.output_tm_type );
00545               ocb->rw.write.nb_rec = ocb->part.hdr.s.u.dhdr.n_rows *
00546                 ocb->rw.write.nbrow_rec;
00547               ocb->rw.write.off_queue = 0;
00548               new_rows += data_state_eval(ocb, nonblock);
00549               // ### Make sure data_state_eval returns the number
00550               // of rows that have been completely added to DQ
00551               if ( ocb->part.nbdata ) {
00552                 ocb->state = TM_STATE_DATA;
00553               } // else break out
00554               break;
00555             default:
00556               nl_error( 4, "Invalid state" );
00557           }
00558           break;
00559         case TM_STATE_INFO:
00560           // ### Check return value
00561           process_tm_info(ocb);
00562           Data_Queue.nonblocking = 1; // nonblock;
00563           new_rows++;
00564           ocb->state = TM_STATE_HDR; //### Use state-init function?
00565           ocb->part.nbdata = sizeof(ocb->part.hdr);
00566           ocb->part.dptr = (char *)&ocb->part.hdr;
00567           break;
00568         case TM_STATE_DATA:
00569           new_rows += data_state_eval(ocb, nonblock);
00570           if ( ocb->rw.write.nb_rec <= 0 ) {
00571             ocb->state = TM_STATE_HDR;
00572             ocb->part.nbdata = sizeof(ocb->part.hdr);
00573             ocb->part.dptr = (char *)&ocb->part.hdr;
00574           }
00575           break;
00576       }
00577     }
00578   }
00579   if ( ocb->rw.write.nb_msg == 0 ) {
00580     MsgReply( ocb->rw.write.rcvid, ocb->rw.write.off_msg, 0, 0 );
00581     if ( new_rows ) run_read_queue();
00582     //### Mark us as not blocked: maybe that's nbdata != 0?
00583   } else {
00584     // We must have nbdata == 0 meaning we're going to block
00585     assert(!nonblock);
00586     blocked_writer = ocb;
00587     run_read_queue();
00588   }
00589 }
00590 
00591 
00592 // allocate_qrows() is responsible for finding space in the
00593 // Data_Queue for incoming data. It returns the number of
00594 // contiguous rows currently available starting at Data_Queue.last.
00595 // If nonblock, allocate_qrows is guaranteed to return a
00596 // non-zero value. If nonblock is zero and clients have not
00597 // finished reading the oldest data in the queue, allocate_qrows
00598 // will return zero.
00599 // This routine assumes the dq is locked (lock_dq()).
00600 static int allocate_qrows( IOFUNC_OCB_T *ocb, int nrows_req, int nonblock ) {
00601   int nrows_free;
00602   // dqd is used to find old data to expire, not for appending new data
00603   // Hence we start at the front of the queue, not the end.
00604   dq_descriptor_t *dqd = DQD_Queue.first;
00605 
00606   assert(nrows_req >= 0);
00607   // If we are nonblocking and are requesting more than total_Qrows, we are
00608   // going to expire rows from this transfer without ever committing them
00609   // to the queue. This may be handled implicitly by the while loop below.
00610   // This should of course never happen!
00611   if ( !Data_Queue.full && Data_Queue.last >= Data_Queue.first ) {
00612     nrows_free = Data_Queue.total_Qrows - Data_Queue.last;
00613     if ( nrows_free > nrows_req )
00614       nrows_free = nrows_req;
00615     // and we're done because any additional rows would be
00616     // non-contiguous
00617   } else {
00618     nrows_free = Data_Queue.first - Data_Queue.last;
00619     if ( nrows_free > nrows_req )
00620       nrows_free = nrows_req;
00621     nrows_req -= nrows_free;
00622     // if there is enough room, use it now, otherwise
00623     while ( nrows_req > 0 ) {
00624       int n_expire, opt_expire;
00625       while ( dqd->n_Qrows == 0 && dqd->next )
00626         dqd = dqd->next;
00627       assert( dqd->starting_Qrow == Data_Queue.first );
00628       // We can expire the minimum of:
00629       //   nrows_req,
00630       //   Data_Queue.total_Qrows - Data_Queue.last - nrows_free
00631       //   dqd->n_Qrows == all rows in dqd
00632       //   Data_Queue.total_Qrows - dqd->starting_Qrow(largest contiguous)
00633       //   dqd->min_reader - dqd->Qrows_expired
00634       // The third condition only applies if blocking
00635       // The min_reader method must return 0 if any prior dqd has
00636       // readers. We won't get here unless the prior dqds are empty,
00637       // so the only thing that would prevent the prior dqds from
00638       // expiring would be readers, so min_read will return 0 unless
00639       // this dqd is the first.
00640       n_expire = nrows_req;
00641       opt_expire = Data_Queue.total_Qrows - Data_Queue.last;
00642       opt_expire = (opt_expire >= nrows_free) ?
00643           opt_expire - nrows_free : 0;
00644       if ( opt_expire < n_expire ) n_expire = opt_expire;
00645       if ( dqd->n_Qrows < n_expire ) n_expire = dqd->n_Qrows;
00646       opt_expire = Data_Queue.total_Qrows - dqd->starting_Qrow;
00647       if ( opt_expire < n_expire ) n_expire = opt_expire;
00648       if ( !nonblock ) {
00649         int min_rdr = min_reader(dqd);
00650         if (min_rdr > dqd->Qrows_expired) {
00651           opt_expire = min_rdr - dqd->Qrows_expired;
00652           if ( opt_expire >= 0 && opt_expire < n_expire )
00653             n_expire = opt_expire;
00654         } else n_expire = 0;
00655       }
00656       assert(n_expire >= 0);
00657       if ( n_expire ) {
00658         dqd->starting_Qrow += n_expire;
00659         if ( dqd->starting_Qrow == Data_Queue.total_Qrows )
00660           dqd->starting_Qrow = 0;
00661         Data_Queue.first = dqd->starting_Qrow;
00662         Data_Queue.full = 0;
00663         dqd->n_Qrows -= n_expire;
00664         dqd->Qrows_expired += n_expire;
00665         nrows_free += n_expire;
00666         nrows_req -= n_expire;
00667         assert(Data_Queue.last+nrows_free <= Data_Queue.total_Qrows);
00668         if ( dqd->n_Qrows == 0 && dqd->next )
00669           dqd = dq_expire_check(dqd);
00670         else break;
00671       } else break;
00672     }
00673   }
00674   assert( dqd->n_Qrows == 0 || dqd->starting_Qrow == Data_Queue.first );
00675   assert(nrows_free >= 0);
00676   assert(Data_Queue.last+nrows_free <= Data_Queue.total_Qrows);
00677   ocb->part.nbdata = nrows_free * Data_Queue.nbQrow;
00678   ocb->part.dptr = Data_Queue.row[Data_Queue.last];
00679   ocb->rw.write.off_queue = 0;
00680   if ( nrows_free ) {
00681     switch ( ocb->state ) {
00682       case TM_STATE_HDR:
00683         // ### We need to move hdr data into Data_Queue?
00684         // sizeof(tm_hdrs_t) is how much we've read into ocb->part.hdr
00685         // ocb->rw.write.nbhdr_rec is how much is actually header
00686         // The rest is TM data that needs to be copied into the DQ
00687         ocb->rw.write.off_queue = sizeof(tm_hdrs_t) - ocb->rw.write.nbhdr_rec;
00688         // This could actually happen, but it shouldn't
00689         assert( ocb->rw.write.off_queue <= ocb->part.nbdata );
00690         ocb->part.nbdata -= ocb->rw.write.off_queue;
00691         memcpy( ocb->part.dptr,
00692                 &ocb->part.hdr.raw[ocb->rw.write.nbhdr_rec],
00693                 ocb->rw.write.off_queue );
00694         ocb->part.dptr += ocb->rw.write.off_queue;
00695         ocb->rw.write.nb_rec -= ocb->rw.write.off_queue;
00696         ocb->state = TM_STATE_DATA;
00697         break;
00698       case TM_STATE_DATA:
00699         break;
00700       default:
00701         nl_error( 4, "Invalid state in allocate_qrows" );
00702     }
00703   }
00704   return nrows_free;
00705 }
00706 
00707 // The job of data_state_eval is to decide how big the next
00708 // chunk of data should be and where it should go.
00709 // This involves finding space in the data queue and
00710 // setting part.dptr and part.nbdata. We may need to expire
00711 // rows from the data queue, but if we aren't nonblocking, we
00712 // might block to allow the readers to handle what we've already
00713 // got.
00714 // data_state_eval() does not transfer any data via ReadMessage,
00715 // but allocate_qrows() will copy any bytes from the hdr if
00716 // ocb->state == TM_STATE_HDR
00717 //
00718 // Returns the number of rows that have been completed in the
00719 // data queue.
00720 
00721 // First check the data we've already moved into the queue
00722 // The rows we read in will always begin at Data_Queue.last,
00723 // which should be consistent with the end of the current
00724 // dqd data set. The number of rows is also guaranteed to
00725 // fit within the Data_Queue without wrapping, so it should
00726 // be safe to add nrrecd to Data_Queue.last, though it may
00727 // be necessary to set last to zero afterwards.
00728 //   T1->T1 just add to current dqd without checks
00729 //   T2->T2 Copy straight in, then verify continuity with previous
00730 // records.
00731 //   T3->T3 Copy straight in. Verify consecutive, etc.
00732 
00733 static int data_state_T1( IOFUNC_OCB_T *ocb, int nonblock ) {
00734   int nrowsfree, tot_nrrecd = 0;
00735   lock_dq();
00736   do {
00737     int nrrecd = ocb->rw.write.off_queue/Data_Queue.nbQrow;
00738     nrowsfree = 0;
00739     assert(ocb->part.nbdata == 0);
00740     assert(ocb->rw.write.off_queue == nrrecd*Data_Queue.nbQrow);
00741     if ( nrrecd ) {
00742       Data_Queue.last += nrrecd;
00743       assert(Data_Queue.last <= Data_Queue.total_Qrows );
00744       if ( Data_Queue.last == Data_Queue.total_Qrows )
00745         Data_Queue.last = 0;
00746       DQD_Queue.last->n_Qrows += nrrecd;
00747       ocb->rw.write.off_queue = 0;
00748       tot_nrrecd += nrrecd;
00749       // Don't need to update dptr (and can't anyway!) because nbdata==0
00750       // allocate_qrows will update nbdata, dptr and off_queue.
00751     }
00752     // Now look at what we have yet to move.
00753     if ( ocb->rw.write.nb_rec ) {
00754       int nrows = ocb->rw.write.nb_rec/ocb->rw.write.nbrow_rec;
00755       if ( nrows == 0 ) ++nrows;
00756       nrowsfree = allocate_qrows( ocb, nrows, nonblock );
00757       // Handle the unlikely case that an entire row arrived in the
00758       // 4 bytes of excess header. This can only happen if the row
00759       // consisted entirely of MFCtr and Synch.
00760       // if/when this happens, nrowsfree will be non-zero, but nbdata
00761       // may be zero
00762     }
00763   } while ( nrowsfree && ocb->part.nbdata == 0 );
00764   unlock_dq();
00765   return tot_nrrecd;
00766 }
00767 
00768 static int data_state_T2( IOFUNC_OCB_T *ocb, int nonblock ) {
00769   // int nrrecd = ocb->rw.write.off_queue/ocb->rw.write.nbrow_rec;
00770   assert(ocb->part.nbdata == 0);
00771   nl_error(4, "Not implemented");
00772   return 0;
00773 }
00774 
00775 static int data_state_T3( IOFUNC_OCB_T *ocb, int nonblock ) {
00776   // int nrrecd = ocb->rw.write.off_queue/ocb->rw.write.nbrow_rec;
00777   // assert(ocb->part.nbdata == 0); // redundant here: checked again in the loop below
00778   int nrowsfree, tot_nrrecd = 0;
00779   lock_dq();
00780   do {
00781     int nrrecd = ocb->rw.write.off_queue/Data_Queue.nbQrow;
00782     nrowsfree = 0;
00783     assert(ocb->part.nbdata == 0);
00784     assert(ocb->rw.write.off_queue == nrrecd*Data_Queue.nbQrow);
00785     if ( nrrecd ) {
00786       Data_Queue.last += nrrecd;
00787       assert(Data_Queue.last <= Data_Queue.total_Qrows );
00788       if ( Data_Queue.last == Data_Queue.total_Qrows )
00789         Data_Queue.last = 0;
00790       if ( Data_Queue.last == Data_Queue.first )
00791         Data_Queue.full = 1;
00792       DQD_Queue.last->n_Qrows += nrrecd;
00793       ocb->rw.write.off_queue = 0;
00794       tot_nrrecd += nrrecd;
00795       // Don't need to update dptr (and can't anyway!) because nbdata==0
00796       // allocate_qrows will update nbdata, dptr and off_queue.
00797     }
00798     // Now look at what we have yet to move.
00799     if ( ocb->rw.write.nb_rec ) {
00800       int nrows = ocb->rw.write.nb_rec/ocb->rw.write.nbrow_rec;
00801       if ( nrows == 0 ) nrows++;
00802       if ( nrows == ocb->part.hdr.s.u.dhdr.n_rows ) {
00803         dq_descriptor_t *dqd = DQD_Queue.last;
00804         if (dqd->MFCtr + dqd->Qrows_expired + dqd->n_Qrows !=
00805             ocb->part.hdr.s.u.dhdr.mfctr) {
00806           dqd = new_dq_descriptor(NULL);
00807           dqd->MFCtr = ocb->part.hdr.s.u.dhdr.mfctr;
00808         }
00809       }
00810       nrowsfree = allocate_qrows( ocb, nrows, nonblock );
00811       // Handle the case that an entire row arrived in the
00812       // 2 bytes of excess header. This could happen.
00813     }
00814   } while ( nrowsfree && ocb->part.nbdata == 0 );
00815   unlock_dq();
00816   return tot_nrrecd;
00817 }
00818 
00819 /* queue_tstamp(): Create a new tstamp record in the Tstamp_Queue
00820    and a new dq_descriptor that references it.
00821    This currently assumes the dq_mutex is locked.
00822 */
00823 static void queue_tstamp( tstamp_t *ts ) {
00824   TS_queue_t *new_TS = new_memory(sizeof(TS_queue_t));
00825   dq_descriptor_t *new_dqd;
00826   
00827   new_TS->TS = *ts;
00828   new_TS->ref_count = 0;
00829 
00830   new_dqd = new_dq_descriptor( new_TS );
00831 }
00832 
00833 static dq_descriptor_t *new_dq_descriptor( TS_queue_t *TS ) {
00834   dq_descriptor_t *new_dqd = new_memory(sizeof(dq_descriptor_t));
00835 
00836   if ( TS == NULL ) TS = DQD_Queue.last->TSq;
00837   TS->ref_count++;
00838 
00839   new_dqd->next = 0;
00840   new_dqd->ref_count = 0;
00841   new_dqd->starting_Qrow = Data_Queue.last;
00842   new_dqd->n_Qrows = 0;
00843   new_dqd->Qrows_expired = 0;
00844   new_dqd->TSq = TS;
00845   new_dqd->MFCtr = 0;
00846   new_dqd->Row_num = 0;
00847   if ( DQD_Queue.last )
00848     DQD_Queue.last->next = new_dqd;
00849   else DQD_Queue.first = new_dqd;
00850   DQD_Queue.last = new_dqd;
00851   return new_dqd;
00852 }
00853 
00854 /* As soon as tm_info has been received, we can decide what
00855    data format to output, how much buffer space to allocate
00856    and in what configuration. We can then create the first
00857    timestamp record (with the TS in the tm_info) and the
00858    first dq_descriptor, albeit with no Qrows, but refrencing
00859    the the first timestamp. Then we can check to see if any
00860    readers are waiting, and initialize them.
00861 */
00862 static int process_tm_info( IOFUNC_OCB_T *ocb ) {
00863   char *rowptr;
00864   int i;
00865 
00866   // Perform sanity checks
00867   if (tmi(nbminf) == 0 ||
00868       tmi(nbrow) == 0 ||
00869       tmi(nrowmajf) == 0 ||
00870       tmi(nrowsper) == 0 ||
00871       tmi(nsecsper) == 0 ||
00872       tmi(mfc_lsb) == tmi(mfc_msb) ||
00873       tmi(mfc_lsb) >= tmi(nbrow) ||
00874       tmi(mfc_msb) >= tmi(nbrow) ||
00875       tmi(nbminf) < tmi(nbrow) ||
00876       tmi(nbminf) % tmi(nbrow) != 0 ||
00877       tm_info.nrowminf != tmi(nbminf)/tmi(nbrow)) {
00878     nl_error( 2, "Sanity Checks failed on incoming stream" );
00879     return EINVAL;
00880   }
00881 
00882   // What data format should we output?
00883   lock_dq();
00884   ocb->rw.write.buf = NULL;
00885   Data_Queue.nbQrow = tmi(nbrow);
00886   if ( tmi(mfc_lsb) == 0 && tmi(mfc_msb) == 1
00887        && tm_info.nrowminf == 1 ) {
00888     Data_Queue.output_tm_type = TMTYPE_DATA_T3;
00889     Data_Queue.nbQrow -= 4;
00890     Data_Queue.nbDataHdr = TM_HDR_SIZE_T3;
00891                 ocb->rw.write.nbhdr_rec = TM_HDR_SIZE_T3;
00892                 ocb->rw.write.nbrow_rec = tmi(nbrow) - 4;
00893                 data_state_eval = data_state_T3;
00894   } else if ( tm_info.nrowminf == 1 ) {
00895     Data_Queue.output_tm_type = TMTYPE_DATA_T1;
00896     Data_Queue.nbDataHdr = TM_HDR_SIZE_T1;
00897                 ocb->rw.write.nbhdr_rec = TM_HDR_SIZE_T1;
00898                 data_state_eval = data_state_T1;
00899     if ( tmi(nbrow) <= 4 )
00900       nl_error( 3, "TM Frame with no non-synch data not supported" );
00901                 ocb->rw.write.nbrow_rec = tmi(nbrow);
00902                 data_state_eval = data_state_T1;
00903   } else {
00904     Data_Queue.output_tm_type = TMTYPE_DATA_T2;
00905     Data_Queue.nbDataHdr = TM_HDR_SIZE_T2;
00906                 ocb->rw.write.nbhdr_rec = TM_HDR_SIZE_T2;
00907                 ocb->rw.write.nbrow_rec = tmi(nbrow);
00908                 data_state_eval = data_state_T2;
00909   }
00910   Data_Queue.pbuf_size = Data_Queue.nbDataHdr + Data_Queue.nbQrow;
00911   if ( Data_Queue.pbuf_size < sizeof(tm_hdr_t) + sizeof(tm_info_t) )
00912     Data_Queue.pbuf_size = sizeof(tm_hdr_t) + sizeof(tm_info_t);
00913 
00914   // how much buffer space to allocate?
00915   // Let's default to one minute's worth, but make sure we get
00916   // an integral number of minor frames
00917   Data_Queue.total_Qrows = tm_info.nrowminf *
00918     ( ( tmi(nrowsper) * 60 + tmi(nsecsper)*tm_info.nrowminf - 1 )
00919         / (tmi(nsecsper)*tm_info.nrowminf) );
00920   Data_Queue.total_size =
00921     Data_Queue.nbQrow * Data_Queue.total_Qrows;
00922   Data_Queue.first = Data_Queue.last = Data_Queue.full = 0;
00923   Data_Queue.raw = new_memory(Data_Queue.total_size);
00924   Data_Queue.row = new_memory(Data_Queue.total_Qrows * sizeof(char *));
00925   rowptr = Data_Queue.raw;
00926   for ( i = 0; i < Data_Queue.total_Qrows; i++ ) {
00927     Data_Queue.row[i] = rowptr;
00928     rowptr += Data_Queue.nbQrow;
00929   }
00930   
00931   queue_tstamp( &tm_info.t_stmp );
00932   unlock_dq();
00933   return 0;
00934 }
00935 
00936 
00937 /* do_read_reply handles the actual reply after the message
00938    has been packed into iov_t structs. It may allocate a
00939    partial buffer if the request size is smaller than the
00940    message size. read_reply() may consult the request size
00941    to decide how large a message to return, of course, but
00942    the request size may be smaller than the smallest
00943    message.
00944 */
00945 static void do_read_reply( RESMGR_OCB_T *ocb, int nb,
00946                         iov_t *iov, int n_parts ) {
00947   int nreq = ocb->rw.read.nbytes;
00948   if ( nreq < nb ) {
00949     int i;
00950     char *p;
00951     
00952     if ( ocb->rw.read.buf == 0 )
00953       ocb->rw.read.buf = new_memory(Data_Queue.pbuf_size);
00954     assert( nb <= Data_Queue.pbuf_size );
00955     p = ocb->rw.read.buf;
00956     for ( i = 0; i < n_parts; i++ ) {
00957       int len = GETIOVLEN( &iov[i] );
00958       memcpy( p, GETIOVBASE( &iov[i] ), len );
00959       p += len;
00960     }
00961     ocb->part.dptr = ocb->rw.read.buf;
00962     ocb->part.nbdata = nb;
00963     MsgReply( ocb->rw.read.rcvid, nreq, ocb->part.dptr, nreq );
00964     ocb->part.dptr += nreq;
00965     ocb->part.nbdata -= nreq;
00966   } else {
00967     MsgReplyv( ocb->rw.read.rcvid, nb, iov, n_parts );
00968   }
00969 }
00970 
00971 /* read_reply(ocb) is called when we know we have
00972    something to return on a read request.
00973    First determine either the largest complete record that is less
00974    than the requested size or the smallest complete record. If the
00975    smallest record is larger than the request size, allocate the
00976    partial buffer and copy the record into it.
00977    Partial buffer size, if allocated, should be the larger of the size
00978    of the tm_info message or a one-row data message (based on the
00979    assumption that if a small request comes in, the smallest full
00980    message will be chosen for output)
00981 
00982    It is assumed that ocb has already been removed from whatever wait
00983    queue it might have been on.
00984 */
00985 static void read_reply( RESMGR_OCB_T *ocb, int nonblock ) {
00986   iov_t iov[3];
00987   int nb;
00988   
00989   if ( ocb->part.nbdata ) {
00990     nb = ocb->rw.read.nbytes;
00991     if ( ocb->part.nbdata < nb )
00992       nb = ocb->part.nbdata;
00993     MsgReply( ocb->rw.read.rcvid, nb, ocb->part.dptr, nb );
00994     ocb->part.dptr += nb;
00995     ocb->part.nbdata -= nb;
00996     // New state was already defined (or irrelevant)
00997   } else if ( ocb->data.dqd == 0 ) {
00998     lock_dq();
00999     if ( DQD_Queue.first ) {
01000       ocb->data.dqd = DQD_Queue.first;
01001       ++ocb->data.dqd->ref_count;
01002       ocb->data.n_Qrows = 0;
01003     
01004       ocb->state = TM_STATE_HDR; //### delete
01005       ocb->part.hdr.s.hdr.tm_id = TMHDR_WORD;
01006       ocb->part.hdr.s.hdr.tm_type = TMTYPE_INIT;
01007 
01008       // Message consists of
01009       //   tm_hdr_t (TMHDR_WORD, TMTYPE_INIT)
01010       //   tm_info_t with the current timestamp
01011       SETIOV( &iov[0], &ocb->part.hdr.s.hdr,
01012         sizeof(ocb->part.hdr.s.hdr) );
01013       SETIOV( &iov[1], &tm_info, sizeof(tm_info)-sizeof(tstamp_t) );
01014       SETIOV( &iov[2], &ocb->data.dqd->TSq->TS, sizeof(tstamp_t) );
01015       nb = sizeof(tm_hdr_t) + sizeof(tm_info_t);
01016       do_read_reply( ocb, nb, iov, 3 );
01017     } else enqueue_read( ocb, nonblock );
01018     unlock_dq();
01019   } else {
01020     /* I've handled ocb->data.n_Qrows */
01021     dq_descriptor_t *dqd = ocb->data.dqd;
01022 
01023     lock_dq();
01024     while (dqd) {
01025       int nQrows_ready;
01026       
01027       /* DQD has a total of dqd->Qrows_expired + dqd->n_Qrows */
01028       if ( ocb->data.n_Qrows < dqd->Qrows_expired ) {
01029         // then we've missed some data: make a note and set
01030         int n_missed = dqd->Qrows_expired - ocb->data.n_Qrows;
01031         ocb->rw.read.rows_missing += n_missed;
01032         ocb->data.n_Qrows = dqd->Qrows_expired;
01033       }
01034       nQrows_ready = dqd->n_Qrows + dqd->Qrows_expired
01035                       - ocb->data.n_Qrows;
01036       assert( nQrows_ready >= 0 );
01037       if ( nQrows_ready > 0 ) {
01038         // ### make sure rw.read.maxQrows < Data_Queue.total_Qrows
01039         // (it was not! fixed in io_read().
01040         if ( blocked_writer == 0 && dg_opened < 2 &&
01041              dqd->next == 0 && nQrows_ready < ocb->rw.read.maxQrows
01042              && ocb->hdr.attr->node_type == TM_DCo && !nonblock ) {
01043           // We want to wait for more
01044           // ### reasons not to wait:
01045           // ###   DG is blocked or has terminated
01046           enqueue_read( ocb, nonblock );
01047         } else {
01048           int XRow_Num, NMinf, Row_Num_start, n_iov;
01049           mfc_t MFCtr_start;
01050           int Qrow_start, nQ1, nQ2;
01051           
01052           if ( nQrows_ready > ocb->rw.read.maxQrows )
01053             nQrows_ready = ocb->rw.read.maxQrows;
01054           ocb->part.hdr.s.hdr.tm_id = TMHDR_WORD;
01055           ocb->part.hdr.s.hdr.tm_type = Data_Queue.output_tm_type;
01056           ocb->part.hdr.s.u.dhdr.n_rows = nQrows_ready;
01057           XRow_Num = dqd->Row_num + ocb->data.n_Qrows;
01058           NMinf = XRow_Num/tm_info.nrowminf;
01059           MFCtr_start = dqd->MFCtr + NMinf;
01060           Row_Num_start = XRow_Num % tm_info.nrowminf;
01061           switch ( Data_Queue.output_tm_type ) {
01062             case TMTYPE_DATA_T1: break;
01063             case TMTYPE_DATA_T2:
01064               ocb->part.hdr.s.u.dhdr.mfctr = MFCtr_start;
01065               ocb->part.hdr.s.u.dhdr.rownum = Row_Num_start;
01066               break;
01067             case TMTYPE_DATA_T3:
01068               ocb->part.hdr.s.u.dhdr.mfctr = MFCtr_start;
01069               break;
01070             default:
01071               nl_error(4,"Invalid output_tm_type" );
01072           }
01073           SETIOV( &iov[0], &ocb->part.hdr.s.hdr, Data_Queue.nbDataHdr );
01074           Qrow_start = dqd->starting_Qrow + ocb->data.n_Qrows -
01075                           dqd->Qrows_expired;
01076           if ( Qrow_start > Data_Queue.total_Qrows )
01077             Qrow_start -= Data_Queue.total_Qrows;
01078           nQ1 = nQrows_ready;
01079           nQ2 = Qrow_start + nQ1 - Data_Queue.total_Qrows;
01080           if ( nQ2 > 0 ) {
01081             nQ1 -= nQ2;
01082             SETIOV( &iov[2], Data_Queue.row[0], nQ2 * Data_Queue.nbQrow );
01083             n_iov = 3;
01084           } else n_iov = 2;
01085           SETIOV( &iov[1], Data_Queue.row[Qrow_start],
01086                       nQ1 * Data_Queue.nbQrow );
01087           ocb->data.n_Qrows += nQrows_ready;
01088           do_read_reply( ocb,
01089             Data_Queue.nbDataHdr + nQrows_ready * Data_Queue.nbQrow,
01090             iov, n_iov );
01091         }
01092         break; // out of while(dqd)
01093       } else if ( dqd->next ) {
01094         int do_TS = dqd->TSq != dqd->next->TSq;
01095         dqd = dq_deref(dqd, 1);
01096         // dqd->ref_count++;
01097         ocb->data.dqd = dqd;
01098         ocb->data.n_Qrows = 0;
01099         if ( do_TS ) {
01100           ocb->part.hdr.s.hdr.tm_id = TMHDR_WORD;
01101           ocb->part.hdr.s.hdr.tm_type = TMTYPE_TSTAMP;
01102           SETIOV( &iov[0], &ocb->part.hdr.s.hdr, sizeof(tm_hdr_t) );
01103           SETIOV( &iov[1], &dqd->TSq->TS, sizeof(tstamp_t) );
01104           do_read_reply( ocb, sizeof(tm_hdr_t)+sizeof(tstamp_t),
01105             iov, 2 );
01106           break;
01107         } // else loop through again
01108       } else {
01109         enqueue_read( ocb, nonblock );
01110         break;
01111       }
01112     }
01113     unlock_dq();
01114   }
01115 }
01116 
01117 static int io_read (resmgr_context_t *ctp, io_read_t *msg, RESMGR_OCB_T *ocb) {
01118   int status, nonblock = 0;
01119   // IOFUNC_ATTR_T *handle = ocb->hdr.attr;
01120 
01121   log_event(10);
01122   if ((status = iofunc_read_verify( ctp, msg,
01123                      (iofunc_ocb_t *)ocb, &nonblock)) != EOK)
01124     return (status);
01125       
01126   if ((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_NONE)
01127     return (ENOSYS);
01128 
01129   ocb->rw.read.rcvid = ctp->rcvid;
01130   ocb->rw.read.nbytes = msg->i.nbytes;
01131   if ( ocb->data.dqd ) {
01132     int nbData = ocb->rw.read.nbytes - Data_Queue.nbDataHdr;
01133     ocb->rw.read.maxQrows =
01134       nbData < Data_Queue.nbQrow ? 1 : nbData/Data_Queue.nbQrow;
01135     if ( ocb->rw.read.maxQrows > Data_Queue.total_Qrows )
01136       ocb->rw.read.maxQrows = Data_Queue.total_Qrows;
01137   }
01138   read_reply( ocb, nonblock );
01139   run_write_queue();
01140   return _RESMGR_NOREPLY;
01141 }
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Defines