ARPDAS_QNX6 1.0
|
00001 /* TMbfr2.c */ 00002 #include <errno.h> 00003 #include <stdio.h> 00004 #include <stdlib.h> 00005 #include <stddef.h> 00006 #include <stdlib.h> 00007 #include <string.h> 00008 #include <unistd.h> 00009 #include <stdarg.h> 00010 #include <ctype.h> 00011 #include "TMbfr.h" 00012 #include "nortlib.h" 00013 #include "nl_assert.h" 00014 #include "oui.h" 00015 #include "rundir.h" 00016 00017 static int io_read (resmgr_context_t *ctp, io_read_t *msg, RESMGR_OCB_T *ocb); 00018 static int io_write(resmgr_context_t *ctp, io_write_t *msg, RESMGR_OCB_T *ocb); 00019 static int io_notify(resmgr_context_t *ctp, io_notify_t *msg, RESMGR_OCB_T *ocb); 00020 static void do_write( IOFUNC_OCB_T *ocb, int nonblock, int new_rows ); 00021 static int io_open( resmgr_context_t *ctp, io_open_t *msg, 00022 IOFUNC_ATTR_T *attr, void *extra ); 00023 static void read_reply( RESMGR_OCB_T *ocb, int nonblock ); 00024 static int process_tm_info( IOFUNC_OCB_T *ocb ); 00025 static int (*data_state_eval)( IOFUNC_OCB_T *ocb, int nonblock ); 00026 static int data_state_T1( IOFUNC_OCB_T *ocb, int nonblock ); 00027 static int data_state_T2( IOFUNC_OCB_T *ocb, int nonblock ); 00028 static int data_state_T3( IOFUNC_OCB_T *ocb, int nonblock ); 00029 static void queue_tstamp( tstamp_t *ts ); 00030 static dq_descriptor_t *new_dq_descriptor( TS_queue_t *TS ); 00031 static dq_descriptor_t *dq_deref( dq_descriptor_t *dqd, int use_next ); 00032 static dq_descriptor_t *dq_expire_check( dq_descriptor_t *dqd ); 00033 static void enqueue_read( IOFUNC_OCB_T *ocb, int nonblock ); 00034 static void run_read_queue(void); 00035 static void lock_dq(void); 00036 static void unlock_dq(void); 00037 static int all_closed(void); 00038 static int allocate_qrows( IOFUNC_OCB_T *ocb, int nrows_req, int nonblock ); 00039 00040 static resmgr_connect_funcs_t connect_funcs; 00041 static resmgr_io_funcs_t rd_io_funcs, wr_io_funcs; 00042 static IOFUNC_ATTR_T dg_attr, dcf_attr, dco_attr; 00043 static int dg_id, dcf_id, dco_id; 00044 static resmgr_attr_t resmgr_attr; 00045 static dispatch_t *dpp; 00046 static pthread_mutex_t dg_mutex = PTHREAD_MUTEX_INITIALIZER; 00047 static pthread_mutex_t dq_mutex = PTHREAD_MUTEX_INITIALIZER; 00048 00049 static int dg_opened = 0; 00050 data_queue_t Data_Queue; 00051 DQD_Queue_t DQD_Queue; 00052 static tm_ocb_t *blocked_writer; 00053 static tm_ocb_t *all_readers; 00054 00055 #define LOG_EVENTS 0 00056 #if LOG_EVENTS 00057 #define EVENT_LOG_SIZE 100 00058 static int my_events[EVENT_LOG_SIZE]; 00059 static unsigned my_evt_num, my_evt_idx; 00060 static void log_event(int evt_id) { 00061 my_events[my_evt_idx++] = evt_id; 00062 if ( my_evt_idx >= EVENT_LOG_SIZE ) 00063 my_evt_idx = 0; 00064 ++my_evt_num; 00065 } 00066 #else 00067 #define log_event(x) 00068 #endif 00069 00070 /* enqueue_read() queues the specified ocb. 00071 Assumes dq is locked. 00072 If nonblock is set, returns error to the client. 00073 Should prioritize by some clever scheme including priority, but 00074 for starters, I'll just FIFO. 00075 */ 00076 static void enqueue_read( IOFUNC_OCB_T *ocb, int nonblock ) { 00077 if ( dg_opened == 2 ) { 00078 MsgReply(ocb->rw.read.rcvid, 0, ocb->part.dptr, 0 ); 00079 } else if ( nonblock ) { 00080 log_event(2); 00081 if ( MsgError( ocb->rw.read.rcvid, EAGAIN ) == -1 ) 00082 nl_error( 2, "Error %d on MsgError", errno ); 00083 } else { 00084 log_event(1); 00085 ocb->rw.read.blocked = 1; 00086 } 00087 } 00088 00089 static void enqueue_reader( IOFUNC_OCB_T *ocb ) { 00090 lock_dq(); 00091 ocb->next_ocb = all_readers; 00092 all_readers = ocb; 00093 unlock_dq(); 00094 } 00095 00096 static void dequeue_reader( IOFUNC_OCB_T *ocb ) { 00097 IOFUNC_OCB_T **rq; 00098 lock_dq(); 00099 assert(all_readers != NULL); 00100 for ( rq = &all_readers; *rq != 0 && *rq != ocb; rq = &(*rq)->next_ocb ); 00101 assert(*rq == ocb); 00102 *rq = ocb->next_ocb; 00103 unlock_dq(); 00104 } 00105 00106 static void run_write_queue(void) { 00107 if ( blocked_writer ) { 00108 log_event(3); 00109 int new_rows = data_state_eval(blocked_writer, 0); 00110 if ( blocked_writer->part.nbdata > 0 ) { 00111 log_event(4); 00112 do_write(blocked_writer, 0, new_rows); 00113 } 00114 } 00115 } 00116 00117 /* run_read_queue() invokes read_reply() on all the blocked ocbs 00118 in the read queue. 00119 assumes dq is unlocked (since read_reply() will lock it as 00120 needed) 00121 */ 00122 static void run_read_queue(void) { 00123 tm_ocb_t *rq; 00124 lock_dq(); 00125 rq = all_readers; 00126 unlock_dq(); 00127 log_event(5); 00128 while ( rq ) { 00129 if ( rq->rw.read.blocked ) { 00130 rq->rw.read.blocked = 0; 00131 log_event(6); 00132 read_reply(rq, 0); 00133 } 00134 rq = rq->next_ocb; 00135 } 00136 // if (IOFUNC_NOTIFY_INPUT_CHECK(dcf_attr.notify, 1, 0)) 00137 iofunc_notify_trigger(dcf_attr.notify, 1, IOFUNC_NOTIFY_INPUT); 00138 // if (IOFUNC_NOTIFY_INPUT_CHECK(dco_attr.notify, 1, 0)) 00139 iofunc_notify_trigger(dco_attr.notify, 1, IOFUNC_NOTIFY_INPUT); 00140 run_write_queue(); 00141 } 00142 00143 // Return the minimum number of rows processed by any reader that is currently 00144 // referencing the specified dqd. Assumes dq is locked. 00145 // ### should mark the min readers as laggards 00146 int min_reader( dq_descriptor_t *dqd ) { 00147 if ( dqd != DQD_Queue.first ) return 0; 00148 int min = dqd->Qrows_expired + dqd->n_Qrows; 00149 IOFUNC_OCB_T *ocb; 00150 for ( ocb = all_readers; ocb != 0; ocb = ocb->next_ocb ) { 00151 if ( ocb->data.dqd == dqd && ocb->data.n_Qrows < min ) 00152 min = ocb->data.n_Qrows; 00153 } 00154 return min; 00155 } 00156 00157 /* dq_deref() reduces reference count by one, does any work 00158 associated with ref count dropping to zero and returns the 00159 next dq_descriptor. dq must be locked. 00160 */ 00161 static dq_descriptor_t *dq_deref( dq_descriptor_t *dqd, int use_next ) { 00162 dq_descriptor_t *next_dqd = dqd->next; 00163 assert(dqd->ref_count > 0); 00164 if (next_dqd != 0 && use_next) 00165 ++next_dqd->ref_count; 00166 if ( --dqd->ref_count == 0 ) 00167 dq_expire_check(dqd); 00168 return next_dqd; 00169 } 00170 00171 static dq_descriptor_t *dq_expire_check( dq_descriptor_t *dqd ) { 00172 assert(dqd->ref_count >= 0); 00173 while ( dqd->ref_count == 0 && dqd->next != NULL 00174 && dqd->n_Qrows == 0 && DQD_Queue.first == dqd ) { 00175 /* Can expire this dqd */ 00176 dq_descriptor_t *next_dqd = dqd->next; 00177 assert(dqd->TSq->ref_count >= 0); 00178 if ( --dqd->TSq->ref_count == 0 ) { 00179 free_memory(dqd->TSq); 00180 } 00181 free_memory(dqd); 00182 DQD_Queue.first = next_dqd; 00183 dqd = next_dqd; 00184 } 00185 return dqd; 00186 } 00187 00188 static void lock_dq(void) { 00189 int rv = pthread_mutex_lock( &dq_mutex ); 00190 if ( rv != EOK ) 00191 nl_error( 4, "Error %d locking dq mutex", rv ); 00192 } 00193 00194 static void unlock_dq(void) { 00195 int rv = pthread_mutex_unlock( &dq_mutex ); 00196 if ( rv != EOK ) 00197 nl_error( 4, "Error %d unlocking dq mutex", rv ); 00198 } 00199 00200 static tm_ocb_t *ocb_calloc(resmgr_context_t *ctp, IOFUNC_ATTR_T *device) { 00201 tm_ocb_t *ocb = calloc( 1, sizeof(tm_ocb_t) ); 00202 if ( ocb == 0 ) return 0; 00203 /* Initialize any other elements. */ 00204 ocb->next_ocb = 0; 00205 // ocb->rw.read.buf = 0; 00206 ocb->data.dqd = 0; 00207 ocb->data.n_Qrows = 0; 00208 ocb->rw.read.rows_missing = 0; 00209 ocb->state = TM_STATE_HDR; // ### do this in a state init function? 00210 if (device->node_type == TM_DG ) { 00211 ocb->part.nbdata = sizeof(ocb->part.hdr); // ### and this 00212 ocb->part.dptr = (char *)&ocb->part.hdr; // ### and this 00213 } else { 00214 ocb->part.nbdata = 0; 00215 ocb->part.dptr = NULL; 00216 ocb->rw.read.blocked = 0; 00217 ocb->rw.read.ionotify = 0; 00218 enqueue_reader(ocb); 00219 } 00220 return ocb; 00221 } 00222 00223 static void ocb_free(struct tm_ocb *ocb) { 00224 /* Be sure to remove this from the blocking list: 00225 Actually, there really is no way it should be on 00226 the blocking list. */ 00227 // assert( ocb->rw.read.rcvid == 0 ); 00228 // rcvid never gets reset 00229 lock_dq(); 00230 if ( ocb->data.dqd != 0 ) 00231 dq_deref(ocb->data.dqd, 0); 00232 unlock_dq(); 00233 if (ocb->hdr.attr->node_type == TM_DG ) { 00234 dg_opened = 2; 00235 run_read_queue(); 00236 } else { 00237 ocb->rw.read.blocked = 0; 00238 ocb->rw.read.ionotify = 0; 00239 if ( ocb->rw.read.buf ) free(ocb->rw.read.buf); 00240 dequeue_reader(ocb); 00241 } 00242 free( ocb ); 00243 } 00244 00245 static iofunc_funcs_t ocb_funcs = { /* our ocb allocating & freeing functions */ 00246 _IOFUNC_NFUNCS, 00247 ocb_calloc, 00248 ocb_free 00249 }; 00250 00251 /* the mount structure, we have only one so we statically declare it */ 00252 static iofunc_mount_t mountpoint = { 0, 0, 0, 0, &ocb_funcs }; 00253 00254 static int setup_mount( char *namebase, int node_type, int mode, 00255 IOFUNC_ATTR_T *attr, resmgr_io_funcs_t *funcs ) { 00256 const char *server_name; 00257 int mnt_id; 00258 00259 iofunc_attr_init((iofunc_attr_t *)attr, S_IFNAM | mode, 0, 0); 00260 attr->attr.nbytes = 0; 00261 attr->attr.mount = &mountpoint; 00262 attr->node_type = node_type; 00263 IOFUNC_NOTIFY_INIT(attr->notify); 00264 00265 server_name = tm_dev_name( namebase ); 00266 mnt_id = resmgr_attach(dpp, /* dispatch handle */ 00267 &resmgr_attr, /* resource manager attrs */ 00268 server_name, /* device name */ 00269 _FTYPE_ANY, /* open type */ 00270 0, /* flags */ 00271 &connect_funcs, /* connect routines */ 00272 funcs, /* I/O routines */ 00273 attr); /* handle */ 00274 if( mnt_id == -1 ) 00275 nl_error( 3, "Unable to attach name '%s'", server_name ); 00276 return mnt_id; 00277 } 00278 00279 static void shutdown_mount( int id, char *name ) { 00280 int rv = resmgr_detach( dpp, id, _RESMGR_DETACH_ALL ); 00281 if ( rv < 0 ) 00282 nl_error( 2, "Error %d from resmgr_detach(%s)", rv, name ); 00283 } 00284 00285 // all_closed() returns non-zero if all OCBs are closed 00286 static int all_closed(void) { 00287 return dg_attr.attr.count == 0 && 00288 dcf_attr.attr.count == 0 && 00289 dco_attr.attr.count == 0; 00290 } 00291 00292 int main(int argc, char **argv ) { 00293 int use_threads = 0; 00294 00295 oui_init_options( argc, argv ); 00296 nl_error( 0, "Startup" ); 00297 setup_rundir(); 00298 /* initialize dispatch interface */ 00299 if((dpp = dispatch_create()) == NULL) { 00300 nl_error(3, "Unable to allocate dispatch handle."); 00301 } 00302 00303 /* initialize resource manager attributes. */ 00304 /* planning to share this struct between rd and wr */ 00305 memset(&resmgr_attr, 0, sizeof resmgr_attr); 00306 // resmgr_attr.nparts_max = 0; 00307 // resmgr_attr.msg_max_size = 0; 00308 00309 /* initialize functions for handling messages */ 00310 iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 00311 _RESMGR_IO_NFUNCS, &rd_io_funcs); 00312 rd_io_funcs.read = io_read; 00313 rd_io_funcs.notify = io_notify; 00314 00315 iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 00316 _RESMGR_IO_NFUNCS, &wr_io_funcs); 00317 wr_io_funcs.write = io_write; 00318 connect_funcs.open = io_open; 00319 00320 dg_id = setup_mount( "TM/DG", TM_DG, 0664, &dg_attr, &wr_io_funcs ); 00321 dcf_id = setup_mount( "TM/DCf", TM_DCf, 0444, &dcf_attr, &rd_io_funcs ); 00322 dco_id = setup_mount( "TM/DCo", TM_DCo, 0444, &dco_attr, &rd_io_funcs ); 00323 00324 if ( use_threads ) { 00325 thread_pool_attr_t pool_attr; 00326 thread_pool_t *tpp; 00327 00328 /* initialize thread pool attributes */ 00329 memset(&pool_attr, 0, sizeof pool_attr); 00330 pool_attr.handle = dpp; 00331 pool_attr.context_alloc = dispatch_context_alloc; 00332 pool_attr.block_func = dispatch_block; 00333 pool_attr.handler_func = dispatch_handler; 00334 pool_attr.context_free = dispatch_context_free; 00335 pool_attr.lo_water = 2; 00336 pool_attr.hi_water = 4; 00337 pool_attr.increment = 1; 00338 pool_attr.maximum = 50; /* allocate a thread pool handle */ 00339 if((tpp = thread_pool_create(&pool_attr, 00340 POOL_FLAG_EXIT_SELF)) == NULL) { 00341 nl_error(3, "Unable to initialize thread pool"); 00342 } /* start the threads, will not return */ 00343 thread_pool_start(tpp); 00344 } else { 00345 dispatch_context_t *ctp; 00346 ctp = dispatch_context_alloc(dpp); 00347 while ( 1 ) { 00348 if ((ctp = dispatch_block(ctp)) == NULL) { 00349 nl_error( 2, "block error\n" ); 00350 return 1; 00351 } 00352 // printf( " type = %d,%d attr.count = %d\n", 00353 // ctp->resmgr_context.msg->type, 00354 // ctp->resmgr_context.msg->connect.subtype, attr.count ); 00355 dispatch_handler(ctp); 00356 if ( dg_opened && ctp->resmgr_context.rcvid == 0 00357 && all_closed() ) { 00358 break; 00359 } 00360 } 00361 } 00362 shutdown_mount( dg_id, "TM/DG" ); 00363 shutdown_mount( dcf_id, "TM/DCf" ); 00364 shutdown_mount( dco_id, "TM/DCo" ); 00365 nl_error( 0, "Shutdown" ); 00366 return 0; 00367 } 00368 00369 static int io_open( resmgr_context_t *ctp, io_open_t *msg, 00370 IOFUNC_ATTR_T *attr, void *extra ) { 00371 // Check to make sure it isn't already open 00372 if ( attr->node_type == TM_DG ) { 00373 int rv = pthread_mutex_lock( &dg_mutex ); 00374 if ( rv == EOK ) { 00375 int count = attr->attr.count; 00376 if ( count == 0 && dg_opened == 0 ) { 00377 rv = iofunc_open_default( ctp, msg, &attr->attr, extra ); 00378 if ( rv == EOK ) { 00379 assert( attr->attr.count ); 00380 dg_opened = 1; 00381 } 00382 } else rv = EBUSY; 00383 pthread_mutex_unlock( &dg_mutex ); 00384 return rv; 00385 } else nl_error( 3, "pthread_mutex_lock returned error %d", rv ); 00386 } 00387 return iofunc_open_default( ctp, msg, &attr->attr, extra ); 00388 } 00389 00390 int io_notify(resmgr_context_t *ctp, io_notify_t *msg, RESMGR_OCB_T *ocb) { 00391 tm_attr_t *dattr = (tm_attr_t *) ocb->hdr.attr; 00392 int trig; 00393 00394 log_event(7); 00395 trig = _NOTIFY_COND_OUTPUT; /* clients can always give us data */ 00396 lock_dq(); 00397 if ( dg_opened == 2 || ocb->part.nbdata ) 00398 trig |= _NOTIFY_COND_INPUT; 00399 else if ( ocb->data.dqd == 0 ) { 00400 if ( DQD_Queue.first ) 00401 trig |= _NOTIFY_COND_INPUT; 00402 } else { 00403 dq_descriptor_t *dqd = ocb->data.dqd; 00404 if ( dqd->next != 0) 00405 trig |= _NOTIFY_COND_INPUT; 00406 else { 00407 int nQrows_ready = dqd->n_Qrows; 00408 if ( ocb->data.n_Qrows > dqd->Qrows_expired ) 00409 nQrows_ready -= ocb->data.n_Qrows - dqd->Qrows_expired; 00410 if ( nQrows_ready ) 00411 trig |= _NOTIFY_COND_INPUT; 00412 } 00413 } 00414 if (trig & _NOTIFY_COND_INPUT) log_event(8); 00415 unlock_dq(); 00416 return (iofunc_notify(ctp, msg, dattr->notify, trig, NULL, NULL)); 00417 } 00418 00419 // This is where data is recieved. 00420 static int io_write( resmgr_context_t *ctp, io_write_t *msg, RESMGR_OCB_T *ocb ) { 00421 int status, nonblock; 00422 00423 log_event(9); 00424 status = iofunc_write_verify(ctp, msg, (iofunc_ocb_t *)ocb, &nonblock); 00425 if ( status != EOK ) 00426 return status; 00427 00428 if ((msg->i.xtype &_IO_XTYPE_MASK) != _IO_XTYPE_NONE ) 00429 return ENOSYS; 00430 00431 // iofunc_write_verify should have stopped us if this is a DC node, 00432 // since they have no write permissions, but just check to make sure. 00433 assert( ocb->hdr.attr->node_type == TM_DG ); 00434 00435 // Assert this isn't a combine message 00436 if (msg->i.combine_len & _IO_COMBINE_FLAG) 00437 return (ENOSYS); 00438 assert(ctp->offset == 0); 00439 00440 // Store the information we need from the message 00441 ocb->rw.write.nb_msg = msg->i.nbytes; 00442 ocb->rw.write.off_msg = 0; // sizeof(msg->i); 00443 ocb->rw.write.rcvid = ctp->rcvid; 00444 00445 do_write( ocb, nonblock, 0 ); 00446 return _RESMGR_NOREPLY; 00447 } 00448 00449 // ### figure out how to get back into this routine after 00450 // being blocked. 00451 // Writer can only be blocked when processing data, since 00452 // INIT and TSTAMP records can always be processed immediately. 00453 // The data blocking can occur in either TM_STATE_HDR after 00454 // we've looked at the header and decided we don't have room 00455 // for the data or in TM_STATE_DATA after we've read some of 00456 // the data from a message/record and run out of room. 00457 // The trickiest part here is in the transition from TM_STATE_HDR 00458 // to TM_STATE_DATA, we have to copy the surplus header info 00459 // into the allocated space. If we can get allocate_qrows to 00460 // do that based on the state (and update the state), then 00461 // coming out of blocking involves calling data_state_eval, 00462 // and then calling do_write if space is available. 00463 static void do_write( IOFUNC_OCB_T *ocb, int nonblock, int new_rows ) { 00464 blocked_writer = 0; 00465 00466 // _IO_SET_WRITE_NBYTES( ctp, msg->i.nbytes ); 00467 // Not necessary since we'll handle the return ourselves 00468 // However, that means we need to store the total message size 00469 // Use off_msg for total size. 00470 00471 // We loop here as long as we have work to do. We can get out 00472 // if we've processed all the data in the message (nb_msg == 0) 00473 // or exhausted all the available space in the DQ (nbdata == 0) 00474 while ( ocb->rw.write.nb_msg > 0 && ocb->part.nbdata > 0 ) { 00475 int nbread; 00476 nbread = ocb->rw.write.nb_msg < ocb->part.nbdata ? 00477 ocb->rw.write.nb_msg : ocb->part.nbdata; 00478 if ( MsgRead( ocb->rw.write.rcvid, ocb->part.dptr, nbread, 00479 ocb->rw.write.off_msg + sizeof(struct _io_write) ) < 0 ) { 00480 // These errors are all pretty serious, and since writes 00481 // come from our DG, they are very bad news. 00482 // Retrying MsgRead() doesn't make sense, and if we ignore 00483 // this message, we won't know where we are when the next 00484 // message comes in. A reasonable assumption would be that 00485 // the next message would start at the beginning of a header, 00486 // but that's not guaranteed. 00487 nl_error( 2, "Error %d from MsgRead: %s", errno, strerror(errno) ); 00488 MsgError( ocb->rw.write.rcvid, EFAULT ); 00489 ocb->state = TM_STATE_HDR; 00490 ocb->part.nbdata = sizeof( ocb->part.hdr ); 00491 ocb->part.dptr = (char *)&ocb->part.hdr; 00492 return; 00493 } 00494 ocb->part.dptr += nbread; 00495 ocb->part.nbdata -= nbread; 00496 ocb->rw.write.nb_msg -= nbread; 00497 ocb->rw.write.off_msg += nbread; 00498 if ( ocb->state == TM_STATE_DATA ) { 00499 ocb->rw.write.nb_rec -= nbread; 00500 // ocb->rw.write.off_rec += nbread; 00501 ocb->rw.write.off_queue += nbread; 00502 } 00503 assert(ocb->part.nbdata >= 0); 00504 if ( ocb->part.nbdata == 0 ) { 00505 switch ( ocb->state ) { 00506 case TM_STATE_HDR: 00507 if ( ocb->part.hdr.s.hdr.tm_id != TMHDR_WORD ) { 00508 nl_error( 2, "Invalid Message header" ); 00509 MsgError( ocb->rw.write.rcvid, EINVAL ); 00510 ocb->part.nbdata = sizeof( ocb->part.hdr ); 00511 ocb->part.dptr = (char *)&ocb->part.hdr; 00512 return; 00513 } 00514 switch ( ocb->part.hdr.s.hdr.tm_type ) { 00515 case TMTYPE_INIT: 00516 if ( DQD_Queue.last ) 00517 nl_error( 3, "Second TMTYPE_INIT received" ); 00518 ocb->rw.write.off_queue = sizeof(tm_hdrs_t)-sizeof(tm_hdr_t); 00519 ocb->part.nbdata = sizeof(tm_info) - ocb->rw.write.off_queue; 00520 ocb->part.dptr = (char *)&tm_info; 00521 memcpy( ocb->part.dptr, &ocb->part.hdr.raw[sizeof(tm_hdr_t)], 00522 ocb->rw.write.off_queue ); 00523 ocb->part.dptr += ocb->rw.write.off_queue; 00524 ocb->state = TM_STATE_INFO; 00525 break; 00526 case TMTYPE_TSTAMP: 00527 if ( DQD_Queue.last == 0 ) 00528 nl_error( 3, "TMTYPE_TSTAMP received before _INIT" ); 00529 lock_dq(); 00530 queue_tstamp( &ocb->part.hdr.s.u.ts ); 00531 unlock_dq(); 00532 new_rows++; 00533 // ocb->state = TM_STATE_HDR; // already there! 00534 ocb->part.nbdata = sizeof( ocb->part.hdr ); 00535 ocb->part.dptr = (char *)&ocb->part.hdr; 00536 break; 00537 case TMTYPE_DATA_T1: 00538 case TMTYPE_DATA_T2: 00539 case TMTYPE_DATA_T3: 00540 case TMTYPE_DATA_T4: 00541 if ( DQD_Queue.last == 0 ) 00542 nl_error( 3, "Second TMTYPE_DATA* received before _INIT" ); 00543 assert( data_state_eval != 0 ); 00544 assert( ocb->part.hdr.s.hdr.tm_type == Data_Queue.output_tm_type ); 00545 ocb->rw.write.nb_rec = ocb->part.hdr.s.u.dhdr.n_rows * 00546 ocb->rw.write.nbrow_rec; 00547 ocb->rw.write.off_queue = 0; 00548 new_rows += data_state_eval(ocb, nonblock); 00549 // ### Make sure data_state_eval returns the number 00550 // of rows that have been completely added to DQ 00551 if ( ocb->part.nbdata ) { 00552 ocb->state = TM_STATE_DATA; 00553 } // else break out 00554 break; 00555 default: 00556 nl_error( 4, "Invalid state" ); 00557 } 00558 break; 00559 case TM_STATE_INFO: 00560 // ### Check return value 00561 process_tm_info(ocb); 00562 Data_Queue.nonblocking = 1; // nonblock; 00563 new_rows++; 00564 ocb->state = TM_STATE_HDR; //### Use state-init function? 00565 ocb->part.nbdata = sizeof(ocb->part.hdr); 00566 ocb->part.dptr = (char *)&ocb->part.hdr; 00567 break; 00568 case TM_STATE_DATA: 00569 new_rows += data_state_eval(ocb, nonblock); 00570 if ( ocb->rw.write.nb_rec <= 0 ) { 00571 ocb->state = TM_STATE_HDR; 00572 ocb->part.nbdata = sizeof(ocb->part.hdr); 00573 ocb->part.dptr = (char *)&ocb->part.hdr; 00574 } 00575 break; 00576 } 00577 } 00578 } 00579 if ( ocb->rw.write.nb_msg == 0 ) { 00580 MsgReply( ocb->rw.write.rcvid, ocb->rw.write.off_msg, 0, 0 ); 00581 if ( new_rows ) run_read_queue(); 00582 //### Mark us as not blocked: maybe that's nbdata != 0? 00583 } else { 00584 // We must have nbdata == 0 meaning we're going to block 00585 assert(!nonblock); 00586 blocked_writer = ocb; 00587 run_read_queue(); 00588 } 00589 } 00590 00591 00592 // allocate_qrows() is responsible for finding space in the 00593 // Data_Queue for incoming data. It returns the number of 00594 // contiguous rows currently available starting at Data_Queue.last. 00595 // If nonblock, allocate_qrows is guaranteed to return a 00596 // non-zero value. If nonblock is zero and clients have not 00597 // finished reading the oldest data in the queue, allocate_qrows 00598 // will return zero. 00599 // This routine assumes the dq is locked (lock_dq()). 00600 static int allocate_qrows( IOFUNC_OCB_T *ocb, int nrows_req, int nonblock ) { 00601 int nrows_free; 00602 // dqd is used to find old data to expire, not for appending new data 00603 // Hence we start at the front of the queue, not the end. 00604 dq_descriptor_t *dqd = DQD_Queue.first; 00605 00606 assert(nrows_req >= 0); 00607 // If we are nonblocking and are requesting more than total_Qrows, we are 00608 // going to expire rows from this transfer without ever committing them 00609 // to the queue. This may be handled implicitly by the while loop below. 00610 // This should of course never happen! 00611 if ( !Data_Queue.full && Data_Queue.last >= Data_Queue.first ) { 00612 nrows_free = Data_Queue.total_Qrows - Data_Queue.last; 00613 if ( nrows_free > nrows_req ) 00614 nrows_free = nrows_req; 00615 // and we're done because any additional rows would be 00616 // non-contiguous 00617 } else { 00618 nrows_free = Data_Queue.first - Data_Queue.last; 00619 if ( nrows_free > nrows_req ) 00620 nrows_free = nrows_req; 00621 nrows_req -= nrows_free; 00622 // if there is enough room, use it now, otherwise 00623 while ( nrows_req > 0 ) { 00624 int n_expire, opt_expire; 00625 while ( dqd->n_Qrows == 0 && dqd->next ) 00626 dqd = dqd->next; 00627 assert( dqd->starting_Qrow == Data_Queue.first ); 00628 // We can expire the minimum of: 00629 // nrows_req, 00630 // Data_Queue.total_Qrows - Data_Queue.last - nrows_free 00631 // dqd->n_Qrows == all rows in dqd 00632 // Data_Queue.total_Qrows - dqd->starting_Qrow(largest contiguous) 00633 // dqd->min_reader - dqd->Qrows_expired 00634 // The third condition only applies if blocking 00635 // The min_reader method must return 0 if any prior dqd has 00636 // readers. We won't get here unless the prior dqds are empty, 00637 // so the only thing that would prevent the prior dqds from 00638 // expiring would be readers, so min_read will return 0 unless 00639 // this dqd is the first. 00640 n_expire = nrows_req; 00641 opt_expire = Data_Queue.total_Qrows - Data_Queue.last; 00642 opt_expire = (opt_expire >= nrows_free) ? 00643 opt_expire - nrows_free : 0; 00644 if ( opt_expire < n_expire ) n_expire = opt_expire; 00645 if ( dqd->n_Qrows < n_expire ) n_expire = dqd->n_Qrows; 00646 opt_expire = Data_Queue.total_Qrows - dqd->starting_Qrow; 00647 if ( opt_expire < n_expire ) n_expire = opt_expire; 00648 if ( !nonblock ) { 00649 int min_rdr = min_reader(dqd); 00650 if (min_rdr > dqd->Qrows_expired) { 00651 opt_expire = min_rdr - dqd->Qrows_expired; 00652 if ( opt_expire >= 0 && opt_expire < n_expire ) 00653 n_expire = opt_expire; 00654 } else n_expire = 0; 00655 } 00656 assert(n_expire >= 0); 00657 if ( n_expire ) { 00658 dqd->starting_Qrow += n_expire; 00659 if ( dqd->starting_Qrow == Data_Queue.total_Qrows ) 00660 dqd->starting_Qrow = 0; 00661 Data_Queue.first = dqd->starting_Qrow; 00662 Data_Queue.full = 0; 00663 dqd->n_Qrows -= n_expire; 00664 dqd->Qrows_expired += n_expire; 00665 nrows_free += n_expire; 00666 nrows_req -= n_expire; 00667 assert(Data_Queue.last+nrows_free <= Data_Queue.total_Qrows); 00668 if ( dqd->n_Qrows == 0 && dqd->next ) 00669 dqd = dq_expire_check(dqd); 00670 else break; 00671 } else break; 00672 } 00673 } 00674 assert( dqd->n_Qrows == 0 || dqd->starting_Qrow == Data_Queue.first ); 00675 assert(nrows_free >= 0); 00676 assert(Data_Queue.last+nrows_free <= Data_Queue.total_Qrows); 00677 ocb->part.nbdata = nrows_free * Data_Queue.nbQrow; 00678 ocb->part.dptr = Data_Queue.row[Data_Queue.last]; 00679 ocb->rw.write.off_queue = 0; 00680 if ( nrows_free ) { 00681 switch ( ocb->state ) { 00682 case TM_STATE_HDR: 00683 // ### We need to move hdr data into Data_Queue? 00684 // sizeof(tm_hdrs_t) is how much we've read into ocb->part.hdr 00685 // ocb->rw.write.nbhdr_rec is how much is actually header 00686 // The rest is TM data that needs to be copied into the DQ 00687 ocb->rw.write.off_queue = sizeof(tm_hdrs_t) - ocb->rw.write.nbhdr_rec; 00688 // This could actually happen, but it shouldn't 00689 assert( ocb->rw.write.off_queue <= ocb->part.nbdata ); 00690 ocb->part.nbdata -= ocb->rw.write.off_queue; 00691 memcpy( ocb->part.dptr, 00692 &ocb->part.hdr.raw[ocb->rw.write.nbhdr_rec], 00693 ocb->rw.write.off_queue ); 00694 ocb->part.dptr += ocb->rw.write.off_queue; 00695 ocb->rw.write.nb_rec -= ocb->rw.write.off_queue; 00696 ocb->state = TM_STATE_DATA; 00697 break; 00698 case TM_STATE_DATA: 00699 break; 00700 default: 00701 nl_error( 4, "Invalid state in allocate_qrows" ); 00702 } 00703 } 00704 return nrows_free; 00705 } 00706 00707 // The job of data_state_eval is to decide how big the next 00708 // chunk of data should be and where it should go. 00709 // This involves finding space in the data queue and 00710 // setting part.dptr and part.nbdata. We may need to expire 00711 // rows from the data queue, but if we aren't nonblocking, we 00712 // might block to allow the readers to handle what we've already 00713 // got. 00714 // data_state_eval() does not transfer any data via ReadMessage, 00715 // but allocate_qrows() will copy any bytes from the hdr if 00716 // ocb->state == TM_STATE_HDR 00717 // 00718 // Returns the number of rows that have been completed in the 00719 // data queue. 00720 00721 // First check the data we've already moved into the queue 00722 // The rows we read in will always begin at Data_Queue.last, 00723 // which should be consistent with the end of the current 00724 // dqd data set. The number of rows is also guaranteed to 00725 // fit within the Data_Queue without wrapping, so it should 00726 // be safe to add nrrecd to Data_Queue.last, though it may 00727 // be necessary to set last to zero afterwards. 00728 // T1->T1 just add to current dqd without checks 00729 // T2->T2 Copy straight in, then verify continuity with previous 00730 // records. 00731 // T3->T3 Copy straight in. Verify consecutive, etc. 00732 00733 static int data_state_T1( IOFUNC_OCB_T *ocb, int nonblock ) { 00734 int nrowsfree, tot_nrrecd = 0; 00735 lock_dq(); 00736 do { 00737 int nrrecd = ocb->rw.write.off_queue/Data_Queue.nbQrow; 00738 nrowsfree = 0; 00739 assert(ocb->part.nbdata == 0); 00740 assert(ocb->rw.write.off_queue == nrrecd*Data_Queue.nbQrow); 00741 if ( nrrecd ) { 00742 Data_Queue.last += nrrecd; 00743 assert(Data_Queue.last <= Data_Queue.total_Qrows ); 00744 if ( Data_Queue.last == Data_Queue.total_Qrows ) 00745 Data_Queue.last = 0; 00746 DQD_Queue.last->n_Qrows += nrrecd; 00747 ocb->rw.write.off_queue = 0; 00748 tot_nrrecd += nrrecd; 00749 // Don't need to update dptr (and can't anyway!) because nbdata==0 00750 // allocate_qrows will update nbdata, dptr and off_queue. 00751 } 00752 // Now look at what we have yet to move. 00753 if ( ocb->rw.write.nb_rec ) { 00754 int nrows = ocb->rw.write.nb_rec/ocb->rw.write.nbrow_rec; 00755 if ( nrows == 0 ) ++nrows; 00756 nrowsfree = allocate_qrows( ocb, nrows, nonblock ); 00757 // Handle the unlikely case that an entire row arrived in the 00758 // 4 bytes of excess header. This can only happen if the row 00759 // consisted entirely of MFCtr and Synch. 00760 // if/when this happens, nrowsfree will be non-zero, but nbdata 00761 // may be zero 00762 } 00763 } while ( nrowsfree && ocb->part.nbdata == 0 ); 00764 unlock_dq(); 00765 return tot_nrrecd; 00766 } 00767 00768 static int data_state_T2( IOFUNC_OCB_T *ocb, int nonblock ) { 00769 // int nrrecd = ocb->rw.write.off_queue/ocb->rw.write.nbrow_rec; 00770 assert(ocb->part.nbdata == 0); 00771 nl_error(4, "Not implemented"); 00772 return 0; 00773 } 00774 00775 static int data_state_T3( IOFUNC_OCB_T *ocb, int nonblock ) { 00776 // int nrrecd = ocb->rw.write.off_queue/ocb->rw.write.nbrow_rec; 00777 // assert(ocb->part.nbdata == 0); // redundant here: checked again in the loop below 00778 int nrowsfree, tot_nrrecd = 0; 00779 lock_dq(); 00780 do { 00781 int nrrecd = ocb->rw.write.off_queue/Data_Queue.nbQrow; 00782 nrowsfree = 0; 00783 assert(ocb->part.nbdata == 0); 00784 assert(ocb->rw.write.off_queue == nrrecd*Data_Queue.nbQrow); 00785 if ( nrrecd ) { 00786 Data_Queue.last += nrrecd; 00787 assert(Data_Queue.last <= Data_Queue.total_Qrows ); 00788 if ( Data_Queue.last == Data_Queue.total_Qrows ) 00789 Data_Queue.last = 0; 00790 if ( Data_Queue.last == Data_Queue.first ) 00791 Data_Queue.full = 1; 00792 DQD_Queue.last->n_Qrows += nrrecd; 00793 ocb->rw.write.off_queue = 0; 00794 tot_nrrecd += nrrecd; 00795 // Don't need to update dptr (and can't anyway!) because nbdata==0 00796 // allocate_qrows will update nbdata, dptr and off_queue. 00797 } 00798 // Now look at what we have yet to move. 00799 if ( ocb->rw.write.nb_rec ) { 00800 int nrows = ocb->rw.write.nb_rec/ocb->rw.write.nbrow_rec; 00801 if ( nrows == 0 ) nrows++; 00802 if ( nrows == ocb->part.hdr.s.u.dhdr.n_rows ) { 00803 dq_descriptor_t *dqd = DQD_Queue.last; 00804 if (dqd->MFCtr + dqd->Qrows_expired + dqd->n_Qrows != 00805 ocb->part.hdr.s.u.dhdr.mfctr) { 00806 dqd = new_dq_descriptor(NULL); 00807 dqd->MFCtr = ocb->part.hdr.s.u.dhdr.mfctr; 00808 } 00809 } 00810 nrowsfree = allocate_qrows( ocb, nrows, nonblock ); 00811 // Handle the case that an entire row arrived in the 00812 // 2 bytes of excess header. This could happen. 00813 } 00814 } while ( nrowsfree && ocb->part.nbdata == 0 ); 00815 unlock_dq(); 00816 return tot_nrrecd; 00817 } 00818 00819 /* queue_tstamp(): Create a new tstamp record in the Tstamp_Queue 00820 and a new dq_descriptor that references it. 00821 This currently assumes the dq_mutex is locked. 00822 */ 00823 static void queue_tstamp( tstamp_t *ts ) { 00824 TS_queue_t *new_TS = new_memory(sizeof(TS_queue_t)); 00825 dq_descriptor_t *new_dqd; 00826 00827 new_TS->TS = *ts; 00828 new_TS->ref_count = 0; 00829 00830 new_dqd = new_dq_descriptor( new_TS ); 00831 } 00832 00833 static dq_descriptor_t *new_dq_descriptor( TS_queue_t *TS ) { 00834 dq_descriptor_t *new_dqd = new_memory(sizeof(dq_descriptor_t)); 00835 00836 if ( TS == NULL ) TS = DQD_Queue.last->TSq; 00837 TS->ref_count++; 00838 00839 new_dqd->next = 0; 00840 new_dqd->ref_count = 0; 00841 new_dqd->starting_Qrow = Data_Queue.last; 00842 new_dqd->n_Qrows = 0; 00843 new_dqd->Qrows_expired = 0; 00844 new_dqd->TSq = TS; 00845 new_dqd->MFCtr = 0; 00846 new_dqd->Row_num = 0; 00847 if ( DQD_Queue.last ) 00848 DQD_Queue.last->next = new_dqd; 00849 else DQD_Queue.first = new_dqd; 00850 DQD_Queue.last = new_dqd; 00851 return new_dqd; 00852 } 00853 00854 /* As soon as tm_info has been received, we can decide what 00855 data format to output, how much buffer space to allocate 00856 and in what configuration. We can then create the first 00857 timestamp record (with the TS in the tm_info) and the 00858 first dq_descriptor, albeit with no Qrows, but refrencing 00859 the the first timestamp. Then we can check to see if any 00860 readers are waiting, and initialize them. 00861 */ 00862 static int process_tm_info( IOFUNC_OCB_T *ocb ) { 00863 char *rowptr; 00864 int i; 00865 00866 // Perform sanity checks 00867 if (tmi(nbminf) == 0 || 00868 tmi(nbrow) == 0 || 00869 tmi(nrowmajf) == 0 || 00870 tmi(nrowsper) == 0 || 00871 tmi(nsecsper) == 0 || 00872 tmi(mfc_lsb) == tmi(mfc_msb) || 00873 tmi(mfc_lsb) >= tmi(nbrow) || 00874 tmi(mfc_msb) >= tmi(nbrow) || 00875 tmi(nbminf) < tmi(nbrow) || 00876 tmi(nbminf) % tmi(nbrow) != 0 || 00877 tm_info.nrowminf != tmi(nbminf)/tmi(nbrow)) { 00878 nl_error( 2, "Sanity Checks failed on incoming stream" ); 00879 return EINVAL; 00880 } 00881 00882 // What data format should we output? 00883 lock_dq(); 00884 ocb->rw.write.buf = NULL; 00885 Data_Queue.nbQrow = tmi(nbrow); 00886 if ( tmi(mfc_lsb) == 0 && tmi(mfc_msb) == 1 00887 && tm_info.nrowminf == 1 ) { 00888 Data_Queue.output_tm_type = TMTYPE_DATA_T3; 00889 Data_Queue.nbQrow -= 4; 00890 Data_Queue.nbDataHdr = TM_HDR_SIZE_T3; 00891 ocb->rw.write.nbhdr_rec = TM_HDR_SIZE_T3; 00892 ocb->rw.write.nbrow_rec = tmi(nbrow) - 4; 00893 data_state_eval = data_state_T3; 00894 } else if ( tm_info.nrowminf == 1 ) { 00895 Data_Queue.output_tm_type = TMTYPE_DATA_T1; 00896 Data_Queue.nbDataHdr = TM_HDR_SIZE_T1; 00897 ocb->rw.write.nbhdr_rec = TM_HDR_SIZE_T1; 00898 data_state_eval = data_state_T1; 00899 if ( tmi(nbrow) <= 4 ) 00900 nl_error( 3, "TM Frame with no non-synch data not supported" ); 00901 ocb->rw.write.nbrow_rec = tmi(nbrow); 00902 data_state_eval = data_state_T1; 00903 } else { 00904 Data_Queue.output_tm_type = TMTYPE_DATA_T2; 00905 Data_Queue.nbDataHdr = TM_HDR_SIZE_T2; 00906 ocb->rw.write.nbhdr_rec = TM_HDR_SIZE_T2; 00907 ocb->rw.write.nbrow_rec = tmi(nbrow); 00908 data_state_eval = data_state_T2; 00909 } 00910 Data_Queue.pbuf_size = Data_Queue.nbDataHdr + Data_Queue.nbQrow; 00911 if ( Data_Queue.pbuf_size < sizeof(tm_hdr_t) + sizeof(tm_info_t) ) 00912 Data_Queue.pbuf_size = sizeof(tm_hdr_t) + sizeof(tm_info_t); 00913 00914 // how much buffer space to allocate? 00915 // Let's default to one minute's worth, but make sure we get 00916 // an integral number of minor frames 00917 Data_Queue.total_Qrows = tm_info.nrowminf * 00918 ( ( tmi(nrowsper) * 60 + tmi(nsecsper)*tm_info.nrowminf - 1 ) 00919 / (tmi(nsecsper)*tm_info.nrowminf) ); 00920 Data_Queue.total_size = 00921 Data_Queue.nbQrow * Data_Queue.total_Qrows; 00922 Data_Queue.first = Data_Queue.last = Data_Queue.full = 0; 00923 Data_Queue.raw = new_memory(Data_Queue.total_size); 00924 Data_Queue.row = new_memory(Data_Queue.total_Qrows * sizeof(char *)); 00925 rowptr = Data_Queue.raw; 00926 for ( i = 0; i < Data_Queue.total_Qrows; i++ ) { 00927 Data_Queue.row[i] = rowptr; 00928 rowptr += Data_Queue.nbQrow; 00929 } 00930 00931 queue_tstamp( &tm_info.t_stmp ); 00932 unlock_dq(); 00933 return 0; 00934 } 00935 00936 00937 /* do_read_reply handles the actual reply after the message 00938 has been packed into iov_t structs. It may allocate a 00939 partial buffer if the request size is smaller than the 00940 message size. read_reply() may consult the request size 00941 to decide how large a message to return, of course, but 00942 the request size may be smaller than the smallest 00943 message. 00944 */ 00945 static void do_read_reply( RESMGR_OCB_T *ocb, int nb, 00946 iov_t *iov, int n_parts ) { 00947 int nreq = ocb->rw.read.nbytes; 00948 if ( nreq < nb ) { 00949 int i; 00950 char *p; 00951 00952 if ( ocb->rw.read.buf == 0 ) 00953 ocb->rw.read.buf = new_memory(Data_Queue.pbuf_size); 00954 assert( nb <= Data_Queue.pbuf_size ); 00955 p = ocb->rw.read.buf; 00956 for ( i = 0; i < n_parts; i++ ) { 00957 int len = GETIOVLEN( &iov[i] ); 00958 memcpy( p, GETIOVBASE( &iov[i] ), len ); 00959 p += len; 00960 } 00961 ocb->part.dptr = ocb->rw.read.buf; 00962 ocb->part.nbdata = nb; 00963 MsgReply( ocb->rw.read.rcvid, nreq, ocb->part.dptr, nreq ); 00964 ocb->part.dptr += nreq; 00965 ocb->part.nbdata -= nreq; 00966 } else { 00967 MsgReplyv( ocb->rw.read.rcvid, nb, iov, n_parts ); 00968 } 00969 } 00970 00971 /* read_reply(ocb) is called when we know we have 00972 something to return on a read request. 00973 First determine either the largest complete record that is less 00974 than the requested size or the smallest complete record. If the 00975 smallest record is larger than the request size, allocate the 00976 partial buffer and copy the record into it. 00977 Partial buffer size, if allocated, should be the larger of the size 00978 of the tm_info message or a one-row data message (based on the 00979 assumption that if a small request comes in, the smallest full 00980 message will be chosen for output) 00981 00982 It is assumed that ocb has already been removed from whatever wait 00983 queue it might have been on. 00984 */ 00985 static void read_reply( RESMGR_OCB_T *ocb, int nonblock ) { 00986 iov_t iov[3]; 00987 int nb; 00988 00989 if ( ocb->part.nbdata ) { 00990 nb = ocb->rw.read.nbytes; 00991 if ( ocb->part.nbdata < nb ) 00992 nb = ocb->part.nbdata; 00993 MsgReply( ocb->rw.read.rcvid, nb, ocb->part.dptr, nb ); 00994 ocb->part.dptr += nb; 00995 ocb->part.nbdata -= nb; 00996 // New state was already defined (or irrelevant) 00997 } else if ( ocb->data.dqd == 0 ) { 00998 lock_dq(); 00999 if ( DQD_Queue.first ) { 01000 ocb->data.dqd = DQD_Queue.first; 01001 ++ocb->data.dqd->ref_count; 01002 ocb->data.n_Qrows = 0; 01003 01004 ocb->state = TM_STATE_HDR; //### delete 01005 ocb->part.hdr.s.hdr.tm_id = TMHDR_WORD; 01006 ocb->part.hdr.s.hdr.tm_type = TMTYPE_INIT; 01007 01008 // Message consists of 01009 // tm_hdr_t (TMHDR_WORD, TMTYPE_INIT) 01010 // tm_info_t with the current timestamp 01011 SETIOV( &iov[0], &ocb->part.hdr.s.hdr, 01012 sizeof(ocb->part.hdr.s.hdr) ); 01013 SETIOV( &iov[1], &tm_info, sizeof(tm_info)-sizeof(tstamp_t) ); 01014 SETIOV( &iov[2], &ocb->data.dqd->TSq->TS, sizeof(tstamp_t) ); 01015 nb = sizeof(tm_hdr_t) + sizeof(tm_info_t); 01016 do_read_reply( ocb, nb, iov, 3 ); 01017 } else enqueue_read( ocb, nonblock ); 01018 unlock_dq(); 01019 } else { 01020 /* I've handled ocb->data.n_Qrows */ 01021 dq_descriptor_t *dqd = ocb->data.dqd; 01022 01023 lock_dq(); 01024 while (dqd) { 01025 int nQrows_ready; 01026 01027 /* DQD has a total of dqd->Qrows_expired + dqd->n_Qrows */ 01028 if ( ocb->data.n_Qrows < dqd->Qrows_expired ) { 01029 // then we've missed some data: make a note and set 01030 int n_missed = dqd->Qrows_expired - ocb->data.n_Qrows; 01031 ocb->rw.read.rows_missing += n_missed; 01032 ocb->data.n_Qrows = dqd->Qrows_expired; 01033 } 01034 nQrows_ready = dqd->n_Qrows + dqd->Qrows_expired 01035 - ocb->data.n_Qrows; 01036 assert( nQrows_ready >= 0 ); 01037 if ( nQrows_ready > 0 ) { 01038 // ### make sure rw.read.maxQrows < Data_Queue.total_Qrows 01039 // (it was not! fixed in io_read(). 01040 if ( blocked_writer == 0 && dg_opened < 2 && 01041 dqd->next == 0 && nQrows_ready < ocb->rw.read.maxQrows 01042 && ocb->hdr.attr->node_type == TM_DCo && !nonblock ) { 01043 // We want to wait for more 01044 // ### reasons not to wait: 01045 // ### DG is blocked or has terminated 01046 enqueue_read( ocb, nonblock ); 01047 } else { 01048 int XRow_Num, NMinf, Row_Num_start, n_iov; 01049 mfc_t MFCtr_start; 01050 int Qrow_start, nQ1, nQ2; 01051 01052 if ( nQrows_ready > ocb->rw.read.maxQrows ) 01053 nQrows_ready = ocb->rw.read.maxQrows; 01054 ocb->part.hdr.s.hdr.tm_id = TMHDR_WORD; 01055 ocb->part.hdr.s.hdr.tm_type = Data_Queue.output_tm_type; 01056 ocb->part.hdr.s.u.dhdr.n_rows = nQrows_ready; 01057 XRow_Num = dqd->Row_num + ocb->data.n_Qrows; 01058 NMinf = XRow_Num/tm_info.nrowminf; 01059 MFCtr_start = dqd->MFCtr + NMinf; 01060 Row_Num_start = XRow_Num % tm_info.nrowminf; 01061 switch ( Data_Queue.output_tm_type ) { 01062 case TMTYPE_DATA_T1: break; 01063 case TMTYPE_DATA_T2: 01064 ocb->part.hdr.s.u.dhdr.mfctr = MFCtr_start; 01065 ocb->part.hdr.s.u.dhdr.rownum = Row_Num_start; 01066 break; 01067 case TMTYPE_DATA_T3: 01068 ocb->part.hdr.s.u.dhdr.mfctr = MFCtr_start; 01069 break; 01070 default: 01071 nl_error(4,"Invalid output_tm_type" ); 01072 } 01073 SETIOV( &iov[0], &ocb->part.hdr.s.hdr, Data_Queue.nbDataHdr ); 01074 Qrow_start = dqd->starting_Qrow + ocb->data.n_Qrows - 01075 dqd->Qrows_expired; 01076 if ( Qrow_start > Data_Queue.total_Qrows ) 01077 Qrow_start -= Data_Queue.total_Qrows; 01078 nQ1 = nQrows_ready; 01079 nQ2 = Qrow_start + nQ1 - Data_Queue.total_Qrows; 01080 if ( nQ2 > 0 ) { 01081 nQ1 -= nQ2; 01082 SETIOV( &iov[2], Data_Queue.row[0], nQ2 * Data_Queue.nbQrow ); 01083 n_iov = 3; 01084 } else n_iov = 2; 01085 SETIOV( &iov[1], Data_Queue.row[Qrow_start], 01086 nQ1 * Data_Queue.nbQrow ); 01087 ocb->data.n_Qrows += nQrows_ready; 01088 do_read_reply( ocb, 01089 Data_Queue.nbDataHdr + nQrows_ready * Data_Queue.nbQrow, 01090 iov, n_iov ); 01091 } 01092 break; // out of while(dqd) 01093 } else if ( dqd->next ) { 01094 int do_TS = dqd->TSq != dqd->next->TSq; 01095 dqd = dq_deref(dqd, 1); 01096 // dqd->ref_count++; 01097 ocb->data.dqd = dqd; 01098 ocb->data.n_Qrows = 0; 01099 if ( do_TS ) { 01100 ocb->part.hdr.s.hdr.tm_id = TMHDR_WORD; 01101 ocb->part.hdr.s.hdr.tm_type = TMTYPE_TSTAMP; 01102 SETIOV( &iov[0], &ocb->part.hdr.s.hdr, sizeof(tm_hdr_t) ); 01103 SETIOV( &iov[1], &dqd->TSq->TS, sizeof(tstamp_t) ); 01104 do_read_reply( ocb, sizeof(tm_hdr_t)+sizeof(tstamp_t), 01105 iov, 2 ); 01106 break; 01107 } // else loop through again 01108 } else { 01109 enqueue_read( ocb, nonblock ); 01110 break; 01111 } 01112 } 01113 unlock_dq(); 01114 } 01115 } 01116 01117 static int io_read (resmgr_context_t *ctp, io_read_t *msg, RESMGR_OCB_T *ocb) { 01118 int status, nonblock = 0; 01119 // IOFUNC_ATTR_T *handle = ocb->hdr.attr; 01120 01121 log_event(10); 01122 if ((status = iofunc_read_verify( ctp, msg, 01123 (iofunc_ocb_t *)ocb, &nonblock)) != EOK) 01124 return (status); 01125 01126 if ((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_NONE) 01127 return (ENOSYS); 01128 01129 ocb->rw.read.rcvid = ctp->rcvid; 01130 ocb->rw.read.nbytes = msg->i.nbytes; 01131 if ( ocb->data.dqd ) { 01132 int nbData = ocb->rw.read.nbytes - Data_Queue.nbDataHdr; 01133 ocb->rw.read.maxQrows = 01134 nbData < Data_Queue.nbQrow ? 1 : nbData/Data_Queue.nbQrow; 01135 if ( ocb->rw.read.maxQrows > Data_Queue.total_Qrows ) 01136 ocb->rw.read.maxQrows = Data_Queue.total_Qrows; 01137 } 01138 read_reply( ocb, nonblock ); 01139 run_write_queue(); 01140 return _RESMGR_NOREPLY; 01141 }