Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to parse Annex B stream in FU-A #71

Closed
wants to merge 28 commits into from

Conversation

thatdevsherry
Copy link

Summary

Add ability to break apart an Annex B stream sent in a FU-A. Fixes #68

Details

My V380 cam would send a FU-A after establishing RTSP connection. The FU-A was not conformant to spec.

FU-A (start) => [ sps header - sps - boundary - pps header - pps - boundary - idr header - idr ] # Annex B stream
FU-A (...) => [ sps header - idr ]
FU-A (end) => [ sps header - idr ]

Notice how all the frags have the same header (as they should be), but the start has an Annex B stream, meaning the last NAL picked from that packet is an IDR. This means the last NAL saved from first packet will be an IDR, but the next fragment will have... the header for SPS, but data for IDR, which is wrong.

It appears that the camera only does this for FU-A that has SPS & PPS. FU-As & single NAL units for other NAL types are conformant to spec.

I have only modified this logic for the FU-A flow. We can however use the start_rtp_nal, append_rtp_nal and end_rtp_nal to handle all NAL types.

Camera details

Name: V380 (It's a generic V380 outdoor camera)
Firmware: HwV380E31_WF8_PTZ_WIFI_20201218 (I had asked them for a firmware update file to enable RTSP support)

Copy link
Owner

@scottlamb scottlamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a bunch of comments, but this is a great first pass! Thanks for digging in; it's really nice to get contributions like this.

src/codec/h264.rs Outdated Show resolved Hide resolved
nal_parser: NalParser,
}

/// Contains logic that converts RTP payload into NALs.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a little more explanation here? This sentence could describe Depacketizer also. NalParser in particular does stuff that would be surprising to someone who just read the RFC, so it's worth some careful explanation. You mention the Annex B splitting in the break-apart_nals docstring, but I think I'd move it up to this level so folks know that's the struct's whole responsibility, and how sequences of calls are supposed to look. Maybe something like:

/// Parses Annex B sequences from RTP NAL data.
///
/// RFC 6184 describes how NAL units should be represented by RTP packets. `Depacketizer` should map each
/// such NAL unit to 1 call to `NalParser::start_rtp_nal`, then 1 or more calls to `NalParser::append_rtp_nal`,
/// then 1 call to `NalParser::end_rtp_nal`.
///
/// If the camera complies with the RFC, this will add exactly one NAL unit. However, what some cameras
/// call a single NAL unit is actually a sequence of multiple NAL units with Annex B separators. `NalParser`
/// looks for these separators and adds multiple NAL units as needed.
/// See [scottlamb/retina#68](https://github.com/scottlamb/retina/issues/68).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should single NAL units only be handled by a single function e.g. start_rtp_nal? Even if it has an Annex B, start_rtp_nal will handle it.

# FU-A

FU-A (start) => start_rtp_nal # break Annex B
FU-A (...) => append_rtp_nal
FU-A (end) => end_rtp_nal

# Single NAL unit

Packet => start_rtp_nal # break Annex B

Also, since you mentioned that we might get cases where Annex B boundaries don't line up inside a single FU-A, and the boundary can get cut off b/w packets, we'd need to run Annex B reader in the append_rtp_nal as well as end_rtp_nal as well.

FU-A (start) => start_rtp_nal # break Annex B
FU-A (...) => append_rtp_nal  # break Annex B
FU-A (end) => end_rtp_nal     # break Annex B

Do I understand this correctly?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yeah, that's a great point. I think you've written each of these to match exactly one RTP packet, right? When we were writing in the notion, I was imagining a different split:

  • start_rtp_nal just says "the RTP layer says to start a NAL with this header" with none of the data.
  • append_rtp_nal appends some data
  • end_rtp_nal just ends the current NAL

so

# FU-A

FU-A (start) => start_rtp_nal ; append_rtp_nal
FU-A (...) => append_rtp_nal
FU-A (end) => append_rtp_nal ; end_rtp_nal

# Single NAL unit

Packet => start_rtp_nal ; append_rtp_nal ; end_rtp_nal

