Skip to content

Commit

Permalink
Use chrono steady clock for frame timing (#173)
Browse files Browse the repository at this point in the history
* Use chrono steady_clock for frame timing

* Fix formatting for humble

* Use chrono in png snapshot streamer

* Don't use std::optional
  • Loading branch information
bjsowa authored Nov 20, 2024
1 parent 27d0d0a commit 965ed73
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 61 deletions.
9 changes: 5 additions & 4 deletions include/web_video_server/image_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#pragma once

#include <chrono>
#include <memory>
#include <string>

Expand Down Expand Up @@ -64,7 +65,7 @@ class ImageStreamer
/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age) = 0;
virtual void restreamFrame(std::chrono::duration<double> max_age) = 0;

std::string getTopic()
{
Expand Down Expand Up @@ -94,8 +95,8 @@ class ImageTransportImageStreamer : public ImageStreamer

protected:
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time) = 0;
virtual void restreamFrame(double max_age);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time) = 0;
virtual void restreamFrame(std::chrono::duration<double> max_age);
virtual void initialize(const cv::Mat &);

image_transport::Subscriber image_sub_;
Expand All @@ -105,7 +106,7 @@ class ImageTransportImageStreamer : public ImageStreamer
std::string default_transport_;
std::string qos_profile_name_;

rclcpp::Time last_frame;
std::chrono::steady_clock::time_point last_frame_;
cv::Mat output_size_image;
std::mutex send_mutex_;

Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/jpeg_streamers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
~MjpegStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);

private:
MultipartStream stream_;
Expand All @@ -78,7 +78,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
~JpegSnapshotStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);

private:
int quality_;
Expand Down
6 changes: 4 additions & 2 deletions include/web_video_server/libav_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extern "C"
#include <libavutil/imgutils.h>
}

#include <chrono>
#include <memory>
#include <string>

Expand All @@ -66,7 +67,7 @@ class LibavStreamer : public ImageTransportImageStreamer

protected:
virtual void initializeEncoder();
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);
virtual void initialize(const cv::Mat &);
AVFormatContext * format_context_;
const AVCodec * codec_;
Expand All @@ -78,8 +79,9 @@ class LibavStreamer : public ImageTransportImageStreamer
private:
AVFrame * frame_;
struct SwsContext * sws_context_;
rclcpp::Time first_image_timestamp_;
std::mutex encode_mutex_;
bool first_image_received_;
std::chrono::steady_clock::time_point first_image_time_;

std::string format_name_;
std::string codec_name_;
Expand Down
15 changes: 8 additions & 7 deletions include/web_video_server/multipart_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,35 @@ namespace web_video_server

struct PendingFooter
{
rclcpp::Time timestamp;
std::chrono::steady_clock::time_point timestamp;
std::weak_ptr<std::string> contents;
};

class MultipartStream
{
public:
MultipartStream(
std::function<rclcpp::Time()> get_now,
async_web_server_cpp::HttpConnectionPtr & connection,
const std::string & boundry = "boundarydonotcross",
std::size_t max_queue_size = 1);

void sendInitialHeader();
void sendPartHeader(const rclcpp::Time & time, const std::string & type, size_t payload_size);
void sendPartFooter(const rclcpp::Time & time);
void sendPartHeader(
const std::chrono::steady_clock::time_point & time, const std::string & type,
size_t payload_size);
void sendPartFooter(const std::chrono::steady_clock::time_point & time);
void sendPartAndClear(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
std::vector<unsigned char> & data);
void sendPart(
const rclcpp::Time & time, const std::string & type, const boost::asio::const_buffer & buffer,
const std::chrono::steady_clock::time_point & time, const std::string & type,
const boost::asio::const_buffer & buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource);

private:
bool isBusy();

private:
std::function<rclcpp::Time()> get_now_;
const std::size_t max_queue_size_;
async_web_server_cpp::HttpConnectionPtr connection_;
std::string boundry_;
Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/png_streamers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PngStreamer : public ImageTransportImageStreamer
~PngStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);

private:
Expand All @@ -78,7 +78,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer
~PngSnapshotStreamer();

protected:
virtual void sendImage(const cv::Mat &, const rclcpp::Time & time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point & time);
virtual cv::Mat decodeImage(const sensor_msgs::msg::Image::ConstSharedPtr & msg);

