Skip to content

Commit

Permalink
added tagged_stream_bufferer block
Browse files Browse the repository at this point in the history
  • Loading branch information
swarnavaghosh04 committed Aug 30, 2023
1 parent 0ebbe99 commit 21a8a81
Show file tree
Hide file tree
Showing 13 changed files with 545 additions and 4 deletions.
3 changes: 2 additions & 1 deletion grc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ install(FILES
UTAT_HERON_header_format_esttc.block.yml
UTAT_HERON_esttc_framer.block.yml
UTAT_HERON_esttc_deframer.block.yml
UTAT_HERON_tagged_stream_tail_tagger.block.yml DESTINATION share/gnuradio/grc/blocks
UTAT_HERON_tagged_stream_tail_tagger.block.yml
UTAT_HERON_tagged_stream_bufferer.block.yml DESTINATION share/gnuradio/grc/blocks
)
48 changes: 48 additions & 0 deletions grc/UTAT_HERON_tagged_stream_bufferer.block.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
id: UTAT_HERON_tagged_stream_bufferer
label: Tagged Stream Bufferer
category: '[UTAT]'

templates:
imports: from gnuradio import UTAT_HERON
make: UTAT_HERON.tagged_stream_bufferer(${buffer_size}, ${length_tag_key}, ${tail_tag_key})

# Make one 'parameters' list entry for every parameter you want settable from the GUI.
# Keys include:
# * id (makes the value accessible as keyname, e.g. in the make entry)
# * label (label shown in the GUI)
# * dtype (e.g. int, float, complex, byte, short, xxx_vector, ...)
# * default
parameters:
- id: buffer_size
label: Biffer Size (Symbols)
dtype: int
default:
- id: length_tag_key
label: Length Tag Key
dtype: string
default: packet_len
- id: tail_tag_key
label: Tail Tag Key
dtype: string
default: 'trailer'

# Make one 'inputs' list entry per input and one 'outputs' list entry per output.
# Keys include:
# * label (an identifier for the GUI)
# * domain (optional - stream or message. Default is stream)
# * dtype (e.g. int, float, complex, byte, short, xxx_vector, ...)
# * vlen (optional - data stream vector length. Default is 1)
# * optional (optional - set to 1 for optional inputs. Default is 0)
inputs:
- label: in
domain: stream
dtype: complex

outputs:
- label: out
domain: stream
dtype: complex

# 'file_format' specifies the version of the GRC yml format used in the file
# and should usually not be changed.
file_format: 1
3 changes: 2 additions & 1 deletion include/gnuradio/UTAT_HERON/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ install(FILES
header_format_esttc.h
esttc_framer.h
esttc_deframer.h
tagged_stream_tail_tagger.h DESTINATION include/gnuradio/UTAT_HERON
tagged_stream_tail_tagger.h
tagged_stream_bufferer.h DESTINATION include/gnuradio/UTAT_HERON
)
44 changes: 44 additions & 0 deletions include/gnuradio/UTAT_HERON/tagged_stream_bufferer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* -*- c++ -*- */
/*
* Copyright 2023 University of Toronto Aerospace Team.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*/

#ifndef INCLUDED_UTAT_HERON_TAGGED_STREAM_BUFFERER_H
#define INCLUDED_UTAT_HERON_TAGGED_STREAM_BUFFERER_H

#include <gnuradio/UTAT_HERON/api.h>
#include <gnuradio/block.h>

namespace gr {
namespace UTAT_HERON {

/*!
* \brief <+description of block+>
* \ingroup UTAT_HERON
*
*/
class UTAT_HERON_API tagged_stream_bufferer : virtual public gr::block
{
public:
typedef std::shared_ptr<tagged_stream_bufferer> sptr;

/*!
* \brief Return a shared_ptr to a new instance of UTAT_HERON::tagged_stream_bufferer.
*
* To avoid accidental use of raw pointers, UTAT_HERON::tagged_stream_bufferer's
* constructor is in a private implementation
* class. UTAT_HERON::tagged_stream_bufferer::make is the public interface for
* creating new instances.
*/
static sptr make(
std::size_t buffer_size,
const std::string& packet_len_tag = "packet_len",
const std::string& trailer_tag = "trailer");
};

} // namespace UTAT_HERON
} // namespace gr

