Skip to content

Commit

Permalink
Simplify aggregator logic
Browse files Browse the repository at this point in the history
Remove seq from data packets
Add compatibility for different endianess on RX/TX
  • Loading branch information
svpcom committed Oct 12, 2017
1 parent 28bd483 commit 95876f4
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 96 deletions.
117 changes: 61 additions & 56 deletions rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extern "C"
#include "rx.hpp"


Receiver::Receiver(const char *wlan, int radio_port, Aggregator *agg) : agg(agg)
Receiver::Receiver(const char *wlan, int radio_port, BaseAggregator *agg) : agg(agg)
{
char errbuf[PCAP_ERRBUF_SIZE];

Expand Down Expand Up @@ -189,10 +189,11 @@ void Receiver::loop_iter(void)
}


LocalAggregator::LocalAggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), proc_ring_last(0)
Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1)
{
sockfd = open_udp_socket(client_addr, client_port);
sockfd = open_udp_socket_for_tx(client_addr, client_port);
fec_p = fec_new(fec_k, fec_n);
memset(session_key, '\0', sizeof(session_key));

for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{
Expand All @@ -208,11 +209,6 @@ LocalAggregator::LocalAggregator(const string &client_addr, int client_port, int
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
}

for(int ring_idx = 0; ring_idx < PROC_RING_SIZE; ring_idx++)
{
proc_ring[ring_idx] = -1;
}

FILE *fp;
if((fp = fopen(keypair.c_str(), "r")) == NULL)
{
Expand All @@ -224,7 +220,7 @@ LocalAggregator::LocalAggregator(const string &client_addr, int client_port, int
}


LocalAggregator::~LocalAggregator()
Aggregator::~Aggregator()
{

for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
Expand All @@ -240,31 +236,24 @@ LocalAggregator::~LocalAggregator()
}


RemoteAggregator::RemoteAggregator(const string &client_addr, int client_port)
Forwarder::Forwarder(const string &client_addr, int client_port)
{
sockfd = open_udp_socket(client_addr, client_port);
sockfd = open_udp_socket_for_tx(client_addr, client_port);
}


void RemoteAggregator::process_packet(const uint8_t *buf, size_t size)
void Forwarder::process_packet(const uint8_t *buf, size_t size)
{
send(sockfd, buf, size, 0);
}


RemoteAggregator::~RemoteAggregator()
Forwarder::~Forwarder()
{
close(sockfd);
}


void LocalAggregator::add_processed_block(int block_idx)
{
proc_ring[proc_ring_last] = block_idx;
proc_ring_last = modN(proc_ring_last + 1, PROC_RING_SIZE);
}

int LocalAggregator::rx_ring_push(void)
int Aggregator::rx_ring_push(void)
{
if(rx_ring_alloc < RX_RING_SIZE)
{
Expand All @@ -276,15 +265,14 @@ int LocalAggregator::rx_ring_push(void)
// override existing data
int idx = rx_ring_front;

fprintf(stderr, "override block %d with %d fragments\n", rx_ring[idx].block_idx, rx_ring[idx].has_fragments);
fprintf(stderr, "override block 0x%Lx with %d fragments\n", (long long unsigned int)(rx_ring[idx].block_idx), rx_ring[idx].has_fragments);

add_processed_block(rx_ring[idx].block_idx);
rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE);
return idx;
}


int LocalAggregator::get_block_ring_idx(int block_idx)
int Aggregator::get_block_ring_idx(uint64_t block_idx)
{
// check if block already added
for(int i = rx_ring_front, c = rx_ring_alloc; c > 0; i = modN(i + 1, RX_RING_SIZE), c--)
Expand All @@ -293,28 +281,21 @@ int LocalAggregator::get_block_ring_idx(int block_idx)
}

// check if block was already processed
for(int i=0; i < PROC_RING_SIZE; i++)
{
if (proc_ring[i] == block_idx) return -1;
}

int new_blocks;
if (rx_ring_alloc > 0)
if (last_known_block != (uint64_t)-1 && block_idx <= last_known_block)
{
new_blocks = modN(block_idx - rx_ring[modN(rx_ring_front + rx_ring_alloc - 1, RX_RING_SIZE)].block_idx, 256);
}else
{
int last_proc_block = proc_ring[modN(proc_ring_last - 1, PROC_RING_SIZE)];
new_blocks = modN(last_proc_block >=0 ? block_idx - last_proc_block : 1, 256);
return -1;
}

int new_blocks = (int)min(last_known_block != (uint64_t)-1 ? block_idx - last_known_block : 1, (uint64_t)RX_RING_SIZE);
assert (new_blocks > 0);

last_known_block = block_idx;
int ring_idx = -1;

for(int i = 0; i < new_blocks; i++)
{
ring_idx = rx_ring_push();
rx_ring[ring_idx].block_idx = modN(block_idx - new_blocks + i + 1, 256);
rx_ring[ring_idx].block_idx = block_idx + i + 1 - new_blocks;
rx_ring[ring_idx].send_fragment_idx = 0;
rx_ring[ring_idx].has_fragments = 0;
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
Expand All @@ -323,8 +304,10 @@ int LocalAggregator::get_block_ring_idx(int block_idx)
}


void LocalAggregator::process_packet(const uint8_t *buf, size_t size)
void Aggregator::process_packet(const uint8_t *buf, size_t size)
{
uint8_t new_session_key[sizeof(session_key)];

if(size == 0) return;
if (size > MAX_FORWARDER_PACKET_SIZE)
{
Expand All @@ -349,11 +332,31 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size)
return;
}

if(crypto_box_open_easy(session_key,
((wsession_key_t*)buf)->session_key, sizeof(wsession_key_t::session_key), ((wsession_key_t*)buf)->nonce,
if(crypto_box_open_easy(new_session_key,
((wsession_key_t*)buf)->session_key_data, sizeof(wsession_key_t::session_key_data),
((wsession_key_t*)buf)->session_key_nonce,
tx_publickey, rx_secretkey) != 0)
{
fprintf(stderr, "unable to decrypt session key\n");
return;
}

if (memcmp(session_key, new_session_key, sizeof(session_key)) != 0)
{
fprintf(stderr, "New session detected\n");
memcpy(session_key, new_session_key, sizeof(session_key));

rx_ring_front = 0;
rx_ring_alloc = 0;
last_known_block = (uint64_t)-1;
seq = 0;
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{
rx_ring[ring_idx].block_idx = 0;
rx_ring[ring_idx].send_fragment_idx = 0;
rx_ring[ring_idx].has_fragments = 0;
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
}
}
return;

Expand All @@ -362,7 +365,6 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size)
return;
}

// Truncate block_idx to 8 bit
uint8_t decrypted[MAX_FEC_PAYLOAD];
long long unsigned int decrypted_len;
wblock_hdr_t *block_hdr = (wblock_hdr_t*)buf;
Expand All @@ -374,14 +376,14 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size)
sizeof(wblock_hdr_t),
(uint8_t*)(&(block_hdr->nonce)), session_key) != 0)
{
fprintf(stderr, "unable to decrypt packet #0x%Lx\n", (long long unsigned int)(block_hdr->nonce));
fprintf(stderr, "unable to decrypt packet #0x%Lx\n", (long long unsigned int)(be64toh(block_hdr->nonce)));
return;
}

