From 75d6b1de0d4cce64ff2a5c0ba6e38122d598b645 Mon Sep 17 00:00:00 2001 From: Runming Wu Date: Thu, 9 Nov 2023 17:28:32 -0800 Subject: [PATCH] Return both DEL and SET requests in ConsumerStateTable pops() when both types of requests are queued. Before this change, it will only return the SET request. The DEL request is lost which can cause issues in some scenarios. Bug: 304818797 Sanity-Workflow: https://fusion2.corp.google.com/4ae50929-4d85-3bd3-8f65-5cd8a5690761 Change-Id: I0e87185468cd12cbf978941c458393437e7f79d4 --- common/consumer_state_table_pops.lua | 5 ++++ tests/redis_piped_state_ut.cpp | 16 ++++++++----- tests/redis_state_ut.cpp | 36 +++++++++++++++------------- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/common/consumer_state_table_pops.lua b/common/consumer_state_table_pops.lua index b5dcbe7d..90301ede 100644 --- a/common/consumer_state_table_pops.lua +++ b/common/consumer_state_table_pops.lua @@ -13,6 +13,11 @@ for i = 1, n do end -- Push the new set of field/value for this key in table local fieldvalues = redis.call('HGETALL', stateprefix..tablename..key) + if num == 1 and next(fieldvalues) then + -- If we have DEL request and SET request, we will return both requests + -- DEL request will come first before the SET request + table.insert(ret, {key, {}}) + end table.insert(ret, {key, fieldvalues}) for i = 1, #fieldvalues, 2 do redis.call('HSET', tablename..key, fieldvalues[i], fieldvalues[i + 1]) diff --git a/tests/redis_piped_state_ut.cpp b/tests/redis_piped_state_ut.cpp index ca329190..760b9f81 100644 --- a/tests/redis_piped_state_ut.cpp +++ b/tests/redis_piped_state_ut.cpp @@ -325,12 +325,16 @@ TEST(ConsumerStateTable, async_set_del_set) { int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); - KeyOpFieldsValuesTuple kco; - c.pop(kco); - EXPECT_EQ(kfvKey(kco), key); - EXPECT_EQ(kfvOp(kco), "SET"); - - auto fvs = kfvFieldsValues(kco); + std::deque vkco; + c.pops(vkco); + ASSERT_EQ(vkco.size(), 2); + EXPECT_EQ(kfvKey(vkco[0]), key); + EXPECT_EQ(kfvOp(vkco[0]), "DEL"); + auto fvs = kfvFieldsValues(vkco[0]); + EXPECT_EQ(fvs.size(), 0); + EXPECT_EQ(kfvKey(vkco[1]), key); + EXPECT_EQ(kfvOp(vkco[1]), "SET"); + fvs = kfvFieldsValues(vkco[1]); EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields); map mm; diff --git a/tests/redis_state_ut.cpp b/tests/redis_state_ut.cpp index 9c0d7512..4898a065 100644 --- a/tests/redis_state_ut.cpp +++ b/tests/redis_state_ut.cpp @@ -341,12 +341,16 @@ TEST(ConsumerStateTable, set_del_set) { int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); - KeyOpFieldsValuesTuple kco; - c.pop(kco); - EXPECT_EQ(kfvKey(kco), key); - EXPECT_EQ(kfvOp(kco), "SET"); - - auto fvs = kfvFieldsValues(kco); + std::deque vkco; + c.pops(vkco); + ASSERT_EQ(vkco.size(), 2); + EXPECT_EQ(kfvKey(vkco[0]), key); + EXPECT_EQ(kfvOp(vkco[0]), "DEL"); + auto fvs = kfvFieldsValues(vkco[0]); + EXPECT_EQ(fvs.size(), 0); + EXPECT_EQ(kfvKey(vkco[1]), key); + EXPECT_EQ(kfvOp(vkco[1]), "SET"); + fvs = kfvFieldsValues(vkco[1]); EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields); map mm; @@ -428,8 +432,6 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get) } } - /* Del operation and second set operation will be merged */ - /* Del operation */ p.del(key); @@ -448,14 +450,16 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get) { int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); - KeyOpFieldsValuesTuple kco; - c.pop(kco); - EXPECT_EQ(kfvKey(kco), key); - EXPECT_EQ(kfvOp(kco), "SET"); - - auto fvs = kfvFieldsValues(kco); - - /* size of fvs should be maxNumOfFields, no "field 1" left from first set*/ + std::deque vkco; + c.pops(vkco); + ASSERT_EQ(vkco.size(), 2); + EXPECT_EQ(kfvKey(vkco[0]), key); + EXPECT_EQ(kfvOp(vkco[0]), "DEL"); + auto fvs = kfvFieldsValues(vkco[0]); + EXPECT_EQ(fvs.size(), 0); + EXPECT_EQ(kfvKey(vkco[1]), key); + EXPECT_EQ(kfvOp(vkco[1]), "SET"); + fvs = kfvFieldsValues(vkco[1]); EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields); map mm;