#endif /* INCLUDED_UTAT_HERON_TAGGED_STREAM_BUFFERER_H */
3 changes: 2 additions & 1 deletion lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ list(APPEND UTAT_HERON_sources
header_format_esttc.cc
esttc_framer_impl.cc
esttc_deframer_impl.cc
tagged_stream_tail_tagger_impl.cc )
tagged_stream_tail_tagger_impl.cc
tagged_stream_bufferer_impl.cc )

set(DEBUG_FILE "${CMAKE_BINARY_DIR}/debug_log.txt" CACHE FILEPATH
"Log debug to this file when CMAKE_BULD_TYPE is Debug or RelWithDebInfo")
Expand Down
260 changes: 260 additions & 0 deletions lib/tagged_stream_bufferer_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/* -*- c++ -*- */
/*
* Copyright 2023 University of Toronto Aerospace Team.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*/

#include "tagged_stream_bufferer_impl.h"
#include <gnuradio/io_signature.h>

namespace gr {
namespace UTAT_HERON {

using input_type = gr_complex;
using output_type = gr_complex;

tagged_stream_bufferer::sptr tagged_stream_bufferer::make(
std::size_t buffer_size,
const std::string& packet_len_tag,
const std::string& trailer_tag)
{
return gnuradio::make_block_sptr<tagged_stream_bufferer_impl>(buffer_size, packet_len_tag, trailer_tag);
}


/*
* The private constructor
*/
tagged_stream_bufferer_impl::tagged_stream_bufferer_impl(std::size_t buffer_size, const std::string& packet_len_tag, const std::string& trailer_tag)
: gr::block("tagged_stream_bufferer",
gr::io_signature::make(1,1,sizeof(input_type)),
gr::io_signature::make(1,1,sizeof(output_type))),
d_state(STATE_BLOCK),
d_packet_len(0),
d_remaining_padding(0),
d_buffer_size(buffer_size),
d_lengthtag(pmt::mp(packet_len_tag)),
d_tailtag(pmt::mp(trailer_tag))
{
set_tag_propagation_policy(TPP_DONT);
}

/*
* Our virtual destructor.
*/
tagged_stream_bufferer_impl::~tagged_stream_bufferer_impl() {}

void tagged_stream_bufferer_impl::forecast(int noutput_items,
gr_vector_int& ninput_items_required)
{
ninput_items_required[0] = noutput_items;
}

int tagged_stream_bufferer_impl::general_work(int noutput_items,
gr_vector_int& ninput_items,
gr_vector_const_void_star& input_items,
gr_vector_void_star& output_items)
{
auto in = static_cast<const input_type*>(input_items[0]);
auto out = static_cast<output_type*>(output_items[0]);
auto n_in = static_cast<int>(ninput_items[0]);

int nconsumed = 0;
int nproduced = 0;

auto in_to_out = [&](int n){
memcpy(out+nproduced, in+nconsumed, n*sizeof(input_type));
nconsumed += n;
nproduced += n;
};

auto in_to_out_packet = [&](int n){
in_to_out(n);
d_packet_len += n;
};

auto can_output = [&](int n){
return noutput_items - nproduced - n >= 0;
};

auto can_consume = [&](int n){
return n_in - nconsumed - n >= 0;
};

auto consume_drop_to_floor = [&](int n){
nconsumed += n;
};

auto can_move_in_to_out = [&](int n){
return can_consume(n) && can_output(n);
};

auto remaining_output = [&](){
return noutput_items - nproduced;
};

auto remaining_input = [&](){
return n_in - nconsumed;
};

auto remaining_move = [&](){
return std::min(remaining_input(), remaining_output());
};

auto finish = [&](){
consume_each(nconsumed);
return nproduced;
};

auto tag_offset_to_last_output_item = [&](gr::tag_t& tag){
tag.offset = nitems_written(0) + nproduced - 1;
};

auto output_packet_first_symbol = [&](gr::tag_t& tag){
d_packet_len = 0;
in_to_out_packet(1);
tag_offset_to_last_output_item(tag);
add_item_tag(0, tag);
d_state = STATE_PACKET;
};

auto output_padding = [&](int len){
for(int i = 0; i < len; i++)
out[nproduced++] = output_type();
d_remaining_padding -= len;
};

// auto handle_state_block = [&](int len){
// if(can_consume(len)){
// consume_drop_to_floor(len);
// if(remaining_output() != 0)
// return true;
// }else
// consume_drop_to_floor(remaining_input());
// return false;
// };


auto handle_state_block_until_packet = [&](int len){
if(can_consume(len)){
consume_drop_to_floor(len);
if(remaining_output() != 0){
d_state = STATE_PACKET_HEADER;
return true;
}
}else consume_drop_to_floor(remaining_input());
d_state = STATE_BLOCK;
return false;
};

auto handle_state_packet_header = [&](gr::tag_t& tag){
if(can_move_in_to_out(1)){
output_packet_first_symbol(tag);
d_state = STATE_PACKET;
if(remaining_output() != 0)
return true;
}
return false;
};

// auto handle_state_packet = [&](int len){
// if(can_move_in_to_out(len)){
// in_to_out_packet(len);
// if(remaining_output() != 0)
// return true;
// }else
// in_to_out_packet(remaining_move());
// return false;
// };

auto handle_state_packet_until_trailer = [&](int len){
if(can_move_in_to_out(len)){
in_to_out_packet(len);
if(remaining_output() != 0){
d_state = STATE_PADDING;
d_remaining_padding = d_buffer_size - d_packet_len;
if(d_remaining_padding <= 0)
d_state = STATE_BLOCK;
return true;
}
}else in_to_out_packet(remaining_move());
d_state = STATE_PACKET;
return false;
};

auto handle_state_padding = [&](){
if(can_output(d_remaining_padding)){
output_padding(d_remaining_padding);
d_state = STATE_BLOCK;
if(remaining_output() != 0)
return true;
}else output_padding(remaining_output());
return false;
};

bool keep_going;

switch(d_state){
case STATE_BLOCK:
case STATE_PACKET:
break;
case STATE_PADDING:
keep_going = handle_state_padding();
if(!keep_going) return finish();
break;
case STATE_BLOCK_UNTIL_PACKET:
case STATE_PACKET_HEADER:
case STATE_PACKET_UNTIL_TRAILER:
throw std::runtime_error("something went terribly wrong");
}

std::vector<tag_t> tags;
get_tags_in_range(tags, 0, nitems_read(0)+nconsumed, nitems_read(0)+n_in);

for (size_t i = 0; i < tags.size(); i++){
auto tag = tags[i];
int len = tag.offset - nitems_read(0) - nconsumed;

if (pmt::eqv(tag.key, d_lengthtag) && d_state == STATE_BLOCK){

d_state = STATE_BLOCK_UNTIL_PACKET;

keep_going = handle_state_block_until_packet(len);
if(!keep_going) return finish();

keep_going = handle_state_packet_header(tag);
if(!keep_going) return finish();

}else if(pmt::eqv(tag.key, d_tailtag) && d_state == STATE_PACKET){

d_state = STATE_PACKET_UNTIL_TRAILER;

keep_going = handle_state_packet_until_trailer(len);
if(!keep_going) return finish();

keep_going = handle_state_padding();
if(!keep_going) return finish();

}
}

switch(d_state){
case STATE_BLOCK:
consume_drop_to_floor(remaining_input());
break;
case STATE_PACKET:
in_to_out_packet(remaining_move());
break;
case STATE_BLOCK_UNTIL_PACKET:
case STATE_PACKET_HEADER:
case STATE_PACKET_UNTIL_TRAILER:
case STATE_PADDING:
throw std::runtime_error("something went terribly wrong");
}

return finish();
}

} /* namespace UTAT_HERON */
} /* namespace gr */
Loading

0 comments on commit 21a8a81

Please sign in to comment.