From 6b467b8c443302eb3f66e6f37bfc075e9217568f Mon Sep 17 00:00:00 2001 From: Dave Marples Date: Thu, 13 Jun 2024 00:24:09 +0100 Subject: [PATCH] Tidy up tag accounting --- Src/nwclient.c | 57 +++---- Src/orbuculum.c | 394 +++++++++++++++++++++--------------------------- Src/otag.c | 1 + 3 files changed, 201 insertions(+), 251 deletions(-) diff --git a/Src/nwclient.c b/Src/nwclient.c index 94d22ae6..e406b98c 100644 --- a/Src/nwclient.c +++ b/Src/nwclient.c @@ -212,40 +212,43 @@ void nwclientSend( struct nwclientsHandle *h, uint32_t len, const uint8_t *ipbuf { const struct timespec ts = {.tv_sec = 1, .tv_nsec = 0}; - if ( _lock_with_timeout( &h->clientList, &ts ) < 0 ) - { - genericsExit( -1, "Failed to acquire mutex" EOL ); - } - - /* Now kick all the clients that new data arrived for them to distribute */ - volatile struct nwClient *n = h->firstClient; - - while ( n ) + if ( h && h->firstClient ) { - ssize_t t = len; - ssize_t sent = 0; - void *p = ( void * )ipbuffer; - - while ( t && ( sent >= 0 ) ) + if ( _lock_with_timeout( &h->clientList, &ts ) < 0 ) { - sent = send( n->portNo, p, t, MSG_NOSIGNAL ); - p += sent; - t -= sent; + genericsExit( -1, "Failed to acquire mutex" EOL ); } - if ( t ) - { - volatile struct nwClient *newn = n->nextClient; - _clientRemoveNoLock( n ); - n = newn; - } - else + /* Now kick all the clients that new data arrived for them to distribute */ + volatile struct nwClient *n = h->firstClient; + + while ( n ) { - n = n->nextClient; + ssize_t t = len; + ssize_t sent = 0; + void *p = ( void * )ipbuffer; + + while ( t && ( sent >= 0 ) ) + { + sent = send( n->portNo, p, t, MSG_NOSIGNAL ); + p += sent; + t -= sent; + } + + if ( t ) + { + volatile struct nwClient *newn = n->nextClient; + _clientRemoveNoLock( n ); + n = newn; + } + else + { + n = n->nextClient; + } } - } - pthread_mutex_unlock( &h->clientList ); + pthread_mutex_unlock( &h->clientList ); + } } // ==================================================================================================== struct nwclientsHandle *nwclientStart( int port ) diff --git a/Src/orbuculum.c b/Src/orbuculum.c index 46ed1388..49a79d37 100644 --- a/Src/orbuculum.c +++ b/Src/orbuculum.c @@ -24,7 +24,6 @@ #include #include #include -#include #include #if defined OSX #include @@ -69,6 +68,18 @@ #define OTAG_SIG (const char*)"%%OTAG1.0.0%%" #define OTAG_SIG_LEN (strlen(OTAG_SIG)) +/* Number of potential tags */ +#define NUM_TAGS (256) +#define LAST_TAG_SEEN_TIME_NS (5L*1000*1000*1000) + +/* Record of transferred data per tag */ +struct TagDataCount +{ + uint64_t ts; + uint64_t totalData; + uint64_t intervalData; +}; + /* Record for options, either defaults or from command line */ struct Options { @@ -89,8 +100,9 @@ struct Options uint32_t intervalReportTime; /* If we want interval reports about performance */ bool mono; /* Supress colour in output */ int paceDelay; /* Delay between blocks of data transmission in file readout */ - char *channelList; /* List of TPIU channels to be serviced */ + char *channelList; /* List of channels to be exported over legacy connection */ bool hiresTime; /* Use hiresolution time (shorter timeouts...more accurate but higher load */ + bool legacy; /* Enable legacy ports (clean ITM etc flows on individual ports */ char *sn; /* Any part serial number for identifying a specific device */ /* Network link */ int listenPort; /* Listening port for network */ @@ -99,7 +111,6 @@ struct Options struct handlers { int channel; /* Channel number for this handler */ - uint64_t intervalBytes; /* Number of depacketised bytes output on this channel */ struct dataBlock *strippedBlock; /* Processed buffers for output to clients */ struct nwclientsHandle *n; /* Link to the network client subsystem */ }; @@ -113,11 +124,9 @@ struct RunTime struct Frame otagOtg; /* Outgoing OTAG frame for legacy use */ struct OrbtraceIf *o; /* For accessing ORBTrace devices + BMPs */ - uint64_t intervalBytes; /* Number of bytes transferred in current interval */ uint64_t intervalRawBytes; /* Number of bytes transferred in current interval */ - uint64_t otagDataLenRxed; /* Number of bytes of decoded OTAG data */ + uint64_t lastInterval; /* Timestamp of previous interval */ - pthread_t intervalThread; /* Thread reporting on intervals */ bool ending; /* Flag indicating app is terminating */ bool errored; /* Flag indicating problem in reception process */ bool conn; /* Flag indicating that we have a good connection */ @@ -132,9 +141,9 @@ struct RunTime struct nwclientsHandle *otagHandler; /* Handle to OTAG output handler */ bool usingOTAG; /* Flag that OTAG protocol is in use */ + struct TagDataCount tagCount[NUM_TAGS]; /* Data carried per tag/TPIU channel */ int numHandlers; /* Number of TPIU channel handlers in use */ struct handlers *handler; - struct nwclientsHandle *n; /* Link to the network client subsystem (used for non-TPIU/OTAG case) */ char *sn; /* Serial number for any device we've established contact with */ }; @@ -157,7 +166,7 @@ struct Options _options = { .listenPort = OTCLIENT_SERVER_PORT, .nwserverHost = NWSERVER_HOST, - .channelList = "1" + .channelList = "1", }; struct RunTime _r; @@ -323,8 +332,6 @@ static void _doExit( void ) { _r.ending = true; - nwclientShutdown( _r.n ); - if ( _r.opFileHandle ) { close( _r.opFileHandle ); @@ -342,7 +349,7 @@ static void _intHandler( int sig ) _doExit(); } // ==================================================================================================== -void _printHelp( const char *const progName ) +void _printHelp( const char *const progName, struct RunTime *r ) { genericsPrintf( "Usage: %s [options]" EOL, progName ); @@ -351,19 +358,20 @@ void _printHelp( const char *const progName ) genericsPrintf( " -f, --input-file: Take input from specified file" EOL ); genericsPrintf( " -h, --help: This help" EOL ); genericsPrintf( " -H, --hires: High resolution time (much higher CPU load though!)" EOL ); - genericsPrintf( " -l, --listen-port: Listen port for the incoming connections (defaults to %d)" EOL, NWCLIENT_SERVER_PORT ); + genericsPrintf( " -l, --listen-port: Listen port for incoming OTAG connections (defaults to %d)" EOL, r->options->listenPort ); + genericsPrintf( " -L, --legacy: Enable legacy channels (First on %d)" EOL, r->options->listenPort + LEGACY_SERVER_PORT_OFS ); genericsPrintf( " -m, --monitor: Output monitor information about the link at ms" EOL ); genericsPrintf( " -M, --no-colour: Supress colour in output" EOL ); - genericsPrintf( " -n, --serial-number: any part of serial number to differentiate specific OrbTrace device" EOL ); + genericsPrintf( " -n, --serial-number: any part of serial number to differentiate specific device" EOL ); genericsPrintf( " -o, --output-file: to be used for dump file" EOL ); - genericsPrintf( " -O, --orbtrace: \"\" run orbtrace with specified options on each new ORBTrace device connect" EOL ); + genericsPrintf( " -O, --orbtrace: \"\" run orbtrace with specified options on device connect" EOL ); genericsPrintf( " -p, --serial-port: to use" EOL ); - genericsPrintf( " -P, --pace: delay in block of data transmission to clients. Used when source is a file." EOL ); + genericsPrintf( " -P, --pace: delay in block of data transmission to clients" EOL ); genericsPrintf( " -s, --server: : to use" EOL ); - genericsPrintf( " -T, --tpiu: Decode TPIU channels (and strip TPIU framing from output flows)" EOL ); - genericsPrintf( " -t, --tag: List of streams to decode (and onward route) from OTag or TPIU frames" EOL ); + genericsPrintf( " -T, --tpiu: Strip TPIU framing from input flows (mostly not relevant)" EOL ); + genericsPrintf( " -t, --tag: TPIU streams to decode and onward route (Default %s)" EOL, r->options->channelList ); genericsPrintf( " -v, --verbose: Verbose mode 0(errors)..3(debug)" EOL ); - genericsPrintf( " -V, --version: Print version and exit" EOL ); + genericsPrintf( " -V, --version: Print version, connected usb devices, and exit" EOL ); } // ==================================================================================================== @@ -396,6 +404,7 @@ static struct option _longOptions[] = {"help", no_argument, NULL, 'h'}, {"hires", no_argument, NULL, 'H'}, {"listen-port", required_argument, NULL, 'l'}, + {"legacy", required_argument, NULL, 'L'}, {"monitor", required_argument, NULL, 'm'}, {"no-colour", no_argument, NULL, 'M'}, {"no-color", no_argument, NULL, 'M'}, @@ -418,7 +427,7 @@ bool _processOptions( int argc, char *argv[], struct RunTime *r ) int c, optionIndex = 0; #define DELIMITER ',' - while ( ( c = getopt_long ( argc, argv, "a:Ef:hHVl:m:Mn:o:O:p:P:s:Tt:v:", _longOptions, &optionIndex ) ) != -1 ) + while ( ( c = getopt_long ( argc, argv, "a:Ef:hHVLl:m:Mn:o:O:p:P:s:Tt:v:", _longOptions, &optionIndex ) ) != -1 ) switch ( c ) { // ------------------------------------ @@ -440,7 +449,7 @@ bool _processOptions( int argc, char *argv[], struct RunTime *r ) // ------------------------------------ case 'h': - _printHelp( argv[0] ); + _printHelp( argv[0], r ); return false; // ------------------------------------ @@ -456,6 +465,12 @@ bool _processOptions( int argc, char *argv[], struct RunTime *r ) // ------------------------------------ + case 'L': + r->options->legacy = !r->options->legacy; + break; + + // ------------------------------------ + case 'l': r->options->listenPort = atoi( optarg ); break; @@ -611,21 +626,19 @@ bool _processOptions( int argc, char *argv[], struct RunTime *r ) if ( r->options->nwserverPort ) { - genericsReport( V_INFO, "NW SERVER H&P : %s:%d" EOL, r->options->nwserverHost, r->options->nwserverPort ); + genericsReport( V_INFO, "NW Server : %s:%d" EOL, r->options->nwserverHost, r->options->nwserverPort ); } genericsReport( V_INFO, "Use/Strip TPIU : %s" EOL, r->options->useTPIU ? "True" : "False" ); - genericsReport( V_INFO, "Decode/Forward : %s" EOL, r->options->channelList ); + genericsReport( V_INFO, "Support Legacy : %s" EOL, ( r->options->legacy ) ? "On" : "Off" ); + genericsReport( V_INFO, "Decode/Forward : %s" EOL, r->options->channelList ? r->options->channelList : "None" ); if ( r->options->otcl ) { genericsReport( V_INFO, "Orbtrace CL : %s" EOL, r->options->otcl ); } - if ( r->options->hiresTime ) - { - genericsReport( V_INFO, "High Res Time" EOL ); - } + genericsReport( V_INFO, "OTAG Port : %d" EOL, r->options->listenPort ); if ( r->options->file ) { @@ -642,6 +655,11 @@ bool _processOptions( int argc, char *argv[], struct RunTime *r ) } } + if ( r->options->hiresTime ) + { + genericsReport( V_INFO, "High Res Time" EOL ); + } + if ( ( r->options->file ) && ( ( r->options->port ) || ( r->options->nwserverPort ) ) ) { genericsReport( V_ERROR, "Cannot specify file and port or NW Server at same time" EOL ); @@ -663,93 +681,98 @@ bool _processOptions( int argc, char *argv[], struct RunTime *r ) return true; } // ==================================================================================================== -void *_checkInterval( void *params ) +void _checkInterval( void *params ) /* Perform any interval reporting that may be needed */ { struct RunTime *r = ( struct RunTime * )params; + struct timespec ts; + uint64_t tnow; uint64_t snapInterval; - struct handlers *h; + int w; - while ( !r->ending ) + if ( r->options->intervalReportTime ) { - usleep( r->options->intervalReportTime * INTERVAL_1MS ); - - /* Grab the interval and scale to 1 second */ - snapInterval = r->intervalRawBytes * 1000 / r->options->intervalReportTime; + clock_gettime( CLOCK_REALTIME, &ts ); + tnow = ts.tv_sec * 1000000000L + ts.tv_nsec; - snapInterval *= 8; - - if ( r->conn ) + if ( tnow - r->lastInterval >= r->options->intervalReportTime * 1000000L ) { - genericsPrintf( C_CLR_LN C_DATA ); + r->lastInterval = tnow; - if ( snapInterval / 1000000 ) - { - genericsPrintf( "%4d.%d " C_RESET "MBits/sec ", snapInterval / 1000000, ( snapInterval * 1 / 100000 ) % 10 ); - } - else if ( snapInterval / 1000 ) - { - genericsPrintf( "%4d.%d " C_RESET "KBits/sec ", snapInterval / 1000, ( snapInterval / 100 ) % 10 ); - } - else - { - genericsPrintf( " %4d " C_RESET " Bits/sec ", snapInterval ); - } + /* Grab the interval and scale to 1 second */ + snapInterval = r->intervalRawBytes * 1000 / r->options->intervalReportTime; - h = r->handler; - uint64_t totalDat = 0; + snapInterval *= 8; - if ( r->intervalRawBytes ) + if ( r->conn ) { - if ( r->options->useTPIU ) - { - /* If we are decoding from TPIU then calculate per channel */ - for ( int chIndex = 0; chIndex < r->numHandlers; chIndex++ ) - { - genericsPrintf( " %d:%3d%% ", h->channel, ( h->intervalBytes * 100 ) / r->intervalRawBytes ); - totalDat += h->intervalBytes; - h->intervalBytes = 0; - h++; - } + genericsPrintf( C_CLR_LN C_DATA ); - genericsPrintf( " Waste:%3d%% ", 100 - ( ( totalDat * 100 ) / r->intervalRawBytes ) ); + if ( snapInterval / 1000000 ) + { + genericsPrintf( "%4d.%d " C_RESET "MBits/sec ", snapInterval / 1000000, ( snapInterval * 1 / 100000 ) % 10 ); + } + else if ( snapInterval / 1000 ) + { + genericsPrintf( "%4d.%d " C_RESET "KBits/sec ", snapInterval / 1000, ( snapInterval / 100 ) % 10 ); } else { - /* Either raw ITM or OTag frames */ - if ( _r.usingOTAG ) + genericsPrintf( " %4d " C_RESET " Bits/sec ", snapInterval ); + } + + uint64_t totalPct = 0; + + if ( r->intervalRawBytes ) + { + for ( int i = 0; i < NUM_TAGS; i++ ) { - int w = 1000 - ( ( r->otagDataLenRxed * 1000 ) / r->intervalRawBytes ); - genericsPrintf( " Waste:%01d.%01d%%", w / 10, w % 10 ); + w = 0; + + if ( r->tagCount[i].intervalData ) + { + w = ( r->tagCount[i].intervalData * 1000 ) / r->intervalRawBytes; + r->tagCount[i].ts = tnow; + totalPct += w; + } + + if ( tnow - r->tagCount[i].ts < LAST_TAG_SEEN_TIME_NS ) + { + genericsPrintf( " %d:" C_DATA"%2d%% " C_RESET, i, w / 10 ); + } + + r->tagCount[i].intervalData = 0; } + + /* Because we are still rxing data there can be some error here, we should really fix this with a mutex */ + w = ( totalPct < 1000 ) ? 1000 - totalPct : 0; + genericsPrintf( " Waste:" C_DATA "%2d.%01d%% " C_RESET, w / 10, w % 10 ); } - } - r->otagDataLenRxed = 0; - r->intervalRawBytes = 0; + if ( r->options->dataSpeed > 100 ) + { + /* Conversion to percentage done as a division to avoid overflow */ + uint32_t fullPercent = ( snapInterval * 100 ) / r->options->dataSpeed; + genericsPrintf( "(" C_DATA " %3d%% " C_RESET "full)", ( fullPercent > 100 ) ? 100 : fullPercent ); + } - if ( r->options->dataSpeed > 100 ) - { - /* Conversion to percentage done as a division to avoid overflow */ - uint32_t fullPercent = ( snapInterval * 100 ) / r->options->dataSpeed; - genericsPrintf( "(" C_DATA " %3d%% " C_RESET "full)", ( fullPercent > 100 ) ? 100 : fullPercent ); + genericsPrintf( " " C_RESET C_CLR_LN EOL C_PREV_LN ); } - genericsPrintf( C_RESET EOL C_PREV_LN ); + r->intervalRawBytes = 0; } } - - return NULL; } // ==================================================================================================== // Block decoders and handlers for the various line formats // ==================================================================================================== -static void _purgeBlock( struct RunTime *r ) +static void _purgeBlock( struct RunTime *r, bool createOTAG ) + +/* Send any packets to clients who want it, no matter where they originate from */ { - /* Send any packets to clients who want it, no matter where they originate from */ struct handlers *h = r->handler; int i = r->numHandlers; @@ -759,18 +782,20 @@ static void _purgeBlock( struct RunTime *r ) if ( h->strippedBlock->fillLevel ) { nwclientSend( h->n, h->strippedBlock->fillLevel, h->strippedBlock->buffer ); - h->intervalBytes += h->strippedBlock->fillLevel; - - /* The OTAG encoded version goes out on the combined OTAG channel, with a specific channel header */ - int j = h->strippedBlock->fillLevel; - const uint8_t *b = h->strippedBlock->buffer; - while ( j ) + if ( createOTAG ) { - OTAGEncode( h->channel, 0, b, ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN, &r->otagOtg ); - nwclientSend( _r.otagHandler, r->otagOtg.len, r->otagOtg.d ); - b += ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN; - j -= ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN; + /* The OTAG encoded version goes out on the combined OTAG channel, with a specific channel header */ + int j = h->strippedBlock->fillLevel; + const uint8_t *b = h->strippedBlock->buffer; + + while ( j ) + { + OTAGEncode( h->channel, 0, b, ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN, &r->otagOtg ); + nwclientSend( _r.otagHandler, r->otagOtg.len, r->otagOtg.d ); + b += ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN; + j -= ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN; + } } h->strippedBlock->fillLevel = 0; @@ -778,6 +803,8 @@ static void _purgeBlock( struct RunTime *r ) h++; } + + _checkInterval( r ); } // ==================================================================================================== static void _TPIUpacketRxed( enum TPIUPumpEvent e, struct TPIUPacket *p, void *param ) @@ -817,6 +844,9 @@ static void _TPIUpacketRxed( enum TPIUPumpEvent e, struct TPIUPacket *p, void *p } } + r->tagCount[p->packet[g].s].totalData++; + r->tagCount[p->packet[g].s].intervalData++; + if ( ( chIndex != r->numHandlers ) && ( h ) ) { /* We must have found a match for this at some point, so add it to the queue */ @@ -824,7 +854,7 @@ static void _TPIUpacketRxed( enum TPIUPumpEvent e, struct TPIUPacket *p, void *p } else { - genericsReport( V_INFO, "No handler for %d" EOL, p->packet[g].s ); + genericsReport( V_DEBUG, "No handler for tag %d" EOL, p->packet[g].s ); } } @@ -847,15 +877,16 @@ static void _TPIUpacketRxed( enum TPIUPumpEvent e, struct TPIUPacket *p, void *p static void _OTAGpacketRxed( struct OTAGFrame *p, void *param ) -/* Put the packet into the correct output buffer */ +/* OTAG packet received, account for it and reflect it to legacy buffers if needed */ { int chIndex; - struct RunTime* r = (struct RunTime*)param; - struct handlers* h = _r.handler; + struct RunTime *r = ( struct RunTime * )param; + struct handlers *h = _r.handler; - /* Record the length we received */ - r->otagDataLenRxed += p->len; + /* Account for this reception */ + r->tagCount[p->tag].totalData += p->len; + r->tagCount[p->tag].intervalData += p->len; /* Search for channel */ for ( chIndex = 0; chIndex < r->numHandlers; chIndex++ ) @@ -887,115 +918,23 @@ static void _OTAGpacketRxed( struct OTAGFrame *p, void *param ) // ==================================================================================================== -static void _processOTAGBlock( struct RunTime *r, const uint8_t *buffer, int length ) - -{ - const uint8_t *flen; - const uint8_t *b = buffer; - - /* We need to decode this so it can go through the 'normal' output channels too */ - OTAGPump( &r->otag, buffer, length, _OTAGpacketRxed, r ); - - if ( r->otagPart.len ) - { - /* We already have a part OTAG frame...complete it using whatever we've just received */ - flen = OTAGgetFrameExtent( b, length ); - - if ( OTAGisEOFRAME( flen ) ) - { - /* There is an end of packet here...we can complete the send */ - if ( ( flen - b + r->otagPart.len ) <= OTAG_MAX_ENC_PACKET_LEN ) - { - /* This is the rest of a part OTAG frame, complete it and send it */ - memcpy( &r->otagPart.d[r->otagPart.len], b, flen - b ); - r->otagPart.len += flen - b; - - nwclientSend( r->otagHandler, r->otagPart.len, r->otagPart.d ); - nwclientSend( r->otagHandler, OTAG_EOP_LEN, cobs_eop ); - } - - r->otagPart.len = 0; - length -= flen - b; - b = flen; - } - else - { - /* This is _another_ part packet ... add it to the one we are building */ - if ( ( length + r->otagPart.len ) <= OTAG_MAX_ENC_PACKET_LEN ) - { - memcpy( &r->otagPart.d[r->otagPart.len], b, length ); - r->otagPart.len += length; - } - else - { - r->otagPart.len = 0; - } - - length = 0; - } - } - - /* Now send out whatever complete frames we have in this block */ - while ( true ) - { - /* Flush any extra EOPs that may have existed in the original frame */ - while ( length && OTAGisEOFRAME( b ) ) - { - b++; - length--; - } - - flen = OTAGgetFrameExtent( b, length ); - - if ( !( length && OTAGisEOFRAME( flen ) ) ) - { - break; - } - - /* This is a complete OTAG frame to send */ - nwclientSend( r->otagHandler, flen - b, b ); - /* We need an end of packet on the end of every packet */ - nwclientSend( r->otagHandler, OTAG_EOP_LEN, cobs_eop ); - - length -= flen - b; - b = flen; - } - - /* Whatever we have left we keep for next time */ - memcpy( r->otagPart.d, b, length ); - r->otagPart.len = length; - - _purgeBlock( r ); -} -// ==================================================================================================== static void _processNonOTAGBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer ) -{ - genericsReport( V_DEBUG, "RXED Packet of %d bytes%s" EOL, fillLevel, ( r->options->intervalReportTime ) ? EOL : "" ); +/* Not an OTAG block, so might be TPIU or clean ITM...deal with both */ +{ if ( fillLevel ) { - /* Account for this reception */ - r->intervalRawBytes += fillLevel; - - if ( r->opFileHandle ) - { - if ( write( r->opFileHandle, buffer, fillLevel ) <= 0 ) - { - genericsExit( -3, "Writing to file failed" EOL ); - } - } - if ( r-> options->useTPIU ) { /* Strip the TPIU framing from this input */ TPIUPump2( &r->t, buffer, fillLevel, _TPIUpacketRxed, r ); - _purgeBlock( r ); } else { - /* Do it the old fashioned way and send out the unfettered block */ - nwclientSend( r->n, fillLevel, buffer ); + /* Not TPIU ... need to assume this is ITM on the first channel */ + r->tagCount[DEFAULT_ITM_STREAM].totalData += fillLevel; + r->tagCount[DEFAULT_ITM_STREAM].intervalData += fillLevel; /* The OTAG encoded version goes out on the default OTAG channel */ uint8_t *b = buffer; @@ -1015,18 +954,38 @@ static void _processNonOTAGBlock( struct RunTime *r, ssize_t fillLevel, uint8_t // ==================================================================================================== static void _handleBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer ) -/* Handle an incoming block in either 'conventional' or OTag format */ +/* Handle an incoming block from any source in either 'conventional' or OTag format */ { + genericsReport( V_DEBUG, "RXED Packet of %d bytes%s" EOL, fillLevel, ( r->options->intervalReportTime ) ? EOL : "" ); + + if ( r->opFileHandle ) + { + if ( write( r->opFileHandle, buffer, fillLevel ) <= 0 ) + { + genericsExit( -3, "Writing to file failed" EOL ); + } + } + if ( r->usingOTAG ) { - _processOTAGBlock( r, buffer, fillLevel ); + if ( r->options->intervalReportTime ) + { + /* We need to decode this so we can get the stats out of it .. we don't bother if we don't need stats */ + OTAGPump( &r->otag, buffer, fillLevel, _OTAGpacketRxed, r ); + } + + /* ...and reflect this packet to the outgoing OTAG channels */ + nwclientSend( r->otagHandler, fillLevel, buffer ); } else { _processNonOTAGBlock( r, fillLevel, buffer ); } + /* Send the block to clients, but only send OTAG if it wasn't OTAG already */ + _purgeBlock( r, !r->usingOTAG ); + r->intervalRawBytes += fillLevel; } // ==================================================================================================== @@ -1040,14 +999,6 @@ static void _usb_callback( struct libusb_transfer *t ) /* Whatever the status that comes back, there may be data... */ if ( t->actual_length > 0 ) { - if ( _r.opFileHandle ) - { - if ( write( _r.opFileHandle, t->buffer, t->actual_length ) < 0 ) - { - genericsExit( -4, "Writing to file failed (%s)" EOL, strerror( errno ) ); - } - } - _handleBlock( &_r, t->actual_length, t->buffer ); } @@ -1198,6 +1149,7 @@ static int _usbFeeder( struct RunTime *r ) { genericsReport( V_ERROR, "Error waiting for USB requests to complete %d" EOL, ret ); } + } /* ========================================================================================= */ @@ -1493,6 +1445,8 @@ static int _fileFeeder( struct RunTime *r ) int main( int argc, char *argv[] ) { + struct timespec ts; + /* This is set here to avoid huge .data section in startup image */ _r.options = &_options; @@ -1535,7 +1489,7 @@ int main( int argc, char *argv[] ) #endif - if ( _r.options->channelList && _r.options->useTPIU ) + if ( _r.options->channelList ) { /* Channel list is only needed for legacy ports that we are re-exporting (i.e. clean unencapsulated flows) */ char *c = _r.options->channelList; @@ -1570,35 +1524,27 @@ int main( int argc, char *argv[] ) _r.handler[_r.numHandlers].channel = x; _r.handler[_r.numHandlers].strippedBlock = ( struct dataBlock * )calloc( 1, sizeof( struct dataBlock ) ); - _r.handler[_r.numHandlers].n = nwclientStart( _r.options->listenPort + LEGACY_SERVER_PORT_OFS + _r.numHandlers ); - genericsReport( V_WARN, "Started Network interface for legacy channel %d on port %d" EOL, x, _r.options->listenPort + LEGACY_SERVER_PORT_OFS + _r.numHandlers ); + genericsReport( V_INFO, "Will decode tag %d" EOL, x ); + + if ( _r.options->legacy ) + { + _r.handler[_r.numHandlers].n = nwclientStart( _r.options->listenPort + LEGACY_SERVER_PORT_OFS + _r.numHandlers ); + genericsReport( V_INFO, "Exported Legacy Network interface for channel %d on port %d" EOL, x, _r.options->listenPort + LEGACY_SERVER_PORT_OFS + _r.numHandlers ); + } + _r.numHandlers++; x = 0; } } - - /* ...a blank line so this doesn't get erased by monitoring reports */ - if ( _r.options->intervalReportTime ) - { - genericsReport( V_WARN, EOL ); - } - } - else - { - if ( !( _r.n = nwclientStart( _r.options->listenPort + LEGACY_SERVER_PORT_OFS ) ) ) - { - genericsExit( -1, "Failed to make network server" EOL ); - } } /* The OTAG handler doesn't need a channel list ... it works on all channels */ _r.otagHandler = nwclientStart( _r.options->listenPort ); - genericsReport( V_WARN, "Started Network interface for OTAG on port %d" EOL, _r.options->listenPort ); + genericsReport( V_INFO, "Started Network interface for OTAG on port %d" EOL, _r.options->listenPort ); - if ( _r.options->intervalReportTime ) - { - pthread_create( &_r.intervalThread, NULL, &_checkInterval, &_r ); - } + /* Don't do anything with interval times for at least the first interval time */ + clock_gettime( CLOCK_REALTIME, &ts ); + _r.lastInterval = ts.tv_sec * 1000000000L + ts.tv_nsec; if ( _r.options->outfile ) { diff --git a/Src/otag.c b/Src/otag.c index bd8615b9..cc0a43d2 100644 --- a/Src/otag.c +++ b/Src/otag.c @@ -74,6 +74,7 @@ static void _pumpcb( struct Frame *p, void *param ) struct OTAG *t = ( struct OTAG * )param; t->f.len = p->len - 1; /* OTAG frames have the first element representing the tag */ + t->f.tag = p->d[0]; /* First byte of an OTAG frame is the tag */ t->f.d = &p->d[1]; /* This is the rest of the data */ /* Timestamp was already set for this cluster */