Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement L0 protocol version 2 #31

Draft
wants to merge 6 commits into
base: dstn-beamid
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions L0_L1_packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
// ints and IEEE 754 floats.
//
// The packet header size in bytes is currently
// 24 + 2*nbeam + 2*nfreq_coarse + 8*nbeam*nfreq
// 32 + 2*nbeam + 2*nfreq_coarse + 8*nbeam*nfreq
//
// For full CHIME, we expect to use (nbeam, nfreq_coarse, nupfreq, ntsamp) = (8, 4, 16, 16)
// which gives a 304-byte header and an 8192-byte data segment.
Expand All @@ -53,7 +53,7 @@
//

struct L0_L1_header {
// This file describes protocol version 1.
// This file describes protocol version 2.
// A 32-bit protocol number is overkill, but means that all fields below are aligned on their
// "natural" boundaries (i.e. fields with size Nbytes have byte offsets which are multiples of Nbytes)
uint32_t protocol_version;
Expand All @@ -68,6 +68,9 @@ struct L0_L1_header {
// The duration in seconds is dt = (2.56e-6 * fpga_counts_per_sample)
uint16_t fpga_counts_per_sample;

// This is the time in nanoseconds since Unix epoch of the first FPGA sample (fpga_count=0)
uint64_t fpga_frame0_ns;

// This is the time index (in FPGA counts) of the first time sample in the packet.
// The packet sender is responsible for "unwrapping" the 32-bit FPGA timestamp to 64 bits.
// Must be divisible by fpga_counts_per_sample.
Expand Down Expand Up @@ -96,8 +99,8 @@ struct L0_L1_header {
// comprising a coarse frequency, and are the same for each of the 16 time samples in
// a packet. Thus the scale of offset arrays have shape (nbeam, nfreq_coarse).

float32 scale[nbeam * nfreq_coarse]; // byte offset (24 + 2*nbeam + 2*nfreq_coarse)
float32 offset[nbeam * nfreq_coarse]; // byte offset (24 + 2*nbeam + 2*nfreq_coarse + 2*nbeam*nfreq_coarse)
float32 scale[nbeam * nfreq_coarse]; // byte offset (32 + 2*nbeam + 2*nfreq_coarse)
float32 offset[nbeam * nfreq_coarse]; // byte offset (32 + 2*nbeam + 2*nfreq_coarse + 2*nbeam*nfreq_coarse)

// The uncompressed data array has shape (nbeam, nfreq_coarse, nupfreq, ntsamp),
// ordered so that the fastest changing index is the time dimension, i.e. each
Expand Down
7 changes: 6 additions & 1 deletion assembled_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ assembled_chunk::assembled_chunk(const assembled_chunk::initializer &ini_params)
binning(ini_params.binning),
stream_id(ini_params.stream_id),
ichunk(ini_params.ichunk),
frame0_nano(ini_params.frame0_nano),
frame0_nano(0),
nt_coarse(_nt_c(nt_per_packet)),
nscales(constants::nfreq_coarse_tot * nt_coarse),
ndata(constants::nfreq_coarse_tot * nupfreq * constants::nt_per_assembled_chunk),
Expand Down Expand Up @@ -326,6 +326,11 @@ void assembled_chunk::add_packet(const intensity_packet &packet)
// Offset relative to beginning of packet
uint64_t t0 = packet_t0 - isample;

// Pull ctime (in nanoseconds) corresponding to FPGAcount 0
// from the packet if we don't have it already.
if (frame0_nano == 0)
frame0_nano = packet.fpga_frame0_ns;

// The runtime checks in intensity_network_stream::_process_packet() should
// ensure that the following checks are redundant. I decided to include the
// redundant checks here in the "generic" assembled_chunk::add_packet(), but
Expand Down
2 changes: 1 addition & 1 deletion assembled_chunk_msgpack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ struct convert<std::shared_ptr<ch_frb_io::assembled_chunk> > {
ini_params.fpga_counts_per_sample = fpga_counts_per_sample;
ini_params.binning = binning;
ini_params.ichunk = ichunk;
ini_params.frame0_nano = frame0_nano;

if (version == 2)
ini_params.nrfifreq = arr[18].as<int>();

ch = ch_frb_io::assembled_chunk::make(ini_params);
ch->frame0_nano = frame0_nano;

if (ch->nt_coarse != nt_coarse)
throw std::runtime_error("ch_frb_io: assembled_chunk msgpack nt_coarse mismatch");
Expand Down
6 changes: 0 additions & 6 deletions assembled_chunk_ringbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream:
ini_params(ini_params_),
stream_id(stream_id_),
beam_id(beam_id_),
frame0_nano(0),
output_devices(ini_params.output_devices)
{
if ((beam_id < 0) || (beam_id > constants::max_allowed_beam_id))
Expand Down Expand Up @@ -66,10 +65,6 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream:
this->_check_invariants();
}

void assembled_chunk_ringbuf::set_frame0(uint64_t f0) {
frame0_nano = f0;
}

void assembled_chunk_ringbuf::print_state()
{
guard_t lock(mutx);
Expand Down Expand Up @@ -705,7 +700,6 @@ std::unique_ptr<assembled_chunk> assembled_chunk_ringbuf::_make_assembled_chunk(
chunk_params.nrfifreq = this->ini_params.nrfifreq;
chunk_params.nt_per_packet = this->ini_params.nt_per_packet;
chunk_params.fpga_counts_per_sample = this->ini_params.fpga_counts_per_sample;
chunk_params.frame0_nano = this->frame0_nano;
chunk_params.force_reference = this->ini_params.force_reference_kernels;
chunk_params.force_fast = this->ini_params.force_fast_kernels;
chunk_params.stream_id = this->stream_id;
Expand Down
5 changes: 5 additions & 0 deletions avx2_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,11 @@ void fast_assembled_chunk::add_packet(const intensity_packet &packet)
// Offset relative to beginning of packet
uint64_t t0 = packet.fpga_count / uint64_t(fpga_counts_per_sample) - isample;

// Pull ctime (in nanoseconds) corresponding to FPGAcount 0
// from the packet if we don't have it already.
if (frame0_nano == 0)
frame0_nano = packet.fpga_frame0_ns;

for (int f = 0; f < packet.nfreq_coarse; f++) {
int coarse_freq_id = packet.coarse_freq_ids[f];

Expand Down
17 changes: 1 addition & 16 deletions ch_frb_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,6 @@ class intensity_network_stream : noncopyable {
// treated as assembler misses.
int nt_align = 0;

// If 'frame0_url' is a nonempty string, then assembler thread will retrieve frame0 info by "curling" the URL.
std::string frame0_url = "";
int frame0_timeout = 3000;

// If ipaddr="0.0.0.0", then network thread will listen on all interfaces.
std::string ipaddr = "0.0.0.0";
int udp_port = constants::default_udp_port;
Expand Down Expand Up @@ -448,8 +444,6 @@ class intensity_network_stream : noncopyable {
// the given beam id.
// Raises runtime_error if the first packet has not been received yet.
uint64_t get_first_fpgacount();

uint64_t get_frame0_nano();

void add_first_packet_listener(first_packet_listener f);

Expand Down Expand Up @@ -578,9 +572,6 @@ class intensity_network_stream : noncopyable {
std::atomic<uint64_t> assembler_thread_waiting_usec;
std::atomic<uint64_t> assembler_thread_working_usec;

// Initialized to zero by constructor, set to nonzero value by assembler thread when first packet is received.
std::atomic<uint64_t> frame0_nano; // nanosecond time() value for fgpacount zero.

char _pad1b[constants::cache_line_size];

// Used only by the network thread (not protected by lock)
Expand Down Expand Up @@ -673,9 +664,6 @@ class intensity_network_stream : noncopyable {
// Private methods called by the assembler thread.
void _assembler_thread_body();
void _assembler_thread_exit();
// initializes 'frame0_nano' by curling 'frame0_url', called when first packet is received.
// NOTE that one must call curl_global_init() before, and curl_global_cleanup() after; in chime-frb-l1 we do this in the top-level main() method.
void _fetch_frame0();
};


Expand Down Expand Up @@ -731,9 +719,6 @@ class assembled_chunk : noncopyable {
bool force_reference = false;
bool force_fast = false;

// "ctime" in nanoseconds of FGPAcount zero
uint64_t frame0_nano = 0;

// If a memory slab has been preallocated from a pool, these pointers should be set.
// Otherwise, both pointers should be empty, and the assembled_chunk constructor will allocate.
std::shared_ptr<memory_slab_pool> pool;
Expand All @@ -750,7 +735,7 @@ class assembled_chunk : noncopyable {
const int stream_id = 0;
const uint64_t ichunk = 0;
// "ctime" in nanoseconds of FGPAcount zero
const uint64_t frame0_nano = 0;
uint64_t frame0_nano = 0;

// Derived parameters.
const int nt_coarse = 0; // equal to (constants::nt_per_assembled_chunk / nt_per_packet)
Expand Down
13 changes: 6 additions & 7 deletions ch_frb_io_internals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ std::string ip_to_string(const sockaddr_in &addr);


struct intensity_packet {
// "Header fields". These 24 bytes should have the same ordering and byte count as the
// "on-wire" packet, since we use memcpy(..., 24) to initialize them from the raw packet data.
// "Header fields". These 32 bytes should have the same ordering and byte count as the
// "on-wire" packet, since we use memcpy(..., intensity_fixed_header_length) to initialize them from the raw packet data.
uint32_t protocol_version;
int16_t data_nbytes;
uint16_t fpga_counts_per_sample;
uint64_t fpga_frame0_ns;
uint64_t fpga_count;
uint16_t nbeams;
uint16_t nfreq_coarse;
Expand All @@ -82,10 +83,12 @@ struct intensity_packet {
float *offsets; // 2D array of shape (nbeam, nfreq_coarse)
uint8_t *data; // array of shape (nbeam, nfreq_coarse, nupfreq, ntsamp)

/// Length of "header fields"
static const int intensity_fixed_header_length = 32;

static inline int header_size(int nbeams, int nfreq_coarse)
{
return 24 + 2*nbeams + 2*nfreq_coarse + 8*nbeams*nfreq_coarse;
return intensity_fixed_header_length + 2*nbeams + 2*nfreq_coarse + 8*nbeams*nfreq_coarse;
}

static inline int packet_size(int nbeams, int nfreq_coarse, int nupfreq, int nt_per_packet)
Expand Down Expand Up @@ -296,8 +299,6 @@ class assembled_chunk_ringbuf : noncopyable,
// Moves any remaining active chunks into the ring buffer, sets 'doneflag', initializes 'final_fpga'.
void end_stream(int64_t *event_counts);

void set_frame0(uint64_t frame0_nano);

// Debugging: inject the given chunk
bool inject_assembled_chunk(assembled_chunk* chunk);

Expand Down Expand Up @@ -356,8 +357,6 @@ class assembled_chunk_ringbuf : noncopyable,
const int stream_id; // only used in assembled_chunk::format_filename().
const int beam_id;

uint64_t frame0_nano; // nanosecond time() value for fgpacount zero

output_device_pool output_devices;

// Helper function called in assembler thread, to add a new assembled_chunk to the ring buffer.
Expand Down
4 changes: 2 additions & 2 deletions intensity_network_ostream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void intensity_network_ostream::_encode_chunk(const float *intensity, int istrid
intensity_packet packet;

// Some intensity_packet fields are packet-independent; these are initialized here.
packet.protocol_version = 1;
packet.protocol_version = 2;
packet.data_nbytes = nbeams * nfreq_coarse_per_packet * nupfreq * nt_per_packet;
packet.fpga_counts_per_sample = fpga_counts_per_sample;
packet.nbeams = nbeams;
Expand Down Expand Up @@ -463,7 +463,7 @@ void intensity_network_ostream::_send_end_of_stream_packets()
// reaches the other side, but we'll make a best effort by sending 5 packets separated by 0.1 sec.

for (int ipacket = 0; ipacket < 5; ipacket++) {
vector<uint8_t> packet(24, uint8_t(0));
vector<uint8_t> packet(intensity_packet::intensity_fixed_header_length, uint8_t(0));
*((uint32_t *) &packet[0]) = uint32_t(1); // protocol number

ssize_t n = send(this->sockfd, &packet[0], packet.size(), 0);
Expand Down
Loading