assert(decrypted_len <= MAX_FEC_PAYLOAD);

uint8_t block_idx = (uint8_t)((block_hdr->nonce >> 8) & 0xff);
uint8_t fragment_idx = (uint8_t)(block_hdr->nonce & 0xff);
uint64_t block_idx = be64toh(block_hdr->nonce) >> 8;
uint8_t fragment_idx = (uint8_t)(be64toh(block_hdr->nonce) & 0xff);

if (fragment_idx >= fec_n)
{
Expand All @@ -391,6 +393,8 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size)

int ring_idx = get_block_ring_idx(block_idx);

//printf("got 0x%lx %d, ring_idx=%d\n", block_idx, fragment_idx, ring_idx);

//ignore already processed blocks
if (ring_idx < 0) return;

Expand Down Expand Up @@ -432,35 +436,36 @@ void LocalAggregator::process_packet(const uint8_t *buf, size_t size)
int nrm = modN(ring_idx - rx_ring_front, RX_RING_SIZE);
for(int i=0; i <= nrm; i++)
{
add_processed_block(rx_ring[rx_ring_front].block_idx);
rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE);
rx_ring_alloc -= 1;
}
assert(rx_ring_alloc >= 0);
}
}

void LocalAggregator::send_packet(int ring_idx, int fragment_idx)
void Aggregator::send_packet(int ring_idx, int fragment_idx)
{
wpacket_hdr_t* packet_hdr = (wpacket_hdr_t*)(rx_ring[ring_idx].fragments[fragment_idx]);
uint8_t *payload = (rx_ring[ring_idx].fragments[fragment_idx]) + sizeof(wpacket_hdr_t);
uint16_t packet_size = be16toh(packet_hdr->packet_size);
uint32_t packet_seq = rx_ring[ring_idx].block_idx * fec_k + fragment_idx;

if (packet_hdr->seq > seq + 1)
if (packet_seq > seq + 1)
{
fprintf(stderr, "%u packets lost\n", packet_hdr->seq - seq - 1);
fprintf(stderr, "%u packets lost\n", packet_seq - seq - 1);
}

seq = packet_hdr->seq;
seq = packet_seq;

if(packet_hdr->packet_size > MAX_PAYLOAD_SIZE)
if(packet_size > MAX_PAYLOAD_SIZE)
{
fprintf(stderr, "corrupted packet %u\n", seq);
}else{
send(sockfd, payload, packet_hdr->packet_size, 0);
send(sockfd, payload, packet_size, 0);
}
}

