Skip to content

Commit

Permalink
Return both DEL and SET requests in ConsumerStateTable pops() when
Browse files Browse the repository at this point in the history
both types of requests are queued.

Before this change, it will only return the SET request. The DEL request
is lost which can cause issues in some scenarios.

Bug: 304818797
Sanity-Workflow: https://fusion2.corp.google.com/4ae50929-4d85-3bd3-8f65-5cd8a5690761
Change-Id: I0e87185468cd12cbf978941c458393437e7f79d4
  • Loading branch information
mint570 committed Nov 14, 2023
1 parent 0cbceed commit 75d6b1d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 22 deletions.
5 changes: 5 additions & 0 deletions common/consumer_state_table_pops.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ for i = 1, n do
end
-- Push the new set of field/value for this key in table
local fieldvalues = redis.call('HGETALL', stateprefix..tablename..key)
if num == 1 and next(fieldvalues) then
-- If we have DEL request and SET request, we will return both requests
-- DEL request will come first before the SET request
table.insert(ret, {key, {}})
end
table.insert(ret, {key, fieldvalues})
for i = 1, #fieldvalues, 2 do
redis.call('HSET', tablename..key, fieldvalues[i], fieldvalues[i + 1])
Expand Down
16 changes: 10 additions & 6 deletions tests/redis_piped_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,16 @@ TEST(ConsumerStateTable, async_set_del_set)
{
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
c.pop(kco);
EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "SET");

auto fvs = kfvFieldsValues(kco);
std::deque<KeyOpFieldsValuesTuple> vkco;
c.pops(vkco);
ASSERT_EQ(vkco.size(), 2);
EXPECT_EQ(kfvKey(vkco[0]), key);
EXPECT_EQ(kfvOp(vkco[0]), "DEL");
auto fvs = kfvFieldsValues(vkco[0]);
EXPECT_EQ(fvs.size(), 0);
EXPECT_EQ(kfvKey(vkco[1]), key);
EXPECT_EQ(kfvOp(vkco[1]), "SET");
fvs = kfvFieldsValues(vkco[1]);
EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields);

map<string, string> mm;
Expand Down
36 changes: 20 additions & 16 deletions tests/redis_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,16 @@ TEST(ConsumerStateTable, set_del_set)
{
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
c.pop(kco);
EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "SET");

auto fvs = kfvFieldsValues(kco);
std::deque<KeyOpFieldsValuesTuple> vkco;
c.pops(vkco);
ASSERT_EQ(vkco.size(), 2);
EXPECT_EQ(kfvKey(vkco[0]), key);
EXPECT_EQ(kfvOp(vkco[0]), "DEL");
auto fvs = kfvFieldsValues(vkco[0]);
EXPECT_EQ(fvs.size(), 0);
EXPECT_EQ(kfvKey(vkco[1]), key);
EXPECT_EQ(kfvOp(vkco[1]), "SET");
fvs = kfvFieldsValues(vkco[1]);
EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields);

map<string, string> mm;
Expand Down Expand Up @@ -428,8 +432,6 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get)
}
}

/* Del operation and second set operation will be merged */

/* Del operation */
p.del(key);

Expand All @@ -448,14 +450,16 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get)
{
int ret = cs.select(&selectcs);
EXPECT_EQ(ret, Select::OBJECT);
KeyOpFieldsValuesTuple kco;
c.pop(kco);
EXPECT_EQ(kfvKey(kco), key);
EXPECT_EQ(kfvOp(kco), "SET");

auto fvs = kfvFieldsValues(kco);

/* size of fvs should be maxNumOfFields, no "field 1" left from first set*/
std::deque<KeyOpFieldsValuesTuple> vkco;
c.pops(vkco);
ASSERT_EQ(vkco.size(), 2);
EXPECT_EQ(kfvKey(vkco[0]), key);
EXPECT_EQ(kfvOp(vkco[0]), "DEL");
auto fvs = kfvFieldsValues(vkco[0]);
EXPECT_EQ(fvs.size(), 0);
EXPECT_EQ(kfvKey(vkco[1]), key);
EXPECT_EQ(kfvOp(vkco[1]), "SET");
fvs = kfvFieldsValues(vkco[1]);
EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields);

map<string, string> mm;
Expand Down

0 comments on commit 75d6b1d

Please sign in to comment.