private:
Expand Down
6 changes: 3 additions & 3 deletions include/web_video_server/ros_compressed_streamer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ class RosCompressedStreamer : public ImageStreamer
rclcpp::Node::SharedPtr node);
~RosCompressedStreamer();
virtual void start();
virtual void restreamFrame(double max_age);
virtual void restreamFrame(std::chrono::duration<double> max_age);

protected:
virtual void sendImage(
const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg,
const rclcpp::Time & time);
const std::chrono::steady_clock::time_point & time);

private:
void imageCallback(const sensor_msgs::msg::CompressedImage::ConstSharedPtr msg);
MultipartStream stream_;
rclcpp::Subscription<sensor_msgs::msg::CompressedImage>::SharedPtr image_sub_;
rclcpp::Time last_frame;
std::chrono::steady_clock::time_point last_frame_;
sensor_msgs::msg::CompressedImage::ConstSharedPtr last_msg;
std::mutex send_mutex_;
std::string qos_profile_name_;
Expand Down
2 changes: 1 addition & 1 deletion include/web_video_server/web_video_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class WebVideoServer : public rclcpp::Node
const char * begin, const char * end);

private:
void restreamFrames(double max_age);
void restreamFrames(std::chrono::duration<double> max_age);
void cleanup_inactive_streams();

rclcpp::TimerBase::SharedPtr cleanup_timer_;
Expand Down
10 changes: 5 additions & 5 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}