void LocalAggregator::apply_fec(int ring_idx)
void Aggregator::apply_fec(int ring_idx)
{
unsigned index[fec_k];
uint8_t *in_blocks[fec_k];
Expand Down Expand Up @@ -549,11 +554,11 @@ int main(int argc, char* const *argv)
struct pollfd fds[MAX_RX_INTERFACES];
Receiver* rx[MAX_RX_INTERFACES];

shared_ptr<Aggregator> agg;
shared_ptr<BaseAggregator> agg;
if(rx_mode == LOCAL){
agg = shared_ptr<LocalAggregator>(new LocalAggregator(client_addr, client_port, k, n, keypair));
agg = shared_ptr<Aggregator>(new Aggregator(client_addr, client_port, k, n, keypair));
}else{
agg = shared_ptr<RemoteAggregator>(new RemoteAggregator(client_addr, client_port));
agg = shared_ptr<Forwarder>(new Forwarder(client_addr, client_port));
}

memset(fds, '\0', sizeof(fds));
Expand Down Expand Up @@ -591,7 +596,7 @@ int main(int argc, char* const *argv)

uint8_t buf[MAX_FORWARDER_PACKET_SIZE];
int fd = open_udp_socket_for_rx(srv_port);
LocalAggregator agg(client_addr, client_port, k, n, keypair);
Aggregator agg(client_addr, client_port, k, n, keypair);

for(;;)
{
Expand Down
29 changes: 13 additions & 16 deletions rx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ typedef enum {
AGGREGATOR
} rx_mode_t;

class Aggregator
class BaseAggregator
{
public:
virtual void process_packet(const uint8_t *buf, size_t size) = 0;

protected:
int open_udp_socket(const string &client_addr, int client_port)
int open_udp_socket_for_tx(const string &client_addr, int client_port)
{
struct sockaddr_in saddr;
int fd = socket(AF_INET, SOCK_DGRAM, 0);
Expand All @@ -50,11 +50,11 @@ class Aggregator
};


class RemoteAggregator : public Aggregator
class Forwarder : public BaseAggregator
{
public:
RemoteAggregator(const string &client_addr, int client_port);
~RemoteAggregator();
Forwarder(const string &client_addr, int client_port);
~Forwarder();
virtual void process_packet(const uint8_t *buf, size_t size);

private:
Expand All @@ -63,7 +63,7 @@ class RemoteAggregator : public Aggregator


typedef struct {
uint8_t block_idx;
uint64_t block_idx;
uint8_t** fragments;
uint8_t *fragment_map;
uint8_t send_fragment_idx;
Expand All @@ -72,24 +72,22 @@ typedef struct {


#define RX_RING_SIZE 40
#define PROC_RING_SIZE 40

static inline int modN(int x, int base)
{
return (base + (x % base)) % base;
}

class LocalAggregator : public Aggregator
class Aggregator : public BaseAggregator
{
public:
LocalAggregator(const string &client_addr, int client_port, int k, int n, const string &keypair);
~LocalAggregator();
Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair);
~Aggregator();
virtual void process_packet(const uint8_t *buf, size_t size);
private:
void send_packet(int ring_idx, int fragment_idx);
void apply_fec(int ring_idx);
int get_block_ring_idx(int block_idx);
void add_processed_block(int block_idx);
int get_block_ring_idx(uint64_t block_idx);
int rx_ring_push(void);
fec_t* fec_p;
int fec_k; // RS number of primary fragments in block
Expand All @@ -99,8 +97,7 @@ class LocalAggregator : public Aggregator
rx_ring_item_t rx_ring[RX_RING_SIZE];
int rx_ring_front; // current packet
int rx_ring_alloc; // number of allocated entries
int proc_ring[PROC_RING_SIZE];
int proc_ring_last; // index to add processed packet
uint64_t last_known_block; //id of last known block

// rx->tx keypair
uint8_t rx_secretkey[crypto_box_SECRETKEYBYTES];
Expand All @@ -111,12 +108,12 @@ class LocalAggregator : public Aggregator
class Receiver
{
public:
Receiver(const char* wlan, int port, Aggregator* agg);
Receiver(const char* wlan, int port, BaseAggregator* agg);
~Receiver();
void loop_iter(void);
int getfd(void){ return fd; }
private:
Aggregator *agg;
BaseAggregator *agg;
int fd;
pcap_t *ppcap;
};
Loading

0 comments on commit 95876f4

Please sign in to comment.