-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect UDP maximum packet size when sending batched points #185
Changes from all commits
7d7ce20
1cf6890
8838ee0
371a58d
9a07eab
865c8af
72585fc
5f741d0
238b6a0
bfb35ae
071bb61
306e05d
598988d
312e383
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,12 +103,19 @@ namespace influxdb::internal | |
|
||
std::unique_ptr<Transport> withUdpTransport(const http::url& uri) | ||
{ | ||
return std::make_unique<transports::UDP>(uri.host, uri.port); | ||
static constexpr std::uint16_t INFLUXDB_UDP_PORT{8089}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see a port as something a caller has to provide (like the host address) and not use any assumptions – unless there's a really, really good reason to do so. (same for tcp below) |
||
const std::uint16_t port{ | ||
http::url::PORT_NOT_SET == uri.port ? INFLUXDB_UDP_PORT : static_cast<std::uint16_t>(uri.port)}; | ||
return std::make_unique<transports::UDP>(uri.host, port); | ||
} | ||
|
||
std::unique_ptr<Transport> withTcpTransport(const http::url& uri) | ||
{ | ||
return std::make_unique<transports::TCP>(uri.host, uri.port); | ||
// Default QuestDB TCP port (TCP support was added for this purpose) | ||
static constexpr std::uint16_t QUESTDB_TCP_PORT{9009}; | ||
const std::uint16_t port{ | ||
http::url::PORT_NOT_SET == uri.port ? QUESTDB_TCP_PORT : static_cast<std::uint16_t>(uri.port)}; | ||
return std::make_unique<transports::TCP>(uri.host, port); | ||
} | ||
|
||
std::unique_ptr<Transport> withUnixSocketTransport(const http::url& uri) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,8 @@ | |
#include "HTTP.h" | ||
#include "InfluxDBException.h" | ||
|
||
#include <limits> | ||
|
||
namespace influxdb::transports | ||
{ | ||
namespace | ||
|
@@ -107,6 +109,11 @@ namespace influxdb::transports | |
checkResponse(response); | ||
} | ||
|
||
std::size_t HTTP::getMaxMessageSize() const | ||
{ | ||
return (std::numeric_limits<std::size_t>::max)(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of handling a huge number, we could simple use |
||
} | ||
|
||
void HTTP::setProxy(const Proxy& proxy) | ||
{ | ||
session.SetProxies(cpr::Proxies{{"http", proxy.getProxy()}, {"https", proxy.getProxy()}}); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,12 +29,91 @@ | |
#include "InfluxDBException.h" | ||
#include "LineProtocol.h" | ||
#include "BoostSupport.h" | ||
#include <iostream> | ||
#include <memory> | ||
#include <string> | ||
|
||
namespace influxdb | ||
{ | ||
namespace | ||
{ | ||
/// Group the points into the largest possible line-protocol messages that can be sent using the transport. | ||
template <typename PointContainer> | ||
void TransmitBatch(std::unique_ptr<Transport>& transport, const std::string& globalTags, PointContainer&& points) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the function doesn't change the Also, please use (both same for the function below) |
||
{ | ||
LineProtocol formatter{globalTags}; | ||
std::string lineProtocol; | ||
bool appendNewLine{false}; | ||
bool messageSizeExceeded{false}; | ||
|
||
const auto maxMessageSize{transport->getMaxMessageSize()}; | ||
for (const auto& point : points) | ||
{ | ||
auto formattedPoint{formatter.format(point)}; | ||
auto GetRequiredSize{[&appendNewLine](const std::string& fp) -> std::size_t | ||
{ | ||
// Have to recalculate because the point may fit in a new message if | ||
// it doesn't have a preceding newline. | ||
return appendNewLine ? 1 + fp.size() : fp.size(); | ||
}}; | ||
|
||
while (maxMessageSize < lineProtocol.size() + GetRequiredSize(formattedPoint)) | ||
{ | ||
// Appending the current point would exceed the maximum message size. | ||
// Flush the existing points and try again. | ||
if (!lineProtocol.empty()) | ||
{ | ||
// If there is some existing content perhaps the current point will | ||
// fit in a new message. | ||
transport->send(std::move(lineProtocol)); | ||
lineProtocol.clear(); | ||
appendNewLine = false; | ||
} | ||
else | ||
{ | ||
// Message is empty, so the current point is too large to be sent using this transport. | ||
// Rather than throwing all the points away, we'll skip the current point and continue | ||
// then raise an exception at the end. | ||
messageSizeExceeded = true; | ||
formattedPoint.clear(); | ||
break; | ||
} | ||
} | ||
|
||
if (!formattedPoint.empty()) | ||
{ | ||
if (appendNewLine) | ||
{ | ||
lineProtocol += '\n'; | ||
} | ||
lineProtocol += formattedPoint; | ||
appendNewLine = true; | ||
} | ||
} | ||
|
||
// Send the last batch of points | ||
if (!lineProtocol.empty()) | ||
{ | ||
transport->send(std::move(lineProtocol)); | ||
} | ||
|
||
// If any points were too large to be sent using this transport, throw an exception. | ||
if (messageSizeExceeded) | ||
{ | ||
throw InfluxDBException{"One or more points exceeded the transport's maximum transmission size"}; | ||
} | ||
} | ||
|
||
void TransmitPoint(std::unique_ptr<Transport>& transport, const std::string& globalTags, Point&& point) | ||
{ | ||
LineProtocol formatter{globalTags}; | ||
std::string formattedPoint{formatter.format(point)}; | ||
if (formattedPoint.size() > transport->getMaxMessageSize()) | ||
{ | ||
throw InfluxDBException{"Point is too large to be sent using this transport"}; | ||
} | ||
transport->send(std::move(formattedPoint)); | ||
} | ||
} | ||
|
||
InfluxDB::InfluxDB(std::unique_ptr<Transport> transport) | ||
: mPointBatch{}, | ||
|
@@ -69,25 +148,13 @@ namespace influxdb | |
{ | ||
if (mIsBatchingActivated && !mPointBatch.empty()) | ||
{ | ||
transmit(joinLineProtocolBatch()); | ||
// Make sure that mPointBatch is cleared even if an exception is thrown during transmission. | ||
auto transmissionBatch{std::move(mPointBatch)}; | ||
mPointBatch.clear(); | ||
TransmitBatch(mTransport, mGlobalTags, std::move(transmissionBatch)); | ||
} | ||
} | ||
|
||
std::string InfluxDB::joinLineProtocolBatch() const | ||
{ | ||
std::string joinedBatch; | ||
|
||
LineProtocol formatter{mGlobalTags}; | ||
for (const auto& point : mPointBatch) | ||
{ | ||
joinedBatch += formatter.format(point) + "\n"; | ||
} | ||
|
||
joinedBatch.erase(std::prev(joinedBatch.end())); | ||
return joinedBatch; | ||
} | ||
|
||
|
||
void InfluxDB::addGlobalTag(std::string_view name, std::string_view value) | ||
{ | ||
|
@@ -100,11 +167,6 @@ namespace influxdb | |
mGlobalTags += LineProtocol::EscapeStringElement(LineProtocol::ElementType::TagValue, value); | ||
} | ||
|
||
void InfluxDB::transmit(std::string&& point) | ||
{ | ||
mTransport->send(std::move(point)); | ||
} | ||
|
||
void InfluxDB::write(Point&& point) | ||
{ | ||
if (mIsBatchingActivated) | ||
|
@@ -113,8 +175,7 @@ namespace influxdb | |
} | ||
else | ||
{ | ||
LineProtocol formatter{mGlobalTags}; | ||
transmit(formatter.format(point)); | ||
TransmitPoint(mTransport, mGlobalTags, std::move(point)); | ||
} | ||
} | ||
|
||
|
@@ -129,16 +190,7 @@ namespace influxdb | |
} | ||
else | ||
{ | ||
std::string lineProtocol; | ||
LineProtocol formatter{mGlobalTags}; | ||
|
||
for (const auto& point : points) | ||
{ | ||
lineProtocol += formatter.format(point) + "\n"; | ||
} | ||
|
||
lineProtocol.erase(std::prev(lineProtocol.end())); | ||
transmit(std::move(lineProtocol)); | ||
TransmitBatch(mTransport, mGlobalTags, points); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,18 +27,38 @@ | |
|
||
#include "UDP.h" | ||
#include "InfluxDBException.h" | ||
#include <algorithm> | ||
#include <limits> | ||
#include <string> | ||
|
||
namespace influxdb::transports | ||
{ | ||
namespace | ||
{ | ||
std::size_t GetSocketSendBufferSize(const boost::asio::ip::udp::socket& socket) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor; |
||
{ | ||
boost::asio::ip::udp::socket::send_buffer_size sendBufferSizeOption; | ||
socket.get_option(sendBufferSizeOption); | ||
int sendBufferSize{sendBufferSizeOption.value()}; | ||
return (sendBufferSize >= 0 ? static_cast<std::size_t>(sendBufferSize) : 0U); | ||
} | ||
} // namespace | ||
|
||
|
||
UDP::UDP(const std::string& hostname, int port) | ||
UDP::UDP(const std::string& hostname, std::uint16_t port) | ||
: mSocket(mIoService, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)) | ||
{ | ||
boost::asio::ip::udp::resolver resolver(mIoService); | ||
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), hostname, std::to_string(port)); | ||
boost::asio::ip::udp::resolver::iterator resolverInerator = resolver.resolve(query); | ||
mEndpoint = *resolverInerator; | ||
try | ||
{ | ||
// "A successful call to this function is guaranteed to return a non-empty range." | ||
// https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/reference/ip__basic_resolver/resolve/overload7.html | ||
mEndpoint = *(resolver.resolve(boost::asio::ip::udp::v4(), hostname, std::to_string(port))); | ||
} | ||
catch (const boost::system::system_error& e) | ||
{ | ||
throw InfluxDBException(e.what()); | ||
} | ||
} | ||
|
||
void UDP::send(std::string&& message) | ||
|
@@ -53,4 +73,21 @@ namespace influxdb::transports | |
} | ||
} | ||
|
||
std::size_t UDP::getMaxMessageSize() const | ||
{ | ||
// UDP header has a 16-bit length field | ||
static constexpr std::size_t maxLengthValue{(std::numeric_limits<std::uint16_t>::max)()}; | ||
static constexpr std::size_t udpHeaderSize{8}; | ||
// Currently only IPv4 is supported | ||
static constexpr std::size_t ipv4HeaderSize{20}; | ||
// Max UDP data size for IPv4 is 65535 - 8 - 20 = 65507 | ||
static constexpr std::size_t maxUDPDataSize{maxLengthValue - udpHeaderSize - ipv4HeaderSize}; | ||
|
||
// MacOS has a default UDP send buffer size which is smaller than maxUDPDataSize | ||
// this can be changed by setting the sysctl net.inet.udp.maxdgram or setting the | ||
// SO_SNDBUF option on a per socket basis. For our purposes we can just use the | ||
// smaller of maxUDPDataSize and the send buffer size for the socket. | ||
return std::min(maxUDPDataSize, GetSocketSendBufferSize(mSocket)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about other systems? Is the behaviour consistent across OS types? |
||
} | ||
|
||
} // namespace influxdb::transports |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the switch from
apt-get
toapt
? Did you experience any connection issues?