void ImageTransportImageStreamer::restreamFrame(double max_age)
void ImageTransportImageStreamer::restreamFrame(std::chrono::duration<double> max_age)
{
if (inactive_ || !initialized_) {
return;
}
try {
if (last_frame + rclcpp::Duration::from_seconds(max_age) < node_->now() ) {
if (last_frame_ + max_age < std::chrono::steady_clock::now()) {
std::scoped_lock lock(send_mutex_);
// don't update last_frame, it may remain an old value.
sendImage(output_size_image, node_->now());
sendImage(output_size_image, std::chrono::steady_clock::now());
}
} catch (boost::system::system_error & e) {
// happens when client disconnects
Expand Down Expand Up @@ -199,8 +199,8 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::msg::Image::C
initialized_ = true;
}

last_frame = node_->now();
sendImage(output_size_image, msg->header.stamp);
last_frame_ = std::chrono::steady_clock::now();
sendImage(output_size_image, last_frame_);
} catch (cv_bridge::Exception & e) {
auto & clk = *node_->get_clock();
RCLCPP_ERROR_THROTTLE(node_->get_logger(), clk, 40, "cv_bridge exception: %s", e.what());
Expand Down
14 changes: 10 additions & 4 deletions src/jpeg_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ MjpegStreamer::MjpegStreamer(
const async_web_server_cpp::HttpRequest & request,
async_web_server_cpp::HttpConnectionPtr connection, rclcpp::Node::SharedPtr node)
: ImageTransportImageStreamer(request, connection, node),
stream_(std::bind(&rclcpp::Node::now, node), connection)
stream_(connection)
{
quality_ = request.get_query_param_value_or_default<int>("quality", 95);
stream_.sendInitialHeader();
Expand All @@ -50,7 +50,9 @@ MjpegStreamer::~MjpegStreamer()
std::scoped_lock lock(send_mutex_); // protects sendImage.
}

void MjpegStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
void MjpegStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
encode_params.push_back(cv::IMWRITE_JPEG_QUALITY);
Expand Down Expand Up @@ -94,7 +96,9 @@ JpegSnapshotStreamer::~JpegSnapshotStreamer()
std::scoped_lock lock(send_mutex_); // protects sendImage.
}

void JpegSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
void JpegSnapshotStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
encode_params.push_back(cv::IMWRITE_JPEG_QUALITY);
Expand All @@ -104,7 +108,9 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & t
cv::imencode(".jpeg", img, encoded_buffer, encode_params);

char stamp[20];
snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds());
snprintf(
stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
Expand Down
18 changes: 11 additions & 7 deletions src/libav_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ LibavStreamer::LibavStreamer(
const std::string & format_name, const std::string & codec_name,
const std::string & content_type)
: ImageTransportImageStreamer(request, connection, node), format_context_(0), codec_(0),
codec_context_(0), video_stream_(0), frame_(0), sws_context_(0), first_image_timestamp_(0),
format_name_(format_name), codec_name_(codec_name), content_type_(content_type), opt_(0),
io_buffer_(0)
codec_context_(0), video_stream_(0), frame_(0), sws_context_(0),
first_image_received_(false), first_image_time_(), format_name_(format_name),
codec_name_(codec_name), content_type_(content_type), opt_(0), io_buffer_(0)
{
bitrate_ = request.get_query_param_value_or_default<int>("bitrate", 100000);
qmin_ = request.get_query_param_value_or_default<int>("qmin", 10);
Expand Down Expand Up @@ -215,11 +215,14 @@ void LibavStreamer::initializeEncoder()
{
}

void LibavStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
void LibavStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::scoped_lock lock(encode_mutex_);
if (0 == first_image_timestamp_.nanoseconds()) {
first_image_timestamp_ = time;
if (!first_image_received_) {
first_image_received_ = true;
first_image_time_ = time;
}

AVPixelFormat input_coding_format = AV_PIX_FMT_BGR24;
Expand Down Expand Up @@ -274,7 +277,8 @@ void LibavStreamer::sendImage(const cv::Mat & img, const rclcpp::Time & time)
std::size_t size;
uint8_t * output_buf;

double seconds = (time - first_image_timestamp_).seconds();
double seconds = std::chrono::duration_cast<std::chrono::duration<double>>(
time - first_image_time_).count();
// Encode video at 1/0.95 to minimize delay
pkt->pts = (int64_t)(seconds / av_q2d(video_stream_->time_base) * 0.95);
if (pkt->pts <= 0) {
Expand Down
23 changes: 13 additions & 10 deletions src/multipart_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ namespace web_video_server
{

MultipartStream::MultipartStream(
std::function<rclcpp::Time()> get_now,
async_web_server_cpp::HttpConnectionPtr & connection,
const std::string & boundry,
std::size_t max_queue_size)
: get_now_(get_now), connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size)
: connection_(connection), boundry_(boundry), max_queue_size_(max_queue_size)
{}

void MultipartStream::sendInitialHeader()
Expand All @@ -58,11 +57,13 @@ void MultipartStream::sendInitialHeader()
}

void MultipartStream::sendPartHeader(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
size_t payload_size)
{
char stamp[20];
snprintf(stamp, sizeof(stamp), "%.06lf", time.seconds());
snprintf(
stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
std::shared_ptr<std::vector<async_web_server_cpp::HttpHeader>> headers(
new std::vector<async_web_server_cpp::HttpHeader>());
headers->push_back(async_web_server_cpp::HttpHeader("Content-type", type));
Expand All @@ -73,7 +74,7 @@ void MultipartStream::sendPartHeader(
connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers);
}

void MultipartStream::sendPartFooter(const rclcpp::Time & time)
void MultipartStream::sendPartFooter(const std::chrono::steady_clock::time_point & time)
{
std::shared_ptr<std::string> str(new std::string("\r\n--" + boundry_ + "\r\n"));
PendingFooter pf;
Expand All @@ -84,7 +85,7 @@ void MultipartStream::sendPartFooter(const rclcpp::Time & time)
}

void MultipartStream::sendPartAndClear(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
std::vector<unsigned char> & data)
{
if (!isBusy()) {
Expand All @@ -95,7 +96,7 @@ void MultipartStream::sendPartAndClear(
}

void MultipartStream::sendPart(
const rclcpp::Time & time, const std::string & type,
const std::chrono::steady_clock::time_point & time, const std::string & type,
const boost::asio::const_buffer & buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource)
{
Expand All @@ -108,13 +109,15 @@ void MultipartStream::sendPart(

bool MultipartStream::isBusy()
{
rclcpp::Time currentTime = get_now_();
auto current_time = std::chrono::steady_clock::now();
while (!pending_footers_.empty()) {
if (pending_footers_.front().contents.expired()) {
pending_footers_.pop();
} else {
rclcpp::Time footerTime = pending_footers_.front().timestamp;
if ((currentTime - footerTime).seconds() > 0.5) {
auto footer_time = pending_footers_.front().timestamp;
if (std::chrono::duration_cast<std::chrono::duration<double>>(
(current_time - footer_time)).count() > 0.5)
{
pending_footers_.pop();
} else {
break;
Expand Down
Loading

0 comments on commit 965ed73

Please sign in to comment.