-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect UDP maximum packet size when sending batched points #185
Conversation
…ion for unsuccessful call
…cifying a valid port number
…hey can transmit using their send method
…raise an exception when all other points have been sent. Also, ensure that the connection's point batch is definitely cleared even if there is a transmission error.
… size or the UDP theoretical limit (for MacOS)
…event unwanted macro expansion in MSVC see https://stackoverflow.com/questions/27442885/syntax-error-with-stdnumeric-limitsmax
…try attempts at installing packages This is a workaround for intermittent 404 errors for the Azure apt mirror
@@ -29,7 +29,9 @@ jobs: | |||
run: script/ci_setup.sh | |||
- name: Install Boost | |||
if: ${{ matrix.boost == true }} | |||
run: apt-get install -y libboost-system-dev | |||
run: | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the switch from apt-get
to apt
? Did you experience any connection issues?
@@ -103,12 +103,19 @@ namespace influxdb::internal | |||
|
|||
std::unique_ptr<Transport> withUdpTransport(const http::url& uri) | |||
{ | |||
return std::make_unique<transports::UDP>(uri.host, uri.port); | |||
static constexpr std::uint16_t INFLUXDB_UDP_PORT{8089}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a port as something a caller has to provide (like the host address) and not use any assumptions – unless there's a really, really good reason to do so.
(same for tcp below)
@@ -107,6 +109,11 @@ namespace influxdb::transports | |||
checkResponse(response); | |||
} | |||
|
|||
std::size_t HTTP::getMaxMessageSize() const | |||
{ | |||
return (std::numeric_limits<std::size_t>::max)(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of handling a huge number, we could simple use 0
(= off) and skip any splitting logic.
{ | ||
/// Group the points into the largest possible line-protocol messages that can be sent using the transport. | ||
template <typename PointContainer> | ||
void TransmitBatch(std::unique_ptr<Transport>& transport, const std::string& globalTags, PointContainer&& points) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the function doesn't change the unique_ptr
itself, pass the transport by const-ref instead.
Also, please use transmitBatch()
(lower case t) naming – I know this isn't used consistent everywhere (yet) :-)
(both same for the function below)
#include <string> | ||
|
||
namespace influxdb::transports | ||
{ | ||
namespace | ||
{ | ||
std::size_t GetSocketSendBufferSize(const boost::asio::ip::udp::socket& socket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor; getSocketSendBufferSize()
(see naming comment above).
// this can be changed by setting the sysctl net.inet.udp.maxdgram or setting the | ||
// SO_SNDBUF option on a per socket basis. For our purposes we can just use the | ||
// smaller of maxUDPDataSize and the send buffer size for the socket. | ||
return std::min(maxUDPDataSize, GetSocketSendBufferSize(mSocket)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about other systems? Is the behaviour consistent across OS types?
|
||
namespace http | ||
{ | ||
struct url | ||
{ | ||
std::string protocol, user, password, host, path, search, url; | ||
static constexpr int PORT_NOT_SET = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(see port related comment above)
std::string p4Line{FormatPoint(p4)}; | ||
// Set transport max message size to accommodate three test points (with newline delimiters) | ||
const std::size_t maxMessageSize{(p4Line.size() * 3) + 2}; | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something missing here? :-)
} | ||
} | ||
|
||
TEST_CASE("UDP Transport", "[InfluxDBST]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is some kind of new test suite, maybe be should extract this to a dedicated source file (to keep each on it's own topic).
Thanks for your PR! I have some comments, but my (personal) overall opinion is, that this adds quite some complexity to everyone just to handle a single (less used?) transport. Instead, I'm tempted to consider deprecating (and finally removing) UDP support, even more as v2 wont support it anymore (questdb did too). |
You're welcome. Thank you for the review comments (which I'm happy to address) and the opportunity to discuss the best approach to this 🙂
You're absolutely right, this definitely adds complexity to the transmission of batched (or vectors of) points. As a slight mitigation it does consolidate two code paths (sending unbatched point vectors and flushing batched points) into a single function. Perhaps we can find middle ground which avoids the complexity for Transports with no transmission limits?
You'd obviously be completely within your rights to do so. Unfortunately, that would be problematic from my perspective. To explain why, it might be helpful to understand the use-case behind the PR. My team are replacing a legacy system which sends large quantities of data to another system's v1 InfluxDB using UDP. We want to perform as few writes as possible (hence the batching) so that Influx is likely to flush related batches of points to the measurements together. This mirrors the behaviour of the system being replaced. During integration testing we noticed the issue with batched points exceeding the UDP packet size limit which would cause data loss as far as the other system was concerned. We don't have the luxury of replacing the receiving system and I don't think this situation will be unique. I would suggest that a combination of inertia and aversion to change most likely means that Influx v1 and the UDP transport will be around for quite some time especially in embedded and industrial markets. It would be great if we can maintain UDP support, I think it may be more widely used than you might think 🙂 |
Thanks for the insight, it helps a lot to understand the situation.
An ideal solution would be to stay within UDP transport implementation. 👍
👍 |
There are two little things that are worth merging independently as they provide profit already: Using uint16 ports (TCP / UDP ctor) and the If you have some minutes spare, could you submit those as a separate PR? |
Some thoughts (not necessarily good though):
|
Thanks for the feedback, very much appreciated. I will get around to doing the separate PRs over the next few days 🙂 |
While playing around with this idea it turned out to be quite promising 👍. I'm going to write a basic PoC so we'll see if it's a feasible solution. |
This closes #184.
Adds a new mandatory member function of the
Transport
base class calledgetMaxMessageSize
. This function must returns the size of the maximum message which may be transmitted using the Transport implementation'ssend
method.UDP
transport this is set to the minimum of the UDP socket's send buffer (for MacOS) or the maximum data length which is practical in the UDP protocol after header sizes are taken into account (65507
).N.B. This may cause packet fragmentation at the IP layer but since UDP makes no delivery guarantees anyway I think this is an acceptable compromise when the user has chosen to batch write points over UDP.
HTTP
,TCP
,UnixSocket
) this is to effectively unlimited (std::numeric_limits<std::size_t>::max()
) to preserve existing behaviour.The
InfluxDB::flushBatch
method is updated to construct the largest possible messages which can be successfully sent given themTransport
in use.If any individual Point is too large to be successfully sent (i.e.: its Line Protocol representation is larger than the maximum message size) it is skipped and the other Points in the batch are sent. An exception is subsequently raised.