From 95876f4bfffc7f2994ac39b701459a2e462e5a0a Mon Sep 17 00:00:00 2001 From: Vasily Evseenko Date: Thu, 12 Oct 2017 14:21:32 +0300 Subject: [PATCH] Simplify aggregator logic Remove seq from data packets Add compatibility for different endianess on RX/TX --- rx.cpp | 117 ++++++++++++++++++++++++---------------------- rx.hpp | 29 ++++++------ tx.cpp | 28 +++++------ tx.hpp | 1 - wifibroadcast.hpp | 41 ++++++++++++---- 5 files changed, 120 insertions(+), 96 deletions(-) diff --git a/rx.cpp b/rx.cpp index 70cf98e8..6a20678d 100644 --- a/rx.cpp +++ b/rx.cpp @@ -44,7 +44,7 @@ extern "C" #include "rx.hpp" -Receiver::Receiver(const char *wlan, int radio_port, Aggregator *agg) : agg(agg) +Receiver::Receiver(const char *wlan, int radio_port, BaseAggregator *agg) : agg(agg) { char errbuf[PCAP_ERRBUF_SIZE]; @@ -189,10 +189,11 @@ void Receiver::loop_iter(void) } -LocalAggregator::LocalAggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), proc_ring_last(0) +Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1) { - sockfd = open_udp_socket(client_addr, client_port); + sockfd = open_udp_socket_for_tx(client_addr, client_port); fec_p = fec_new(fec_k, fec_n); + memset(session_key, '\0', sizeof(session_key)); for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++) { @@ -208,11 +209,6 @@ LocalAggregator::LocalAggregator(const string &client_addr, int client_port, int memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t)); } - for(int ring_idx = 0; ring_idx < PROC_RING_SIZE; ring_idx++) - { - proc_ring[ring_idx] = -1; - } - FILE *fp; if((fp = fopen(keypair.c_str(), "r")) == NULL) { @@ -224,7 +220,7 @@ LocalAggregator::LocalAggregator(const string &client_addr, int client_port, int } -LocalAggregator::~LocalAggregator() +Aggregator::~Aggregator() { for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++) @@ -240,31 +236,24 @@ LocalAggregator::~LocalAggregator() } -RemoteAggregator::RemoteAggregator(const string &client_addr, int client_port) +Forwarder::Forwarder(const string &client_addr, int client_port) { - sockfd = open_udp_socket(client_addr, client_port); + sockfd = open_udp_socket_for_tx(client_addr, client_port); } -void RemoteAggregator::process_packet(const uint8_t *buf, size_t size) +void Forwarder::process_packet(const uint8_t *buf, size_t size) { send(sockfd, buf, size, 0); } -RemoteAggregator::~RemoteAggregator() +Forwarder::~Forwarder() { close(sockfd); } - -void LocalAggregator::add_processed_block(int block_idx) -{ - proc_ring[proc_ring_last] = block_idx; - proc_ring_last = modN(proc_ring_last + 1, PROC_RING_SIZE); -} - -int LocalAggregator::rx_ring_push(void) +int Aggregator::rx_ring_push(void) { if(rx_ring_alloc < RX_RING_SIZE) { @@ -276,15 +265,14 @@ int LocalAggregator::rx_ring_push(void) // override existing data int idx = rx_ring_front; - fprintf(stderr, "override block %d with %d fragments\n", rx_ring[idx].block_idx, rx_ring[idx].has_fragments); + fprintf(stderr, "override block 0x%Lx with %d fragments\n", (long long unsigned int)(rx_ring[idx].block_idx), rx_ring[idx].has_fragments); - add_processed_block(rx_ring[idx].block_idx); rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE); return idx; } -int LocalAggregator::get_block_ring_idx(int block_idx) +int Aggregator::get_block_ring_idx(uint64_t block_idx) { // check if block already added for(int i = rx_ring_front, c = rx_ring_alloc; c > 0; i = modN(i + 1, RX_RING_SIZE), c--) @@ -293,28 +281,21 @@ int LocalAggregator::get_block_ring_idx(int block_idx) } // check if block was already processed - for(int i=0; i < PROC_RING_SIZE; i++) - { - if (proc_ring[i] == block_idx) return -1; - } - - int new_blocks; - if (rx_ring_alloc > 0) + if (last_known_block != (uint64_t)-1 && block_idx <= last_known_block) { - new_blocks = modN(block_idx - rx_ring[modN(rx_ring_front + rx_ring_alloc - 1, RX_RING_SIZE)].block_idx, 256); - }else - { - int last_proc_block = proc_ring[modN(proc_ring_last - 1, PROC_RING_SIZE)]; - new_blocks = modN(last_proc_block >=0 ? block_idx - last_proc_block : 1, 256); + return -1; } + int new_blocks = (int)min(last_known_block != (uint64_t)-1 ? block_idx - last_known_block : 1, (uint64_t)RX_RING_SIZE); assert (new_blocks > 0); + last_known_block = block_idx; int ring_idx = -1; + for(int i = 0; i < new_blocks; i++) { ring_idx = rx_ring_push(); - rx_ring[ring_idx].block_idx = modN(block_idx - new_blocks + i + 1, 256); + rx_ring[ring_idx].block_idx = block_idx + i + 1 - new_blocks; rx_ring[ring_idx].send_fragment_idx = 0; rx_ring[ring_idx].has_fragments = 0; memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t)); @@ -323,8 +304,10 @@ int LocalAggregator::get_block_ring_idx(int block_idx) } -void LocalAggregator::process_packet(const uint8_t *buf, size_t size) +void Aggregator::process_packet(const uint8_t *buf, size_t size) { + uint8_t new_session_key[sizeof(session_key)]; + if(size == 0) return; if (size > MAX_FORWARDER_PACKET_SIZE) { @@ -349,11 +332,31 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size) return; } - if(crypto_box_open_easy(session_key, - ((wsession_key_t*)buf)->session_key, sizeof(wsession_key_t::session_key), ((wsession_key_t*)buf)->nonce, + if(crypto_box_open_easy(new_session_key, + ((wsession_key_t*)buf)->session_key_data, sizeof(wsession_key_t::session_key_data), + ((wsession_key_t*)buf)->session_key_nonce, tx_publickey, rx_secretkey) != 0) { fprintf(stderr, "unable to decrypt session key\n"); + return; + } + + if (memcmp(session_key, new_session_key, sizeof(session_key)) != 0) + { + fprintf(stderr, "New session detected\n"); + memcpy(session_key, new_session_key, sizeof(session_key)); + + rx_ring_front = 0; + rx_ring_alloc = 0; + last_known_block = (uint64_t)-1; + seq = 0; + for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++) + { + rx_ring[ring_idx].block_idx = 0; + rx_ring[ring_idx].send_fragment_idx = 0; + rx_ring[ring_idx].has_fragments = 0; + memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t)); + } } return; @@ -362,7 +365,6 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size) return; } - // Truncate block_idx to 8 bit uint8_t decrypted[MAX_FEC_PAYLOAD]; long long unsigned int decrypted_len; wblock_hdr_t *block_hdr = (wblock_hdr_t*)buf; @@ -374,14 +376,14 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size) sizeof(wblock_hdr_t), (uint8_t*)(&(block_hdr->nonce)), session_key) != 0) { - fprintf(stderr, "unable to decrypt packet #0x%Lx\n", (long long unsigned int)(block_hdr->nonce)); + fprintf(stderr, "unable to decrypt packet #0x%Lx\n", (long long unsigned int)(be64toh(block_hdr->nonce))); return; } assert(decrypted_len <= MAX_FEC_PAYLOAD); - uint8_t block_idx = (uint8_t)((block_hdr->nonce >> 8) & 0xff); - uint8_t fragment_idx = (uint8_t)(block_hdr->nonce & 0xff); + uint64_t block_idx = be64toh(block_hdr->nonce) >> 8; + uint8_t fragment_idx = (uint8_t)(be64toh(block_hdr->nonce) & 0xff); if (fragment_idx >= fec_n) { @@ -391,6 +393,8 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size) int ring_idx = get_block_ring_idx(block_idx); + //printf("got 0x%lx %d, ring_idx=%d\n", block_idx, fragment_idx, ring_idx); + //ignore already processed blocks if (ring_idx < 0) return; @@ -432,7 +436,6 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size) int nrm = modN(ring_idx - rx_ring_front, RX_RING_SIZE); for(int i=0; i <= nrm; i++) { - add_processed_block(rx_ring[rx_ring_front].block_idx); rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE); rx_ring_alloc -= 1; } @@ -440,27 +443,29 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size) } } -void LocalAggregator::send_packet(int ring_idx, int fragment_idx) +void Aggregator::send_packet(int ring_idx, int fragment_idx) { wpacket_hdr_t* packet_hdr = (wpacket_hdr_t*)(rx_ring[ring_idx].fragments[fragment_idx]); uint8_t *payload = (rx_ring[ring_idx].fragments[fragment_idx]) + sizeof(wpacket_hdr_t); + uint16_t packet_size = be16toh(packet_hdr->packet_size); + uint32_t packet_seq = rx_ring[ring_idx].block_idx * fec_k + fragment_idx; - if (packet_hdr->seq > seq + 1) + if (packet_seq > seq + 1) { - fprintf(stderr, "%u packets lost\n", packet_hdr->seq - seq - 1); + fprintf(stderr, "%u packets lost\n", packet_seq - seq - 1); } - seq = packet_hdr->seq; + seq = packet_seq; - if(packet_hdr->packet_size > MAX_PAYLOAD_SIZE) + if(packet_size > MAX_PAYLOAD_SIZE) { fprintf(stderr, "corrupted packet %u\n", seq); }else{ - send(sockfd, payload, packet_hdr->packet_size, 0); + send(sockfd, payload, packet_size, 0); } } -void LocalAggregator::apply_fec(int ring_idx) +void Aggregator::apply_fec(int ring_idx) { unsigned index[fec_k]; uint8_t *in_blocks[fec_k]; @@ -549,11 +554,11 @@ int main(int argc, char* const *argv) struct pollfd fds[MAX_RX_INTERFACES]; Receiver* rx[MAX_RX_INTERFACES]; - shared_ptr agg; + shared_ptr agg; if(rx_mode == LOCAL){ - agg = shared_ptr(new LocalAggregator(client_addr, client_port, k, n, keypair)); + agg = shared_ptr(new Aggregator(client_addr, client_port, k, n, keypair)); }else{ - agg = shared_ptr(new RemoteAggregator(client_addr, client_port)); + agg = shared_ptr(new Forwarder(client_addr, client_port)); } memset(fds, '\0', sizeof(fds)); @@ -591,7 +596,7 @@ int main(int argc, char* const *argv) uint8_t buf[MAX_FORWARDER_PACKET_SIZE]; int fd = open_udp_socket_for_rx(srv_port); - LocalAggregator agg(client_addr, client_port, k, n, keypair); + Aggregator agg(client_addr, client_port, k, n, keypair); for(;;) { diff --git a/rx.hpp b/rx.hpp index e5a2c738..66e4bf24 100644 --- a/rx.hpp +++ b/rx.hpp @@ -24,13 +24,13 @@ typedef enum { AGGREGATOR } rx_mode_t; -class Aggregator +class BaseAggregator { public: virtual void process_packet(const uint8_t *buf, size_t size) = 0; protected: - int open_udp_socket(const string &client_addr, int client_port) + int open_udp_socket_for_tx(const string &client_addr, int client_port) { struct sockaddr_in saddr; int fd = socket(AF_INET, SOCK_DGRAM, 0); @@ -50,11 +50,11 @@ class Aggregator }; -class RemoteAggregator : public Aggregator +class Forwarder : public BaseAggregator { public: - RemoteAggregator(const string &client_addr, int client_port); - ~RemoteAggregator(); + Forwarder(const string &client_addr, int client_port); + ~Forwarder(); virtual void process_packet(const uint8_t *buf, size_t size); private: @@ -63,7 +63,7 @@ class RemoteAggregator : public Aggregator typedef struct { - uint8_t block_idx; + uint64_t block_idx; uint8_t** fragments; uint8_t *fragment_map; uint8_t send_fragment_idx; @@ -72,24 +72,22 @@ typedef struct { #define RX_RING_SIZE 40 -#define PROC_RING_SIZE 40 static inline int modN(int x, int base) { return (base + (x % base)) % base; } -class LocalAggregator : public Aggregator +class Aggregator : public BaseAggregator { public: - LocalAggregator(const string &client_addr, int client_port, int k, int n, const string &keypair); - ~LocalAggregator(); + Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair); + ~Aggregator(); virtual void process_packet(const uint8_t *buf, size_t size); private: void send_packet(int ring_idx, int fragment_idx); void apply_fec(int ring_idx); - int get_block_ring_idx(int block_idx); - void add_processed_block(int block_idx); + int get_block_ring_idx(uint64_t block_idx); int rx_ring_push(void); fec_t* fec_p; int fec_k; // RS number of primary fragments in block @@ -99,8 +97,7 @@ class LocalAggregator : public Aggregator rx_ring_item_t rx_ring[RX_RING_SIZE]; int rx_ring_front; // current packet int rx_ring_alloc; // number of allocated entries - int proc_ring[PROC_RING_SIZE]; - int proc_ring_last; // index to add processed packet + uint64_t last_known_block; //id of last known block // rx->tx keypair uint8_t rx_secretkey[crypto_box_SECRETKEYBYTES]; @@ -111,12 +108,12 @@ class LocalAggregator : public Aggregator class Receiver { public: - Receiver(const char* wlan, int port, Aggregator* agg); + Receiver(const char* wlan, int port, BaseAggregator* agg); ~Receiver(); void loop_iter(void); int getfd(void){ return fd; } private: - Aggregator *agg; + BaseAggregator *agg; int fd; pcap_t *ppcap; }; diff --git a/tx.cpp b/tx.cpp index 72216555..d2c79ce4 100644 --- a/tx.cpp +++ b/tx.cpp @@ -40,7 +40,7 @@ extern "C" #include "tx.hpp" Transmitter::Transmitter(int k, int n, const string &keypair): fec_k(k), fec_n(n), block_idx(0), - fragment_idx(0), seq(0), + fragment_idx(0), max_packet_size(0) { fec_p = fec_new(fec_k, fec_n); @@ -62,9 +62,9 @@ Transmitter::Transmitter(int k, int n, const string &keypair): fec_k(k), fec_n( randombytes_buf(session_key, sizeof(session_key)); session_key_packet.packet_type = WFB_PACKET_KEY; - randombytes_buf(session_key_packet.nonce, sizeof(session_key_packet.nonce)); - if (crypto_box_easy(session_key_packet.session_key, session_key, sizeof(session_key), - session_key_packet.nonce, rx_publickey, tx_secretkey) != 0) + randombytes_buf(session_key_packet.session_key_nonce, sizeof(session_key_packet.session_key_nonce)); + if (crypto_box_easy(session_key_packet.session_key_data, session_key, sizeof(session_key), + session_key_packet.session_key_nonce, rx_publickey, tx_secretkey) != 0) { throw runtime_error("Unable to make session key!"); } @@ -146,7 +146,7 @@ void Transmitter::send_block_fragment(size_t packet_size) assert(packet_size <= MAX_FEC_PAYLOAD); block_hdr->packet_type = WFB_PACKET_DATA; - block_hdr->nonce = ((block_idx & BLOCK_IDX_MASK) << 8) + fragment_idx; + block_hdr->nonce = htobe64(((block_idx & BLOCK_IDX_MASK) << 8) + fragment_idx); // encrypted payload crypto_aead_chacha20poly1305_encrypt(ciphertext + sizeof(wblock_hdr_t), &ciphertext_len, @@ -159,6 +159,7 @@ void Transmitter::send_block_fragment(size_t packet_size) void Transmitter::send_session_key(void) { + //fprintf(stderr, "Announce session key\n"); inject_packet((uint8_t*)&session_key_packet, sizeof(session_key_packet)); } @@ -167,8 +168,7 @@ void Transmitter::send_packet(const uint8_t *buf, size_t size) wpacket_hdr_t packet_hdr; assert(size <= MAX_PAYLOAD_SIZE); - packet_hdr.seq = seq++; - packet_hdr.packet_size = size; + packet_hdr.packet_size = htobe16(size); memset(block[fragment_idx], '\0', MAX_FEC_PAYLOAD); memcpy(block[fragment_idx], &packet_hdr, sizeof(packet_hdr)); memcpy(block[fragment_idx] + sizeof(packet_hdr), buf, size); @@ -196,7 +196,7 @@ uint64_t get_system_time(void) // in milliseconds return te.tv_sec * 1000LL + te.tv_usec / 1000; } -void normal_rx(Transmitter *t, int fd) +void video_source(Transmitter *t, int fd) { uint8_t buf[MAX_PAYLOAD_SIZE]; uint64_t session_key_announce_ts = 0; @@ -215,7 +215,7 @@ void normal_rx(Transmitter *t, int fd) } } -void mavlink_rx(Transmitter *t, int fd, int agg_latency) +void mavlink_source(Transmitter *t, int fd, int agg_latency) { struct pollfd fds[1]; @@ -345,15 +345,17 @@ int main(int argc, char * const *argv) try { int fd = open_udp_socket_for_rx(udp_port); +#ifdef DEBUG_TX + shared_ptrt = shared_ptr(new UdpTransmitter(k, n, keypair, "127.0.0.1", 5601)); +#else shared_ptrt = shared_ptr(new PcapTransmitter(k, n, keypair, radio_port, argv[optind])); - //shared_ptrt = shared_ptr(new UdpTransmitter(k, n, keypair, "127.0.0.1", 5601)); - +#endif if (mavlink_mode) { - mavlink_rx(t.get(), fd, mavlink_agg_latency); + mavlink_source(t.get(), fd, mavlink_agg_latency); }else { - normal_rx(t.get(), fd); + video_source(t.get(), fd); } }catch(runtime_error &e) { diff --git a/tx.hpp b/tx.hpp index e913e524..1f8ecce2 100644 --- a/tx.hpp +++ b/tx.hpp @@ -41,7 +41,6 @@ class Transmitter int fec_n; // RS total number of fragments in block uint64_t block_idx; //block_idx << 8 + fragment_idx = nonce (64bit) uint8_t fragment_idx; - uint32_t seq; uint8_t** block; size_t max_packet_size; diff --git a/wifibroadcast.hpp b/wifibroadcast.hpp index dbbd996f..941afb7b 100644 --- a/wifibroadcast.hpp +++ b/wifibroadcast.hpp @@ -33,6 +33,7 @@ #include #include #include +#include #define MAX_PACKET_SIZE 1510 #define MAX_RX_INTERFACES 8 @@ -89,8 +90,19 @@ static uint8_t ieee80211_header[] = { 0x00, 0x00, // seq num << 4 + fragment num }; +/* + Wifibroadcast protocol: + + radiotap_header + ieee_80211_header + wblock_hdr_t { packet_type, nonce = (block_idx << 8 + fragment_idx) } + wpacket_hdr_t { packet_size } # + data # + +-- encrypted + + */ -// nounce: 56bit block_idx + 8bit fragment_idx +// nonce: 56bit block_idx + 8bit fragment_idx #define BLOCK_IDX_MASK ((1LU << 56) - 1) @@ -99,21 +111,30 @@ static uint8_t ieee80211_header[] = { #define SESSION_KEY_ANNOUNCE_MSEC 1000 + +// Network packet headers. All numbers are in network (big endian) format +// Encrypted packets can be either session key or data packet. + +// Session key packet + typedef struct { uint8_t packet_type; - uint64_t nonce; -} __attribute__ ((packed)) wblock_hdr_t; + uint8_t session_key_nonce[crypto_box_NONCEBYTES]; // random data + uint8_t session_key_data[crypto_aead_chacha20poly1305_KEYBYTES + crypto_box_MACBYTES]; // encrypted session key +} __attribute__ ((packed)) wsession_key_t; -typedef struct { - uint32_t seq; - uint16_t packet_size; -} __attribute__ ((packed)) wpacket_hdr_t; +// Data packet. Embed FEC-encoded data typedef struct { uint8_t packet_type; - uint8_t nonce[crypto_box_NONCEBYTES]; - uint8_t session_key[crypto_aead_chacha20poly1305_KEYBYTES + crypto_box_MACBYTES]; // encrypted session key -} __attribute__ ((packed)) wsession_key_t; + uint64_t nonce; // big endian, nonce = block_idx << 8 + fragment_idx +} __attribute__ ((packed)) wblock_hdr_t; + +// Plain data packet after FEC decode + +typedef struct { + uint16_t packet_size; // big endian +} __attribute__ ((packed)) wpacket_hdr_t; #define MAX_PAYLOAD_SIZE (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header) - sizeof(wblock_hdr_t) - crypto_aead_chacha20poly1305_ABYTES - sizeof(wpacket_hdr_t)) #define MAX_FEC_PAYLOAD (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header) - sizeof(wblock_hdr_t) - crypto_aead_chacha20poly1305_ABYTES)