diff --git a/L0_L1_packet.hpp b/L0_L1_packet.hpp index 6e2d1cb..ecafe08 100644 --- a/L0_L1_packet.hpp +++ b/L0_L1_packet.hpp @@ -43,7 +43,7 @@ // ints and IEEE 754 floats. // // The packet header size in bytes is currently -// 24 + 2*nbeam + 2*nfreq_coarse + 8*nbeam*nfreq +// 32 + 2*nbeam + 2*nfreq_coarse + 8*nbeam*nfreq // // For full CHIME, we expect to use (nbeam, nfreq_coarse, nupfreq, ntsamp) = (8, 4, 16, 16) // which gives a 304-byte header and an 8192-byte data segment. @@ -53,7 +53,7 @@ // struct L0_L1_header { - // This file describes protocol version 1. + // This file describes protocol version 2. // A 32-bit protocol number is overkill, but means that all fields below are aligned on their // "natural" boundaries (i.e. fields with size Nbytes have byte offsets which are multiples of Nbytes) uint32_t protocol_version; @@ -68,6 +68,9 @@ struct L0_L1_header { // The duration in seconds is dt = (2.56e-6 * fpga_counts_per_sample) uint16_t fpga_counts_per_sample; + // This is the time in nanoseconds since Unix epoch of the first FPGA sample (fpga_count=0) + uint64_t fpga_frame0_ns; + // This is the time index (in FPGA counts) of the first time sample in the packet. // The packet sender is responsible for "unwrapping" the 32-bit FPGA timestamp to 64 bits. // Must be divisible by fpga_counts_per_sample. @@ -96,8 +99,8 @@ struct L0_L1_header { // comprising a coarse frequency, and are the same for each of the 16 time samples in // a packet. Thus the scale of offset arrays have shape (nbeam, nfreq_coarse). - float32 scale[nbeam * nfreq_coarse]; // byte offset (24 + 2*nbeam + 2*nfreq_coarse) - float32 offset[nbeam * nfreq_coarse]; // byte offset (24 + 2*nbeam + 2*nfreq_coarse + 2*nbeam*nfreq_coarse) + float32 scale[nbeam * nfreq_coarse]; // byte offset (32 + 2*nbeam + 2*nfreq_coarse) + float32 offset[nbeam * nfreq_coarse]; // byte offset (32 + 2*nbeam + 2*nfreq_coarse + 2*nbeam*nfreq_coarse) // The uncompressed data array has shape (nbeam, nfreq_coarse, nupfreq, ntsamp), // ordered so that the fastest changing index is the time dimension, i.e. each diff --git a/assembled_chunk.cpp b/assembled_chunk.cpp index c33df13..e24b4ed 100644 --- a/assembled_chunk.cpp +++ b/assembled_chunk.cpp @@ -102,7 +102,7 @@ assembled_chunk::assembled_chunk(const assembled_chunk::initializer &ini_params) binning(ini_params.binning), stream_id(ini_params.stream_id), ichunk(ini_params.ichunk), - frame0_nano(ini_params.frame0_nano), + frame0_nano(0), nt_coarse(_nt_c(nt_per_packet)), nscales(constants::nfreq_coarse_tot * nt_coarse), ndata(constants::nfreq_coarse_tot * nupfreq * constants::nt_per_assembled_chunk), @@ -326,6 +326,11 @@ void assembled_chunk::add_packet(const intensity_packet &packet) // Offset relative to beginning of packet uint64_t t0 = packet_t0 - isample; + // Pull ctime (in nanoseconds) corresponding to FPGAcount 0 + // from the packet if we don't have it already. + if (frame0_nano == 0) + frame0_nano = packet.fpga_frame0_ns; + // The runtime checks in intensity_network_stream::_process_packet() should // ensure that the following checks are redundant. I decided to include the // redundant checks here in the "generic" assembled_chunk::add_packet(), but diff --git a/assembled_chunk_msgpack.hpp b/assembled_chunk_msgpack.hpp index 5cb7670..1d7e07c 100644 --- a/assembled_chunk_msgpack.hpp +++ b/assembled_chunk_msgpack.hpp @@ -184,12 +184,12 @@ struct convert > { ini_params.fpga_counts_per_sample = fpga_counts_per_sample; ini_params.binning = binning; ini_params.ichunk = ichunk; - ini_params.frame0_nano = frame0_nano; if (version == 2) ini_params.nrfifreq = arr[18].as(); ch = ch_frb_io::assembled_chunk::make(ini_params); + ch->frame0_nano = frame0_nano; if (ch->nt_coarse != nt_coarse) throw std::runtime_error("ch_frb_io: assembled_chunk msgpack nt_coarse mismatch"); diff --git a/assembled_chunk_ringbuf.cpp b/assembled_chunk_ringbuf.cpp index 9c4620c..e9afeab 100644 --- a/assembled_chunk_ringbuf.cpp +++ b/assembled_chunk_ringbuf.cpp @@ -22,7 +22,6 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream: ini_params(ini_params_), stream_id(stream_id_), beam_id(beam_id_), - frame0_nano(0), output_devices(ini_params.output_devices) { if ((beam_id < 0) || (beam_id > constants::max_allowed_beam_id)) @@ -66,10 +65,6 @@ assembled_chunk_ringbuf::assembled_chunk_ringbuf(const intensity_network_stream: this->_check_invariants(); } -void assembled_chunk_ringbuf::set_frame0(uint64_t f0) { - frame0_nano = f0; -} - void assembled_chunk_ringbuf::print_state() { guard_t lock(mutx); @@ -705,7 +700,6 @@ std::unique_ptr assembled_chunk_ringbuf::_make_assembled_chunk( chunk_params.nrfifreq = this->ini_params.nrfifreq; chunk_params.nt_per_packet = this->ini_params.nt_per_packet; chunk_params.fpga_counts_per_sample = this->ini_params.fpga_counts_per_sample; - chunk_params.frame0_nano = this->frame0_nano; chunk_params.force_reference = this->ini_params.force_reference_kernels; chunk_params.force_fast = this->ini_params.force_fast_kernels; chunk_params.stream_id = this->stream_id; diff --git a/avx2_kernels.cpp b/avx2_kernels.cpp index 96822a3..23e3996 100644 --- a/avx2_kernels.cpp +++ b/avx2_kernels.cpp @@ -924,6 +924,11 @@ void fast_assembled_chunk::add_packet(const intensity_packet &packet) // Offset relative to beginning of packet uint64_t t0 = packet.fpga_count / uint64_t(fpga_counts_per_sample) - isample; + // Pull ctime (in nanoseconds) corresponding to FPGAcount 0 + // from the packet if we don't have it already. + if (frame0_nano == 0) + frame0_nano = packet.fpga_frame0_ns; + for (int f = 0; f < packet.nfreq_coarse; f++) { int coarse_freq_id = packet.coarse_freq_ids[f]; diff --git a/ch_frb_io.hpp b/ch_frb_io.hpp index 87bb8ca..649dd96 100644 --- a/ch_frb_io.hpp +++ b/ch_frb_io.hpp @@ -342,10 +342,6 @@ class intensity_network_stream : noncopyable { // treated as assembler misses. int nt_align = 0; - // If 'frame0_url' is a nonempty string, then assembler thread will retrieve frame0 info by "curling" the URL. - std::string frame0_url = ""; - int frame0_timeout = 3000; - // If ipaddr="0.0.0.0", then network thread will listen on all interfaces. std::string ipaddr = "0.0.0.0"; int udp_port = constants::default_udp_port; @@ -448,8 +444,6 @@ class intensity_network_stream : noncopyable { // the given beam id. // Raises runtime_error if the first packet has not been received yet. uint64_t get_first_fpgacount(); - - uint64_t get_frame0_nano(); void add_first_packet_listener(first_packet_listener f); @@ -578,9 +572,6 @@ class intensity_network_stream : noncopyable { std::atomic assembler_thread_waiting_usec; std::atomic assembler_thread_working_usec; - // Initialized to zero by constructor, set to nonzero value by assembler thread when first packet is received. - std::atomic frame0_nano; // nanosecond time() value for fgpacount zero. - char _pad1b[constants::cache_line_size]; // Used only by the network thread (not protected by lock) @@ -673,9 +664,6 @@ class intensity_network_stream : noncopyable { // Private methods called by the assembler thread. void _assembler_thread_body(); void _assembler_thread_exit(); - // initializes 'frame0_nano' by curling 'frame0_url', called when first packet is received. - // NOTE that one must call curl_global_init() before, and curl_global_cleanup() after; in chime-frb-l1 we do this in the top-level main() method. - void _fetch_frame0(); }; @@ -731,9 +719,6 @@ class assembled_chunk : noncopyable { bool force_reference = false; bool force_fast = false; - // "ctime" in nanoseconds of FGPAcount zero - uint64_t frame0_nano = 0; - // If a memory slab has been preallocated from a pool, these pointers should be set. // Otherwise, both pointers should be empty, and the assembled_chunk constructor will allocate. std::shared_ptr pool; @@ -750,7 +735,7 @@ class assembled_chunk : noncopyable { const int stream_id = 0; const uint64_t ichunk = 0; // "ctime" in nanoseconds of FGPAcount zero - const uint64_t frame0_nano = 0; + uint64_t frame0_nano = 0; // Derived parameters. const int nt_coarse = 0; // equal to (constants::nt_per_assembled_chunk / nt_per_packet) diff --git a/ch_frb_io_internals.hpp b/ch_frb_io_internals.hpp index 219866c..a34adaa 100644 --- a/ch_frb_io_internals.hpp +++ b/ch_frb_io_internals.hpp @@ -62,11 +62,12 @@ std::string ip_to_string(const sockaddr_in &addr); struct intensity_packet { - // "Header fields". These 24 bytes should have the same ordering and byte count as the - // "on-wire" packet, since we use memcpy(..., 24) to initialize them from the raw packet data. + // "Header fields". These 32 bytes should have the same ordering and byte count as the + // "on-wire" packet, since we use memcpy(..., intensity_fixed_header_length) to initialize them from the raw packet data. uint32_t protocol_version; int16_t data_nbytes; uint16_t fpga_counts_per_sample; + uint64_t fpga_frame0_ns; uint64_t fpga_count; uint16_t nbeams; uint16_t nfreq_coarse; @@ -82,10 +83,12 @@ struct intensity_packet { float *offsets; // 2D array of shape (nbeam, nfreq_coarse) uint8_t *data; // array of shape (nbeam, nfreq_coarse, nupfreq, ntsamp) + /// Length of "header fields" + static const int intensity_fixed_header_length = 32; static inline int header_size(int nbeams, int nfreq_coarse) { - return 24 + 2*nbeams + 2*nfreq_coarse + 8*nbeams*nfreq_coarse; + return intensity_fixed_header_length + 2*nbeams + 2*nfreq_coarse + 8*nbeams*nfreq_coarse; } static inline int packet_size(int nbeams, int nfreq_coarse, int nupfreq, int nt_per_packet) @@ -296,8 +299,6 @@ class assembled_chunk_ringbuf : noncopyable, // Moves any remaining active chunks into the ring buffer, sets 'doneflag', initializes 'final_fpga'. void end_stream(int64_t *event_counts); - void set_frame0(uint64_t frame0_nano); - // Debugging: inject the given chunk bool inject_assembled_chunk(assembled_chunk* chunk); @@ -356,8 +357,6 @@ class assembled_chunk_ringbuf : noncopyable, const int stream_id; // only used in assembled_chunk::format_filename(). const int beam_id; - uint64_t frame0_nano; // nanosecond time() value for fgpacount zero - output_device_pool output_devices; // Helper function called in assembler thread, to add a new assembled_chunk to the ring buffer. diff --git a/intensity_network_ostream.cpp b/intensity_network_ostream.cpp index dd6f57c..944a812 100644 --- a/intensity_network_ostream.cpp +++ b/intensity_network_ostream.cpp @@ -254,7 +254,7 @@ void intensity_network_ostream::_encode_chunk(const float *intensity, int istrid intensity_packet packet; // Some intensity_packet fields are packet-independent; these are initialized here. - packet.protocol_version = 1; + packet.protocol_version = 2; packet.data_nbytes = nbeams * nfreq_coarse_per_packet * nupfreq * nt_per_packet; packet.fpga_counts_per_sample = fpga_counts_per_sample; packet.nbeams = nbeams; @@ -463,7 +463,7 @@ void intensity_network_ostream::_send_end_of_stream_packets() // reaches the other side, but we'll make a best effort by sending 5 packets separated by 0.1 sec. for (int ipacket = 0; ipacket < 5; ipacket++) { - vector packet(24, uint8_t(0)); + vector packet(intensity_packet::intensity_fixed_header_length, uint8_t(0)); *((uint32_t *) &packet[0]) = uint32_t(1); // protocol number ssize_t n = send(this->sockfd, &packet[0], packet.size(), 0); diff --git a/intensity_network_stream.cpp b/intensity_network_stream.cpp index 367247a..2b34904 100644 --- a/intensity_network_stream.cpp +++ b/intensity_network_stream.cpp @@ -13,7 +13,6 @@ #include #include -#include #include #include "ch_frb_io_internals.hpp" @@ -63,7 +62,6 @@ intensity_network_stream::intensity_network_stream(const initializer &ini_params network_thread_working_usec(0), assembler_thread_waiting_usec(0), assembler_thread_working_usec(0), - frame0_nano(0), stream_priority(0), stream_chunks_written(0), stream_bytes_written(0), @@ -158,10 +156,6 @@ uint64_t intensity_network_stream::get_first_fpgacount() { return first_fpgacount; } -uint64_t intensity_network_stream::get_frame0_nano() { - return frame0_nano; -} - shared_ptr intensity_network_stream::_assembler_for_beam(int beam_id) { auto it = beam_to_assembler.find(beam_id); if (it == beam_to_assembler.end()) { @@ -283,7 +277,6 @@ void intensity_network_stream::reset_stream() { this->beam_ids.clear(); this->beam_to_assembler.clear(); this->first_fpgacount = 0; - this->frame0_nano = 0; { ulock_t slock(this->stream_lock); this->stream_beam_ids.clear(); @@ -308,7 +301,6 @@ void intensity_network_stream::reset_stream() { /// shut down assembler thread /// reset first_fpgacount /// clear beam_to_assembmler - /// reset frame0_nano ? /// reset event counts?? // shut down streaming // shut down forking @@ -587,7 +579,6 @@ intensity_network_stream::get_statistics() { m["nupfreq"] = ini_params.nupfreq; m["nt_per_packet"] = ini_params.nt_per_packet; m["fpga_counts_per_sample"] = ini_params.fpga_counts_per_sample; - m["frame0_nano"] = frame0_nano; m["fpga_count"] = 0; // XXX FIXME XXX m["network_thread_waiting_usec"] = network_thread_waiting_usec; m["network_thread_working_usec"] = network_thread_working_usec; @@ -834,7 +825,7 @@ void intensity_network_stream::_network_thread_body() for (;;) { int packet_nbytes = ::recv(sockfd, packet_data, maxsize, MSG_PEEK | MSG_DONTWAIT); chlog("Flushing end-of-stream packets: peeked at a packet with " << packet_nbytes << " bytes."); - if (packet_nbytes == 24) { + if (packet_nbytes == intensity_packet::intensity_fixed_header_length) { packet_nbytes = ::recv(sockfd, packet_data, maxsize, 0); chlog("dumped a packet with " << packet_nbytes << " bytes"); } else @@ -948,8 +939,8 @@ void intensity_network_stream::_network_thread_one_stream() { event_subcounts[event_type::byte_received] += packet_nbytes; event_subcounts[event_type::packet_received]++; - // If we receive a special "short" packet (length 24), it indicates end-of-stream. - if (_unlikely(packet_nbytes == 24)) { + // If we receive a special "short" packet (length intensity_packet::intensity_fixed_header_length), it indicates end-of-stream. + if (_unlikely(packet_nbytes == intensity_packet::intensity_fixed_header_length)) { event_subcounts[event_type::packet_end_of_stream]++; if (ini_params.accept_end_of_stream_packets) { chlog("i_n_s: Received end-of-stream packet."); @@ -1144,7 +1135,7 @@ void intensity_network_stream::start_forking_packets(int beam, int destbeam, con void intensity_network_stream::stop_forking_packets(int beam, int destbeam, const struct sockaddr_in& dest) { // end-of-stream packet - vector packet(24, uint8_t(0)); + vector packet(intensity_packet::intensity_fixed_header_length, uint8_t(0)); *((uint32_t *) &packet[0]) = uint32_t(1); // protocol number unique_lock ulock(forking_mutex); @@ -1342,12 +1333,6 @@ void intensity_network_stream::_assembler_thread_body() continue; } - if (this->ini_params.frame0_url.size()) { - // After we receive our first packet, we will go fetch the frame0_ctime - // via curl. This is usually fast, so we'll do it in blocking mode. - chlog("Retrieving frame0_ctime from " << this->ini_params.frame0_url); - _fetch_frame0(); // raises runtime_error on failure - } chlog("Received first packet. Beams:" << packet.nbeams); for (int i=0; i constants::max_allowed_beam_id)) throw runtime_error("ch_frb_io: bad beam_id received in first packet"); auto assembler = make_shared(ini_params, beam, ini_params.stream_id); - assembler->set_frame0(frame0_nano); assemblers.push_back(assembler); beam_ids.push_back(beam); beam_to_assembler[beam] = assembler; @@ -1628,66 +1612,4 @@ void intensity_network_stream::_assembler_thread_exit() #endif } - -class CurlStringHolder { -public: - string thestring; -}; - -static size_t -CurlWriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) -{ - size_t realsize = size * nmemb; - CurlStringHolder* h = (CurlStringHolder*)userp; - h->thestring += string((char*)contents, realsize); - return realsize; -} - -void intensity_network_stream::_fetch_frame0() { - if (ini_params.frame0_url.size() == 0) { - chlog("No 'frame0_url' set; skipping."); - return; - } - CURL *curl_handle; - CURLcode res; - CurlStringHolder holder; - // init the curl session - curl_handle = curl_easy_init(); - // specify URL to get - curl_easy_setopt(curl_handle, CURLOPT_URL, ini_params.frame0_url.c_str()); - // set timeout - curl_easy_setopt(curl_handle, CURLOPT_TIMEOUT_MS, ini_params.frame0_timeout); - // set received-data callback - curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, - CurlWriteMemoryCallback); - curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)(&holder)); - // curl! - chlog("Fetching frame0_time from " << ini_params.frame0_url); - res = curl_easy_perform(curl_handle); - if (res != CURLE_OK) - throw runtime_error("ch_frb_io: fetch_frame0 failed: " + string(curl_easy_strerror(res))); - curl_easy_cleanup(curl_handle); - - string frame0_txt = holder.thestring; - //chlog("Received frame0 text: " << frame0_txt); - Json::Reader frame0_reader; - Json::Value frame0_json; - if (!frame0_reader.parse(frame0_txt, frame0_json)) - throw runtime_error("ch_frb_io: failed to parse 'frame0' string: '" + frame0_txt + "'"); - - //chlog("Parsed: " << frame0_json); - if (!frame0_json.isObject()) - throw runtime_error("ch_frb_io: 'frame0' was not a JSON 'Object' as expected"); - - string key = "frame0_nano"; - if (!frame0_json.isMember(key)) - throw runtime_error("ch_frb_io: 'frame0' did not contain key '" + key + "'"); - - const Json::Value v = frame0_json[key]; - if (!v.isIntegral()) - throw runtime_error("ch_frb_io: expected 'frame0[frame0_nano]' to be integral."); - frame0_nano = v.asUInt64(); - chlog("Found frame0_nano: " << frame0_nano); -} - } // namespace ch_frb_io diff --git a/intensity_packet.cpp b/intensity_packet.cpp index 2dd3c1e..9389580 100644 --- a/intensity_packet.cpp +++ b/intensity_packet.cpp @@ -20,7 +20,7 @@ namespace ch_frb_io { // Does a bunch of sanity checks and returns 'true' if packet is good, 'false' if bad. // // Explicitly, the following checks are performed: -// - protocol version == 1 +// - protocol version == 2 // - dimensions (nbeams, nfreq_coarse, nupfreq, ntsamp) are not large enough to lead to integer overflows // - packet and data byte counts are correct // - coarse_freq_ids are in range @@ -30,8 +30,8 @@ namespace ch_frb_io { bool intensity_packet::decode(const uint8_t *src, int src_nbytes) { - if (_unlikely(src_nbytes < 24)) { - chlog("packet nbytes < 24"); + if (_unlikely(src_nbytes < intensity_fixed_header_length)) { + chlog("packet nbytes < " << intensity_packet::intensity_fixed_header_length); return false; } if (_unlikely(src_nbytes > constants::max_input_udp_packet_size)) { @@ -39,9 +39,9 @@ bool intensity_packet::decode(const uint8_t *src, int src_nbytes) return false; } - memcpy(this, src, 24); + memcpy(this, src, intensity_fixed_header_length); - if (_unlikely(protocol_version != 1)) { + if (_unlikely(protocol_version != 2)) { chlog("packet protocol version bad"); return false; } @@ -67,7 +67,7 @@ bool intensity_packet::decode(const uint8_t *src, int src_nbytes) uint64_t n4 = uint64_t(ntsamp); // Expected header, data size - uint64_t nh = 24 + 2*n1 + 2*n2 + 8*n1*n2; + uint64_t nh = intensity_fixed_header_length + 2*n1 + 2*n2 + 8*n1*n2; uint64_t nd = n1 * n2 * n3 * n4; if (_unlikely(uint64_t(src_nbytes) != nh+nd)) { @@ -79,10 +79,10 @@ bool intensity_packet::decode(const uint8_t *src, int src_nbytes) return false; } - this->beam_ids = (uint16_t *) (src + 24); - this->coarse_freq_ids = (uint16_t *) (src + 24 + 2*n1); - this->scales = (float *) (src + 24 + 2*n1 + 2*n2); - this->offsets = (float *) (src + 24 + 2*n1 + 2*n2 + 4*n1*n2); + this->beam_ids = (uint16_t *) (src + intensity_fixed_header_length); + this->coarse_freq_ids = (uint16_t *) (src + intensity_fixed_header_length + 2*n1); + this->scales = (float *) (src + intensity_fixed_header_length + 2*n1 + 2*n2); + this->offsets = (float *) (src + intensity_fixed_header_length + 2*n1 + 2*n2 + 4*n1*n2); this->data = (uint8_t *) (src + nh); for (int i = 0; i < nfreq_coarse; i++) @@ -100,15 +100,15 @@ int intensity_packet::set_pointers(uint8_t *dst) { int nu = this->nupfreq; int nt = this->ntsamp; - memcpy(dst, this, 24); + memcpy(dst, this, intensity_fixed_header_length); - this->beam_ids = (uint16_t *)(dst + 24); - this->coarse_freq_ids = (uint16_t *)(dst + 24 + 2*nb); - this->scales = (float *) (dst + 24 + 2*nb + 2*nf); - this->offsets = (float *) (dst + 24 + 2*nb + 2*nf + 4*nb*nf); - this->data = dst + 24 + 2*nb + 2*nf + 8*nb*nf; + this->beam_ids = (uint16_t *)(dst + intensity_fixed_header_length); + this->coarse_freq_ids = (uint16_t *)(dst + intensity_fixed_header_length + 2*nb); + this->scales = (float *) (dst + intensity_fixed_header_length + 2*nb + 2*nf); + this->offsets = (float *) (dst + intensity_fixed_header_length + 2*nb + 2*nf + 4*nb*nf); + this->data = dst + intensity_fixed_header_length + 2*nb + 2*nf + 8*nb*nf; - return 24 + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; + return intensity_fixed_header_length + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; } // Encodes a floating-point array of intensities into raw packet data, before sending packet. @@ -122,15 +122,15 @@ int intensity_packet::encode(uint8_t *dst, const float *intensity, int beam_istr int nt = this->ntsamp; // similar but not exactly the same as set_pointers()... - memcpy(dst, this, 24); - memcpy(dst + 24, this->beam_ids, 2*nb); - memcpy(dst + 24 + 2*nb, this->coarse_freq_ids, 2*nf); + memcpy(dst, this, intensity_fixed_header_length); + memcpy(dst + intensity_fixed_header_length, this->beam_ids, 2*nb); + memcpy(dst + intensity_fixed_header_length + 2*nb, this->coarse_freq_ids, 2*nf); - this->scales = (float *) (dst + 24 + 2*nb + 2*nf); - this->offsets = (float *) (dst + 24 + 2*nb + 2*nf + 4*nb*nf); - this->data = dst + 24 + 2*nb + 2*nf + 8*nb*nf; + this->scales = (float *) (dst + intensity_fixed_header_length + 2*nb + 2*nf); + this->offsets = (float *) (dst + intensity_fixed_header_length + 2*nb + 2*nf + 4*nb*nf); + this->data = dst + intensity_fixed_header_length + 2*nb + 2*nf + 8*nb*nf; - int nbytes = 24 + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; + int nbytes = intensity_fixed_header_length + 2*nb + 2*nf + 8*nb*nf + nb*nf*nu*nt; for (int b = 0; b < nb; b++) { for (int f = 0; f < nf; f++) { @@ -214,7 +214,7 @@ bool intensity_packet::contains_coarse_freq_id(int id) const void test_packet_offsets(std::mt19937 &rng) { cerr << "test_packet_offsets()..."; - vector buf(24, 0); + vector buf(intensity_packet::intensity_fixed_header_length, 0); for (int iouter = 0; iouter < 1000; iouter++) { intensity_packet p; @@ -223,6 +223,7 @@ void test_packet_offsets(std::mt19937 &rng) uint32_t protocol_version = std::uniform_int_distribution()(rng); int16_t data_nbytes = std::uniform_int_distribution()(rng); uint16_t fpga_counts_per_sample = std::uniform_int_distribution()(rng); + uint64_t fpga_frame0_ns = std::uniform_int_distribution()(rng); uint64_t fpga_count = std::uniform_int_distribution()(rng); uint16_t nbeams = std::uniform_int_distribution()(rng); uint16_t nfreq_coarse = std::uniform_int_distribution()(rng); @@ -232,17 +233,19 @@ void test_packet_offsets(std::mt19937 &rng) *((uint32_t *) &buf[0]) = protocol_version; *((int16_t *) &buf[4]) = data_nbytes; *((uint16_t *) &buf[6]) = fpga_counts_per_sample; - *((uint64_t *) &buf[8]) = fpga_count; - *((uint16_t *) &buf[16]) = nbeams; - *((uint16_t *) &buf[18]) = nfreq_coarse; - *((uint16_t *) &buf[20]) = nupfreq; - *((uint16_t *) &buf[22]) = ntsamp; + *((uint64_t *) &buf[8]) = fpga_frame0_ns; + *((uint64_t *) &buf[16]) = fpga_count; + *((uint16_t *) &buf[24]) = nbeams; + *((uint16_t *) &buf[26]) = nfreq_coarse; + *((uint16_t *) &buf[28]) = nupfreq; + *((uint16_t *) &buf[32]) = ntsamp; - memcpy(&p, &buf[0], 24); + memcpy(&p, &buf[0], intensity_packet::intensity_fixed_header_length); assert(p.protocol_version == protocol_version); assert(p.data_nbytes == data_nbytes); assert(p.fpga_counts_per_sample == fpga_counts_per_sample); + assert(p.fpga_frame0_ns == fpga_frame0_ns); assert(p.fpga_count == fpga_count); assert(p.nbeams == nbeams); assert(p.nfreq_coarse == nfreq_coarse); diff --git a/site/Makefile.local.frb-compute-0 b/site/Makefile.local.frb-compute-0 index 913b13b..903c0cd 100644 --- a/site/Makefile.local.frb-compute-0 +++ b/site/Makefile.local.frb-compute-0 @@ -21,9 +21,6 @@ ZMQ_LFLAGS ?= $(shell pkg-config --libs libzmq) JSON_CFLAGS ?= $(shell pkg-config --cflags jsoncpp) JSON_LFLAGS ?= $(shell pkg-config --libs jsoncpp) -CURL_CFLAGS ?= $(shell pkg-config --cflags libcurl) -CURL_LFLAGS ?= $(shell pkg-config --libs libcurl) - # # C++ command line # Must support c++11 @@ -47,7 +44,7 @@ ifeq ($(COVERAGE), yes) OPT_FLAGS += -fprofile-arcs -ftest-coverage endif -CPP := $(CXX) -std=c++11 -pthread -fPIC -Wall $(OPT_FLAGS) -march=native -ffast-math -I. -I$(INCDIR) $(HDF5_CFLAGS) $(MSGPACK_CFLAGS) $(ZMQ_CFLAGS) $(JSON_CFLAGS) $(CURL_CFLAGS) +CPP := $(CXX) -std=c++11 -pthread -fPIC -Wall $(OPT_FLAGS) -march=native -ffast-math -I. -I$(INCDIR) $(HDF5_CFLAGS) $(MSGPACK_CFLAGS) $(ZMQ_CFLAGS) $(JSON_CFLAGS) -CPP_LFLAGS := -L. -L$(LIBDIR) $(HDF5_LFLAGS) $(MSGPACK_LFLAGS) $(ZMQ_LFLAGS) $(JSON_LFLAGS) $(CURL_LFLAGS) +CPP_LFLAGS := -L. -L$(LIBDIR) $(HDF5_LFLAGS) $(MSGPACK_LFLAGS) $(ZMQ_LFLAGS) $(JSON_LFLAGS) diff --git a/test-network-streams.cpp b/test-network-streams.cpp index 09f70fc..b0fa6b0 100644 --- a/test-network-streams.cpp +++ b/test-network-streams.cpp @@ -98,7 +98,7 @@ unit_test_instance::unit_test_instance(std::mt19937 &rng, int irun, int nrun, do // Now assign nfreq_coarse_per_packet, subject to packet size constraints. // The constants "c0" and "c1" are defined so that the packet size is c0 + c1 * nfreq_coarse_per_packet. - int c0 = 24 + 2*nbeams; + int c0 = intensity_packet::intensity_fixed_header_length + 2*nbeams; int c1 = 2 + 8*nbeams + nbeams*nupfreq*nt_per_packet; this->nfreq_coarse_per_packet = (ch_frb_io::constants::max_output_udp_packet_size - c0) / c1; diff --git a/time-kernels.cpp b/time-kernels.cpp index 1780030..4bcd0b9 100644 --- a/time-kernels.cpp +++ b/time-kernels.cpp @@ -65,6 +65,7 @@ static double time_assemble(std::mt19937 &rng) const int nupfreq = 16; const int nt_per_packet = 16; const int fpga_counts_per_sample = 384; + const uint64_t fpga_frame0_ns = 1234; const int nchunks = 2; const int niter = 5; @@ -95,20 +96,21 @@ static double time_assemble(std::mt19937 &rng) intensity_packet &p = packets[ichunk*npackets_per_chunk + ipacket]; uint8_t *d = &packet_data[(ichunk*npackets_per_chunk + ipacket) * packet_size]; - p.protocol_version = 1; + p.protocol_version = 2; p.data_nbytes = 4 * nupfreq * nt_per_packet; p.fpga_counts_per_sample = fpga_counts_per_sample; + p.fpga_frame0_ns = fpga_frame0_ns; p.fpga_count = it_c * nt_per_packet * fpga_counts_per_sample; p.nbeams = 1; p.nfreq_coarse = 4; p.nupfreq = nupfreq; p.ntsamp = nt_per_packet; - p.beam_ids = reinterpret_cast (d + 24); - p.coarse_freq_ids = reinterpret_cast (d + 26); - p.scales = reinterpret_cast (d + 34); - p.offsets = reinterpret_cast (d + 50); - p.data = d + 66; + p.beam_ids = reinterpret_cast (d + intensity_packet::intensity_fixed_header_length); + p.coarse_freq_ids = reinterpret_cast (d + intensity_packet::intensity_fixed_header_length + 2); + p.scales = reinterpret_cast (d + intensity_packet::intensity_fixed_header_length + 10); + p.offsets = reinterpret_cast (d + intensity_packet::intensity_fixed_header_length + 26); + p.data = d + intensity_packet::intensity_fixed_header_length + 42; p.beam_ids[0] = 0;