diff --git a/.github/workflows/pkg-build.yaml b/.github/workflows/pkg-build.yaml index 3d0b9283..08b0f901 100644 --- a/.github/workflows/pkg-build.yaml +++ b/.github/workflows/pkg-build.yaml @@ -16,10 +16,9 @@ jobs: arch: - amd64 target: + - "debian:bookworm" - "debian:buster" - "debian:bullseye" - - "debian:bookworm" - - "ubuntu:bionic" - "ubuntu:focal" - "ubuntu:jammy" @@ -30,16 +29,91 @@ jobs: run: ./debpkg-setup.sh - name: Build packages run: ./debpkg-build.sh - - name: Store packages + - name: Set environment variables for upload + run: echo DIRNAME=${{ matrix.target }} | tr ':' '_' >> $GITHUB_ENV + - name: Copy packages to uploadable location run: | - export DIRNAME=`echo ${{ matrix.target }} | tr ':' '_'` mkdir -p packages/${DIRNAME} cp ../*.deb packages/${DIRNAME}/ - - name: Publish packages to cloudsmith - uses: wandnz/action-cloudsmith-upload-packages@v1.6 + - name: Store packages + uses: actions/upload-artifact@v3 with: - path: packages/ - repo: ${{ secrets.CLOUDSMITH_OWNER }}/libtrace - username: salcock - api_key: ${{ secrets.CLOUDSMITH_API_KEY }} + name: packages-${{ env.DIRNAME }} + path: packages/${{ env.DIRNAME }}/*.deb + retention-days: 1 + test: + runs-on: ubuntu-latest + container: + image: ${{ matrix.target }} + strategy: + fail-fast: false + matrix: + arch: + - amd64 + target: + - "debian:buster" + - "debian:bullseye" + - "ubuntu:focal" + - "ubuntu:jammy" + - "debian:bookworm" + needs: build + steps: + - name: Set environment variables for download + run: echo DIRNAME=${{ matrix.target }} | tr ':' '_' >> $GITHUB_ENV + - name: Download artifact + uses: actions/download-artifact@v3 + with: + name: packages-${{ env.DIRNAME }} + - name: Add repositories + run: | + apt update -y + apt install -y apt-transport-https curl + curl -1sLf 'https://dl.cloudsmith.io/public/wand/libwandio/cfg/setup/bash.deb.sh' | bash + curl -1sLf 'https://dl.cloudsmith.io/public/wand/libwandder/cfg/setup/bash.deb.sh' | bash + - name: Test package install + env: + DEBIAN_FRONTEND: noninteractive + run: | + apt update -y + find . -name "*.deb" | xargs apt install -y + - name: Test package removal + env: + DEBIAN_FRONTEND: noninteractive + run: | + apt remove -y --purge libtrace4 + + publish: + runs-on: ubuntu-latest + container: + image: ${{ matrix.target }} + strategy: + fail-fast: false + matrix: + arch: + - amd64 + target: + - "debian:buster" + - "debian:bullseye" + - "debian:bookworm" + - "ubuntu:focal" + - "ubuntu:jammy" + needs: test + steps: + - name: Set environment variables for download + run: echo DIRNAME=${{ matrix.target }} | tr ':' '_' >> $GITHUB_ENV + - name: Download artifact + uses: actions/download-artifact@v3 + with: + name: packages-${{ env.DIRNAME }} + - name: Copy packages + run: | + mkdir -p packages/${DIRNAME} + find . -name "*.deb" | xargs cp -t packages/${DIRNAME}/ + - name: Publish package to cloudsmith + uses: wanduow/action-cloudsmith-upload-packages@v1 + with: + path: packages/ + repo: ${{ secrets.CLOUDSMITH_OWNER }}/libtrace + username: salcock + api_key: ${{ secrets.CLOUDSMITH_API_KEY }} diff --git a/.github/workflows/rpm-build.yaml b/.github/workflows/rpm-build.yaml index ce88c231..676dfe5f 100644 --- a/.github/workflows/rpm-build.yaml +++ b/.github/workflows/rpm-build.yaml @@ -1,4 +1,4 @@ -name: Packaging for Centos and Fedora +name: Packaging for RPM on: push: @@ -7,40 +7,126 @@ on: jobs: build: - runs-on: ubuntu-latest - container: - image: ${{ matrix.target }} - strategy: - fail-fast: false - matrix: - arch: - - amd64 - target: - - "centos:7" - - "fedora:37" - - "fedora:38" - - "rockylinux:8" - - "rockylinux:9" - - - steps: + runs-on: ubuntu-latest + container: + image: ${{ matrix.target }} + strategy: + fail-fast: false + matrix: + arch: + - amd64 + target: + - "rockylinux:8" + - "rockylinux:9" + - "fedora:39" + - "fedora:38" + - "centos:7" + + steps: - name: Checkout repo uses: actions/checkout@v3 - name: Install prereq packages run: ./rpmpkg-setup.sh ${{ matrix.target }} - name: Build packages - run: ./rpmpkg-build.sh ${{ matrix.target }} - - name: Store packages + run: ./rpmpkg-build.sh + - name: Set environment variables for upload + run: echo DIRNAME=${{ matrix.target }} | tr ':' '_' >> $GITHUB_ENV + - name: Copy packages to uploadable location run: | - export DIRNAME=`echo ${{ matrix.target }} | tr ':' '_'` mkdir -p packages/${DIRNAME} cp ~/rpmbuild/RPMS/x86_64/*.rpm packages/${DIRNAME}/ - - name: Publish packages to cloudsmith - uses: salcock/action-cloudsmith-upload-packages@v1.7 + - name: Store packages + uses: actions/upload-artifact@v3 with: - path: packages/ - repo: ${{ secrets.CLOUDSMITH_OWNER }}/libtrace - username: salcock - api_key: ${{ secrets.CLOUDSMITH_API_KEY }} + name: packages-${{ env.DIRNAME }} + path: packages/${{ env.DIRNAME }}/*.rpm + retention-days: 1 + + test: + runs-on: ubuntu-latest + container: + image: ${{ matrix.target }} + strategy: + fail-fast: false + matrix: + arch: + - amd64 + target: + - "rockylinux:8" + - "rockylinux:9" + - "fedora:39" + - "fedora:38" + - "centos:7" + needs: build + steps: + - name: Set environment variables for download + run: echo DIRNAME=${{ matrix.target }} | tr ':' '_' >> $GITHUB_ENV + - name: Download artifact + uses: actions/download-artifact@v3 + with: + name: packages-${{ env.DIRNAME }} + - name: Add repositories + env: + TARGET: ${{ matrix.target }} + run: | + curl -1sLf 'https://dl.cloudsmith.io/public/wand/libwandio/cfg/setup/bash.rpm.sh' | bash + curl -1sLf 'https://dl.cloudsmith.io/public/wand/libwandder/cfg/setup/bash.rpm.sh' | bash + if [ "${TARGET}" == "centos:7" ]; then + yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm || true + fi + if [[ "${TARGET}" =~ rocky* ]]; then + dnf install -y dnf-plugins-core epel-release || true + dnf config-manager --set-enabled powertools || true + /usr/bin/crb enable || true + fi + - name: Test package install + run: | + yum install -y findutils + find . -name "*.rpm" | xargs yum install -y + - name: Test package removal + run: | + rpm -e libtrace4-tools + rpm -e libpacketdump4-devel + rpm -e libtrace4-devel + rpm -e libpacketdump4 + rpm -e libtrace4 + + + publish: + runs-on: ubuntu-latest + container: + image: ${{ matrix.target }} + strategy: + fail-fast: false + matrix: + arch: + - amd64 + target: + - "rockylinux:8" + - "rockylinux:9" + - "fedora:39" + - "fedora:38" + - "centos:7" + needs: test + steps: + - name: Set environment variables for download + run: echo DIRNAME=${{ matrix.target }} | tr ':' '_' >> $GITHUB_ENV + - name: Download artifact + uses: actions/download-artifact@v3 + with: + name: packages-${{ env.DIRNAME }} + - name: Copy packages + run: | + mkdir -p packages/${DIRNAME} + yum install -y findutils + find . -name "*.rpm" | xargs cp -t packages/${DIRNAME}/ + - name: Publish package to cloudsmith + uses: wanduow/action-cloudsmith-upload-packages@v1 + with: + path: packages/ + repo: ${{ secrets.CLOUDSMITH_OWNER }}/libtrace + username: salcock + api_key: ${{ secrets.CLOUDSMITH_API_KEY }} + diff --git a/README b/README index 568f35fa..9df3f106 100644 --- a/README +++ b/README @@ -1,4 +1,4 @@ -libtrace 4.0.23 +libtrace 4.0.24 Code and documentation added since version 4.0.20 is Copyright (c) 2023 Shane Alcock and has been contributed as per diff --git a/configure.in b/configure.in index 06ba372c..72e56831 100644 --- a/configure.in +++ b/configure.in @@ -3,11 +3,11 @@ # Now you only need to update the version number in two places - below, # and in the README -AC_INIT([libtrace],[4.0.23],[shane@alcock.co.nz],[libtrace]) +AC_INIT([libtrace],[4.0.24],[shane@alcock.co.nz],[libtrace]) LIBTRACE_MAJOR=4 LIBTRACE_MID=0 -LIBTRACE_MINOR=23 +LIBTRACE_MINOR=24 # OpenSolaris hides libraries like libncurses in /usr/gnu/lib, which is not # searched by default - add it to LDFLAGS so we at least have a chance of diff --git a/debian/changelog b/debian/changelog index fdf18b27..9748c1aa 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,24 @@ +libtrace4 (4.0.24-1) unstable; urgency=medium + + * ndagtcp: fix miscalculation of ERF header length that would + prevent packets from being able to be decoded correctly. + * traceucast: do not exit if the client disconnects, instead try to + reconnect and resume unicasting. + * traceucast: fix memory errors when attempting to send a captured + packet that is larger than 10K bytes. + * ndagtcp: fix issue that would cause input to silently stop reading + if it received a packet larger than 10K bytes. + * ndagtcp: fix lock-up that occurred when trying to halt a program + that was reading from an ndagtcp input. + * ndagtcp: fix erroneous "Malformed beacon" message when an ndagtcp + input reconnects to traceucast. + * traceucast: fix getaddrinfo memory leak. + * traceucast: fix endless reconnection loop bug. + * traceucast: fix race condition that would prevent Ctrl-C from + halting traceucast properly. + + -- Shane Alcock Wed, 24 Jan 2024 13:43:51 +1300 + libtrace4 (4.0.23-1) unstable; urgency=medium * Add new tool: traceucast, a TCP unicast variant of tracemcast. diff --git a/lib/Makefile.am b/lib/Makefile.am index fafa62b5..f4f0e296 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -146,7 +146,7 @@ endif AM_CPPFLAGS= @ADD_INCLS@ libtrace_la_LIBADD = @LIBTRACE_LIBS@ @LTLIBOBJS@ $(DPDKLIBS) -libtrace_la_LDFLAGS=-version-info 7:7:0 @ADD_LDFLAGS@ +libtrace_la_LDFLAGS=-version-info 7:8:0 @ADD_LDFLAGS@ dagapi.c: cp @DAG_TOOLS_DIR@/dagapi.c . diff --git a/lib/format_erf.c b/lib/format_erf.c index c51fcddc..3665f49d 100644 --- a/lib/format_erf.c +++ b/lib/format_erf.c @@ -154,6 +154,7 @@ int erf_get_padding(const libtrace_packet_t *packet) case TRACE_FORMAT_NDAG: case TRACE_FORMAT_RAWERF: case TRACE_FORMAT_DPDK_NDAG: + case TRACE_FORMAT_NDAG_TCP: switch((erfptr->type & 0x7f)) { case TYPE_ETH: case TYPE_COLOR_ETH: diff --git a/lib/format_ndag.c b/lib/format_ndag.c index c6eb35c1..90a13844 100644 --- a/lib/format_ndag.c +++ b/lib/format_ndag.c @@ -50,7 +50,7 @@ #include "format_ndag.h" #define NDAG_IDLE_TIMEOUT (600) -#define ENCAP_BUFSIZE (10000) +#define ENCAP_BUFSIZE (65536) #define CTRL_BUF_SIZE (10000) #define ENCAP_BUFFERS (1000) @@ -440,7 +440,7 @@ static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf, numstreams = ntohs(*ptr); ptr ++; - if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t))) + if ((uint32_t)msgsize < ((numstreams + 1) * sizeof(uint16_t))) { fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n"); fprintf(stderr, "%u %u\n", msgsize, numstreams); @@ -556,7 +556,7 @@ static int accept_ndagtcp_connection(libtrace_t *libtrace, fcntl(sock, F_SETFL, O_NONBLOCK); - while (is_halted(libtrace) == -1) { + while (is_halted(libtrace) == -1 && !ndag_paused) { r = select_on_sock(sock); if (r < 0) { fprintf(stderr, "Error in select while accepting connection on socket for %s:%s -- %s\n", @@ -1098,7 +1098,7 @@ static int ndag_prepare_packet_stream_encaperf(libtrace_t *restrict libtrace, nr = ssock->nextreadind; available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); - if (ssock->nextrlen == 0 || ssock->nextrlen > ENCAP_BUFSIZE) { + if (ssock->nextrlen == 0) { return -1; } @@ -1175,7 +1175,7 @@ static int ndag_prepare_packet_stream_corsarotag(libtrace_t *restrict libtrace, nr = ssock->nextreadind; available = ssock->savedsize[nr] - (ssock->nextread - ssock->saved[nr]); - if (ssock->nextrlen == 0 || ssock->nextrlen > ENCAP_BUFSIZE) { + if (ssock->nextrlen == 0) { return -1; } @@ -1574,7 +1574,6 @@ static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv, ssock->sock = -1; return 0; } - ssock->startidle = 0; ssock->savedsize[ssock->nextwriteind] = ret; diff --git a/rpm/libtrace4.spec b/rpm/libtrace4.spec index 11e8935f..08e2074d 100644 --- a/rpm/libtrace4.spec +++ b/rpm/libtrace4.spec @@ -1,5 +1,5 @@ Name: libtrace4 -Version: 4.0.23 +Version: 4.0.24 Release: 1%{?dist} Summary: C Library for capturing and analysing network packets @@ -127,6 +127,9 @@ find $RPM_BUILD_ROOT -name '*.la' -exec rm -f {} ';' %changelog +* Wed Jan 24 2024 Shane Alcock - 4.0.24-1 +- Updated for 4.0.24 release + * Fri Nov 10 2023 Shane Alcock - 4.0.23-1 - Updated for 4.0.23 release diff --git a/tools/tracemcast/traceucast.c b/tools/tracemcast/traceucast.c index 0b115a9f..91c6af5e 100644 --- a/tools/tracemcast/traceucast.c +++ b/tools/tracemcast/traceucast.c @@ -100,6 +100,7 @@ typedef struct read_thread_data { int streamfd; uint8_t *pbuffer; + uint32_t bufsize; ndag_encap_t *encaphdr; uint8_t *writeptr; uint32_t seqno; @@ -118,7 +119,7 @@ volatile int halted = 0; static void cleanup_signal(int signal UNUSED) { if (currenttrace) { - trace_pstop(currenttrace); + trace_interrupt(); } halted = 1; } @@ -130,7 +131,7 @@ static int create_stream_socket(uint16_t port, char *clientaddr, struct addrinfo *gotten; char portstr[16]; int sock; - int bufsize, reuse=1; + int bufsize, reuse=1, connected = 0; hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -146,6 +147,9 @@ static int create_stream_socket(uint16_t port, char *clientaddr, return -1; } if (targetinfo) { + if (*targetinfo) { + free(*targetinfo); + } *targetinfo = gotten; } @@ -201,9 +205,14 @@ static int create_stream_socket(uint16_t port, char *clientaddr, } else { fprintf(stderr, "traceucast connected to %s:%s\n", clientaddr, portstr); + connected = 1; break; } } + if (!connected && sock > 0) { + close(sock); + sock = -1; + } sockcreateover: if (targetinfo == NULL) { @@ -242,6 +251,7 @@ static void *init_reader_thread(libtrace_t *trace, rdata->streamport = gparams->firstport + rdata->threadid; rdata->streamfd = -1; rdata->pbuffer = calloc(MAX_PACKET_SIZE, sizeof(uint8_t)); + rdata->bufsize = MAX_PACKET_SIZE; rdata->writeptr = rdata->pbuffer; rdata->seqno = 1; rdata->target = NULL; @@ -255,66 +265,158 @@ static void *init_reader_thread(libtrace_t *trace, return rdata; } -static int send_ndag_packet(read_thread_data_t *rdata) { +static int connect_stream_fd(read_thread_data_t *rdata, + struct global_params *gparams) +{ - int s; - int rem = (rdata->writeptr - rdata->pbuffer); - int sentsofar = 0; - int ret = 0; - int attempts = 0; - int backoff = 5000; + int fd; + uint8_t block; - rdata->encaphdr->recordcount = ntohs(rdata->reccount); + if (rdata->livesource) { + block = 0; + } else { + block = 1; + } + fd = create_stream_socket(rdata->streamport, gparams->clientaddr, + &(rdata->target), block); - while (rem > 0) { - s = send(rdata->streamfd, rdata->pbuffer + sentsofar, rem, MSG_DONTWAIT); + if (fd == 0) { + return 0; + } - if (s < 0) { - if ((errno == EAGAIN || errno == EWOULDBLOCK) && attempts < 20) { - attempts ++; - usleep(backoff); - backoff = backoff * 2; - if (backoff > 1000000) { - backoff = 1000000; + if (fd == -1) { + if (errno != ECONNREFUSED) { + fprintf(stderr, + "traceucast: failed to create TCP socket for " + "reader thread %d: %s\n", + rdata->threadid, strerror(errno)); + return -1; + } else { + return 0; } - continue; - } - fprintf(stderr, "traceucast: thread %d failed to send streamed ERF packet: %s\n", - rdata->threadid, strerror(errno)); - fprintf(stderr, "%u\n", rdata->seqno); - ret = -1; - break; + } else if (rdata->target == NULL) { + fprintf( + stderr, + "traceucast: failed to get addrinfo for reader socket %d\n", + rdata->threadid); + close(rdata->streamfd); + rdata->streamfd = -1; + return -1; } + rdata->streamfd = fd; + return fd; +} - sentsofar += s; - rem -= s; - } +#define HANDLE_SEND_ERROR \ + if (s < 0) { \ + if ((errno == EAGAIN || errno == EWOULDBLOCK) && \ + attempts < 20) { \ + attempts++; \ + usleep(backoff); \ + backoff = backoff * 2; \ + if (backoff > 1000000) { \ + backoff = 1000000; \ + } \ + continue; \ + } \ + fprintf(stderr, \ + "traceucast: thread %d failed to send streamed ERF " \ + "packet: %s\n", \ + rdata->threadid, strerror(errno)); \ + close(rdata->streamfd); \ + rdata->streamfd = -1; \ + usleep(200000); \ + continue; \ + } - rdata->writeptr = rdata->pbuffer; - rdata->encaphdr = NULL; - rdata->reccount = 0; - return ret; +static int send_ndag_packet(read_thread_data_t *rdata, + struct global_params *gparams) +{ + + int s, r; + int rem = (rdata->writeptr - rdata->pbuffer); + int sentsofar = 0; + int attempts = 0; + int backoff = 5000; + + int firstsend = 0; + int fs_amount = 0; + + rdata->encaphdr->recordcount = ntohs(rdata->reccount); + + while (rem > 0 && !halted) { + if (rdata->streamfd == -1) { + if ((r = connect_stream_fd(rdata, gparams)) < 0) { + rdata->failed = 1; + trace_interrupt(); + return -1; + } + if (r == 0) { + if (rdata->livesource) { + return 0; + } + sleep(1); + continue; + } + fprintf(stderr, + "traceucast: streaming thread %d established " + "connection\n", + rdata->threadid); + } + + if (firstsend == 0 && rem > 8) { + /* try to detect a broken pipe by attempting a "canary" + * send of 8 bytes so that the main send is more likely + * to trigger EPIPE + */ + s = send(rdata->streamfd, rdata->pbuffer, 8, + MSG_DONTWAIT | MSG_NOSIGNAL); + HANDLE_SEND_ERROR + fs_amount = s; + + s = send(rdata->streamfd, rdata->pbuffer + fs_amount, + rem - fs_amount, MSG_DONTWAIT | MSG_NOSIGNAL); + HANDLE_SEND_ERROR + sentsofar += (s + fs_amount); + rem -= (s + fs_amount); + firstsend = 1; + } else { + s = send(rdata->streamfd, rdata->pbuffer + sentsofar, + rem, MSG_DONTWAIT | MSG_NOSIGNAL); + HANDLE_SEND_ERROR + sentsofar += s; + rem -= s; + } + } + + rdata->writeptr = rdata->pbuffer; + rdata->encaphdr = NULL; + rdata->reccount = 0; + return sentsofar; } static void halt_reader_thread(libtrace_t *trace UNUSED, - libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { + libtrace_thread_t *t UNUSED, void *global, + void *tls) +{ - read_thread_data_t *rdata = (read_thread_data_t *)tls; + read_thread_data_t *rdata = (read_thread_data_t *)tls; + struct global_params *gparams = (struct global_params *)global; - if (rdata->writeptr > rdata->pbuffer) { - send_ndag_packet(rdata); - } + if (rdata->writeptr > rdata->pbuffer) { + send_ndag_packet(rdata, gparams); + } - if (rdata->pbuffer) { - free(rdata->pbuffer); - } - if (rdata->target) { - freeaddrinfo(rdata->target); - } - if (rdata->streamfd != -1) { - close(rdata->streamfd); - } - free(rdata); + if (rdata->pbuffer) { + free(rdata->pbuffer); + } + if (rdata->target) { + freeaddrinfo(rdata->target); + } + if (rdata->streamfd != -1) { + close(rdata->streamfd); + } + free(rdata); } static uint16_t construct_erf_header(read_thread_data_t *rdata, @@ -354,19 +456,21 @@ static uint16_t construct_erf_header(read_thread_data_t *rdata, } static void tick_reader_thread(libtrace_t *trace UNUSED, - libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls, - uint64_t order) { + libtrace_thread_t *t UNUSED, void *global, + void *tls, uint64_t order) +{ - read_thread_data_t *rdata = (read_thread_data_t *)tls; + read_thread_data_t *rdata = (read_thread_data_t *)tls; + struct global_params *gparams = (struct global_params *)global; - if (rdata->writeptr > rdata->pbuffer && + if (rdata->writeptr > rdata->pbuffer && (order >> 32) >= rdata->lastsend + 3) { - if (send_ndag_packet(rdata) < 0) { - rdata->failed = 1; + if (send_ndag_packet(rdata, gparams) < 0) { + rdata->failed = 1; + } + rdata->lastsend = (order >> 32); } - rdata->lastsend = (order >> 32); - } } static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, @@ -379,6 +483,7 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, uint32_t rem; void *l2; uint64_t erfts; + int r; if (rdata->failed) { trace_interrupt(); @@ -389,36 +494,6 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, return packet; } - if (rdata->streamfd == -1) { - int fd; - uint8_t block; - - if (rdata->livesource) { - block = 0; - } else { - block = 1; - } - fd = create_stream_socket(rdata->streamport, - gparams->clientaddr, &(rdata->target), block); - - if (fd == 0) { - return packet; - } - if (fd == -1) { - fprintf(stderr, "traceucast: failed to create TCP socket for reader thread %d\n", rdata->threadid); - trace_interrupt(); - return packet; - - } else if (rdata->target == NULL) { - fprintf(stderr, "traceucast: failed to get addrinfo for reader socket %d\n", rdata->threadid); - close(rdata->streamfd); - rdata->streamfd = -1; - trace_interrupt(); - return packet; - } - rdata->streamfd = fd; - } - /* first, check if there is going to be space in the buffer for this * packet + an ERF header */ l2 = trace_get_layer2(packet, <ype, &rem); @@ -432,16 +507,34 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, */ if (rdata->writeptr > rdata->pbuffer + sizeof(ndag_common_t) + sizeof(ndag_encap_t)) { - - if (send_ndag_packet(rdata) < 0) { - rdata->failed = 1; - return packet; - } + if ((r = send_ndag_packet(rdata, gparams)) < 0) { + rdata->failed = 1; + close(rdata->streamfd); + rdata->streamfd = -1; + return packet; + } else if (r == 0) { + return packet; + } rdata->lastsend = (erfts >> 32); } } - /* append this packet to the buffer (truncate if necessary) */ + /* extend the buffer size if we happen to be working with very large + * packets + */ + while (rem + dag_record_size + sizeof(ndag_encap_t) + sizeof(ndag_common_t) + > rdata->bufsize) { + int writeoff = rdata->writeptr - rdata->pbuffer; + int encapoff = ((uint8_t *)rdata->encaphdr) - rdata->pbuffer; + + rdata->pbuffer = realloc(rdata->pbuffer, + rdata->bufsize + MAX_PACKET_SIZE); + rdata->bufsize += MAX_PACKET_SIZE; + rdata->writeptr = rdata->pbuffer + writeoff; + rdata->encaphdr = rdata->pbuffer + encapoff; + } + + /* append this packet to the buffer */ /* if the buffer is empty, put on a common and encap header on the * front, before adding any packets */ @@ -471,10 +564,11 @@ static libtrace_packet_t *packet_reader_thread(libtrace_t *trace UNUSED, /* if the buffer is close to full, just send the buffer anyway */ if (MAX_PACKET_SIZE - (rdata->writeptr - rdata->pbuffer) - (dag_record_size + 2) < 64) { - if (send_ndag_packet(rdata) < 0) { - rdata->failed = 1; - } - rdata->lastsend = (erfts >> 32); + if ((r = send_ndag_packet(rdata, gparams)) < 0) { + rdata->failed = 1; + } else if (r != 0) { + rdata->lastsend = (erfts >> 32); + } } return packet; @@ -574,22 +668,11 @@ static uint32_t form_beacon(char **buffer, struct beacon_params *bparams) { static void *beaconer_thread(void *tdata) { struct beacon_params *bparams = (struct beacon_params *)tdata; - int sock; + int sock = -1; char *beaconpacket = NULL; uint32_t beaconsize; struct addrinfo *targetinfo = NULL; - sock = create_stream_socket(bparams->beaconport, - bparams->gparams->clientaddr, &targetinfo, 1); - - if (sock == -1) { - fprintf(stderr, "traceucast: failed to create TCP socket for beaconer thread\n"); - halted = 1; - } else if (targetinfo == NULL) { - fprintf(stderr, "traceucast: failed to get addrinfo for beaconer socket\n"); - halted = 1; - } - beaconsize = form_beacon(&beaconpacket, bparams); if (beaconsize <= 0 || beaconpacket == NULL) { @@ -597,11 +680,45 @@ static void *beaconer_thread(void *tdata) { } while (!halted) { - if (send(sock, beaconpacket, beaconsize, 0) != beaconsize) { - fprintf(stderr, "traceucast: failed to send a beacon packet: %s\n", - strerror(errno)); - break; - } + if (sock == -1) { + sock = create_stream_socket(bparams->beaconport, + bparams->gparams->clientaddr, + &targetinfo, 1); + } + + if (sock == 0) { + sleep(1); + continue; + } + + if (sock == -1) { + if (errno != ECONNREFUSED) { + fprintf(stderr, "traceucast: failed to create TCP " + "socket for beacon thread: %s\n", + strerror(errno)); + halted = 1; + break; + } else { + sleep(1); + continue; + } + } else if (targetinfo == NULL) { + fprintf(stderr, "traceucast: failed to get addrinfo for " + "beaconer socket\n"); + halted = 1; + break; + } + + if (send(sock, beaconpacket, beaconsize, MSG_NOSIGNAL) != + beaconsize) { + fprintf(stderr, + "traceucast: failed to send a beacon packet: %s\n", + strerror(errno)); + close(sock); + sock = -1; + usleep(200000); + continue; + } usleep(1000 * bparams->frequency); } @@ -611,8 +728,8 @@ static void *beaconer_thread(void *tdata) { if (targetinfo) { free(targetinfo); } - if (sock >= 0) { - close(sock); + if (sock > 0) { + close(sock); } pthread_exit(NULL);