From d0de6ee645431e72fa18f8edaf261f8d545dff72 Mon Sep 17 00:00:00 2001 From: Dave Marples Date: Fri, 31 May 2024 18:16:12 +0100 Subject: [PATCH] Add OTAG format file save/playback and simplify threading arrangements --- Inc/cobs.h | 7 +- Src/cobs.c | 2 - Src/orbuculum.c | 186 ++++++++++++++++++------------------------------ meson.build | 1 + 4 files changed, 76 insertions(+), 120 deletions(-) diff --git a/Inc/cobs.h b/Inc/cobs.h index bbe5c4d3..2277843f 100644 --- a/Inc/cobs.h +++ b/Inc/cobs.h @@ -17,10 +17,11 @@ extern "C" { #endif -#define COBS_FRONTMATTER (10) -#define COBS_MAX_PACKET_LEN (4096) +#define COBS_FRONTMATTER (10) +#define COBS_MAX_PACKET_LEN (4096) +#define COBS_SYNC_CHAR (0) #define COBS_OVERALL_MAX_PACKET_LEN (COBS_MAX_PACKET_LEN+COBS_FRONTMATTER) -#define COBS_MAX_ENC_PACKET_LEN (COBS_OVERALL_MAX_PACKET_LEN + COBS_OVERALL_MAX_PACKET_LEN / 254) +#define COBS_MAX_ENC_PACKET_LEN (COBS_OVERALL_MAX_PACKET_LEN + COBS_OVERALL_MAX_PACKET_LEN / 254) enum COBSPumpState { diff --git a/Src/cobs.c b/Src/cobs.c index 478686c9..110f0f8c 100644 --- a/Src/cobs.c +++ b/Src/cobs.c @@ -15,8 +15,6 @@ #include #include "cobs.h" -#define COBS_SYNC_CHAR (0) - const uint8_t cobs_eop[COBS_EOP_LEN] = { COBS_SYNC_CHAR }; // ==================================================================================================== diff --git a/Src/orbuculum.c b/Src/orbuculum.c index 182979a8..96bdf942 100644 --- a/Src/orbuculum.c +++ b/Src/orbuculum.c @@ -53,21 +53,21 @@ #include "git_version_info.h" #include "generics.h" #include "tpiuDecoder.h" -#include "cobs.h" +#include "otag.h" #include "nwclient.h" #include "orbtraceIf.h" #include "stream.h" -/* How many transfer buffers from the source to allocate */ -#define NUM_RAW_BLOCKS (32) - #define MAX_LINE_LEN (1024) #define ORBTRACE "orbtrace" #define ORBTRACEENVNAME "ORBTRACE" -#define OTAG_MAX_PACKET_LEN (COBS_MAX_PACKET_LEN) -#define OTAG_MAX_ENC_PACKET_LEN (COBS_MAX_ENC_PACKET_LEN) -#define OTAG_EOP_LEN (COBS_EOP_LEN) +/* Multiple blocks are used for USB, otherwise just the one */ +#define NUM_RAW_BLOCKS (32) + +/* File header for OTAG formatted file */ +#define OTAG_SIG (const char*)"%%OTAG1.0.0%%" +#define OTAG_SIG_LEN (strlen(OTAG_SIG)) /* Record for options, either defaults or from command line */ struct Options @@ -107,7 +107,7 @@ struct handlers struct RunTime { struct TPIUDecoder t; /* TPIU decoder instance, in case we need it */ - struct COBS c; /* COBS instance for OTAG, in case we need it */ + struct OTAG otag; /* OTAG instance, in case we need it */ struct Frame otagPart; /* OTAG part frame for maintaining continuity across packets */ struct Frame otagOtg; /* Outgoing OTAG frame for legacy use */ @@ -115,13 +115,9 @@ struct RunTime uint64_t intervalBytes; /* Number of bytes transferred in current interval */ uint64_t intervalRawBytes; /* Number of bytes transferred in current interval */ - uint64_t otagDataLenRxed; /* Number of bytes of decoded OTAG data */ + uint64_t otagDataLenRxed; /* Number of bytes of decoded OTAG data */ pthread_t intervalThread; /* Thread reporting on intervals */ - pthread_t processThread; /* Thread for processing prior to distributing to clients */ - pthread_t usbThread; /* Thread for usb thread (this sometimes dies, so needs restarting) */ - pthread_cond_t dataForClients; /* Semaphore counting data for clients */ - pthread_mutex_t dataForClients_m; /* Mutex for counting data for clients */ bool ending; /* Flag indicating app is terminating */ bool errored; /* Flag indicating problem in reception process */ bool conn; /* Flag indicating that we have a good connection */ @@ -131,11 +127,10 @@ struct RunTime int opFileHandle; /* Handle if we're writing orb output locally */ struct Options *options; /* Command line options (reference to above) */ - int wp; /* Read and write pointers into transfer buffers */ - int rp; struct dataBlock rawBlock[NUM_RAW_BLOCKS]; /* Transfer buffers from the receiver */ struct nwclientsHandle *otagHandler; /* Handle to OTAG output handler */ + bool usingOTAG; /* Flag that OTAG protocol is in use */ int numHandlers; /* Number of TPIU channel handlers in use */ struct handlers *handler; @@ -709,7 +704,7 @@ void *_checkInterval( void *params ) else { /* Either raw ITM or OTag frames */ - if ( OrbtraceSupportsOTAG( _r.o ) ) + if ( _r.usingOTAG ) { int w = 1000 - ( ( r->otagDataLenRxed * 1000 ) / r->intervalRawBytes ); genericsPrintf( " Waste:%01d.%01d%%", w / 10, w % 10 ); @@ -751,15 +746,11 @@ static void _purgeBlock( struct RunTime *r ) /* The OTAG encoded version goes out on the combined OTAG channel, with a specific channel header */ int j = h->strippedBlock->fillLevel; - - const uint8_t frontMatter[1] = { h->channel }; const uint8_t *b = h->strippedBlock->buffer; while ( j ) { - COBSEncode( frontMatter, 1, - b, ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN, - &r->otagOtg ); + OTAGEncode( h->channel, 0, b, ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN, &r->otagOtg ); nwclientSend( _r.otagHandler, r->otagOtg.len, r->otagOtg.d ); b += ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN; j -= ( j < OTAG_MAX_PACKET_LEN ) ? j : OTAG_MAX_PACKET_LEN; @@ -846,9 +837,9 @@ static void _otagIncoming( struct RunTime *r, const uint8_t *buffer, int length if ( r->otagPart.len ) { /* We already have a part cobs frame...complete it using whatever we've just received */ - flen = COBSgetFrameExtent( b, length ); + flen = OTAGgetFrameExtent( b, length ); - if ( COBSisEOFRAME( flen ) ) + if ( OTAGisEOFRAME( flen ) ) { /* There is an end of packet here...we can complete the send */ if ( ( flen - b + r->otagPart.len ) <= OTAG_MAX_ENC_PACKET_LEN ) @@ -886,15 +877,15 @@ static void _otagIncoming( struct RunTime *r, const uint8_t *buffer, int length while ( true ) { /* Flush any extra EOPs that may have existed in the original frame */ - while ( length && COBSisEOFRAME( b ) ) + while ( length && OTAGisEOFRAME( b ) ) { b++; length--; } - flen = COBSgetFrameExtent( b, length ); + flen = OTAGgetFrameExtent( b, length ); - if ( !( length && COBSisEOFRAME( flen ) ) ) + if ( !( length && OTAGisEOFRAME( flen ) ) ) { break; } @@ -917,7 +908,6 @@ static void _otagIncoming( struct RunTime *r, const uint8_t *buffer, int length static void _processBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer ) { - static const uint8_t frontMatter[1] = { DEFAULT_ITM_STREAM }; genericsReport( V_DEBUG, "RXED Packet of %d bytes%s" EOL, fillLevel, ( r->options->intervalReportTime ) ? EOL : "" ); if ( fillLevel ) @@ -949,8 +939,7 @@ static void _processBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer while ( fillLevel ) { - COBSEncode( frontMatter, 1, - b, + OTAGEncode( DEFAULT_ITM_STREAM, 0, b, ( fillLevel < OTAG_MAX_PACKET_LEN ) ? fillLevel : OTAG_MAX_PACKET_LEN, &r->otagOtg ); nwclientSend( _r.otagHandler, r->otagOtg.len, r->otagOtg.d ); @@ -962,7 +951,7 @@ static void _processBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer } // ==================================================================================================== -static void _OTAGpacketRxed( struct Frame *p, void *param ) +static void _OTAGpacketRxed( struct OTAGFrame *p, void *param ) /* Put the packet into the correct output buffer */ @@ -970,13 +959,13 @@ static void _OTAGpacketRxed( struct Frame *p, void *param ) int chIndex; struct handlers *h = _r.handler; - /* Record the length we received...first byte is channel number */ - _r.otagDataLenRxed += p->len - 1; + /* Record the length we received */ + _r.otagDataLenRxed += p->len; /* Search for channel */ for ( chIndex = 0; chIndex < _r.numHandlers; chIndex++ ) { - if ( h->channel == p->d[0] ) + if ( h->channel == p->tag ) { break; } @@ -987,7 +976,7 @@ static void _OTAGpacketRxed( struct Frame *p, void *param ) if ( ( chIndex != _r.numHandlers ) && ( h ) ) { /* We must have found a match for this at some point, so add it to the queue */ - for ( int i = 1; i < p->len; i++ ) + for ( int i = 0; i < p->len; i++ ) { h->strippedBlock->buffer[h->strippedBlock->fillLevel++] = p->d[i]; @@ -1001,18 +990,18 @@ static void _OTAGpacketRxed( struct Frame *p, void *param ) } } // ==================================================================================================== -static void _handleBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer, bool isOTag ) +static void _handleBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer ) /* Handle an incoming block in either 'conventional' or OTag format */ { - if ( isOTag ) + if ( r->usingOTAG ) { _otagIncoming( r, buffer, fillLevel ); /* We need to decode this so it can go through the 'normal' output channels too */ - COBSPump( &r->c, buffer, fillLevel, _OTAGpacketRxed, r ); + OTAGPump( &r->otag, buffer, fillLevel, _OTAGpacketRxed, r ); r->intervalRawBytes += fillLevel; _purgeBlock( r ); } @@ -1024,35 +1013,6 @@ static void _handleBlock( struct RunTime *r, ssize_t fillLevel, uint8_t *buffer, r->intervalRawBytes += fillLevel; } // ==================================================================================================== -static void *_processBlocksQueue( void *params ) - -/* Generic block processor task for received data */ - -{ - struct RunTime *r = ( struct RunTime * )params; - - while ( !r->ending ) - { - pthread_cond_wait( &r->dataForClients, &r->dataForClients_m ); - - if ( r->rp != r->wp ) - { - _processBlock( r, r->rawBlock[r->rp].fillLevel, r->rawBlock[r->rp].buffer ); - r->rp = ( r->rp + 1 ) % NUM_RAW_BLOCKS; - } - } - - return NULL; -} - -// ==================================================================================================== -static void _dataAvailable( struct RunTime *r ) - -{ - pthread_cond_signal( &r->dataForClients ); -} -// ==================================================================================================== - static void _usb_callback( struct libusb_transfer *t ) @@ -1070,7 +1030,7 @@ static void _usb_callback( struct libusb_transfer *t ) } } - _handleBlock( &_r, t->actual_length, t->buffer, OrbtraceSupportsOTAG( _r.o ) ); + _handleBlock( &_r, t->actual_length, t->buffer ); if ( ( t->status != LIBUSB_TRANSFER_COMPLETED ) && ( t->status != LIBUSB_TRANSFER_TIMED_OUT ) && @@ -1134,6 +1094,7 @@ void _actionOrbtraceCommand( struct RunTime *r, char *sn, enum ORBTraceDevice d static int _usbFeeder( struct RunTime *r ) { + bool firstRunThrough = true; int workingDev; /* Copy any part serial number across */ @@ -1182,7 +1143,9 @@ static int _usbFeeder( struct RunTime *r ) break; } - if ( OrbtraceSupportsOTAG( r->o ) ) + r->usingOTAG = OrbtraceSupportsOTAG( r->o ); + + if ( r->usingOTAG ) { genericsReport( V_INFO, "Orbtrace supports OTAG protocol" EOL ); @@ -1190,6 +1153,17 @@ static int _usbFeeder( struct RunTime *r ) { genericsReport( V_WARN, "TPIU decoding specified, but ORBTrace supports OTAG, are you sure?" EOL ); } + + if ( firstRunThrough && _r.opFileHandle ) + { + if ( write( _r.opFileHandle, OTAG_SIG, OTAG_SIG_LEN ) < 0 ) + { + genericsExit( -4, "Could not write OTAG signature to file (%s)" EOL, strerror( errno ) ); + } + } + + /* We only attempt to write the file header on the first run through */ + firstRunThrough = false; } genericsReport( V_DEBUG, "USB Interface claimed, ready for data" EOL ); @@ -1227,7 +1201,10 @@ static int _usbFeeder( struct RunTime *r ) } // ==================================================================================================== static int _nwserverFeeder( struct RunTime *r ) + { + struct dataBlock *rxBlock = &r->rawBlock[0]; + while ( true ) { struct Stream *stream = streamCreateSocket( r->options->nwserverHost, r->options->nwserverPort ); @@ -1243,21 +1220,16 @@ static int _nwserverFeeder( struct RunTime *r ) while ( !r->ending ) { - struct dataBlock *rxBlock = &r->rawBlock[r->wp]; - - size_t receivedSize; - enum ReceiveResult result = stream->receive( stream, rxBlock->buffer, USB_TRANSFER_SIZE, NULL, &receivedSize ); + size_t fl; + enum ReceiveResult result = stream->receive( stream, rxBlock->buffer, USB_TRANSFER_SIZE, NULL, &fl ); + rxBlock->fillLevel = fl; if ( result != RECEIVE_RESULT_OK ) { break; } - rxBlock->fillLevel = receivedSize; - r->wp = ( r->wp + 1 ) % NUM_RAW_BLOCKS; - - - _dataAvailable( r ); + _handleBlock( r, rxBlock->fillLevel, rxBlock->buffer ); } if ( !r->ending ) @@ -1325,7 +1297,7 @@ static int _serialFeeder( struct RunTime *r ) continue; } - struct dataBlock *rxBlock = &r->rawBlock[r->wp]; + struct dataBlock *rxBlock = &r->rawBlock[0]; DWORD transferSize = stats.cbInQue; @@ -1334,18 +1306,14 @@ static int _serialFeeder( struct RunTime *r ) transferSize = USB_TRANSFER_SIZE; } - DWORD readBytes = 0; - ReadFile( portHandle, rxBlock->buffer, transferSize, &readBytes, NULL ); - - rxBlock->fillLevel = readBytes; + ReadFile( portHandle, rxBlock->buffer, transferSize, &rxBlock->fillLevel, NULL ); if ( rxBlock->fillLevel <= 0 ) { break; } - r->wp = ( r->wp + 1 ) % NUM_RAW_BLOCKS; - _dataAvailable( r ); + _handleBlock( r, rxBlock->fillLevel, rxBlock->buffer ); } r->conn = false; @@ -1369,6 +1337,7 @@ static int _serialFeeder( struct RunTime *r ) static int _serialFeeder( struct RunTime *r ) { int ret; + struct dataBlock *rxBlock = &r->rawBlock[0]; while ( !r->ending ) { @@ -1412,15 +1381,12 @@ static int _serialFeeder( struct RunTime *r ) while ( !r->ending ) { - struct dataBlock *rxBlock = &r->rawBlock[r->wp]; - if ( ( rxBlock->fillLevel = read( r->f, rxBlock->buffer, USB_TRANSFER_SIZE ) ) <= 0 ) { break; } - r->wp = ( r->wp + 1 ) % NUM_RAW_BLOCKS; - _dataAvailable( r ); + _handleBlock( r, rxBlock->fillLevel, rxBlock->buffer ); } r->conn = false; @@ -1441,6 +1407,9 @@ static int _serialFeeder( struct RunTime *r ) static int _fileFeeder( struct RunTime *r ) { + struct dataBlock *rxBlock = &r->rawBlock[0]; + + if ( ( r->f = open( r->options->file, O_RDONLY ) ) < 0 ) { genericsExit( -4, "Can't open file %s" EOL, r->options->file ); @@ -1448,11 +1417,19 @@ static int _fileFeeder( struct RunTime *r ) r->conn = true; - while ( !r->ending ) + /* Start off by checking if this is OTAG formatted */ + rxBlock->fillLevel = read( r->f, rxBlock->buffer, OTAG_SIG_LEN ); + r->usingOTAG = ( ( OTAG_SIG_LEN == rxBlock->fillLevel ) && ( !strncmp( OTAG_SIG, ( char * )rxBlock->buffer, OTAG_SIG_LEN ) ) ); + genericsReport( V_INFO, "File is %sin OTAG format" EOL, ( r->usingOTAG ) ? "" : "not " ); + + if ( r->usingOTAG ) { - struct dataBlock *rxBlock = &r->rawBlock[r->wp]; + /* This is OTAG, so we need to read the first data after the header */ rxBlock->fillLevel = read( r->f, rxBlock->buffer, USB_TRANSFER_SIZE ); + } + while ( !r->ending ) + { if ( !rxBlock->fillLevel ) { if ( r->options->fileTerminate ) @@ -1467,22 +1444,14 @@ static int _fileFeeder( struct RunTime *r ) } } - int nwp = ( r->wp + 1 ) % NUM_RAW_BLOCKS; - - /* Spin waiting for buffer space to become available */ - while ( nwp == r->rp ) - { - usleep( INTERVAL_1MS ); - } - - r->wp = nwp; - - _dataAvailable( r ); + _handleBlock( r, rxBlock->fillLevel, rxBlock->buffer ); if ( r->options->paceDelay ) { usleep( r->options->paceDelay ); } + + rxBlock->fillLevel = read( r->f, rxBlock->buffer, USB_TRANSFER_SIZE ); } r->conn = false; @@ -1514,16 +1483,6 @@ int main( int argc, char *argv[] ) WSAStartup( MAKEWORD( 2, 2 ), &wsaData ); #endif - if ( pthread_mutex_init( &_r.dataForClients_m, NULL ) != 0 ) - { - genericsExit( -1, "Failed to establish mutex for condition variablee" EOL ); - } - - if ( pthread_cond_init( &_r.dataForClients, NULL ) != 0 ) - { - genericsExit( -1, "Failed to establish condition variablee" EOL ); - } - if ( !_processOptions( argc, argv, &_r ) ) { /* processOptions generates its own error messages */ @@ -1535,7 +1494,7 @@ int main( int argc, char *argv[] ) TPIUDecoderInit( &_r.t ); } - COBSInit( &_r.c ); + OTAGInit( &_r.otag ); genericsScreenHandling( !_r.options->mono ); @@ -1636,9 +1595,6 @@ int main( int argc, char *argv[] ) if ( ( _r.options->nwserverPort ) || ( _r.options->port ) || ( _r.options->file ) ) { - /* Start the distribution task */ - pthread_create( &_r.processThread, NULL, &_processBlocksQueue, &_r ); - if ( _r.options->nwserverPort ) { exit( _nwserverFeeder( &_r ) ); diff --git a/meson.build b/meson.build index 7bde989a..00936359 100644 --- a/meson.build +++ b/meson.build @@ -58,6 +58,7 @@ liborb = library('orb', 'Src/tpiuDecoder.c', 'Src/msgDecoder.c', 'Src/cobs.c', + 'Src/otag.c', 'Src/msgSeq.c', 'Src/traceDecoder.c', 'Src/generics.c',