Skip to content

Commit

Permalink
fix: properly seriailize meta buffer in SendStringArrInternal (dragon…
Browse files Browse the repository at this point in the history
…flydb#3455)

Fixes dragonflydb#3449 that was introduced by dragonflydb#3425

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 6, 2024
1 parent e482eef commit 420046a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
45 changes: 26 additions & 19 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,47 +593,54 @@ void RedisReplyBuilder::SendStringArrInternal(
serialize_len(type_char[0], header_len);
unsigned vec_indx = 0;
string_view src;

#define FLUSH_IOVEC() \
do { \
Send(vec.data(), vec_indx); \
if (ec_) \
return; \
vec_indx = 0; \
next = meta.data(); \
} while (false)

for (unsigned i = 0; i < size; ++i) {
DCHECK_LT(vec_indx, vec_cap);

src = producer(i);
serialize_len('$', src.size());

// copy data either by referencing via an iovec or copying inline into meta buf.
constexpr size_t kSSOLen = 32;
if (src.size() > kSSOLen) {
if (vec_indx + 1 >= vec_cap) {
Send(vec.data(), vec_indx);
if (ec_)
return;
vec_indx = 0;
next = meta.data();
}

// reference metadata blob before referencing another vector.
DCHECK_GT(next - start, 0);
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
start = next;
if (vec_indx >= vec_cap) {
FLUSH_IOVEC();
}

DCHECK_LT(vec_indx, vec.size());
vec[vec_indx++] = IoVec(src);
if (vec_indx >= vec_cap) {
FLUSH_IOVEC();
}
start = next;
} else if (src.size() > 0) {
// NOTE!: this is not just optimization. producer may returns a string_piece that will
// be overriden for the next call, so we must do this for correctness.
memcpy(next, src.data(), src.size());
next += src.size();
}

constexpr ptrdiff_t kMargin = kSSOLen + 3 /*$\r\n*/ + 2 /*length*/ + 2 /* \r\n*/; // metadata
// how much buffer we need to perform the next iteration.
constexpr ptrdiff_t kMargin = kSSOLen + 3 /* $\r\n */ + 2 /*length*/ + 2 /* \r\n */;

// Keep at least kMargin bytes for a small string as well as its length.
if (vec_indx >= vec.size() || ((meta.end() - next) <= kMargin)) {
if (kMargin >= meta.end() - next) {
// Flush the iovec array.
DVLOG(2) << "i=" << i << "meta size=" << next - meta.data();
Send(vec.data(), vec_indx);
if (ec_)
return;

vec_indx = 0;
start = meta.data();
next = start;
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
FLUSH_IOVEC();
start = next;
}
*next++ = '\r';
*next++ = '\n';
Expand Down
14 changes: 13 additions & 1 deletion src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RedisReplyBuilderTest : public testing::Test {
}

bool IsError() const {
return args.size() == 1 && args[0].type == RespExpr::ERROR;
return result != RedisParser::OK || (args.size() == 1 && args[0].type == RespExpr::ERROR);
}

bool IsOk() const {
Expand Down Expand Up @@ -951,6 +951,18 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) {
ASSERT_EQ(TakePayload(), "$16\r\nA simple string!\r\n") << "Resp3 VerbatimString TXT failed.";
}

TEST_F(RedisReplyBuilderTest, Issue3449) {
vector<string> records;
for (unsigned i = 0; i < 10'000; ++i) {
records.push_back(absl::StrCat(i));
}
builder_->SendStringArr(records);
ASSERT_TRUE(NoErrors());
ParsingResults parse_result = Parse();
ASSERT_FALSE(parse_result.IsError());
EXPECT_EQ(10000, parse_result.args.size());
}

static void BM_FormatDouble(benchmark::State& state) {
vector<double> values;
char buf[64];
Expand Down

0 comments on commit 420046a

Please sign in to comment.