diff --git a/.github/workflows/deb-build.yaml b/.github/workflows/deb-build.yaml index be99ecbb..a29ba610 100644 --- a/.github/workflows/deb-build.yaml +++ b/.github/workflows/deb-build.yaml @@ -21,7 +21,6 @@ jobs: - "ubuntu:xenial" - "ubuntu:bionic" - "ubuntu:focal" - - "ubuntu:groovy" steps: - name: Checkout repo diff --git a/README.md b/README.md index 25a7d796..267c1017 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ OpenLI -- open source ETSI-compliant Lawful Intercept software -Version: 1.0.10 +Version: 1.0.11 --------------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index 46a303c2..6cad08c8 100644 --- a/configure.ac +++ b/configure.ac @@ -1,6 +1,6 @@ # Super primitive configure script -AC_INIT(openli, 1.0.10, salcock@waikato.ac.nz) +AC_INIT(openli, 1.0.11, salcock@waikato.ac.nz) AM_INIT_AUTOMAKE([subdir-objects]) AC_CONFIG_SRCDIR(src/collector/collector.c) diff --git a/debian/changelog b/debian/changelog index 917e831d..d3006684 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,26 @@ +openli (1.0.11-1) unstable; urgency=medium + + * Improved collector encoding performance by saving and reusing + previously-encoded records that have the exact same layout. + * Improved encoding performance at high packet rates by sending encoded + records to the forwarding thread in batches. + * Mediators will now actively avoid splitting a record across multiple send + calls wherever possible. + * Fixed bug where a collector would simply stop forwarding records for an + LIID on to the mediator for no apparent reason, especially when more + encoder threads were being used. + * Fixed bug where encoding jobs would be lost without being seen by the + encoder thread. + * Fixed performance-related issue where an overwhelmed mediator would never + send data to its handovers. + * Fixed crash in mediator after a handover is disconnected for failing to + send a keep alive response. + * BER encoding optimization has been removed. + * Fix issue where collector memory usage would be extremely high when under + load. + + -- Shane Alcock Wed, 21 Jul 2021 10:56:13 +1200 + openli (1.0.10-1) unstable; urgency=medium * Intercepts can now be configured with a start and/or end time (unix diff --git a/debian/compat b/debian/compat deleted file mode 100644 index ec635144..00000000 --- a/debian/compat +++ /dev/null @@ -1 +0,0 @@ -9 diff --git a/debian/control b/debian/control index 1f6d05b7..7baa72e5 100644 --- a/debian/control +++ b/debian/control @@ -2,8 +2,8 @@ Source: openli Section: net Priority: optional Maintainer: Shane Alcock -Build-Depends: debhelper (>= 9), dh-autoreconf, dh-systemd (>=1.5), - libtrace4-dev (>= 4.0.14), libyaml-dev, uthash-dev, libwandder2-dev, +Build-Depends: debhelper-compat (= 12), dh-autoreconf, dh-systemd (>=1.5), + libtrace4-dev (>= 4.0.16), libyaml-dev, uthash-dev, libwandder2-dev, libjudy-dev, libzmq3-dev, libgoogle-perftools-dev, libosip2-dev, libssl1.0-dev (>=1.0.2r) | libssl-dev, librabbitmq-dev, libmicrohttpd-dev, libjson-c-dev, libsqlcipher-dev diff --git a/debian/rules b/debian/rules index 58c7a226..3552fbb4 100755 --- a/debian/rules +++ b/debian/rules @@ -1,7 +1,7 @@ #!/usr/bin/make -f %: - dh $@ --with=systemd --with autoreconf + dh $@ --with autoreconf override_dh_auto_configure: diff --git a/debpkg-setup.sh b/debpkg-setup.sh index 5f1f70ce..d6b1d85b 100755 --- a/debpkg-setup.sh +++ b/debpkg-setup.sh @@ -11,7 +11,7 @@ export SOURCENAME=`echo ${GITHUB_REF##*/} | cut -d '-' -f 1` apt-get update apt-get install -y equivs devscripts dpkg-dev quilt curl apt-transport-https \ apt-utils ssl-cert ca-certificates gnupg lsb-release debhelper git \ - pkg-config + pkg-config sed DISTRO=$(lsb_release -sc) @@ -21,8 +21,23 @@ curl -1sLf 'https://dl.cloudsmith.io/public/wand/libtrace/cfg/setup/bash.deb.sh' curl -1sLf 'https://dl.cloudsmith.io/public/wand/openli/cfg/setup/bash.deb.sh' | bash case ${DISTRO} in - jessie | xenial | stretch ) + xenial ) curl -1sLf 'https://dl.cloudsmith.io/public/wand/dpdk-wand/cfg/setup/bash.deb.sh' | bash + apt-get install -y debhelper dh-systemd -t xenial-backports + sed -i 's/debhelper-compat (= 12)/debhelper (>= 10)/' debian/control + sed -i 's/--with auto/--with=systemd --with auto/' debian/rules + echo "10" > debian/compat + ;; + + stretch ) + curl -1sLf 'https://dl.cloudsmith.io/public/wand/dpdk-wand/cfg/setup/bash.deb.sh' | bash + sed -i 's/debhelper-compat (= 12)/debhelper (>= 10)/' debian/control + sed -i 's/--with auto/--with=systemd --with auto/' debian/rules + echo "10" > debian/compat + ;; + + bionic ) + apt-get install -y debhelper -t bionic-backports ;; esac diff --git a/doc/CollectorDoc.md b/doc/CollectorDoc.md index cd589359..0990fcc6 100644 --- a/doc/CollectorDoc.md +++ b/doc/CollectorDoc.md @@ -155,28 +155,6 @@ A collector only requires a small amount of configuration: a username and password that can be used to authenticate against a local RabbitMQ instance, and a flag to inform the collector that RabbitMQ output is enabled. -### Encoding Methods -OpenLI supports two methods for encoding the intercepted records and meta-data -so that they conform to the ETSI standard. The first method is DER encoding, -whereby all fields are encoded in the most space-efficient way. The advantage -of DER is that there is only one "correct" result for encoding a given input, -so it is less likely to be incompatible with what the receiver is expecting. -However, this encoding method is slower and therefore may limit your -maximum interception capability. - -The second method is BER encoding, which allows us to encode numeric values -using a fixed-width field, e.g. every integer consumes eight bytes, regardless -of whether it is a small number or a large one. This is less space-efficient -and relies on the receiver being prepared for fields that consume more space -than they might need, but is much faster for OpenLI to encode because each -record will have a relatively static structure. - -OpenLI versions prior to 1.0.9 supported DER encoding only, and this is the -default encoding method as it has been well tested in production deployments. -BER has the potential to be much more efficient, but we recommend doing some -specific testing with your local LEAs before switching over to BER as not all -LEA equipment may be able to decode it. - ### Target Identification for VOIP Intercepts By default, OpenLI does NOT trust the "From:" field in SIP packets when it is determining whether a SIP packet has been sent by an intercept target. This @@ -209,8 +187,6 @@ The basic option keys are: * interceptpointid -- set the interception point ID * seqtrackerthreads -- set the number of threads to use for sequence number tracking (defaults to 1). -* encoding -- choose the encoding method to use, either `ber` or - `der` (defaults to `der`). * encoderthreads -- set the number of threads to use for encoding ETSI records (defaults to 2). * forwardingthreads -- set the number of threads to use for forwarding diff --git a/doc/exampleconfigs/collector-example.yaml b/doc/exampleconfigs/collector-example.yaml index a0a7be14..265031fb 100644 --- a/doc/exampleconfigs/collector-example.yaml +++ b/doc/exampleconfigs/collector-example.yaml @@ -28,11 +28,6 @@ encoderthreads: 2 # mediators. You probably don't need to change this. forwardingthreads: 1 -# The encoding method to use for generating ETSI records. DER is the default -# method. BER is an alternative that should be more efficient, but not all -# LEAs may be able to decode it. -encoding: der - # Set this to yes if you want to override the policy of not trusting the # contents of the "From:" field in SIP packets (as this field is not # validated and can be easily spoofed). diff --git a/rpm/openli.spec b/rpm/openli.spec index e05855cd..97ca401e 100644 --- a/rpm/openli.spec +++ b/rpm/openli.spec @@ -1,5 +1,5 @@ Name: openli -Version: 1.0.10 +Version: 1.0.11 Release: 1%{?dist} Summary: Software for performing ETSI-compliant lawful intercept @@ -14,7 +14,7 @@ BuildRequires: bison BuildRequires: doxygen BuildRequires: flex BuildRequires: libyaml-devel -BuildRequires: libtrace4-devel >= 4.0.14 +BuildRequires: libtrace4-devel >= 4.0.16 BuildRequires: Judy-devel BuildRequires: uthash-devel BuildRequires: libwandder2-devel @@ -232,6 +232,9 @@ fi %changelog +* Mon Jul 5 2021 Shane Alcock - 1.0.11-1 +- Updated for 1.0.11 release + * Tue Apr 26 2021 Shane Alcock - 1.0.10-1 - Updated for 1.0.10 release diff --git a/src/Makefile.am b/src/Makefile.am index 82908a8e..84bca262 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -40,7 +40,7 @@ openlicollector_SOURCES=collector/collector.c configparser.c configparser.h \ netcomms.c netcomms.h byteswap.c byteswap.h etsili_core.c \ collector/sipparsing.c collector/sipparsing.h \ collector/jenkinshash.c collector/ipmmcc.c collector/ipmmcc.h \ - collector/ipmmiri.c collector/ipmmiri.h \ + collector/ipmmiri.h \ collector/internetaccess.c collector/internetaccess.h \ collector/ipcc.c collector/ipcc.h \ coreserver.h coreserver.c collector/collector_push_messaging.c \ @@ -55,7 +55,6 @@ openlicollector_SOURCES=collector/collector.c configparser.c configparser.h \ collector/collector_seqtracker.c \ collector/collector_forwarder.c collector/jmirror_parser.c \ collector/jmirror_parser.h openli_tls.c openli_tls.h \ - collector/umtscc.h collector/umtscc.c \ collector/umtsiri.h collector/umtsiri.c \ collector/radius_hasher.c collector/radius_hasher.h \ collector/timed_intercept.c collector/timed_intercept.h \ diff --git a/src/collector/collector.c b/src/collector/collector.c index a81fa2cf..abb019fa 100644 --- a/src/collector/collector.c +++ b/src/collector/collector.c @@ -24,7 +24,7 @@ * */ - +#define _GNU_SOURCE #include #include #include @@ -196,7 +196,7 @@ static void process_tick(libtrace_t *trace, libtrace_thread_t *t, static void init_collocal(colthread_local_t *loc, collector_global_t *glob, int threadid) { - int zero = 0, i; + int zero = 0, i, hwm=1000; libtrace_message_queue_init(&(loc->fromsyncq_ip), sizeof(openli_pushed_t)); libtrace_message_queue_init(&(loc->fromsyncq_voip), @@ -227,18 +227,18 @@ static void init_collocal(colthread_local_t *loc, collector_global_t *glob, snprintf(pubsockname, 128, "inproc://openlipub-%d", i); loc->zmq_pubsocks[i] = zmq_socket(glob->zmq_ctxt, ZMQ_PUSH); - zmq_setsockopt(loc->zmq_pubsocks[i], ZMQ_SNDHWM, &zero, sizeof(zero)); + zmq_setsockopt(loc->zmq_pubsocks[i], ZMQ_SNDHWM, &hwm, sizeof(hwm)); zmq_connect(loc->zmq_pubsocks[i], pubsockname); } loc->fragreass = create_new_ipfrag_reassembler(); loc->tosyncq_ip = zmq_socket(glob->zmq_ctxt, ZMQ_PUSH); - zmq_setsockopt(loc->tosyncq_ip, ZMQ_SNDHWM, &zero, sizeof(zero)); + zmq_setsockopt(loc->tosyncq_ip, ZMQ_SNDHWM, &hwm, sizeof(hwm)); zmq_connect(loc->tosyncq_ip, "inproc://openli-ipsync"); loc->tosyncq_voip = zmq_socket(glob->zmq_ctxt, ZMQ_PUSH); - zmq_setsockopt(loc->tosyncq_voip, ZMQ_SNDHWM, &zero, sizeof(zero)); + zmq_setsockopt(loc->tosyncq_voip, ZMQ_SNDHWM, &hwm, sizeof(hwm)); zmq_connect(loc->tosyncq_voip, "inproc://openli-voipsync"); } @@ -1588,6 +1588,7 @@ int main(int argc, char *argv[]) { collector_global_t *glob = NULL; int i, ret, todaemon; colinput_t *inp, *tmp; + char name[1024]; todaemon = 0; while (1) { @@ -1678,6 +1679,9 @@ int main(int argc, char *argv[]) { sizeof(forwarding_thread_data_t)); for (i = 0; i < glob->forwarding_threads; i++) { + + snprintf(name, 1024, "forwarder-%d", i); + glob->forwarders[i].zmq_ctxt = glob->zmq_ctxt; glob->forwarders[i].forwardid = i; glob->forwarders[i].encoders = glob->encoding_threads; @@ -1692,12 +1696,14 @@ int main(int argc, char *argv[]) { pthread_create(&(glob->forwarders[i].threadid), NULL, start_forwarding_thread, (void *)&(glob->forwarders[i])); + pthread_setname_np(glob->forwarders[i].threadid, name); } glob->seqtrackers = calloc(glob->seqtracker_threads, sizeof(seqtracker_thread_data_t)); for (i = 0; i < glob->seqtracker_threads; i++) { + snprintf(name, 1024, "seqtracker-%d", i); glob->seqtrackers[i].zmq_ctxt = glob->zmq_ctxt; glob->seqtrackers[i].trackerid = i; glob->seqtrackers[i].zmq_pushjobsock = NULL; @@ -1705,16 +1711,15 @@ int main(int argc, char *argv[]) { glob->seqtrackers[i].intercepts = NULL; glob->seqtrackers[i].colident = &(glob->sharedinfo); glob->seqtrackers[i].encoding_method = glob->encoding_method; -#ifdef HAVE_BER_ENCODING - glob->seqtrackers[i].enc_ber = wandder_init_encoder_ber(1000, 512); -#endif pthread_create(&(glob->seqtrackers[i].threadid), NULL, start_seqtracker_thread, (void *)&(glob->seqtrackers[i])); + pthread_setname_np(glob->seqtrackers[i].threadid, name); } glob->encoders = calloc(glob->encoding_threads, sizeof(openli_encoder_t)); for (i = 0; i < glob->encoding_threads; i++) { + snprintf(name, 1024, "encoder-%d", i); glob->encoders[i].zmq_ctxt = glob->zmq_ctxt; glob->encoders[i].zmq_recvjobs = NULL; glob->encoders[i].zmq_pushresults = NULL; @@ -1724,12 +1729,15 @@ int main(int argc, char *argv[]) { glob->encoders[i].shared = &(glob->sharedinfo); glob->encoders[i].encoder = NULL; glob->encoders[i].freegenerics = NULL; + glob->encoders[i].saved_intercept_templates = NULL; + glob->encoders[i].saved_global_templates = NULL; glob->encoders[i].seqtrackers = glob->seqtracker_threads; glob->encoders[i].forwarders = glob->forwarding_threads; pthread_create(&(glob->encoders[i].threadid), NULL, run_encoder_worker, (void *)&(glob->encoders[i])); + pthread_setname_np(glob->encoders[i].threadid, name); } /* Start IP intercept sync thread */ @@ -1739,6 +1747,8 @@ int main(int argc, char *argv[]) { logger(LOG_INFO, "OpenLI: error creating IP sync thread. Exiting."); return 1; } + snprintf(name, 1024, "sync-ip"); + pthread_setname_np(glob->syncip.threadid, name); /* Start VOIP intercept sync thread */ ret = pthread_create(&(glob->syncvoip.threadid), NULL, @@ -1747,6 +1757,8 @@ int main(int argc, char *argv[]) { logger(LOG_INFO, "OpenLI: error creating VOIP sync thread. Exiting."); return 1; } + snprintf(name, 1024, "sync-voip"); + pthread_setname_np(glob->syncvoip.threadid, name); if (pthread_sigmask(SIG_SETMASK, &sig_before, NULL)) { logger(LOG_INFO, "Unable to re-enable signals after starting threads."); diff --git a/src/collector/collector_base.h b/src/collector/collector_base.h index f4da77dd..829f7260 100644 --- a/src/collector/collector_base.h +++ b/src/collector/collector_base.h @@ -41,6 +41,8 @@ #include "export_buffer.h" #include "openli_tls.h" +#define MAX_ENCODED_RESULT_BATCH 50 + typedef struct export_dest { int failmsg; int fd; @@ -167,23 +169,15 @@ typedef struct seqtracker_thread_data { exporter_intercept_state_t *intercepts; removed_intercept_t *removedints; uint8_t encoding_method; -#if HAVE_BER_ENCODING - wandder_encoder_ber_t *enc_ber; -#endif } seqtracker_thread_data_t; -typedef struct stored_result { - openli_encoded_result_t res; - UT_hash_handle hh; -} stored_result_t; - typedef struct intercept_reorderer { char *liid; char *key; uint32_t expectedseqno; - stored_result_t *pending; + Pvoid_t pending; } int_reorderer_t; @@ -235,6 +229,9 @@ typedef struct encoder_state { wandder_encoder_t *encoder; etsili_generic_freelist_t *freegenerics; + Pvoid_t saved_intercept_templates; + Pvoid_t saved_global_templates; + int seqtrackers; int forwarders; uint8_t halted; @@ -242,11 +239,8 @@ typedef struct encoder_state { typedef struct encoder_job { wandder_encode_job_t *preencoded; -#ifdef HAVE_BER_ENCODING - wandder_etsili_top_t *top; - wandder_etsili_child_t *child; -#endif uint32_t seqno; + int64_t cin; char *cinstr; openli_export_recv_t *origreq; char *liid; diff --git a/src/collector/collector_forwarder.c b/src/collector/collector_forwarder.c index b1bcb54c..419bf3c0 100644 --- a/src/collector/collector_forwarder.c +++ b/src/collector/collector_forwarder.c @@ -57,15 +57,7 @@ static inline void free_encoded_result(openli_encoded_result_t *res) { if (res->msgbody) { if (res->msgbody->encoded) { -#ifdef HAVE_BER_ENCODING - if (!res->child) { - //if child exists, then msgbody->encoded is owned by child - //dont free so it can be reused - free(res->msgbody->encoded); - } -#else free(res->msgbody->encoded); -#endif } free(res->msgbody); } @@ -74,11 +66,6 @@ static inline void free_encoded_result(openli_encoded_result_t *res) { free_published_message(res->origreq); } -#ifdef HAVE_BER_ENCODING - if (res->child){ - wandder_free_child(res->child); - } -#endif } static int add_new_destination(forwarding_thread_data_t *fwd, @@ -265,10 +252,11 @@ static void remove_reorderers(forwarding_thread_data_t *fwd, char *liid, Pvoid_t *reorderer_array) { PWord_t jval; + PWord_t pval; uint8_t index[256]; int_reorderer_t *reord; - stored_result_t *stored, *tmp; int err; + Word_t seqindex; index[0] = '\0'; JSLF(jval, *reorderer_array, index); @@ -280,11 +268,19 @@ static void remove_reorderers(forwarding_thread_data_t *fwd, char *liid, continue; } JSLD(err, *reorderer_array, index); - HASH_ITER(hh, reord->pending, stored, tmp) { - HASH_DELETE(hh, reord->pending, stored); - free_encoded_result(&(stored->res)); - free(stored); + + seqindex = 0; + JLF(pval, reord->pending, seqindex); + while (pval) { + openli_encoded_result_t *res; + + res = (openli_encoded_result_t *)(*pval); + free_encoded_result(res); + free(res); + JLN(pval, reord->pending, seqindex); } + JLFA(err, reord->pending); + free(reord->liid); free(reord->key); free(reord); @@ -361,9 +357,11 @@ static inline int enqueue_result(forwarding_thread_data_t *fwd, export_dest_t *med, openli_encoded_result_t *res) { PWord_t jval; + PWord_t pval; int_reorderer_t *reord; Pvoid_t *reorderer; - stored_result_t *stored, *tmp; + openli_encoded_result_t *stored; + int rcint; if (res->origreq->type == OPENLI_EXPORT_IPCC || res->origreq->type == OPENLI_EXPORT_IPMMCC || @@ -399,13 +397,17 @@ static inline int enqueue_result(forwarding_thread_data_t *fwd, } if (res->seqno != reord->expectedseqno) { + openli_encoded_result_t *tosave = calloc(1, + sizeof(openli_encoded_result_t)); + memcpy(tosave, res, sizeof(openli_encoded_result_t)); + + JLI(pval, reord->pending, res->seqno); + if (pval == NULL) { + logger(LOG_INFO, "OpenLI: unable to create stored intercept record due to lack of memory"); + exit(-3); + } - stored = (stored_result_t *)calloc(1, sizeof(stored_result_t)); - memcpy(&(stored->res), res, sizeof(openli_encoded_result_t)); - - HASH_ADD_KEYPTR(hh, reord->pending, &(stored->res.seqno), - sizeof(stored->res.seqno), stored); - + *pval = (Word_t)tosave; return 0; } @@ -419,25 +421,24 @@ static inline int enqueue_result(forwarding_thread_data_t *fwd, reord->expectedseqno = res->seqno + 1; - HASH_ITER(hh, reord->pending, stored, tmp) { - if (stored->res.seqno != reord->expectedseqno) { - break; - } + JLG(pval, reord->pending, reord->expectedseqno); + while (pval != NULL) { + stored = (openli_encoded_result_t *)(*pval); - HASH_DELETE(hh, reord->pending, stored); + JLD(rcint, reord->pending, reord->expectedseqno); - if (append_message_to_buffer(&(med->buffer), &(stored->res), 0) == 0) { + if (append_message_to_buffer(&(med->buffer), stored, 0) == 0) { logger(LOG_INFO, "OpenLI: forced to drop mediator %u because we cannot buffer any more records for it -- please investigate asap!", med->mediatorid); remove_destination(fwd, med); return -1; } - reord->expectedseqno = stored->res.seqno + 1; + reord->expectedseqno = stored->seqno + 1; - free_encoded_result(&(stored->res)); + free_encoded_result(stored); free(stored); - + JLG(pval, reord->pending, reord->expectedseqno); } return 1; @@ -661,11 +662,11 @@ static void connect_export_targets(forwarding_thread_data_t *fwd) { static int drain_incoming_etsi(forwarding_thread_data_t *fwd) { - int x, encoders_over = 0; - openli_encoded_result_t res; + int x, encoders_over = 0, i, msgcnt; + openli_encoded_result_t res[MAX_ENCODED_RESULT_BATCH]; do { - x = zmq_recv(fwd->zmq_pullressock, &res, sizeof(res), + x = zmq_recv(fwd->zmq_pullressock, res, sizeof(res), ZMQ_DONTWAIT); if (x < 0 && errno != EAGAIN) { return -1; @@ -675,20 +676,30 @@ static int drain_incoming_etsi(forwarding_thread_data_t *fwd) { continue; } - if (res.liid == NULL && res.destid == 0) { - logger(LOG_INFO, "encoder %d has ceased encoding", encoders_over); - encoders_over ++; + if (x % sizeof(openli_encoded_result_t) != 0) { + logger(LOG_INFO, "forwarder received odd sized message (%d bytes)?", + x); + return -1; } + msgcnt = x / sizeof(openli_encoded_result_t); + + for (i = 0; i < msgcnt; i++) { - free_encoded_result(&res); + if (res[i].liid == NULL && res[i].destid == 0) { + logger(LOG_INFO, "encoder %d has ceased encoding", encoders_over); + encoders_over ++; + } + + free_encoded_result(&(res[i])); + } } while (encoders_over < fwd->encoders); return 1; } static int receive_incoming_etsi(forwarding_thread_data_t *fwd) { - int x, processed; - openli_encoded_result_t res; + int x, processed, i, msgcnt; + openli_encoded_result_t res[MAX_ENCODED_RESULT_BATCH]; processed = 0; do { @@ -705,10 +716,19 @@ static int receive_incoming_etsi(forwarding_thread_data_t *fwd) { break; } - if (handle_encoded_result(fwd, &res) < 0) { + if (x % sizeof(openli_encoded_result_t) != 0) { + logger(LOG_INFO, "forwarder received odd sized message (%d bytes)?", + x); return -1; } - processed ++; + msgcnt = x / sizeof(openli_encoded_result_t); + + for (i = 0; i < msgcnt; i++) { + if (handle_encoded_result(fwd, &(res[i])) < 0) { + return -1; + } + processed ++; + } } while (x > 0 && processed < 100000); return 1; } diff --git a/src/collector/collector_publish.h b/src/collector/collector_publish.h index 792cd099..011e913d 100644 --- a/src/collector/collector_publish.h +++ b/src/collector/collector_publish.h @@ -80,14 +80,14 @@ typedef struct openli_ipmmiri_job { uint8_t ipsrc[16]; uint8_t ipdest[16]; int ipfamily; -} PACKED openli_ipmmiri_job_t; +} openli_ipmmiri_job_t; typedef struct openli_mobiri_job { char *liid; uint32_t cin; etsili_iri_type_t iritype; etsili_generic_t *customparams; -} PACKED openli_mobiri_job_t; +} openli_mobiri_job_t; typedef struct openli_ipiri_job { char *liid; @@ -105,7 +105,7 @@ typedef struct openli_ipiri_job { etsili_iri_type_t iritype; etsili_generic_t *customparams; -} PACKED openli_ipiri_job_t; +} openli_ipiri_job_t; typedef struct openli_rawip_job { char *liid; @@ -113,7 +113,7 @@ typedef struct openli_rawip_job { uint32_t ipclen; uint32_t seqno; uint32_t cin; -} PACKED openli_rawip_job_t; +} openli_rawip_job_t; enum { OPENLI_IPIRI_STANDARD, @@ -151,7 +151,7 @@ struct openli_export_recv { openli_mobiri_job_t mobiri; openli_rawip_job_t rawip; } data; -} PACKED; +}; int publish_openli_msg(void *pubsock, openli_export_recv_t *msg); void free_published_message(openli_export_recv_t *msg); diff --git a/src/collector/collector_seqtracker.c b/src/collector/collector_seqtracker.c index 214f4c99..eb80b6ba 100644 --- a/src/collector/collector_seqtracker.c +++ b/src/collector/collector_seqtracker.c @@ -120,16 +120,7 @@ static void purge_removedints(seqtracker_thread_data_t *seqdata) { prev->next = rem->next; } -#ifdef HAVE_BER_ENCODING - if(rem->ber_top){ - wandder_free_top(rem->ber_top); - } - else { - etsili_clear_preencoded_fields((wandder_encode_job_t *)rem->preencoded); - } -#else etsili_clear_preencoded_fields((wandder_encode_job_t *)rem->preencoded); -#endif tmp = rem; rem = rem->next; @@ -149,11 +140,6 @@ static inline void remove_preencoded(seqtracker_thread_data_t *seqdata, gettimeofday(&tv, NULL); rem->haltedat = tv.tv_sec; - -#ifdef HAVE_BER_ENCODING - //only one of theses will be non-null so its ok to assign both - rem->ber_top = intstate->top; -#endif rem->preencoded = intstate->preencoded; if (seqdata->removedints == NULL) { @@ -200,9 +186,6 @@ static void track_new_intercept(seqtracker_thread_data_t *seqdata, intstate->details.authcc_len = strlen(cept->authcc); intstate->details.delivcc_len = strlen(cept->delivcc); intstate->cinsequencing = NULL; -#ifdef HAVE_BER_ENCODING - intstate->top = NULL; -#endif HASH_ADD_KEYPTR(hh, seqdata->intercepts, intstate->details.liid, intstate->details.liid_len, intstate); @@ -216,19 +199,11 @@ static void track_new_intercept(seqtracker_thread_data_t *seqdata, intdetails.networkelemid = seqdata->colident->networkelemid; intdetails.intpointid = seqdata->colident->intpointid; - //decide to do BER or DER here if(seqdata->encoding_method == OPENLI_ENCODING_DER){ intstate->preencoded = calloc(OPENLI_PREENCODE_LAST, sizeof(wandder_encode_job_t)); etsili_preencode_static_fields(intstate->preencoded, &intdetails); } - else { - -#ifdef HAVE_BER_ENCODING - intstate->top = wandder_encode_init_top_ber(seqdata->enc_ber, - (wandder_etsili_intercept_details_t *)&intdetails); -#endif - } } static void reconfigure_intercepts(seqtracker_thread_data_t *seqdata) { @@ -343,59 +318,10 @@ static int run_encoding_job(seqtracker_thread_data_t *seqdata, job.preencoded = intstate->preencoded; - -#ifdef HAVE_BER_ENCODING - wandder_etsili_child_t * child = NULL; - if (intstate->top){ - wandder_etsili_top_t * top = intstate->top; - wandder_encoder_ber_t *enc_ber = seqdata->enc_ber; - switch(recvd->type) { - case OPENLI_EXPORT_IPCC: - if (!top->ipcc.buf) { - wandder_init_etsili_ipcc(enc_ber, top); - top->ipcc.flist = wandder_create_etsili_child_freelist(); - } - break; - case OPENLI_EXPORT_IPMMCC: - if (!top->ipmmcc.buf) { - wandder_init_etsili_ipmmcc(enc_ber, top); - top->ipmmcc.flist = wandder_create_etsili_child_freelist(); - } - break; - case OPENLI_EXPORT_IPMMIRI: - if (!top->ipmmiri.buf) { - wandder_init_etsili_ipmmiri(enc_ber, top); - top->ipmmiri.flist = wandder_create_etsili_child_freelist(); - } - break; - case OPENLI_EXPORT_IPIRI: - if (!top->ipiri.buf) { - wandder_init_etsili_ipiri(enc_ber, top); - top->ipiri.flist = wandder_create_etsili_child_freelist(); - } - break; - case OPENLI_EXPORT_UMTSCC: - if (!top->umtscc.buf) { - wandder_init_etsili_umtscc(enc_ber, top); - top->umtscc.flist = wandder_create_etsili_child_freelist(); - } - break; - case OPENLI_EXPORT_UMTSIRI: - if (!top->umtsiri.buf) { - wandder_init_etsili_umtsiri(enc_ber, top); - top->umtsiri.flist = wandder_create_etsili_child_freelist(); - } - break; - default: - logger(LOG_INFO, "OpenLI: Error Unknown encoding type"); - } - job.top = top; - - } -#endif job.origreq = recvd; job.liid = strdup(liid); job.cinstr = strdup(cinseq->cin_string); + job.cin = (int64_t)cin; if (recvd->type == OPENLI_EXPORT_IPMMCC || recvd->type == OPENLI_EXPORT_IPCC || @@ -407,13 +333,18 @@ static int run_encoding_job(seqtracker_thread_data_t *seqdata, cinseq->iri_seqno ++; } - - if (zmq_send(seqdata->zmq_pushjobsock, (char *)&job, - sizeof(openli_encoding_job_t), 0) < 0) { - logger(LOG_INFO, - "Error while pushing encoding job to worker threads: %s", - strerror(errno)); - return -1; + while (1) { + if ((ret = zmq_send(seqdata->zmq_pushjobsock, (char *)&job, + sizeof(openli_encoding_job_t), 0)) < 0) { + if (errno == EAGAIN) { + continue; + } + logger(LOG_INFO, + "Error while pushing encoding job to worker threads: %s", + strerror(errno)); + return -1; + } + break; } return ret; @@ -495,7 +426,7 @@ void *start_seqtracker_thread(void *data) { char sockname[128]; seqtracker_thread_data_t *seqdata = (seqtracker_thread_data_t *)data; openli_export_recv_t *job = NULL; - int x, zero = 0, large=1000000, sndtimeo=1000; + int x, zero = 0, large=1000, sndtimeo=1000; exporter_intercept_state_t *intstate, *tmpexp; seqdata->zmq_recvpublished = zmq_socket(seqdata->zmq_ctxt, ZMQ_PULL); @@ -572,12 +503,6 @@ void *start_seqtracker_thread(void *data) { free_intercept_state(seqdata, intstate); } -#ifdef HAVE_BER_ENCODING - if (seqdata->enc_ber){ - wandder_free_encoder_ber(seqdata->enc_ber); - } -#endif - zmq_close(seqdata->zmq_recvpublished); zmq_close(seqdata->zmq_pushjobsock); pthread_exit(NULL); @@ -589,18 +514,8 @@ void clean_seqtracker(seqtracker_thread_data_t *seqdata) { while (seqdata->removedints) { rem = seqdata->removedints; -#ifdef HAVE_BER_ENCODING - if(rem->ber_top){ //if top exists its the BER encoding that needs to be free'd - wandder_free_top(rem->ber_top); - } - else { - etsili_clear_preencoded_fields((wandder_encode_job_t *)rem->preencoded); - free(rem->preencoded); - } -#else etsili_clear_preencoded_fields((wandder_encode_job_t *)rem->preencoded); free(rem->preencoded); -#endif seqdata->removedints = seqdata->removedints->next; free(rem); diff --git a/src/collector/encoder_worker.c b/src/collector/encoder_worker.c index 5407cc01..4a464bc3 100644 --- a/src/collector/encoder_worker.c +++ b/src/collector/encoder_worker.c @@ -31,14 +31,15 @@ #include "ipmmcc.h" #include "ipcc.h" #include "ipmmiri.h" -#include "umtscc.h" #include "umtsiri.h" #include "collector_base.h" #include "logger.h" +#include "etsili_core.h" +#include "encoder_worker.h" static int init_worker(openli_encoder_t *enc) { int zero = 0, rto = 10; - int hwm = 1000000; + int hwm = 1000; int i; char sockname[128]; @@ -133,10 +134,82 @@ static int init_worker(openli_encoder_t *enc) { } +static void free_encoded_header_templates(Pvoid_t headers) { + PWord_t pval; + Word_t index = 0; + + JLF(pval, headers, index); + while (pval) { + encoded_header_template_t *tplate; + + tplate = (encoded_header_template_t *)(*pval); + if (tplate->header) { + free(tplate->header); + } + free(tplate); + JLN(pval, headers, index); + } +} + +static void free_umtsiri_parameters(etsili_generic_t *params) { + + etsili_generic_t *oldp, *tmp; + + HASH_ITER(hh, params, oldp, tmp) { + HASH_DELETE(hh, params, oldp); + release_etsili_generic(oldp); + } + +} + void destroy_encoder_worker(openli_encoder_t *enc) { - int x, i; + int x, i, rcint; openli_encoding_job_t job; uint32_t drained = 0; + PWord_t pval; + uint8_t index[1000]; + Word_t rcw, indexint; + + index[0] = '\0'; + + JSLF(pval, enc->saved_intercept_templates, index); + while (pval) { + saved_encoding_templates_t *t_set; + + t_set = (saved_encoding_templates_t *)(*pval); + if (t_set->key) { + free(t_set->key); + } + free_encoded_header_templates(t_set->headers); + JLFA(rcint, t_set->headers); + + assert(t_set->ccpayloads == NULL); + assert(t_set->iripayloads == NULL); + free(t_set); + + JSLN(pval, enc->saved_intercept_templates, index); + } + JSLFA(rcw, enc->saved_intercept_templates); + + indexint = 0; + JLF(pval, enc->saved_global_templates, indexint); + while (pval) { + encoded_global_template_t *t; + + t = (encoded_global_template_t *)(*pval); + switch (t->cctype) { + case TEMPLATE_TYPE_IPCC_DIRFROM: + case TEMPLATE_TYPE_IPCC_DIRTO: + case TEMPLATE_TYPE_IPCC_DIROTHER: + if (t->cc_content.cc_wrap) { + free(t->cc_content.cc_wrap); + } + break; + } + free(t); + JLN(pval, enc->saved_global_templates, indexint); + } + JLFA(rcint, enc->saved_global_templates); if (enc->encoder) { free_wandder_encoder(enc->encoder); @@ -210,150 +283,538 @@ static int encode_rawip(openli_encoder_t *enc, openli_encoding_job_t *job, res->ipcontents = job->origreq->data.rawip.ipcontent; res->ipclen = job->origreq->data.rawip.ipclen; res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); + res->header.bodylen = htons(res->msgbody->len); res->header.intercepttype = htons(OPENLI_PROTO_RAWIP_SYNC); res->header.internalid = 0; - res->isDer = 1; /* Must be set as DER for the forwarder to handle - * correctly */ return 0; } +static inline uint8_t DERIVE_INTEGER_LENGTH(uint64_t x) { + if (x < 128) return 1; + if (x < 32768) return 2; + if (x < 8388608) return 3; + if (x < 2147483648) return 4; + return 5; +} + +static inline uint8_t encode_pspdu_sequence(uint8_t *space, uint8_t space_len, + uint32_t contentsize, char *liid, uint16_t liidlen) { + + uint8_t len_space_req = DERIVE_INTEGER_LENGTH(contentsize); + int i; + uint16_t l; + + if (liidlen > space_len - 8) { + logger(LOG_INFO, + "OpenLI: invalid LIID for PSPDU: %s (%u %u)", liid, liidlen, space_len); + return 0; + } + + l = htons(liidlen); + memcpy(space, &l, sizeof(uint16_t)); + memcpy(space + 2, liid, liidlen); + space += (2 + liidlen); + + *space = (uint8_t)((WANDDER_CLASS_UNIVERSAL_CONSTRUCT << 5) | + WANDDER_TAG_SEQUENCE); + space ++; + + if (len_space_req == 1) { + *space = (uint8_t)contentsize; + return 2 + (2 + liidlen); + } + + if (len_space_req > (space_len - 2)) { + logger(LOG_INFO, + "OpenLI: invalid PSPDU sequence length %u", contentsize); + return 0; + } + + *space = len_space_req | 0x80; + space ++; + + for (i = len_space_req - 1; i >= 0; i--) { + *(space + i) = (contentsize & 0xff); + contentsize = contentsize >> 8; + } + + return len_space_req + 2 + (2 + liidlen); +} + +static int create_encoded_message_body(openli_encoded_result_t *res, + encoded_header_template_t *hdr_tplate, uint8_t *bodycontent, + uint16_t bodylen, char *liid, uint16_t liidlen) { + + uint8_t pspdu[108]; + uint8_t pspdu_len; + /* Create a msgbody by concatenating hdr_tplate and ipcc_tplate, plus + * a preceding pS-PDU sequence with the appropriate length... + */ + pspdu_len = encode_pspdu_sequence(pspdu, sizeof(pspdu), + hdr_tplate->header_len + bodylen, liid, liidlen); + + if (pspdu_len == 0) { + return -1; + } + + res->msgbody = calloc(1, sizeof(wandder_encoded_result_t)); + res->msgbody->encoder = NULL; + res->msgbody->len = pspdu_len + hdr_tplate->header_len + bodylen; + + res->msgbody->encoded = malloc(res->msgbody->len); + res->msgbody->alloced = res->msgbody->len; + res->msgbody->next = NULL; + + memcpy(res->msgbody->encoded, pspdu, pspdu_len); + memcpy(res->msgbody->encoded + pspdu_len, hdr_tplate->header, + hdr_tplate->header_len); + memcpy(res->msgbody->encoded + pspdu_len + hdr_tplate->header_len, + bodycontent, bodylen); + + /* Set the remaining msg->header properties */ + res->header.magic = htonl(OPENLI_PROTO_MAGIC); + res->header.bodylen = htons(res->msgbody->len); + res->header.internalid = 0; + + return 0; +} + +static int encode_templated_ipiri(openli_encoder_t *enc, + openli_encoding_job_t *job, encoded_header_template_t *hdr_tplate, + openli_encoded_result_t *res) { + + /* Doesn't really make sense to template the IPIRI payload itself, since + * the content is quite variable and IPIRIs should be generated + * relatively infrequently. + */ + + wandder_encoded_result_t *body = NULL; + openli_ipiri_job_t *ipirijob; + etsili_iri_type_t iritype; + etsili_generic_t *params = NULL; + + ipirijob = (openli_ipiri_job_t *)&(job->origreq->data.ipiri); + + /* in ipiri.c */ + prepare_ipiri_parameters(enc->freegenerics, ipirijob, &iritype, ¶ms); + + reset_wandder_encoder(enc->encoder); + body = encode_ipiri_body(enc->encoder, job->preencoded, iritype, ¶ms); + + if (body == NULL || body->len == 0 || body->encoded == NULL) { + logger(LOG_INFO, "OpenLI: failed to encode ETSI IPIRI body"); + if (body) { + wandder_release_encoded_result(enc->encoder, body); + free_ipiri_parameters(params); + } + return -1; + } + + if (create_encoded_message_body(res, hdr_tplate, body->encoded, body->len, + job->liid, + job->preencoded[OPENLI_PREENCODE_LIID].vallen) < 0) { + wandder_release_encoded_result(enc->encoder, body); + free_ipiri_parameters(params); + return -1; + } + + res->ipcontents = NULL; + res->ipclen = 0; + res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); + + wandder_release_encoded_result(enc->encoder, body); + free_ipiri_parameters(params); + + /* Success */ + return 1; +} + +static inline encoded_global_template_t *lookup_global_template( + openli_encoder_t *enc, uint32_t key, uint8_t *is_new) { + + PWord_t pval; + encoded_global_template_t *ipcc_tplate = NULL; + + JLG(pval, enc->saved_global_templates, key); + if (pval == NULL) { + ipcc_tplate = calloc(1, sizeof(encoded_global_template_t)); + ipcc_tplate->key = key; + ipcc_tplate->cctype = (key >> 16); + JLI(pval, enc->saved_global_templates, key); + *pval = (Word_t)ipcc_tplate; + *is_new = 1; + } else { + ipcc_tplate = (encoded_global_template_t *)(*pval); + *is_new = 0; + } + + return ipcc_tplate; +} + +static int encode_templated_ipmmcc(openli_encoder_t *enc, + openli_encoding_job_t *job, encoded_header_template_t *hdr_tplate, + openli_encoded_result_t *res) { + + uint32_t key = 0; + encoded_global_template_t *ipmmcc_tplate = NULL; + openli_ipcc_job_t *mmccjob; + uint8_t is_new = 0; + + mmccjob = (openli_ipcc_job_t *)&(job->origreq->data.ipcc); + + /* We only handle IP frames and RTP protocol for IPMM so far... */ + + if (mmccjob->dir == ETSI_DIR_FROM_TARGET) { + key = (TEMPLATE_TYPE_IPMMCC_DIRFROM_IP_RTP << 16) + mmccjob->ipclen; + } else if (mmccjob->dir == ETSI_DIR_TO_TARGET) { + key = (TEMPLATE_TYPE_IPMMCC_DIRTO_IP_RTP << 16) + mmccjob->ipclen; + } else { + key = (TEMPLATE_TYPE_IPMMCC_DIROTHER_IP_RTP << 16) + mmccjob->ipclen; + } + + ipmmcc_tplate = lookup_global_template(enc, key, &is_new); + + if (is_new) { + if (etsili_create_ipmmcc_template(enc->encoder, job->preencoded, + mmccjob->dir, mmccjob->ipcontent, mmccjob->ipclen, + ipmmcc_tplate) < 0) { + return -1; + } + } else { + /* Overwrite the existing MMCCContents field */ + if (etsili_update_ipmmcc_template(ipmmcc_tplate, mmccjob->ipcontent, + mmccjob->ipclen) < 0) { + return -1; + } + } + + if (create_encoded_message_body(res, hdr_tplate, + ipmmcc_tplate->cc_content.cc_wrap, + ipmmcc_tplate->cc_content.cc_wrap_len, + job->liid, + job->preencoded[OPENLI_PREENCODE_LIID].vallen) < 0) { + return -1; + } + + /* No ipcontents in the result, as it is encoded already in cc_content */ + res->ipcontents = NULL; + res->ipclen = 0; + res->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); + + /* Success */ + return 1; +} + +static int encode_templated_umtsiri(openli_encoder_t *enc, + openli_encoding_job_t *job, encoded_header_template_t *hdr_tplate, + openli_encoded_result_t *res) { + + wandder_encoded_result_t *body = NULL; + openli_mobiri_job_t *irijob = + (openli_mobiri_job_t *)&(job->origreq->data.mobiri); + etsili_generic_t *np = NULL; + char opid[6]; + int opidlen = enc->shared->operatorid_len; + + /* TODO maybe we could find a way to reuse this instead of creating + * every time? + */ + if (opidlen > 5) { + opidlen = 5; + } + + memcpy(opid, enc->shared->operatorid, opidlen); + opid[opidlen] = '\0'; + + np = create_etsili_generic(enc->freegenerics, + UMTSIRI_CONTENTS_OPERATOR_IDENTIFIER, opidlen, + (uint8_t *)opid); + HASH_ADD_KEYPTR(hh, irijob->customparams, + &(np->itemnum), sizeof(np->itemnum), np); + + /* Not worth trying to template the body of UMTS IRIs -- way too + * many variables in here that may or may not change on a semi-regular + * basis. + */ + reset_wandder_encoder(enc->encoder); + + /* Assuming SIP here for now, other protocols can be supported later */ + body = encode_umtsiri_body(enc->encoder, job->preencoded, irijob->iritype, + irijob->customparams); + + + if (body == NULL || body->len == 0 || body->encoded == NULL) { + logger(LOG_INFO, "OpenLI: failed to encode ETSI UMTSIRI body"); + if (body) { + wandder_release_encoded_result(enc->encoder, body); + } + return -1; + } + + if (create_encoded_message_body(res, hdr_tplate, body->encoded, body->len, + job->liid, + job->preencoded[OPENLI_PREENCODE_LIID].vallen) < 0) { + wandder_release_encoded_result(enc->encoder, body); + return -1; + } + + res->ipcontents = NULL; + res->ipclen = 0; + res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); + + wandder_release_encoded_result(enc->encoder, body); + free_umtsiri_parameters(irijob->customparams); + /* Success */ + return 1; +} + +static int encode_templated_ipmmiri(openli_encoder_t *enc, + openli_encoding_job_t *job, encoded_header_template_t *hdr_tplate, + openli_encoded_result_t *res) { + + wandder_encoded_result_t *body = NULL; + openli_ipmmiri_job_t *irijob = + (openli_ipmmiri_job_t *)&(job->origreq->data.ipmmiri); + + /* We could consider templating the body portion of IPMMIRIs if we + * really need the performance -- we'd need to create templates for each + * SIP message size + IP version + IRI type, with saved pointers to the SIP + * content, IRI type, source IP address and dest IP address. + */ + + /* For now though, let's just encode the body each time... */ + + reset_wandder_encoder(enc->encoder); + + /* Assuming SIP here for now, other protocols can be supported later */ + body = encode_sipiri_body(enc->encoder, job->preencoded, irijob->iritype, + irijob->ipsrc, irijob->ipdest, irijob->ipfamily, + irijob->content, irijob->contentlen); + + + if (body == NULL || body->len == 0 || body->encoded == NULL) { + logger(LOG_INFO, "OpenLI: failed to encode ETSI SIP IPMMIRI body"); + if (body) { + wandder_release_encoded_result(enc->encoder, body); + } + return -1; + } + + if (create_encoded_message_body(res, hdr_tplate, body->encoded, body->len, + job->liid, + job->preencoded[OPENLI_PREENCODE_LIID].vallen) < 0) { + wandder_release_encoded_result(enc->encoder, body); + return -1; + } + + res->ipcontents = (uint8_t *)(irijob->content); + res->ipclen = irijob->contentlen; + res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); + + wandder_release_encoded_result(enc->encoder, body); + + /* Success */ + return 1; +} + +static int encode_templated_umtscc(openli_encoder_t *enc, + openli_encoding_job_t *job, encoded_header_template_t *hdr_tplate, + openli_encoded_result_t *res) { + + uint32_t key = 0; + encoded_global_template_t *umtscc_tplate = NULL; + openli_ipcc_job_t *ccjob; + uint8_t is_new = 0; + + ccjob = (openli_ipcc_job_t *)&(job->origreq->data.ipcc); + + if (ccjob->dir == ETSI_DIR_FROM_TARGET) { + key = (TEMPLATE_TYPE_UMTSCC_DIRFROM << 16) + ccjob->ipclen; + } else if (ccjob->dir == ETSI_DIR_TO_TARGET) { + key = (TEMPLATE_TYPE_UMTSCC_DIRTO << 16) + ccjob->ipclen; + } else { + key = (TEMPLATE_TYPE_UMTSCC_DIROTHER << 16) + ccjob->ipclen; + } + + umtscc_tplate = lookup_global_template(enc, key, &is_new); + + if (is_new) { + if (etsili_create_umtscc_template(enc->encoder, job->preencoded, + ccjob->dir, ccjob->ipclen, umtscc_tplate) < 0) { + logger(LOG_INFO, "OpenLI: Failed to create UMTSCC template?"); + return -1; + } + } + /* We have very specific templates for each observed packet size, so + * this will not require updating */ + + if (create_encoded_message_body(res, hdr_tplate, + umtscc_tplate->cc_content.cc_wrap, + umtscc_tplate->cc_content.cc_wrap_len, + job->liid, + job->preencoded[OPENLI_PREENCODE_LIID].vallen) < 0) { + return -1; + } + + /* Set ipcontents in the result */ + res->ipcontents = (uint8_t *)ccjob->ipcontent; + res->ipclen = ccjob->ipclen; + res->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); + + /* Success */ + return 1; + +} + +static int encode_templated_ipcc(openli_encoder_t *enc, + openli_encoding_job_t *job, encoded_header_template_t *hdr_tplate, + openli_encoded_result_t *res) { + + uint32_t key = 0; + encoded_global_template_t *ipcc_tplate = NULL; + openli_ipcc_job_t *ipccjob; + uint8_t is_new = 0; + + ipccjob = (openli_ipcc_job_t *)&(job->origreq->data.ipcc); + + if (ipccjob->dir == ETSI_DIR_FROM_TARGET) { + key = (TEMPLATE_TYPE_IPCC_DIRFROM << 16) + ipccjob->ipclen; + } else if (ipccjob->dir == ETSI_DIR_TO_TARGET) { + key = (TEMPLATE_TYPE_IPCC_DIRTO << 16) + ipccjob->ipclen; + } else { + key = (TEMPLATE_TYPE_IPCC_DIROTHER << 16) + ipccjob->ipclen; + } + + ipcc_tplate = lookup_global_template(enc, key, &is_new); + + if (is_new) { + if (etsili_create_ipcc_template(enc->encoder, job->preencoded, + ipccjob->dir, ipccjob->ipclen, ipcc_tplate) < 0) { + logger(LOG_INFO, "OpenLI: Failed to create IPCC template?"); + return -1; + } + } + /* We have very specific templates for each observed packet size, so + * this will not require updating */ + + if (create_encoded_message_body(res, hdr_tplate, + ipcc_tplate->cc_content.cc_wrap, + ipcc_tplate->cc_content.cc_wrap_len, + job->liid, + job->preencoded[OPENLI_PREENCODE_LIID].vallen) < 0) { + return -1; + } + + /* Set ipcontents in the result */ + res->ipcontents = (uint8_t *)ipccjob->ipcontent; + res->ipclen = ipccjob->ipclen; + res->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); + + /* Success */ + return 1; + +} + +static encoded_header_template_t *encode_templated_psheader( + wandder_encoder_t *encoder, saved_encoding_templates_t *t_set, + openli_encoding_job_t *job) { + + uint8_t seqlen, tvsec_len, tvusec_len; + uint32_t key = 0; + PWord_t pval; + encoded_header_template_t *tplate = NULL; + + if (job->origreq->ts.tv_sec == 0) { + gettimeofday(&(job->origreq->ts), NULL); + } + + seqlen = DERIVE_INTEGER_LENGTH(job->seqno); + tvsec_len = DERIVE_INTEGER_LENGTH(job->origreq->ts.tv_sec); + tvusec_len = DERIVE_INTEGER_LENGTH(job->origreq->ts.tv_usec); + + key = (seqlen << 16) + (tvsec_len << 8) + tvusec_len; + + JLI(pval, t_set->headers, key); + if (*pval == 0) { + tplate = calloc(1, sizeof(encoded_header_template_t)); + + if (etsili_create_header_template(encoder, job->preencoded, + (int64_t)job->cin, (int64_t)job->seqno, &(job->origreq->ts), + tplate) < 0) { + free(tplate); + return NULL; + } + + *pval = (Word_t)tplate; + + } else { + tplate = (encoded_header_template_t *)(*pval); + + if (etsili_update_header_template(tplate, (int64_t)job->seqno, + &(job->origreq->ts)) < 0) { + return NULL; + } + } + + return tplate; +} + static int encode_etsi(openli_encoder_t *enc, openli_encoding_job_t *job, openli_encoded_result_t *res) { int ret = -1; - uint8_t isDer = 1; - etsili_generic_t *np; - -#ifdef HAVE_BER_ENCODING - if (job->top != NULL) { - isDer = 0; + char keystr[1000]; + PWord_t pval; + saved_encoding_templates_t *t_set = NULL; + encoded_header_template_t *hdr_tplate = NULL; + + snprintf(keystr, 1000, "%s-%s", job->liid, job->cinstr); + JSLI(pval, enc->saved_intercept_templates, keystr); + if ((*pval)) { + t_set = (saved_encoding_templates_t *)(*pval); + } else { + t_set = calloc(1, sizeof(saved_encoding_templates_t)); + t_set->key = strdup(keystr); + (*pval) = (Word_t)t_set; } -#endif - switch(job->origreq->type) { + hdr_tplate = encode_templated_psheader(enc->encoder, t_set, job); + + switch (job->origreq->type) { case OPENLI_EXPORT_IPCC: - if (isDer){ - ret = encode_ipcc(enc->encoder, job->preencoded, - &(job->origreq->data.ipcc), job->seqno, - &(job->origreq->ts), res); -#ifdef HAVE_BER_ENCODING - }else { - job->child = wandder_create_etsili_child(job->top, &(job->top->ipcc)); - ret = encode_ipcc_ber( - &(job->origreq->data.ipcc), job->seqno, - &(job->origreq->ts), res, job->child, enc->encoder); -#endif - } - break; - case OPENLI_EXPORT_IPIRI: - if(isDer){ - ret = encode_ipiri(enc->encoder, enc->freegenerics, - job->preencoded, - &(job->origreq->data.ipiri), job->seqno, res); -#ifdef HAVE_BER_ENCODING - } - else{ - job->child = wandder_create_etsili_child(job->top, &(job->top->ipiri)); - ret = encode_ipiri_ber( - &(job->origreq->data.ipiri), enc->freegenerics, - job->seqno, &(job->origreq->ts), res, job->child, - enc->encoder); -#endif - } - break; - case OPENLI_EXPORT_IPMMIRI: - if (isDer){ - ret = encode_ipmmiri(enc->encoder, job->preencoded, - &(job->origreq->data.ipmmiri), job->seqno, res, - &(job->origreq->ts)); -#ifdef HAVE_BER_ENCODING - }else { - job->child = wandder_create_etsili_child(job->top, &(job->top->ipmmiri)); - ret = encode_ipmmiri_ber( - &(job->origreq->data.ipmmiri), job->seqno, - &(job->origreq->ts), res, job->child, enc->encoder); -#endif - } + /* IPCC "header" can be templated */ + ret = encode_templated_ipcc(enc, job, hdr_tplate, res); break; case OPENLI_EXPORT_IPMMCC: - if (isDer){ - ret = encode_ipmmcc(enc->encoder, job->preencoded, - &(job->origreq->data.ipcc), job->seqno, - &(job->origreq->ts), res); -#ifdef HAVE_BER_ENCODING - } else { - job->child = wandder_create_etsili_child(job->top, &(job->top->ipmmcc)); - ret = encode_ipmmcc_ber( - &(job->origreq->data.ipcc), job->seqno, - &(job->origreq->ts), res, job->child, enc->encoder); -#endif - } + ret = encode_templated_ipmmcc(enc, job, hdr_tplate, res); break; case OPENLI_EXPORT_UMTSCC: - if (isDer) { - ret = encode_umtscc(enc->encoder, job->preencoded, - &(job->origreq->data.ipcc), job->seqno, - &(job->origreq->ts), res); -#ifdef HAVE_BER_ENCODING - } - else { - job->child = wandder_create_etsili_child(job->top, &(job->top->umtscc)); - ret = encode_umtscc_ber( - &(job->origreq->data.ipcc), job->seqno, - &(job->origreq->ts), res, job->child, enc->encoder); - -#endif - } + ret = encode_templated_umtscc(enc, job, hdr_tplate, res); break; - case OPENLI_EXPORT_UMTSIRI: { - char opid[6]; - int opidlen = enc->shared->operatorid_len; - - if (opidlen > 5) { - opidlen = 5; - } - - memcpy(opid, enc->shared->operatorid, opidlen); - opid[opidlen] = '\0'; - - np = create_etsili_generic(enc->freegenerics, - UMTSIRI_CONTENTS_OPERATOR_IDENTIFIER, opidlen, - (uint8_t *)opid); - HASH_ADD_KEYPTR(hh, job->origreq->data.mobiri.customparams, - &(np->itemnum), sizeof(np->itemnum), np); - - if (isDer) { - ret = encode_umtsiri(enc->encoder, enc->freegenerics, - job->preencoded, &(job->origreq->data.mobiri), - job->seqno, res); -#ifdef HAVE_BER_ENCODING - } - else{ - job->child = wandder_create_etsili_child(job->top, &(job->top->umtsiri)); - ret = encode_umtsiri_ber( - &(job->origreq->data.mobiri), enc->freegenerics, - job->seqno, res, job->child); -#endif - } + case OPENLI_EXPORT_IPIRI: + ret = encode_templated_ipiri(enc, job, hdr_tplate, res); break; - } + case OPENLI_EXPORT_IPMMIRI: + ret = encode_templated_ipmmiri(enc, job, hdr_tplate, res); + break; + case OPENLI_EXPORT_UMTSIRI: + ret = encode_templated_umtsiri(enc, job, hdr_tplate, res); + break; + default: + ret = 0; } - res->isDer = isDer; //encodeing typeto be stored in result - return ret; } - static int process_job(openli_encoder_t *enc, void *socket) { int x; int batch = 0; openli_encoding_job_t job; - openli_encoded_result_t result; + openli_encoded_result_t result[MAX_ENCODED_RESULT_BATCH]; - while (batch < 50) { + while (batch < MAX_ENCODED_RESULT_BATCH) { memset(&job, 0, sizeof(openli_encoding_job_t)); x = zmq_recv(socket, &job, sizeof(openli_encoding_job_t), 0); if (x < 0 && (errno != EAGAIN && errno != EINTR)) { @@ -368,38 +829,53 @@ static int process_job(openli_encoder_t *enc, void *socket) { } if (job.origreq->type == OPENLI_EXPORT_RAW_SYNC) { - encode_rawip(enc, &job, &result); + encode_rawip(enc, &job, &(result[batch])); } else { - if (encode_etsi(enc, &job, &result) < 0) { + if ((x = encode_etsi(enc, &job, &(result[batch]))) <= 0) { /* What do we do in the event of an error? */ - logger(LOG_INFO, - "OpenLI: encoder worker had an error when encoding %d record", - job.origreq->type); + if (x < 0) { + logger(LOG_INFO, + "OpenLI: encoder worker had an error when encoding %d record", + job.origreq->type); + } + if (job.cinstr) { + free(job.cinstr); + } + if (job.liid) { + free(job.liid); + } + if (job.origreq) { + free_published_message(job.origreq); + } continue; } } - result.cinstr = job.cinstr; - result.liid = job.liid; - result.seqno = job.seqno; - result.destid = job.origreq->destid; - result.origreq = job.origreq; - result.encodedby = enc->workerid; + result[batch].cinstr = job.cinstr; + result[batch].liid = job.liid; + result[batch].seqno = job.seqno; + result[batch].destid = job.origreq->destid; + result[batch].origreq = job.origreq; + result[batch].encodedby = enc->workerid; -#ifdef HAVE_BER_ENCODING - result.child = job.child; -#endif + batch++; + } - // FIXME -- hash result based on LIID (and CIN?) - assert(enc->zmq_pushresults[0] != NULL); - if (zmq_send(enc->zmq_pushresults[0], &result, sizeof(result), 0) < 0) { + /* TODO if we have multiple forwarding threads, we will need to + * assign individual results to the forwarder based on its LIID -- + * this will also require multiple result[] arrays (one per forwarder) + * for message batching. + */ + if (batch > 0) { + if (zmq_send(enc->zmq_pushresults[0], result, + batch * sizeof(openli_encoded_result_t), 0) < 0) { logger(LOG_INFO, "OpenLI: error while pushing encoded result back to exporter (worker=%d)", enc->workerid); - break; + return -1; } - batch++; } + return batch; } diff --git a/src/collector/encoder_worker.h b/src/collector/encoder_worker.h index 9413fd3e..1a43b586 100644 --- a/src/collector/encoder_worker.h +++ b/src/collector/encoder_worker.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "collector_publish.h" #include "collector.h" @@ -37,6 +38,30 @@ #include "etsili_core.h" #include "export_shared.h" +enum { + TEMPLATE_TYPE_IPCC_DIRFROM, + TEMPLATE_TYPE_IPCC_DIRTO, + TEMPLATE_TYPE_IPCC_DIROTHER, + + TEMPLATE_TYPE_IPMMCC_DIRFROM_IP_RTP, + TEMPLATE_TYPE_IPMMCC_DIRTO_IP_RTP, + TEMPLATE_TYPE_IPMMCC_DIROTHER_IP_RTP, + + TEMPLATE_TYPE_UMTSCC_DIRFROM, + TEMPLATE_TYPE_UMTSCC_DIRTO, + TEMPLATE_TYPE_UMTSCC_DIROTHER, + +}; + +typedef struct saved_encoding_templates { + + char *key; + Pvoid_t headers; + Pvoid_t ccpayloads; + Pvoid_t iripayloads; + +} saved_encoding_templates_t; + void destroy_encoder_worker(openli_encoder_t *enc); void *run_encoder_worker(void *encstate); diff --git a/src/collector/export_shared.h b/src/collector/export_shared.h index 3a88d8bb..c3a793ca 100644 --- a/src/collector/export_shared.h +++ b/src/collector/export_shared.h @@ -55,9 +55,6 @@ typedef struct intercept_state { cin_seqno_t *cinsequencing; UT_hash_handle hh; wandder_encode_job_t *preencoded; -#ifdef HAVE_BER_ENCODING - wandder_etsili_top_t *top; -#endif } exporter_intercept_state_t; #endif diff --git a/src/collector/ipcc.c b/src/collector/ipcc.c index 537179a5..93afb1a4 100644 --- a/src/collector/ipcc.c +++ b/src/collector/ipcc.c @@ -39,71 +39,6 @@ #include "etsili_core.h" #include "ipcc.h" -int encode_ipcc(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg) { - - uint32_t liidlen = precomputed[OPENLI_PREENCODE_LIID].vallen; - reset_wandder_encoder(encoder); - - memset(msg, 0, sizeof(openli_encoded_result_t)); - - msg->msgbody = encode_etsi_ipcc(encoder, precomputed, - (int64_t)job->cin, (int64_t)seqno, tv, job->ipcontent, - job->ipclen, job->dir); - - msg->ipcontents = (uint8_t *)job->ipcontent; - msg->ipclen = job->ipclen; - - msg->header.magic = htonl(OPENLI_PROTO_MAGIC); - msg->header.bodylen = htons(msg->msgbody->len + liidlen + sizeof(uint16_t)); - msg->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); - msg->header.internalid = 0; - - return 0; - -} - -#ifdef HAVE_BER_ENCODING - -int encode_ipcc_ber( - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg, wandder_etsili_child_t *child, wandder_encoder_t *encoder) { - - uint32_t liidlen = (uint32_t)((size_t)child->owner->preencoded[WANDDER_PREENCODE_LIID_LEN]); - - memset(msg, 0, sizeof(openli_encoded_result_t)); - - wandder_encode_etsi_ipcc_ber( //new way - (int64_t)job->cin, - (int64_t)seqno, - tv, - job->ipcontent, - job->ipclen, - job->dir, - child); - - msg->msgbody = malloc(sizeof(wandder_encoded_result_t)); - - msg->msgbody->encoder = NULL; - msg->msgbody->encoded = child->buf; - msg->msgbody->len = child->len; - msg->msgbody->alloced = child->alloc_len; - msg->msgbody->next = NULL; - - msg->ipcontents = NULL; - msg->ipclen = 0; - - msg->header.magic = htonl(OPENLI_PROTO_MAGIC); - msg->header.bodylen = htons(msg->msgbody->len + liidlen + sizeof(uint16_t)); - msg->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); - msg->header.internalid = 0; - - return 0; -} -#endif - - static inline static_ipcache_t *find_static_cached(prefix_t *prefix, colthread_local_t *loc) { diff --git a/src/collector/ipcc.h b/src/collector/ipcc.h index 8243a3f0..6f55674a 100644 --- a/src/collector/ipcc.h +++ b/src/collector/ipcc.h @@ -30,20 +30,11 @@ #include #include "collector.h" -int encode_ipcc(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg); int ipv4_comm_contents(libtrace_packet_t *pkt, packet_info_t *pinfo, libtrace_ip_t *ip, uint32_t rem, colthread_local_t *loc); int ipv6_comm_contents(libtrace_packet_t *pkt, packet_info_t *pinfo, libtrace_ip6_t *ip, uint32_t rem, colthread_local_t *loc); -#ifdef HAVE_BER_ENCODING -int encode_ipcc_ber( - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg, wandder_etsili_child_t *child, - wandder_encoder_t *encoder); -#endif #endif // vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/collector/ipiri.c b/src/collector/ipiri.c index 86e8d192..145dc6f4 100644 --- a/src/collector/ipiri.c +++ b/src/collector/ipiri.c @@ -38,7 +38,7 @@ #include "ipiri.h" #include "internetaccess.h" -static void free_ipiri_parameters(etsili_generic_t *params) { +void free_ipiri_parameters(etsili_generic_t *params) { etsili_generic_t *oldp, *tmp; @@ -83,8 +83,7 @@ static inline void add_another_target_identifier(internetaccess_ip_t *nextip, } -static inline void encode_ipiri_shared(wandder_encoder_t *encoder, - etsili_generic_freelist_t *freegenerics, +void prepare_ipiri_parameters(etsili_generic_freelist_t *freegenerics, openli_ipiri_job_t *job, etsili_iri_type_t *iritype_p, etsili_generic_t **params_p) { @@ -260,49 +259,11 @@ static inline void encode_ipiri_shared(wandder_encoder_t *encoder, np); } - reset_wandder_encoder(encoder); - *iritype_p = iritype; *params_p = params; } -int encode_ipiri(wandder_encoder_t *encoder, - etsili_generic_freelist_t *freegenerics, - wandder_encode_job_t *precomputed, - openli_ipiri_job_t *job, uint32_t seqno, - openli_encoded_result_t *res) { - - - etsili_generic_t *params = NULL; - etsili_iri_type_t iritype; - struct timeval tv; - int ret = 0; - uint32_t liidlen = precomputed[OPENLI_PREENCODE_LIID].vallen; - - encode_ipiri_shared(encoder, - freegenerics, - job, - &iritype, - ¶ms); - - gettimeofday(&tv, NULL); - - memset(res, 0, sizeof(openli_encoded_result_t)); - res->msgbody = encode_etsi_ipiri(encoder, precomputed, - (int64_t)(job->cin), (int64_t)seqno, iritype, &tv, ¶ms); - - res->ipcontents = NULL; - res->ipclen = 0; - res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); - res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); - res->header.internalid = 0; - - free_ipiri_parameters(params); - return ret; -} - int ipiri_create_id_printable(char *idstr, int length, ipiri_id_t *iriid) { if (length <= 0) { @@ -341,61 +302,6 @@ void ipiri_free_id(ipiri_id_t *iriid) { } } -#ifdef HAVE_BER_ENCODING -int encode_ipiri_ber( - openli_ipiri_job_t *job, - etsili_generic_freelist_t *freegenerics, - uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *res, - wandder_etsili_child_t *child, - wandder_encoder_t *encoder) { - - memset(res, 0, sizeof(openli_encoded_result_t)); - - etsili_generic_t *params = NULL; - etsili_iri_type_t iritype; - struct timeval current_tv; - int ret = 0; - uint32_t liidlen = (uint32_t)((size_t)child->owner->preencoded[WANDDER_PREENCODE_LIID_LEN]); - - encode_ipiri_shared(encoder, - freegenerics, - job, - &iritype, - ¶ms); - - gettimeofday(¤t_tv, NULL); - - memset(res, 0, sizeof(openli_encoded_result_t)); - - wandder_encode_etsi_ipiri_ber ( - (int64_t)(job->cin), - (int64_t)seqno, - ¤t_tv, - params, - iritype, - child); - - res->msgbody = malloc(sizeof(wandder_encoded_result_t)); - res->msgbody->encoder = NULL; - res->msgbody->encoded = child->buf; - res->msgbody->len = child->len; - res->msgbody->alloced = child->alloc_len; - res->msgbody->next = NULL; - - res->ipcontents = NULL; - res->ipclen = 0; - - res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); - res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); - res->header.internalid = 0; - - free_ipiri_parameters(params); - return ret; -} -#endif - static inline void finish_ipiri_job(collector_sync_t *sync, access_session_t *sess, ipintercept_t *ipint, openli_export_recv_t *irimsg) { diff --git a/src/collector/ipiri.h b/src/collector/ipiri.h index 80e135f8..b2983ec3 100644 --- a/src/collector/ipiri.h +++ b/src/collector/ipiri.h @@ -129,26 +129,13 @@ typedef struct ipiri_id { } content; } ipiri_id_t; -int ip_iri(collector_identity_t *info, wandder_encoder_t **encoder, - libtrace_message_queue_t *q, access_session_t *sess, - ipintercept_t *ipint, etsili_iri_type_t iritype, - struct timeval *tv, etsili_generic_t *params); - -int encode_ipiri(wandder_encoder_t *encoder, - etsili_generic_freelist_t *freegenerics, - wandder_encode_job_t *precomputed, - openli_ipiri_job_t *job, uint32_t seqno, - openli_encoded_result_t *res); - -#ifdef HAVE_BER_ENCODING -int encode_ipiri_ber( + +void free_ipiri_parameters(etsili_generic_t *params); + +void prepare_ipiri_parameters(etsili_generic_freelist_t *freegenerics, openli_ipiri_job_t *job, - etsili_generic_freelist_t *freegenerics, - uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *res, - wandder_etsili_child_t *child, - wandder_encoder_t *encoder); -#endif + etsili_iri_type_t *iritype_p, + etsili_generic_t **params_p); /* TODO consider adding free lists to these APIs to avoid excess mallocs */ int ipiri_create_id_printable(char *idstr, int length, ipiri_id_t *ipiriid); diff --git a/src/collector/ipmmcc.c b/src/collector/ipmmcc.c index 1f0431ad..1bad303d 100644 --- a/src/collector/ipmmcc.c +++ b/src/collector/ipmmcc.c @@ -41,75 +41,6 @@ #include "util.h" #include "ipmmcc.h" -int encode_ipmmcc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, openli_ipcc_job_t *job, - uint32_t seqno, struct timeval *tv, openli_encoded_result_t *msg) { - - uint32_t liidlen = precomputed[OPENLI_PREENCODE_LIID].vallen; - reset_wandder_encoder(encoder); - - memset(msg, 0, sizeof(openli_encoded_result_t)); - msg->msgbody = encode_etsi_ipmmcc(encoder, precomputed, - (int64_t)job->cin, (int64_t)seqno, tv, job->ipcontent, - job->ipclen, job->dir); - - /* Unfortunately, the packet body is not the last item in our message so - * we can't easily use our zero-copy shortcut :( */ - msg->ipcontents = NULL; - msg->ipclen = 0; - msg->header.magic = htonl(OPENLI_PROTO_MAGIC); - msg->header.bodylen = htons(msg->msgbody->len + liidlen + sizeof(uint16_t)); - msg->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); - msg->header.internalid = 0; - return 0; -} - -#ifdef HAVE_BER_ENCODING - -int encode_ipmmcc_ber( - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg, wandder_etsili_child_t *child, - wandder_encoder_t *encoder) { - - uint32_t liidlen = (uint32_t)((size_t)child->owner->preencoded[WANDDER_PREENCODE_LIID_LEN]); - - memset(msg, 0, sizeof(openli_encoded_result_t)); - - wandder_encode_etsi_ipmmcc_ber( //new way - (int64_t)job->cin, - (int64_t)seqno, - tv, - job->ipcontent, - job->ipclen, - job->dir, - child); - - msg->msgbody = malloc(sizeof(wandder_encoded_result_t)); - - msg->msgbody->encoder = NULL; - msg->msgbody->encoded = child->buf; - msg->msgbody->len = child->len; - msg->msgbody->alloced = child->alloc_len; - msg->msgbody->next = NULL; - - msg->ipcontents = NULL; - msg->ipclen = 0; - - /* Unfortunately, the packet body is not the last item in our message so - * we can't easily use our zero-copy shortcut :( */ - // msg->ipcontents = NULL; - // msg->ipclen = 0; - - msg->header.magic = htonl(OPENLI_PROTO_MAGIC); - msg->header.bodylen = htons(msg->msgbody->len + liidlen + sizeof(uint16_t)); - msg->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); - msg->header.internalid = 0; - - return 0; -} - -#endif - static inline uint8_t is_rtp_comfort_noise(libtrace_packet_t *packet) { void *transport, *payload; diff --git a/src/collector/ipmmcc.h b/src/collector/ipmmcc.h index 71700f88..317dce04 100644 --- a/src/collector/ipmmcc.h +++ b/src/collector/ipmmcc.h @@ -30,22 +30,11 @@ #include #include "collector.h" -int encode_ipmmcc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, openli_ipcc_job_t *job, - uint32_t seqno, struct timeval *tv, openli_encoded_result_t *msg); - int ip4mm_comm_contents(libtrace_packet_t *pkt, packet_info_t *pinfo, libtrace_ip_t *ip, uint32_t rem, colthread_local_t *loc); int ip6mm_comm_contents(libtrace_packet_t *pkt, packet_info_t *pinfo, libtrace_ip6_t *ip6, uint32_t rem, colthread_local_t *loc); -#ifdef HAVE_BER_ENCODING -int encode_ipmmcc_ber( - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg, wandder_etsili_child_t *child, - wandder_encoder_t *encoder); -#endif - #endif diff --git a/src/collector/ipmmiri.c b/src/collector/ipmmiri.c deleted file mode 100644 index 66dab0ae..00000000 --- a/src/collector/ipmmiri.c +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Copyright (c) 2018 The University of Waikato, Hamilton, New Zealand. - * All rights reserved. - * - * This file is part of OpenLI. - * - * This code has been developed by the University of Waikato WAND - * research group. For further information please see http://www.wand.net.nz/ - * - * OpenLI is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * OpenLI is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * - */ - -#include "config.h" - -#include -#include -#include -#include -#include -#include - -#include "logger.h" -#include "collector.h" -#include "intercept.h" -#include "etsili_core.h" -#include "ipmmiri.h" - -int encode_ipmmiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, openli_ipmmiri_job_t *job, - uint32_t seqno, openli_encoded_result_t *res, struct timeval *ts) { - - uint32_t liidlen = precomputed[OPENLI_PREENCODE_LIID].vallen; - reset_wandder_encoder(encoder); - - memset(res, 0, sizeof(openli_encoded_result_t)); - - if (job->ipmmiri_style == OPENLI_IPMMIRI_SIP) { - if (job->content == NULL) { - logger(LOG_INFO, "OpenLI: trying to create SIP IRI but packet has no SIP payload?"); - return -1; - } - - res->msgbody = encode_etsi_sipiri(encoder, precomputed, - (int64_t)(job->cin), (int64_t)seqno, job->iritype, ts, - job->ipsrc, job->ipdest, job->ipfamily, job->content, - job->contentlen); - res->ipcontents = (uint8_t *)(job->content); - res->ipclen = job->contentlen; - } - /* TODO style == H323 */ - - res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); - res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); - res->header.internalid = 0; - - return 0; -} - -#ifdef HAVE_BER_ENCODING - -int encode_ipmmiri_ber( - openli_ipmmiri_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *res, wandder_etsili_child_t *child, - wandder_encoder_t *encoder) { - - uint32_t liidlen = (uint32_t)((size_t)child->owner->preencoded[WANDDER_PREENCODE_LIID_LEN]); - - memset(res, 0, sizeof(openli_encoded_result_t)); - - if (job->ipmmiri_style == OPENLI_IPMMIRI_SIP) { - if (job->content == NULL) { - logger(LOG_INFO, "OpenLI: trying to create SIP IRI but packet has no SIP payload?"); - return -1; - } - - wandder_encode_etsi_ipmmiri_ber( - (int64_t)job->cin, //cin - (int64_t)seqno, //seqno - tv, //tv - job->content, //content - job->contentlen, //contentlen - job->iritype, //iritype - job->ipsrc, - job->ipdest, - job->ipfamily, - child); - - res->msgbody = malloc(sizeof(wandder_encoded_result_t)); - - res->msgbody->encoder = NULL; - res->msgbody->encoded = child->buf; - res->msgbody->len = child->len; - res->msgbody->alloced = child->alloc_len; - res->msgbody->next = NULL; - - res->ipcontents = NULL; - res->ipclen = 0; - - } - /* TODO style == H323 */ - - res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); - res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); - res->header.internalid = 0; - - return 0; -} - -#endif - -// vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/collector/ipmmiri.h b/src/collector/ipmmiri.h index 5d0c7335..73d0b722 100644 --- a/src/collector/ipmmiri.h +++ b/src/collector/ipmmiri.h @@ -41,16 +41,4 @@ enum { OPENLI_IPMMIRI_H323, }; -int encode_ipmmiri(wandder_encoder_t *encoder, - wandder_encode_job_t *preencoded, openli_ipmmiri_job_t *job, - uint32_t seqno, - openli_encoded_result_t *res, struct timeval *ts); - -#ifdef HAVE_BER_ENCODING -int encode_ipmmiri_ber( - openli_ipmmiri_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *res, wandder_etsili_child_t *child, - wandder_encoder_t *encoder); -#endif - // vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/collector/umtscc.c b/src/collector/umtscc.c deleted file mode 100644 index cb346de5..00000000 --- a/src/collector/umtscc.c +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Copyright (c) 2018 The University of Waikato, Hamilton, New Zealand. - * All rights reserved. - * - * This file is part of OpenLI. - * - * This code has been developed by the University of Waikato WAND - * research group. For further information please see http://www.wand.net.nz/ - * - * OpenLI is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * OpenLI is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * - */ -#include "config.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "logger.h" -#include "collector.h" -#include "collector_publish.h" -#include "etsili_core.h" -#include "util.h" -#include "umtscc.h" - - -int encode_umtscc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, openli_ipcc_job_t *job, - uint32_t seqno, struct timeval *tv, openli_encoded_result_t *msg) { - - uint32_t liidlen = precomputed[OPENLI_PREENCODE_LIID].vallen; - reset_wandder_encoder(encoder); - - memset(msg, 0, sizeof(openli_encoded_result_t)); - msg->msgbody = encode_etsi_umtscc(encoder, precomputed, (int64_t)job->cin, - (int64_t)seqno, tv, job->ipcontent, job->ipclen, job->dir); - - msg->ipcontents = (uint8_t *)job->ipcontent; - msg->ipclen = job->ipclen; - msg->header.magic = htonl(OPENLI_PROTO_MAGIC); - msg->header.bodylen = htons(msg->msgbody->len + liidlen + sizeof(uint16_t)); - msg->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); - msg->header.internalid = 0; - return 0; -} - - -#ifdef HAVE_BER_ENCODING - -int encode_umtscc_ber( - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg, wandder_etsili_child_t *child, wandder_encoder_t *encoder) { - - uint32_t liidlen = (uint32_t)((size_t)child->owner->preencoded[WANDDER_PREENCODE_LIID_LEN]); - - memset(msg, 0, sizeof(openli_encoded_result_t)); - - wandder_encode_etsi_umtscc_ber ( - job->cin, - (int64_t)seqno, - tv, - job->ipcontent, - job->ipclen, - job->dir, - child); - - msg->msgbody = malloc(sizeof(wandder_encoded_result_t)); - - msg->msgbody->encoder = NULL; - msg->msgbody->encoded = child->buf; - msg->msgbody->len = child->len; - msg->msgbody->alloced = child->alloc_len; - msg->msgbody->next = NULL; - - msg->ipcontents = NULL; - msg->ipclen = 0; - - msg->header.magic = htonl(OPENLI_PROTO_MAGIC); - msg->header.bodylen = htons(msg->msgbody->len + liidlen + sizeof(uint16_t)); - msg->header.intercepttype = htons(OPENLI_PROTO_ETSI_CC); - msg->header.internalid = 0; - - return 0; -} -#endif - -// vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/collector/umtscc.h b/src/collector/umtscc.h deleted file mode 100644 index 7dff9d30..00000000 --- a/src/collector/umtscc.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Copyright (c) 2018 The University of Waikato, Hamilton, New Zealand. - * All rights reserved. - * - * This file is part of OpenLI. - * - * This code has been developed by the University of Waikato WAND - * research group. For further information please see http://www.wand.net.nz/ - * - * OpenLI is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * OpenLI is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - * - */ - -#ifndef OPENLI_UMTSCC_H_ -#define OPENLI_UMTSCC_H_ - -#include "config.h" -#include "collector.h" - -int encode_umtscc(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg); - - -#ifdef HAVE_BER_ENCODING -int encode_umtscc_ber( - openli_ipcc_job_t *job, uint32_t seqno, struct timeval *tv, - openli_encoded_result_t *msg, wandder_etsili_child_t *child, - wandder_encoder_t *encoder); -#endif - -#endif -// vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/collector/umtsiri.c b/src/collector/umtsiri.c index c73e19ac..fcb6fdb6 100644 --- a/src/collector/umtsiri.c +++ b/src/collector/umtsiri.c @@ -31,87 +31,6 @@ #include "umtsiri.h" #include "etsili_core.h" -static void free_umtsiri_parameters(etsili_generic_t *params) { - - etsili_generic_t *oldp, *tmp; - - HASH_ITER(hh, params, oldp, tmp) { - HASH_DELETE(hh, params, oldp); - release_etsili_generic(oldp); - } - -} - -int encode_umtsiri(wandder_encoder_t *encoder, - etsili_generic_freelist_t *freegenerics, - wandder_encode_job_t *precomputed, - openli_mobiri_job_t *job, uint32_t seqno, - openli_encoded_result_t *res) { - - struct timeval tv; - uint32_t liidlen = precomputed[OPENLI_PREENCODE_LIID].vallen; - - reset_wandder_encoder(encoder); - gettimeofday(&tv, NULL); - - memset(res, 0, sizeof(openli_encoded_result_t)); - res->msgbody = encode_etsi_umtsiri(encoder, precomputed, (int64_t)job->cin, - (int64_t)seqno, job->iritype, &tv, job->customparams); - - res->ipcontents = NULL; - res->ipclen = 0; - res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); - res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); - res->header.internalid = 0; - - free_umtsiri_parameters(job->customparams); - return 0; -} - -#ifdef HAVE_BER_ENCODING -int encode_umtsiri_ber( - openli_mobiri_job_t *job, - etsili_generic_freelist_t *freegenerics, - uint32_t seqno, - openli_encoded_result_t *res, - wandder_etsili_child_t *child) { - - struct timeval current_tv; - - uint32_t liidlen = (uint32_t)((size_t)child->owner->preencoded[WANDDER_PREENCODE_LIID_LEN]); - - gettimeofday(¤t_tv, NULL); - memset(res, 0, sizeof(openli_encoded_result_t)); - - wandder_encode_etsi_umtsiri_ber( //new way - (int64_t)job->cin, - (int64_t)seqno, - ¤t_tv, - job->customparams, - job->iritype, - child); - - res->msgbody = malloc(sizeof(wandder_encoded_result_t)); - res->msgbody->encoder = NULL; - res->msgbody->encoded = child->buf; - res->msgbody->len = child->len; - res->msgbody->alloced = child->alloc_len; - res->msgbody->next = NULL; - - res->ipcontents = NULL; - res->ipclen = 0; - - res->header.magic = htonl(OPENLI_PROTO_MAGIC); - res->header.bodylen = htons(res->msgbody->len + liidlen + sizeof(uint16_t)); - res->header.intercepttype = htons(OPENLI_PROTO_ETSI_IRI); - res->header.internalid = 0; - - free_umtsiri_parameters(job->customparams); - return 0; -} -#endif - int create_mobiri_job_from_session(collector_sync_t *sync, access_session_t *sess, ipintercept_t *ipint, uint8_t special) { diff --git a/src/collector/umtsiri.h b/src/collector/umtsiri.h index 00d076ea..ce3d452e 100644 --- a/src/collector/umtsiri.h +++ b/src/collector/umtsiri.h @@ -64,21 +64,6 @@ enum { UMTSIRI_EVENT_TYPE_PDPCONTEXT_MODIFICATION = 13, }; -int encode_umtsiri(wandder_encoder_t *encoder, - etsili_generic_freelist_t *freegenerics, - wandder_encode_job_t *precomputed, - openli_mobiri_job_t *job, uint32_t seqno, - openli_encoded_result_t *res); - -#ifdef HAVE_BER_ENCODING -int encode_umtsiri_ber( - openli_mobiri_job_t *job, - etsili_generic_freelist_t *freegenerics, - uint32_t seqno, - openli_encoded_result_t *res, - wandder_etsili_child_t *child); -#endif - int create_mobiri_job_from_session(collector_sync_t *sync, access_session_t *sess, ipintercept_t *ipint, uint8_t special); diff --git a/src/configparser.c b/src/configparser.c index da52ad72..9dc84946 100644 --- a/src/configparser.c +++ b/src/configparser.c @@ -1022,23 +1022,14 @@ static int global_parser(void *arg, yaml_document_t *doc, value->type == YAML_SCALAR_NODE && strcmp((char *)key->data.scalar.value, "encoding") == 0) { -/** Don't let users enable BER just yet, it's still incomplete. - * - * Still TODO: - * BER encoding for IPIRIs, UMTSIRIs and UMTSCCs - */ -#if 1 + /* We're back to only having one encoding method now, but + * allow users to choose "BER" without breaking anything. + */ if (strcasecmp(value->data.scalar.value, "BER") == 0) { -#ifdef HAVE_BER_ENCODING - glob->encoding_method = OPENLI_ENCODING_BER; -#else - logger(LOG_INFO, "BER encoding not supported by your version of libwandder, falling back to DER"); glob->encoding_method = OPENLI_ENCODING_DER; -#endif } else { glob->encoding_method = OPENLI_ENCODING_DER; } -#endif } if (key->type == YAML_SCALAR_NODE && diff --git a/src/etsili_core.c b/src/etsili_core.c index 3b805178..e5cbeb3b 100644 --- a/src/etsili_core.c +++ b/src/etsili_core.c @@ -110,7 +110,7 @@ static inline void encode_hi1_notification_body(wandder_encoder_t *encoder, wandder_encode_endseq(encoder); // End Outermost Sequence } -static inline void encode_umtscc_body(wandder_encoder_t *encoder, +wandder_encoded_result_t *encode_umtscc_body(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, void *ipcontent, uint32_t iplen, uint8_t dir) { @@ -146,7 +146,8 @@ static inline void encode_umtscc_body(wandder_encoder_t *encoder, wandder_encode_next_preencoded(encoder, jobarray, nextjob); wandder_encode_next(encoder, WANDDER_TAG_IPPACKET, WANDDER_CLASS_CONTEXT_PRIMITIVE, 4, ipcontent, iplen); - END_ENCODED_SEQUENCE(encoder, 5); + END_ENCODED_SEQUENCE(encoder, 4); + return wandder_encode_finish(encoder); } static inline void encode_ipcc_body(wandder_encoder_t *encoder, @@ -279,11 +280,11 @@ static inline void encode_ipmmiri_body(wandder_encoder_t *encoder, encode_ipmmiri_body_common(encoder, precomputed, iritype); wandder_encode_next(encoder, WANDDER_TAG_IPPACKET, WANDDER_CLASS_CONTEXT_PRIMITIVE, 0, ipcontent, iplen); - END_ENCODED_SEQUENCE(encoder, 7); + END_ENCODED_SEQUENCE(encoder, 6); } -static inline void encode_sipiri_body(wandder_encoder_t *encoder, +wandder_encoded_result_t *encode_sipiri_body(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, etsili_iri_type_t iritype, uint8_t *ipsrc, uint8_t *ipdest, int ipfamily, void *sipcontent, uint32_t siplen) { @@ -317,8 +318,7 @@ static inline void encode_sipiri_body(wandder_encoder_t *encoder, encipdst.ipvalue = calloc(16, sizeof(uint8_t)); memcpy(encipdst.ipvalue, ipdest, 16); } else { - END_ENCODED_SEQUENCE(encoder, 1); // ends outermost sequence - return; + return NULL; } encode_ipmmiri_body_common(encoder, precomputed, iritype); @@ -334,7 +334,9 @@ static inline void encode_sipiri_body(wandder_encoder_t *encoder, END_ENCODED_SEQUENCE(encoder, 1); wandder_encode_next(encoder, WANDDER_TAG_OCTETSTRING, WANDDER_CLASS_CONTEXT_PRIMITIVE, 2, sipcontent, siplen); - END_ENCODED_SEQUENCE(encoder, 8); + END_ENCODED_SEQUENCE(encoder, 7); + + return wandder_encode_finish(encoder); } static inline void encode_ipiri_id(wandder_encoder_t *encoder, @@ -380,7 +382,7 @@ static int sort_etsili_generic(etsili_generic_t *a, etsili_generic_t *b) { return 0; } -static inline void encode_umtsiri_body(wandder_encoder_t *encoder, +wandder_encoded_result_t *encode_umtsiri_body(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, etsili_iri_type_t iritype, etsili_generic_t *params) { @@ -642,11 +644,12 @@ static inline void encode_umtsiri_body(wandder_encoder_t *encoder, logger(LOG_INFO, "OpenLI: UMTS IRI record may be invalid..."); } - END_ENCODED_SEQUENCE(encoder, 8); + END_ENCODED_SEQUENCE(encoder, 7); + return wandder_encode_finish(encoder); } -static inline void encode_ipiri_body(wandder_encoder_t *encoder, +wandder_encoded_result_t *encode_ipiri_body(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, etsili_iri_type_t iritype, etsili_generic_t **params) { @@ -727,12 +730,11 @@ static inline void encode_ipiri_body(wandder_encoder_t *encoder, case IPIRI_CONTENTS_STARTTIME: case IPIRI_CONTENTS_ENDTIME: case IPIRI_CONTENTS_EXPECTED_ENDTIME: - if (p->itemlen != sizeof(struct timeval)) { - return; + if (p->itemlen == sizeof(struct timeval)) { + wandder_encode_next(encoder, WANDDER_TAG_GENERALTIME, + WANDDER_CLASS_CONTEXT_PRIMITIVE, p->itemnum, + p->itemptr, p->itemlen); } - wandder_encode_next(encoder, WANDDER_TAG_GENERALTIME, - WANDDER_CLASS_CONTEXT_PRIMITIVE, p->itemnum, - p->itemptr, p->itemlen); break; case IPIRI_CONTENTS_TARGET_NETWORKID: @@ -749,10 +751,11 @@ static inline void encode_ipiri_body(wandder_encoder_t *encoder, } } - END_ENCODED_SEQUENCE(encoder, 7); + END_ENCODED_SEQUENCE(encoder, 6); + return wandder_encode_finish(encoder); } -static inline void encode_ipmmcc_body(wandder_encoder_t *encoder, +void encode_ipmmcc_body(wandder_encoder_t *encoder, wandder_encode_job_t *precomputed, void *ipcontent, uint32_t iplen, uint8_t dir) { @@ -806,7 +809,7 @@ static inline void encode_ipmmcc_body(wandder_encoder_t *encoder, WANDDER_CLASS_CONTEXT_PRIMITIVE, 4, &mmccproto, sizeof(uint32_t)); - END_ENCODED_SEQUENCE(encoder, 6); + END_ENCODED_SEQUENCE(encoder, 5); } static inline void encode_etsili_pshdr_pc(wandder_encoder_t *encoder, @@ -819,19 +822,18 @@ static inline void encode_etsili_pshdr_pc(wandder_encoder_t *encoder, * into separate parameters. */ - wandder_encode_job_t *jobarray[9]; + wandder_encode_job_t *jobarray[8]; - jobarray[0] = &(precomputed[OPENLI_PREENCODE_USEQUENCE]); - jobarray[1] = &(precomputed[OPENLI_PREENCODE_CSEQUENCE_1]); - jobarray[2] = &(precomputed[OPENLI_PREENCODE_PSDOMAINID]); - jobarray[3] = &(precomputed[OPENLI_PREENCODE_LIID]); - jobarray[4] = &(precomputed[OPENLI_PREENCODE_AUTHCC]); - jobarray[5] = &(precomputed[OPENLI_PREENCODE_CSEQUENCE_3]); - jobarray[6] = &(precomputed[OPENLI_PREENCODE_CSEQUENCE_0]); - jobarray[7] = &(precomputed[OPENLI_PREENCODE_OPERATORID]); - jobarray[8] = &(precomputed[OPENLI_PREENCODE_NETWORKELEMID]); + jobarray[0] = &(precomputed[OPENLI_PREENCODE_CSEQUENCE_1]); + jobarray[1] = &(precomputed[OPENLI_PREENCODE_PSDOMAINID]); + jobarray[2] = &(precomputed[OPENLI_PREENCODE_LIID]); + jobarray[3] = &(precomputed[OPENLI_PREENCODE_AUTHCC]); + jobarray[4] = &(precomputed[OPENLI_PREENCODE_CSEQUENCE_3]); + jobarray[5] = &(precomputed[OPENLI_PREENCODE_CSEQUENCE_0]); + jobarray[6] = &(precomputed[OPENLI_PREENCODE_OPERATORID]); + jobarray[7] = &(precomputed[OPENLI_PREENCODE_NETWORKELEMID]); - wandder_encode_next_preencoded(encoder, jobarray, 9); + wandder_encode_next_preencoded(encoder, jobarray, 8); END_ENCODED_SEQUENCE(encoder, 1) wandder_encode_next(encoder, WANDDER_TAG_INTEGER, @@ -948,87 +950,6 @@ static inline void encode_etsili_pshdr(wandder_encoder_t *encoder, } -wandder_encoded_result_t *encode_etsi_ipcc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - struct timeval *tv, void *ipcontents, uint32_t iplen, uint8_t dir) { - - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_ipcc_body(encoder, precomputed, ipcontents, iplen, dir); - return wandder_encode_finish(encoder); - -} - -wandder_encoded_result_t *encode_etsi_umtscc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - struct timeval *tv, void *ipcontents, uint32_t iplen, uint8_t dir) { - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_umtscc_body(encoder, precomputed, ipcontents, iplen, dir); - return wandder_encode_finish(encoder); -} - -wandder_encoded_result_t *encode_etsi_ipmmcc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - struct timeval *tv, void *ipcontents, uint32_t iplen, uint8_t dir) { - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_ipmmcc_body(encoder, precomputed, ipcontents, iplen, dir); - return wandder_encode_finish(encoder); - -} - -wandder_encoded_result_t *encode_etsi_ipmmiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, void *ipcontents, - uint32_t iplen) { - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_ipmmiri_body(encoder, precomputed, iritype, ipcontents, iplen); - return wandder_encode_finish(encoder); -} - -wandder_encoded_result_t *encode_etsi_ipiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, - etsili_generic_t **params) { - - /* Note: params is a double pointer here because we are going to - * use HASH_SRT(), which may change which item should be the "start" - * of the hashed collection. If we want that change to persist back - * to our caller, e.g. to properly release all of the items in the - * collection, we need to pass in a reference to the collection - * to encode_ipiri_body(). - */ - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_ipiri_body(encoder, precomputed, iritype, params); - return wandder_encode_finish(encoder); - -} - -wandder_encoded_result_t *encode_etsi_umtsiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, - etsili_generic_t *params) { - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_umtsiri_body(encoder, precomputed, iritype, params); - return wandder_encode_finish(encoder); -} - -wandder_encoded_result_t *encode_etsi_sipiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, uint8_t *ipsrc, - uint8_t *ipdest, int ipfamily, void *sipcontents, uint32_t siplen) { - - encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); - encode_sipiri_body(encoder, precomputed, iritype, ipsrc, ipdest, ipfamily, - sipcontents, siplen); - return wandder_encode_finish(encoder); -} - - wandder_encoded_result_t *encode_etsi_keepalive(wandder_encoder_t *encoder, wandder_etsipshdr_data_t *hdrdata, int64_t seqno) { @@ -1400,4 +1321,310 @@ void etsili_copy_preencoded(wandder_encode_job_t *dest, } } +int etsili_create_header_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, + struct timeval *tv, encoded_header_template_t *tplate) { + + wandder_encoded_result_t *encres; + wandder_decoder_t *dec; + uint16_t level; + + + if (tplate == NULL) { + logger(LOG_INFO, "OpenLI: called etsili_create_header_template with NULL template?"); + return -1; + } + + if (encoder == NULL) { + logger(LOG_INFO, "OpenLI: called etsili_create_header_template with NULL encoder?"); + return -1; + } + + reset_wandder_encoder(encoder); + + /* Create an encoded header */ + encode_etsili_pshdr_pc(encoder, precomputed, cin, seqno, tv); + encres = wandder_encode_finish(encoder); + + if (encres == NULL || encres->len == 0 || encres->encoded == NULL) { + logger(LOG_INFO, "OpenLI: failed to encode ETSI PS header for template"); + if (encres) { + wandder_release_encoded_result(encoder, encres); + } + return -1; + } + + /* Copy the encoded header to the template */ + tplate->header = malloc(encres->len); + memcpy(tplate->header, encres->encoded, encres->len); + tplate->header_len = encres->len; + + /* Release the encoded result -- the caller will use the templated copy */ + wandder_release_encoded_result(encoder, encres); + + /* Use a decoder to find the locations of the sequence number, timestamp + * seconds and timestamp microseconds + */ + dec = init_wandder_decoder(NULL, tplate->header, tplate->header_len, 0); + if (dec == NULL) { + logger(LOG_INFO, "OpenLI: unable to create decoder for templated ETSI PS header"); + return -1; + } + + if (wandder_decode_next(dec) <= 0) { + logger(LOG_INFO, "OpenLI: cannot decode templated ETSI PS header"); + free_wandder_decoder(dec); + return -1; + } + + if (wandder_decode_sequence_until(dec, 4) == 1) { + tplate->seqno_ptr = wandder_get_itemptr(dec); + tplate->seqno_size = wandder_get_itemlen(dec); + } else { + logger(LOG_INFO, "OpenLI: cannot find sequence number in templated ETSI PS header"); + free_wandder_decoder(dec); + return -1; + } + + level = wandder_get_level(dec); + + while (1) { + int r; + if ((r = wandder_decode_next(dec)) < 0) { + logger(LOG_INFO, "OpenLI: cannot continue decode templated ETSI PS header"); + free_wandder_decoder(dec); + return -1; + } + + if (r == 0) { + break; + } + + if (wandder_get_level(dec) < level) { + break; + } + if (wandder_get_level(dec) > level) { + continue; + } + + if (wandder_get_identifier(dec) != 7) { + continue; + } + + /* Must be at start of microSecondsTimestamp sequence */ + if (wandder_decode_next(dec) <= 0) { + logger(LOG_INFO, "OpenLI: cannot decode timestamp section of templated ETSI PS header"); + free_wandder_decoder(dec); + return -1; + } + + tplate->tssec_ptr = wandder_get_itemptr(dec); + tplate->tssec_size = wandder_get_itemlen(dec); + + if (wandder_decode_next(dec) <= 0) { + logger(LOG_INFO, "OpenLI: cannot decode timestamp section of templated ETSI PS header"); + free_wandder_decoder(dec); + return -1; + } + + tplate->tsusec_ptr = wandder_get_itemptr(dec); + tplate->tsusec_size = wandder_get_itemlen(dec); + break; + } + + /* Return success */ + free_wandder_decoder(dec); + return 0; + +} + +int etsili_update_header_template(encoded_header_template_t *tplate, + int64_t seqno, struct timeval *tv) { + int i; + + /* Assume that we've been provided the right template with sufficient + * space to fit the sequence number and timestamps -- ideally we would + * validate this, but the point of the template is to save CPU cycles + * not waste them on double-checking something that we should get + * right anyway... + */ + + for (i = tplate->seqno_size - 1; i >= 0; i--) { + *(tplate->seqno_ptr + i) = (seqno & 0xff); + seqno = seqno >> 8; + } + + for (i = tplate->tssec_size - 1; i >= 0; i--) { + *(tplate->tssec_ptr + i) = (tv->tv_sec & 0xff); + tv->tv_sec = tv->tv_sec >> 8; + } + + for (i = tplate->tsusec_size - 1; i >= 0; i--) { + *(tplate->tsusec_ptr + i) = (tv->tv_usec & 0xff); + tv->tv_usec = tv->tv_usec >> 8; + } + + return 0; +} + +int etsili_update_ipmmcc_template(encoded_global_template_t *tplate, + uint8_t *ipcontent, uint16_t ipclen) { + + assert(ipclen == tplate->cc_content.content_size); + + memcpy(tplate->cc_content.content_ptr, ipcontent, ipclen); + return 0; +} + +int etsili_create_ipmmcc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint8_t *ipcontent, + uint16_t ipclen, encoded_global_template_t *tplate) { + + wandder_encoded_result_t *encres; + wandder_decoder_t *dec; + + if (tplate == NULL) { + logger(LOG_INFO, "OpenLI: called etsili_create_ipmmcc_template with NULL template?"); + return -1; + } + + if (encoder == NULL) { + logger(LOG_INFO, "OpenLI: called etsili_create_ipmmcc_template with NULL encoder?"); + return -1; + } + + reset_wandder_encoder(encoder); + + encode_ipmmcc_body(encoder, precomputed, ipcontent, ipclen, dir); + encres = wandder_encode_finish(encoder); + + if (encres == NULL || encres->len == 0 || encres->encoded == NULL) { + logger(LOG_INFO, "OpenLI: failed to encode ETSI IPMMCC body for template"); + if (encres) { + wandder_release_encoded_result(encoder, encres); + } + return -1; + } + + /* Copy the encoded header to the template */ + tplate->cc_content.cc_wrap = malloc(encres->len); + memcpy(tplate->cc_content.cc_wrap, encres->encoded, encres->len); + tplate->cc_content.cc_wrap_len = encres->len; + tplate->cc_content.content_size = ipclen; + + /* Find the MMCCContents and save a pointer to the value location so + * we can overwrite it when another intercepted packet can use this + * template. + */ + dec = init_wandder_decoder(NULL, tplate->cc_content.cc_wrap, + tplate->cc_content.cc_wrap_len, 0); + if (dec == NULL) { + logger(LOG_INFO, "OpenLI: unable to create decoder for templated ETSI IPMMCC"); + return -1; + } + + /* TODO add tedious error checking */ + wandder_decode_next(dec); // payload + wandder_decode_next(dec); // ccpayloadsequence + wandder_decode_next(dec); // ccpayload + wandder_decode_sequence_until(dec, 2); // ccContents + wandder_decode_next(dec); // IPMMCC + wandder_decode_next(dec); // IPMMCCObjId + wandder_decode_next(dec); // MMCCContents + + if (wandder_get_identifier(dec) != 1 || wandder_get_itemlen(dec) != ipclen) + { + assert(0); + } + + tplate->cc_content.content_ptr = wandder_get_itemptr(dec); + + /* Release the encoded result -- the caller will use the templated copy */ + wandder_release_encoded_result(encoder, encres); +} + +enum { + CC_TEMPLATE_TYPE_IPCC, + CC_TEMPLATE_TYPE_UMTSCC +}; + +static int etsili_create_generic_cc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint16_t ipclen, + encoded_global_template_t *tplate, int templatetype) { + + wandder_encoded_result_t *encres; + const char *funcname; + + if (templatetype == CC_TEMPLATE_TYPE_IPCC) { + funcname = "etsili_create_ipcc_template"; + } else if (templatetype == CC_TEMPLATE_TYPE_UMTSCC) { + funcname = "etsili_create_umtscc_template"; + } else { + funcname = "etsili_create_generic_cc_template"; + } + + if (tplate == NULL) { + logger(LOG_INFO, "OpenLI: called %s with NULL template?", funcname); + return -1; + } + + if (encoder == NULL) { + logger(LOG_INFO, "OpenLI: called %s with NULL encoder?", funcname); + return -1; + } + + reset_wandder_encoder(encoder); + + if (templatetype == CC_TEMPLATE_TYPE_IPCC) { + /* Create an encoded IPCC body -- NULL should be OK for the IPcontents, + * since it won't be touched by libwandder (we copy it in ourselves + * manually later on). */ + encode_ipcc_body(encoder, precomputed, NULL, ipclen, dir); + } else if (templatetype == CC_TEMPLATE_TYPE_UMTSCC) { + encode_umtscc_body(encoder, precomputed, NULL, ipclen, dir); + } else { + logger(LOG_INFO, "OpenLI: unexpected CC template type: %d", + templatetype); + return -1; + } + encres = wandder_encode_finish(encoder); + + if (encres == NULL || encres->len == 0 || encres->encoded == NULL) { + logger(LOG_INFO, "OpenLI: failed to encode ETSI CC body in %s", + funcname); + if (encres) { + wandder_release_encoded_result(encoder, encres); + } + return -1; + } + + /* Copy the encoded header to the template */ + tplate->cc_content.cc_wrap = malloc(encres->len); + memcpy(tplate->cc_content.cc_wrap, encres->encoded, encres->len); + tplate->cc_content.cc_wrap_len = encres->len; + tplate->cc_content.content_size = ipclen; + tplate->cc_content.content_ptr = NULL; + + /* Release the encoded result -- the caller will use the templated copy */ + wandder_release_encoded_result(encoder, encres); + return 0; +} + +int etsili_create_umtscc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint16_t ipclen, + encoded_global_template_t *tplate) { + + return etsili_create_generic_cc_template(encoder, precomputed, dir, + ipclen, tplate, CC_TEMPLATE_TYPE_UMTSCC); +} + +int etsili_create_ipcc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint16_t ipclen, + encoded_global_template_t *tplate) { + + return etsili_create_generic_cc_template(encoder, precomputed, dir, + ipclen, tplate, CC_TEMPLATE_TYPE_IPCC); + +} + // vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/etsili_core.h b/src/etsili_core.h index c0ade126..ba715e4f 100644 --- a/src/etsili_core.h +++ b/src/etsili_core.h @@ -164,37 +164,52 @@ typedef struct wandder_etsipshdr_data { } wandder_etsipshdr_data_t; -wandder_encoded_result_t *encode_etsi_ipcc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - struct timeval *tv, void *ipcontents, uint32_t iplen, uint8_t dir); +typedef struct encoded_header_template { + uint32_t key; + uint8_t *header; + uint16_t header_len; + uint8_t seqno_size; + uint8_t tssec_size; + uint8_t tsusec_size; + uint8_t *seqno_ptr; + uint8_t *tssec_ptr; + uint8_t *tsusec_ptr; -wandder_encoded_result_t *encode_etsi_ipmmcc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - struct timeval *tv, void *ipcontents, uint32_t iplen, uint8_t dir); +} encoded_header_template_t; -wandder_encoded_result_t *encode_etsi_umtscc(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - struct timeval *tv, void *ipcontents, uint32_t iplen, uint8_t dir); +typedef struct encoded_cc_template { + uint8_t *content_ptr; + uint16_t content_size; -wandder_encoded_result_t *encode_etsi_ipmmiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, void *ipcontents, - uint32_t iplen); + uint8_t *cc_wrap; + uint16_t cc_wrap_len; -wandder_encoded_result_t *encode_etsi_ipiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, +} encoded_cc_template_t; + +typedef struct encoded_global_template { + uint32_t key; + uint8_t cctype; + + encoded_cc_template_t cc_content; +} encoded_global_template_t; + + +wandder_encoded_result_t *encode_umtscc_body(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, void *ipcontent, uint32_t iplen, + uint8_t dir); + +wandder_encoded_result_t *encode_ipiri_body(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, etsili_iri_type_t iritype, etsili_generic_t **params); -wandder_encoded_result_t *encode_etsi_umtsiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, - etsili_generic_t *params); +wandder_encoded_result_t *encode_sipiri_body(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, + etsili_iri_type_t iritype, uint8_t *ipsrc, uint8_t *ipdest, + int ipfamily, void *sipcontent, uint32_t siplen); -wandder_encoded_result_t *encode_etsi_sipiri(wandder_encoder_t *encoder, - wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, - etsili_iri_type_t iritype, struct timeval *tv, uint8_t *ipsrc, - uint8_t *ipdest, int ipfamily, void *sipcontents, uint32_t siplen); +wandder_encoded_result_t *encode_umtsiri_body(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, + etsili_iri_type_t iritype, etsili_generic_t *params); wandder_encoded_result_t *encode_etsi_keepalive(wandder_encoder_t *encoder, wandder_etsipshdr_data_t *hdrdata, int64_t seqno); @@ -220,6 +235,24 @@ void etsili_preencode_static_fields(wandder_encode_job_t *pendarray, void etsili_clear_preencoded_fields(wandder_encode_job_t *pendarray); void etsili_copy_preencoded(wandder_encode_job_t *dest, wandder_encode_job_t *src); + + +int etsili_create_umtscc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint16_t ipclen, + encoded_global_template_t *tplate); +int etsili_update_ipmmcc_template(encoded_global_template_t *tplate, + uint8_t *ipcontent, uint16_t ipclen); +int etsili_create_ipmmcc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint8_t *ipcontent, + uint16_t ipclen, encoded_global_template_t *tplate); +int etsili_create_header_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, int64_t cin, int64_t seqno, + struct timeval *tv, encoded_header_template_t *tplate); +int etsili_update_header_template(encoded_header_template_t *tplate, + int64_t seqno, struct timeval *tv); +int etsili_create_ipcc_template(wandder_encoder_t *encoder, + wandder_encode_job_t *precomputed, uint8_t dir, uint16_t ipclen, + encoded_global_template_t *tplate); #endif // vim: set sw=4 tabstop=4 softtabstop=4 expandtab : diff --git a/src/export_buffer.c b/src/export_buffer.c index 976e28cf..1d2d4df5 100644 --- a/src/export_buffer.c +++ b/src/export_buffer.c @@ -35,17 +35,23 @@ #define BUFFER_ALLOC_SIZE (1024 * 1024 * 50) #define BUFFER_WARNING_THRESH (1024 * 1024 * 1024) +#define BUF_OFFSET_FREQUENCY (1024 * 256) void init_export_buffer(export_buffer_t *buf) { buf->bufhead = NULL; buf->buftail = NULL; buf->alloced = 0; buf->partialfront = 0; + buf->partialrem = 0; buf->deadfront = 0; buf->nextwarn = BUFFER_WARNING_THRESH; + buf->record_offsets = NULL; + buf->since_last_saved_offset = 0; } void release_export_buffer(export_buffer_t *buf) { + Word_t rc; + J1FA(rc, buf->record_offsets); free(buf->bufhead); } @@ -53,6 +59,49 @@ uint64_t get_buffered_amount(export_buffer_t *buf) { return (buf->buftail - (buf->bufhead + buf->deadfront)); } +void reset_export_buffer(export_buffer_t *buf) { + buf->partialfront = 0; + buf->partialrem = 0; +} + +static inline void dump_buffer_offsets(export_buffer_t *buf) { + + Word_t index = 0; + int rcint; + + J1F(rcint, buf->record_offsets, index); + fprintf(stderr, "Offsets: "); + while(rcint) { + fprintf(stderr, "%lu ", index); + J1N(rcint, buf->record_offsets, index); + } + fprintf(stderr, "\n"); +} + +static inline int slide_buffer(export_buffer_t *buf, uint8_t *start, + uint64_t amount) { + + uint64_t slide = start - buf->bufhead; + Word_t index = 0; + int rcint, x; + + if (amount == 0) { + J1FA(rcint, buf->record_offsets); + return 0; + } + + memmove(buf->bufhead, start, amount); + + J1F(rcint, buf->record_offsets, index); + while (rcint) { + J1U(x, buf->record_offsets, index); + if (index >= slide) { + J1S(x, buf->record_offsets, index - slide); + } + J1N(rcint, buf->record_offsets, index); + } + return 0; +} static inline uint64_t extend_buffer(export_buffer_t *buf) { @@ -61,7 +110,7 @@ static inline uint64_t extend_buffer(export_buffer_t *buf) { uint64_t bufused = buf->buftail - (buf->bufhead + buf->deadfront); if (buf->deadfront > 0) { - memmove(buf->bufhead, buf->bufhead + buf->deadfront, bufused); + slide_buffer(buf, buf->bufhead + buf->deadfront, bufused); } space = (uint8_t *)realloc(buf->bufhead, buf->alloced + BUFFER_ALLOC_SIZE); @@ -95,6 +144,7 @@ uint64_t append_etsipdu_to_buffer(export_buffer_t *buf, uint64_t bufused = buf->buftail - (buf->bufhead); uint64_t spaceleft = buf->alloced - bufused; + int rcint; if (bufused == 0) { buf->partialfront = beensent; @@ -109,6 +159,12 @@ uint64_t append_etsipdu_to_buffer(export_buffer_t *buf, memcpy(buf->buftail, (void *)pdustart, pdulen); + if (buf->since_last_saved_offset + pdulen >= BUF_OFFSET_FREQUENCY) { + J1S(rcint, buf->record_offsets, bufused); + buf->since_last_saved_offset = 0; + } + + buf->since_last_saved_offset += pdulen; buf->buftail += pdulen; return (buf->buftail - buf->bufhead); @@ -120,20 +176,14 @@ uint64_t append_message_to_buffer(export_buffer_t *buf, uint32_t enclen = res->msgbody->len - res->ipclen; uint64_t bufused = buf->buftail - buf->bufhead; uint64_t spaceleft = buf->alloced - bufused; - - int liidlen; - - if (res->liid == NULL) { - return 0; - } - - liidlen = strlen(res->liid); + uint32_t added = 0; + int rcint; if (bufused == 0) { buf->partialfront = beensent; } - while (spaceleft < res->msgbody->len + sizeof(res->header) + liidlen + 2) { + while (spaceleft < res->msgbody->len + sizeof(res->header)) { /* Add some space to the buffer */ spaceleft = extend_buffer(buf); if (spaceleft == 0) { @@ -143,32 +193,24 @@ uint64_t append_message_to_buffer(export_buffer_t *buf, memcpy(buf->buftail, &res->header, sizeof(res->header)); buf->buftail += sizeof(res->header); + added += sizeof(res->header); - if (res->liid) { - uint16_t l = htons(liidlen); - memcpy(buf->buftail, &l, sizeof(uint16_t)); - memcpy(buf->buftail + 2, res->liid, liidlen); - buf->buftail += (liidlen + 2); + if (enclen > 0) { + memcpy(buf->buftail, res->msgbody->encoded, enclen); + buf->buftail += enclen; } - if (res->isDer){ - if (enclen > 0) { - memcpy(buf->buftail, res->msgbody->encoded, enclen); - buf->buftail += enclen; - } - - if (res->ipclen > 0) { - memcpy(buf->buftail, res->ipcontents, res->ipclen); - buf->buftail += res->ipclen; - } - } - else { - memcpy(buf->buftail, res->msgbody->encoded, res->msgbody->len); - buf->buftail += res->msgbody->len; - //BER has the payload already encoded into the result, DER leaves the payload out untill now - //BER has a set of trailing ending octets (number varies by msg type) + if (res->ipclen > 0) { + memcpy(buf->buftail, res->ipcontents, res->ipclen); + buf->buftail += res->ipclen; } + added += res->msgbody->len; + if (buf->since_last_saved_offset + added >= BUF_OFFSET_FREQUENCY) { + J1S(rcint, buf->record_offsets, bufused); + buf->since_last_saved_offset = 0; + } + buf->since_last_saved_offset += added; return (buf->buftail - buf->bufhead); } @@ -217,23 +259,66 @@ int transmit_heartbeat(int fd, SSL *ssl) { return (int)(sizeof(hbeat)); } +static inline void post_transmit(export_buffer_t *buf) { + + uint64_t rem = 0; + uint8_t *newbuf = NULL; + uint64_t resize = 0; + + assert(buf->buftail >= buf->bufhead + buf->deadfront); + rem = (buf->buftail - (buf->bufhead + buf->deadfront)); + + /* Consider shrinking buffer if it is now way too large */ + if (rem < buf->alloced / 2 && buf->alloced > 10 * BUFFER_ALLOC_SIZE) { + + resize = ((rem / BUFFER_ALLOC_SIZE) + 1) * BUFFER_ALLOC_SIZE; + slide_buffer(buf, buf->bufhead + buf->deadfront, + rem); + newbuf = (uint8_t *)realloc(buf->bufhead, resize); + buf->buftail = newbuf + rem; + buf->bufhead = newbuf; + buf->alloced = resize; + buf->deadfront = 0; + } else if (buf->alloced - (buf->buftail - buf->bufhead) < + 0.25 * buf->alloced && buf->deadfront >= 0.25 * buf->alloced) { + slide_buffer(buf, buf->bufhead + buf->deadfront, + rem); + buf->buftail = buf->bufhead + rem; + assert(buf->buftail < buf->bufhead + buf->alloced); + buf->deadfront = 0; + } + + buf->partialfront = 0; + buf->partialrem = 0; +} + int transmit_buffered_records(export_buffer_t *buf, int fd, uint64_t bytelimit, SSL *ssl) { uint64_t sent = 0; - uint64_t rem = 0; uint8_t *bhead = buf->bufhead + buf->deadfront; uint64_t offset = buf->partialfront; - int ret; - - sent = (buf->buftail - (bhead + offset)); - - if (sent > bytelimit) { - sent = bytelimit; + int ret, rcint; + Word_t index = 0; + + if (buf->partialrem > 0) { + sent = buf->partialrem; + } else { + sent = (buf->buftail - (bhead + offset)); + + if (sent > bytelimit) { + index = bytelimit + 1 + buf->deadfront; + J1P(rcint, buf->record_offsets, index); + if (rcint == 0) { + assert(rcint != 0); + return 0; + } + sent = index - buf->deadfront; + } + buf->partialrem = sent; } if (sent != 0) { - if (ssl != NULL) { while (1) { ret = SSL_write(ssl, bhead + offset, (int)sent); @@ -264,38 +349,13 @@ int transmit_buffered_records(export_buffer_t *buf, int fd, } else if (ret < sent) { /* Partial send, move partialfront ahead by whatever we did send. */ buf->partialfront += (uint32_t)ret; + buf->partialrem -= (uint32_t)ret; return ret; } buf->deadfront += ((uint32_t)ret + buf->partialfront); } - assert(buf->buftail >= buf->bufhead + buf->deadfront); - rem = (buf->buftail - (buf->bufhead + buf->deadfront)); - - /* Consider shrinking buffer if it is now way too large */ - if (rem < buf->alloced / 2 && buf->alloced > 10 * BUFFER_ALLOC_SIZE) { - - uint8_t *newbuf = NULL; - uint64_t resize = 0; - resize = ((rem / BUFFER_ALLOC_SIZE) + 1) * BUFFER_ALLOC_SIZE; - - memmove(buf->bufhead, bhead + sent + offset, rem); - newbuf = (uint8_t *)realloc(buf->bufhead, resize); - buf->buftail = newbuf + rem; - buf->bufhead = newbuf; - buf->alloced = resize; - buf->deadfront = 0; - } else if (buf->alloced - (buf->buftail - buf->bufhead) < - 0.25 * buf->alloced && buf->deadfront >= 0.25 * buf->alloced) { - if (rem > 0) { - memmove(buf->bufhead, bhead + sent + offset, rem); - } - buf->buftail = buf->bufhead + rem; - assert(buf->buftail < buf->bufhead + buf->alloced); - buf->deadfront = 0; - } - - buf->partialfront = 0; + post_transmit(buf); return sent; } @@ -307,22 +367,20 @@ int transmit_buffered_records_RMQ(export_buffer_t *buf, uint64_t sent = 0; uint64_t rem = 0; uint8_t *bhead = buf->bufhead + buf->deadfront; - uint64_t offset = buf->partialfront; int ret; ii_header_t *header = NULL; - sent = (buf->buftail - (bhead + offset)); + sent = (buf->buftail - (bhead)); if (sent > bytelimit) { sent = bytelimit; } if (sent != 0) { - amqp_bytes_t message_bytes; amqp_basic_properties_t props; message_bytes.len = sent; - message_bytes.bytes = bhead + offset; + message_bytes.bytes = bhead; props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; props.delivery_mode = 2; /* persistent mode */ @@ -332,8 +390,8 @@ int transmit_buffered_records_RMQ(export_buffer_t *buf, channel, exchange, routing_key, - 0, - 0, + 0, + 0, &props, message_bytes); @@ -344,36 +402,10 @@ int transmit_buffered_records_RMQ(export_buffer_t *buf, ret = sent; } - buf->deadfront += ((uint32_t)ret + buf->partialfront); + buf->deadfront += ((uint32_t)ret); } - assert(buf->buftail >= buf->bufhead + buf->deadfront); - rem = (buf->buftail - (buf->bufhead + buf->deadfront)); - - /* Consider shrinking buffer if it is now way too large */ - if (rem < buf->alloced / 2 && buf->alloced > 10 * BUFFER_ALLOC_SIZE) { - - uint8_t *newbuf = NULL; - uint64_t resize = 0; - resize = ((rem / BUFFER_ALLOC_SIZE) + 1) * BUFFER_ALLOC_SIZE; - - memmove(buf->bufhead, bhead + sent + offset, rem); - newbuf = (uint8_t *)realloc(buf->bufhead, resize); - buf->buftail = newbuf + rem; - buf->bufhead = newbuf; - buf->alloced = resize; - buf->deadfront = 0; - } else if (buf->alloced - (buf->buftail - buf->bufhead) < - 0.25 * buf->alloced && buf->deadfront >= 0.25 * buf->alloced) { - if (rem > 0) { - memmove(buf->bufhead, bhead + sent + offset, rem); - } - buf->buftail = buf->bufhead + rem; - assert(buf->buftail < buf->bufhead + buf->alloced); - buf->deadfront = 0; - } - - buf->partialfront = 0; + post_transmit(buf); return sent; } diff --git a/src/export_buffer.h b/src/export_buffer.h index 2fd52bf8..103b0efd 100644 --- a/src/export_buffer.h +++ b/src/export_buffer.h @@ -30,6 +30,7 @@ #include "config.h" #include #include +#include #include "netcomms.h" #include "collector/collector_publish.h" @@ -43,11 +44,7 @@ typedef struct encoder_result { char *liid; char *cinstr; uint8_t encodedby; - uint8_t isDer; openli_export_recv_t *origreq; -#ifdef HAVE_BER_ENCODING - wandder_etsili_child_t *child; -#endif } PACKED openli_encoded_result_t; @@ -58,12 +55,17 @@ typedef struct export_buffer { uint32_t deadfront; uint32_t partialfront; + uint32_t partialrem; uint64_t nextwarn; + + Pvoid_t record_offsets; + uint32_t since_last_saved_offset; } export_buffer_t; void init_export_buffer(export_buffer_t *buf); +void reset_export_buffer(export_buffer_t *buf); void release_export_buffer(export_buffer_t *buf); uint64_t get_buffered_amount(export_buffer_t *buf); uint64_t append_message_to_buffer(export_buffer_t *buf, diff --git a/src/mediator/handover.c b/src/mediator/handover.c index 2c7f560f..8ddadff8 100644 --- a/src/mediator/handover.c +++ b/src/mediator/handover.c @@ -78,10 +78,10 @@ int xmit_handover(med_epoll_ev_t *mev) { wandder_release_encoded_result(NULL, ho->ho_state->pending_ka); ho->ho_state->pending_ka = NULL; -/* + /* logger(LOG_INFO, "successfully sent keep alive to %s:%s HI%d", ho->ipstr, ho->portstr, ho->handover_type); -*/ + */ /* Start the timer for the response */ if (start_mediator_timer(ho->aliverespev, ho->ho_state->kawait) == -1) { @@ -133,13 +133,13 @@ int xmit_handover(med_epoll_ev_t *mev) { return 0; } - /* Send some of our buffered records, but no more than 16,000 bytes at + /* Send some of our buffered records, but no more than 1MB at * a time -- we need to go back to our epoll loop to handle other events * rather than getting stuck trying to send massive amounts of data in * one go. */ if ((ret = transmit_buffered_records(&(ho->ho_state->buf), mev->fd, - 16000, NULL)) == -1) { + (1024 * 1024), NULL)) == -1) { return -1; } @@ -247,6 +247,11 @@ void disconnect_handover(handover_t *ho) { ho->ho_state->incoming = NULL; } + /* Reset any connection-specific state in the export buffer for this + * handover. + */ + reset_export_buffer(&(ho->ho_state->buf)); + /* This handover is officially disconnected, so no more logging for it * until / unless it reconnects. */ @@ -293,11 +298,10 @@ int disable_handover_writing(handover_t *ho) { int ret = 0; uint32_t events = EPOLLRDHUP | EPOLLIN ; - pthread_mutex_lock(&(ho->ho_state->ho_mutex)); - - if (ho->ho_state->outenabled) { - ret = modify_mediator_fdevent(ho->outev, events); - } + if (!ho->ho_state->outenabled) { + return 0; + } + ret = modify_mediator_fdevent(ho->outev, events); if (ret == -1) { logger(LOG_INFO, @@ -308,7 +312,6 @@ int disable_handover_writing(handover_t *ho) { ho->ho_state->outenabled = 0; } - pthread_mutex_unlock(&(ho->ho_state->ho_mutex)); return ret; } @@ -324,11 +327,11 @@ int enable_handover_writing(handover_t *ho) { int ret = 0; uint32_t events = EPOLLRDHUP | EPOLLIN | EPOLLOUT; - pthread_mutex_lock(&(ho->ho_state->ho_mutex)); + if (ho->ho_state->outenabled) { + return 0; + } - if (!ho->ho_state->outenabled) { - ret = modify_mediator_fdevent(ho->outev, events); - } + ret = modify_mediator_fdevent(ho->outev, events); if (ret == -1) { logger(LOG_INFO, @@ -338,7 +341,6 @@ int enable_handover_writing(handover_t *ho) { } else { ho->ho_state->outenabled = 1; } - pthread_mutex_unlock(&(ho->ho_state->ho_mutex)); return ret; } diff --git a/src/mediator/mediator.c b/src/mediator/mediator.c index 1c3641a7..2cfa2b1b 100644 --- a/src/mediator/mediator.c +++ b/src/mediator/mediator.c @@ -1012,7 +1012,9 @@ static int receive_liid_mapping(mediator_state_t *state, uint8_t *msgbody, * * @param mev The epoll event for the keep alive response timer * - * @return 0 always. + * @return -1 to force the epoll loop to restart, rather than try to + * continue processing events (in case the handover that we've + * just disconnected is one of the upcoming events). */ static int trigger_ka_failure(med_epoll_ev_t *mev) { handover_t *ho = (handover_t *)(mev->state); @@ -1023,7 +1025,9 @@ static int trigger_ka_failure(med_epoll_ev_t *mev) { } disconnect_handover(ho); - return 0; + + /* Return -1 here to force a fresh call to epoll_wait() */ + return -1; } /** Receives and actions one or more messages received from the provisioner. @@ -1105,6 +1109,8 @@ static int receive_provisioner(mediator_state_t *state, med_epoll_ev_t *mev) { return 0; } +#define MAX_COLL_RECV (10 * 1024 * 1024) + /** Receives and actions a message from a collector, which can include * an encoded ETSI CC or IRI. * @@ -1123,6 +1129,7 @@ static int receive_collector(mediator_state_t *state, med_epoll_ev_t *mev) { openli_proto_msgtype_t msgtype; mediator_pcap_msg_t pcapmsg; uint16_t liidlen; + uint32_t total_recvd = 0; do { if (mev->fdtype == MED_EPOLL_COL_RMQ) { @@ -1142,6 +1149,8 @@ static int receive_collector(mediator_state_t *state, med_epoll_ev_t *mev) { return -1; } + total_recvd += msglen; + switch(msgtype) { case OPENLI_PROTO_DISCONNECT: logger(LOG_INFO, @@ -1226,7 +1235,7 @@ static int receive_collector(mediator_state_t *state, med_epoll_ev_t *mev) { } return -1; } - } while (msgtype != OPENLI_PROTO_NO_MESSAGE); + } while (msgtype != OPENLI_PROTO_NO_MESSAGE && total_recvd < MAX_COLL_RECV); return 0; } @@ -1314,6 +1323,7 @@ static int check_epoll_fd(mediator_state_t *state, struct epoll_event *ev) { } if (ret == -1) { handover_t *ho = (handover_t *)(mev->state); + logger(LOG_INFO, "fail %u", ev->events & EPOLLIN); disconnect_handover(ho); } break;