with append_rtp_nal doing the breaking of Annex B, so it happens regardless of what kind of packet its embedded in, without having to repeat that logic between start_rtp_nal and append_rtp_nal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I got a glimpse of this when looking at the sample test case you quoted here 😄 . This is really nice, we can leave Annex B parsing to append_rtp_nal, and it's easier to test.

The responsibilities of these functions would then be:

start_rtp_nal => create a new `Nal` only, no processing whatsoever
append_rtp_nal => update `Nal::len`, `Nal::next_piece_idx`, `Nal::pieces` & create new `Nal`s on boundary
end_rtp_nal => umm whadoido? append_rtp_nal would've already updated the `Nal::len` and `Nal::next_piece_idx` of final `Nal` correctly

What would be the purpose of end_rtp_nal? How would we represent ending current NAL? Maybe we could use it to reset the states (did_find_boundary, seen_one_zero etc)?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; now that you mention it, I'm not sure we need end_rtp_nal.

I do want to make sure we handle the case where an RTP packet ends with a zero. Until we get the next packet, we don't know for sure if that's part of a NAL or if it's part of an Annex B boundary. If append_rtp_nal sort of optimistically assumes it's part of a NAL and then the following one walks that back if it finds the rest of the boundary, then maybe end_rtp_nal isn't necessary (and resetting state can happen at the beginning of the next start_rtp_nal). If append_rtp_nal works a different way, maybe there's something for end_rtp_nal to do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I'm steering towards letting append_rtp_nal handle boundaries b/w packets since it's already doing all the splitting work.

Judging by the time end_rtp_nal would be called, it would be doing anything but reading the RTP payload.

However, will try thinking if there's a way to use end_rtp_nal. Not having end and just start & append feels off :p

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we instead of having separate start_rtp_nal & append_rtp_nal, we have a single NalParser::push(rtp_payload: Bytes) that handles processing the payload into NalParser::nals and NalParser::pieces?

It would act as a single API that internally manages nals and pieces, and would also handle splitting Annex B streams (and edge case if boundaries span b/w packets).

What do you think?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason I suggested having a separate start_rtp_nal is the header byte of FU-A. It's not written verbatim at the start of the (first) NAL of the FU-A start packet, but instead is bitmasked together from two bytes (here). So you'd end up doing:

parser.push(Bytes::copy_from_slice(&[nal_header_byte])?;
parser.push(rest)?;

which involves an extra allocation to construct that Bytes, just to have it taken apart to add to self.nals anyway. Being able to pass in the h264_reader::nal::NalHeader to fill into Nal::hdr directly avoids that.

src/codec/h264.rs Outdated Show resolved Hide resolved
src/codec/h264.rs Show resolved Hide resolved

/// Adds a piece to `self.pieces`. Returns the total length of pieces after adding,
/// erroring if it becomes absurdly large.
fn add_piece(&mut self, piece: Bytes) -> Result<u32, String> {
Copy link
Owner

@scottlamb scottlamb Oct 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that some stuff in Depacketizer is still calling this directly. Can it all go through e.g. append_rtp_nal so that we're consistent in looking for the Annex B boundaries in all RTP packets? and keep add_piece as an internal thing to NalParser?

edit: oh, yeah, I see you wrote:

I have only modified this logic for the FU-A flow. We can however use the start_rtp_nal, append_rtp_nal and end_rtp_nal to handle all NAL types.

yeah, I think we should just use them for everything consistently.

.unwrap(),
);
assert!(push_result.is_err());
}
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to have tests of NalParser by itself. That'd help explain its responsibility and isolate bugs to one struct or the other.

One test in particular seems valuable: two smaller calls to append_rtp_data should produce the same result as a longer one, even if they're in the middle of a run of zeros. This would help us ensure if the camera makes a long Annex B sequence and then breaks it apart into FU-As, it will parse correctly regardless of whether the FU-A boundaries line up nicely with the Annex B boundaries or not. I think we should be able to do something roughly like this and get it to pass:

#[test]
fn test_small_appends() {
    init_logging();
    let full_input = b"first nal\x00\x00\x01\x67second nal starts\x00\x00second nal continues";

    // Parse all at once.
    let mut all_at_once_p = NalParser::new();
    all_at_once_p.start_rtp_nal(...).unwrap();
    all_at_once_p.append_rtp_data(Bytes::from_static(full_input)).unwrap();
    all_at_once_p.end_rtp_nal(...).unwrap();
    assert_eq!(all_at_once_p.nals.size(), 2);
    // ...verify each nal looks right...

    // Parse in two pieces, at every possible boundary. Should get the same result.
    for i in 1..full_input.len() {
        let mut split_p = NalParser::new();
        split_p.start_rtp_nal(...).unwrap();
        let parts = full_input.split_at(i);
        split_p.append_rtp_data(Bytes::from_static(parts[0])).unwrap();
        split_p.append_rtp_data(Bytes::from_static(parts[1])).unwrap();
        split_p.end_rtp_nal(...).unwrap();
        assert_eq!(&all_at_once_p, &split_p); // add an `Eq` impl for this, or in some way test they get the same result
    }

    // Parse a byte at a time. Should get the same result.
    let mut byte_at_a_time_p = NalParser::new();
    byte_at_a_time_p.start_rtp_nal(...).unwrap();
    for i in full_input.len() {
        byte_at_a_time_p.append_rtp_data(Bytes::from_static(&parts[i])).unwrap();
    }
    byte_at_a_time_p.end_rtp_nal(...).unwrap();
    assert_eq!(&all_at_once_p, &byte_at_a_time_p); // likewise.
}

It may be tricky to get it to pass. In particular, if an append_rtp_data call ends right after a zero, I think it needs to put it in the buffer, and then a following call to append_rtp_data needs to take away that previous zero if it turned out to be part of a boundary! And likewise if there are two zeros (maybe in separate pieces) and then something other than a 0, 1, 2, or 3. That's not straightforward, so I'd find it really helpful to test this without the added complexity of bringing Depacketizer's own responsibilities into it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's how I understand this:

let rtp_payloads: Vec<Bytes> = vec!(b"previous-nal\x00\x00", b"\x00new-nal"); // code isn't valid but just for understanding

w/o handling boundaries b/w packets, we'd get:

pieces: [ [b"previous-nal\x00\x00"], [b"\x00new-nal"] ]

what we should be forming:

pieces: [ [b"previous-nal"], [b"new-nal"] ]

To handle this, we can do something like:

append_rtp_nal(...) {
  // if first index is 0x00, check if last `piece` ends with two 0x00. if so, remove them and also this pkt's 1st index i.e. 0x00
  // if first two indexes are 0x00, again check if last `piece` ends with 0x00. If so, remove them and this pkt's first two indexes
}

However, this would fail for byte-by-byte, because, as per my understanding, pieces would end up like this when we reach the third boundary:

pieces: [ .., [b"n"], [b"a"], [b"l"], [b"\x00], [b"\x00]] // each append_rtp_call adds the given data to pieces

If we look at it from real-world usage, there shouldn't be a case where three 0x00 of a boundary are split b/w three pieces, rather the possible cases can be:

// first `0x00` of boundary ends and remaining two `0x00` are at the start of next packet
pkts = [ [b"...\x00"], [b"\x00\x00.."] ]

// or first two `0x00` of boundary end and remaining `0x00` is at the start of next packet
pkts = [ [b"..\x00\x00"], [b"\x00..] ]

Is there a gap in my understanding? I can't figure out what would byte-to-byte test give us.

If what I understand is right, and we want to test byte-to-byte, then maybe we can bring back end_rtp_nal and get a flow like this:

start_rtp_nal: creates a new Nal as discussed
append_rtp_nal: will not store directly into `pieces`, but rather in an intermediary state where it can handle boundaries byte-by-byte
end_rtp_nal: move over data from intermediary state into actual `pieces`

Please let me know if I'm missing something.

src/codec/h264.rs Outdated Show resolved Hide resolved
src/codec/h264.rs Outdated Show resolved Hide resolved
@thatdevsherry
Copy link
Author

I added a bunch of comments, but this is a great first pass! Thanks for digging in; it's really nice to get contributions like this.

Thanks for taking the time out to review. I'll take a look at the comments and update the PR.

src/codec/h264.rs Outdated Show resolved Hide resolved
src/codec/h264.rs Outdated Show resolved Hide resolved
@thatdevsherry
Copy link
Author

Should NalParser ignore continuous zeros? Currently benches/client.rs is failing because it creates a long 0x00 bytes data, which break_apart_nals fails to parse because it expects that three consecutive 0x00 are a boundary, and not part of a nal.

benches/client.rs is sending a FU-A (start) with all bytes being 0x00. Isn't this test data incorrect?

@scottlamb
Copy link
Owner

Apologies—I've been quite bogged down. Will look through your comments and updates tomorrow.

@thatdevsherry
Copy link
Author

Apologies—I've been quite bogged down. Will look through your comments and updates tomorrow.

Wishing you good health. Please take your time to recover.

Copy link
Owner

@scottlamb scottlamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies again for the long delay—much longer than I expected!

Should NalParser ignore continuous zeros? Currently benches/client.rs is failing because it creates a long 0x00 bytes data, which break_apart_nals fails to parse because it expects that three consecutive 0x00 are a boundary, and not part of a nal.

Great question. I looked back at the spec and realized something I don't think I was clear about (may have forgotten about entirely). The separator can have extra leading zeros, so if the bytestream has 0x12 0x34 0x00 0x00 0x00 0x00 0x1 0x56 0x78, then the NALs are supposed to be just 0x12 0x34 then just 0x56 0x78. So basically yes, they should be ignored.

The benches/client.rs data is definitely not a valid, just some easy dummy value that worked with the current implementaiton. Feel free to change it to say a long string of 0x04s or whatever instead to avoid tripping on the new logic.

src/codec/h264.rs Outdated Show resolved Hide resolved
let mut did_find_boundary = false;

// Check if current packet starts with either a single or two 0x00 bytes.
// If so, check if previous piece ended with either one or two 0x00 bytes.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, it might be simpler to keep in NalParser the current count of trailing zeros. Then we know how far back we need to trim if that's necessary, and we can also handle the case in which the two zeros were in previous calls and the 0x03 is the starting byte of the current call (your TODO on line 143).

Copy link
Author

@thatdevsherry thatdevsherry Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion.

If we keep the count of zeros inside NalParser, could something like this make sense?

pkt_1 = [..., 0x00, 0x00]

# count will get incremented to 2

pkt_2 = [0x00, 0x00, 0x00, ...]

# count will get incremented to 5

We can then write a function that would run when we read the first non-boundary character, which would then backtrack and remove zeros, going backwards across packets until the count gets decremented to 0.

current_pkt = [ 0x00, 0x00, 0x00, ... ]
pieces = [ [ ..., 0x00, 0x00 ] ]

# we had read the previous pkt, so it is in `pieces`, current one is still being read so it won't be in `pieces`

# going backwards from current pkt, we'll remove the three zeros, and get

current_pkt = [ ... ]
pieces = [ [ ..., 0x00, 0x00 ] ]
count = 2

# since we've reached the start of current pkt and count isn't 0, we'll move over to the last pkt in `pieces`, and continue decrementing from end.

current_pkt = [ ... ]
pieces = [ [ ... ] ]

count = 0

The zero removing function can do something like this.

1. get the first non-boundary character index inside our current pkt, index is `z`
2. create a reversed array from current_pkt[0..z] and loop over it, let's call this rev_arr_processed
3. pop each value and decrement count
4. if count reaches 0, end by joining `[ rev_arr_processed.rev() .. current_pkt[z..] ]`, else
5. if iteration ends and count is not 0, get the last pkt from `pieces`
6. repeat from step 2, except that when this ends, we have to update the pkt(s) in `pieces`

The algorithm can work for a single pkt as well as across pkts, and will not limit itself to just 3 zero boundaries. If we have a few zeros inside a single pkt:

## Single pkt flow

current_pkt = [ a..., 0x00, 0x00, 0x00, z... ]

# `a...` are non-boundary bytes before boundary, and `z...` are non-boundary bytes after the boundary

# processing overview

rev_arr_processed = [ 0x00, 0x00, 0x00, ...a ]
# after processing
rev_arr_processed = [ ...a ]

# join them back
[ rev_arr_processed.rev() .. current_pkt[z..] ]

result = [ a..., z... ]

Please let me know if my explanation isn't easy to understand, I'll improve it.

Also, please excuse if my understanding has gotten rusty (pun absolutely intended), came back to this project after a while 😄

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a prototype in python for the above mentioned algorithm. Thought it's easier to write a PoC first instead of directly coding in Rust.

If I can handle the cases in the PoC, I'll move over the logic to retina and update the PR.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. I tried adding a test that has the boundary spread across multiple packets. Would you expect this to pass? It doesn't seem to (although I might not be understanding your Python code's contract correctly).

    def test_boundary_spread_across_pkts(self):
        input = [
            [1, 2, 3, 4, 0],
            [0],
            [0],
            [1],
            [5, 6, 7, 8],
        ]
        nal_parser = NalParser()
        for idx, i in enumerate(input):
            nal_parser.process_packet(i)
        output = nal_parser.pieces
        expected = [
            [1, 2, 3, 4],
            [5, 6, 7, 8],
        ]
        self.assertEqual(output, expected)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PoC currently defines the boundary as consecutive 0x00s >= 3.

The above logic would make it that our test should return something like [ [1,2,3,4], [1], [5,6,7,8] ] since it isn't treating [0, 0, 0, 1] as a boundary. Would [0, 0, 0, .. , 1] be a valid boundary? Could you also let me know what other variations we should count as boundaries?

The PoC is pretty dumb and finds a single pkt, single byte [0] weird. The output of our test is:

[[1, 2, 3, 4], None, None, [1], [5, 6, 7, 8]]

which is near what I expected at the start.

I guess we can use the following constraint to properly define the full boundary.

  • consecutive 0x00s >= 2, followed by a number in [0, 1, 2, 3]

Does the above case cover all boundaries? You also mentioned in notion that a delimiter is 0,0,1, and that 3 is used as an escape e.g. 0,0,1 -> 0,0,3,1. So I'm not sure if above constraint would be valid.

create new struct that holds `nals` and `pieces`

struct owned by Depacketizer, makes it so we have RTP -> NAL logic
be in a separate struct
My v380 cam would send an annex b stream in the first FU-A, with wrong
headers in the middle and end frags for the first FU-A

Two cases are being modified:

a) disable FU-A header check b/w frags

add boolean flag to disable checking for the same header b/w FU-A frags
because the cam's first FU-A was like this:

1) FU-A (start) => SPS header - SPS - PPS header - PPS - IDR header -
   IDR
2) FU-A (...) => SPS header - IDR
3) FU-A (end) => SPS header - IDR

The headers all indicate for SPS, which is wrong, as SPS is defined in
annex b stream in the start packet. For this, we need to ignore the
subsequent headers after first packet, and append incoming data into the
last nal that we saved

b) break apart annex b stream

when adding FU-A, pass it through a fn that checks for delimiters (0x00
0x00 0x00) and breaks the NALs apart
move delimiter watcher state from inside fn to in struct
instead of matching the starting header of a fu-a with the last
saved nal, match it with the first header of the starting fu-a

this removes the need to add a flag when an Annex B stream
is sent in FU-As

it will work for both compliant FU-As and for ones with Annex B stream
instead of storing the index of zeros in NalParser, use bools to
simplify tracking, as we don't really need to use the zero indices
start_rtp_nal will now only create a new nal, and append_rtp_nal will be
responsible for appending data as well as splitting Annex B streams
Annex B reader was storing indices of zeros when finding
boundaries.

Remove them from struct and rely handle this state inside function
boundary state is currently only used inside function body.

Removed it from struct because it isn't needed outside the function
if Annex B boundary cuts off b/w packets, explain how it'll be handled
@thatdevsherry
Copy link
Author

Closing in favor of #100

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Exiting because of bad sps
2 participants