diff --git a/common/consumer_state_table_pops.lua b/common/consumer_state_table_pops.lua index b5dcbe7d4..90301eded 100644 --- a/common/consumer_state_table_pops.lua +++ b/common/consumer_state_table_pops.lua @@ -13,6 +13,11 @@ for i = 1, n do end -- Push the new set of field/value for this key in table local fieldvalues = redis.call('HGETALL', stateprefix..tablename..key) + if num == 1 and next(fieldvalues) then + -- If we have DEL request and SET request, we will return both requests + -- DEL request will come first before the SET request + table.insert(ret, {key, {}}) + end table.insert(ret, {key, fieldvalues}) for i = 1, #fieldvalues, 2 do redis.call('HSET', tablename..key, fieldvalues[i], fieldvalues[i + 1]) diff --git a/tests/redis_piped_state_ut.cpp b/tests/redis_piped_state_ut.cpp index ca3291907..760b9f814 100644 --- a/tests/redis_piped_state_ut.cpp +++ b/tests/redis_piped_state_ut.cpp @@ -325,12 +325,16 @@ TEST(ConsumerStateTable, async_set_del_set) { int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); - KeyOpFieldsValuesTuple kco; - c.pop(kco); - EXPECT_EQ(kfvKey(kco), key); - EXPECT_EQ(kfvOp(kco), "SET"); - - auto fvs = kfvFieldsValues(kco); + std::deque vkco; + c.pops(vkco); + ASSERT_EQ(vkco.size(), 2); + EXPECT_EQ(kfvKey(vkco[0]), key); + EXPECT_EQ(kfvOp(vkco[0]), "DEL"); + auto fvs = kfvFieldsValues(vkco[0]); + EXPECT_EQ(fvs.size(), 0); + EXPECT_EQ(kfvKey(vkco[1]), key); + EXPECT_EQ(kfvOp(vkco[1]), "SET"); + fvs = kfvFieldsValues(vkco[1]); EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields); map mm; diff --git a/tests/redis_state_ut.cpp b/tests/redis_state_ut.cpp index 9c0d75125..4898a0653 100644 --- a/tests/redis_state_ut.cpp +++ b/tests/redis_state_ut.cpp @@ -341,12 +341,16 @@ TEST(ConsumerStateTable, set_del_set) { int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); - KeyOpFieldsValuesTuple kco; - c.pop(kco); - EXPECT_EQ(kfvKey(kco), key); - EXPECT_EQ(kfvOp(kco), "SET"); - - auto fvs = kfvFieldsValues(kco); + std::deque vkco; + c.pops(vkco); + ASSERT_EQ(vkco.size(), 2); + EXPECT_EQ(kfvKey(vkco[0]), key); + EXPECT_EQ(kfvOp(vkco[0]), "DEL"); + auto fvs = kfvFieldsValues(vkco[0]); + EXPECT_EQ(fvs.size(), 0); + EXPECT_EQ(kfvKey(vkco[1]), key); + EXPECT_EQ(kfvOp(vkco[1]), "SET"); + fvs = kfvFieldsValues(vkco[1]); EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields); map mm; @@ -428,8 +432,6 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get) } } - /* Del operation and second set operation will be merged */ - /* Del operation */ p.del(key); @@ -448,14 +450,16 @@ TEST(ConsumerStateTable, set_pop_del_set_pop_get) { int ret = cs.select(&selectcs); EXPECT_EQ(ret, Select::OBJECT); - KeyOpFieldsValuesTuple kco; - c.pop(kco); - EXPECT_EQ(kfvKey(kco), key); - EXPECT_EQ(kfvOp(kco), "SET"); - - auto fvs = kfvFieldsValues(kco); - - /* size of fvs should be maxNumOfFields, no "field 1" left from first set*/ + std::deque vkco; + c.pops(vkco); + ASSERT_EQ(vkco.size(), 2); + EXPECT_EQ(kfvKey(vkco[0]), key); + EXPECT_EQ(kfvOp(vkco[0]), "DEL"); + auto fvs = kfvFieldsValues(vkco[0]); + EXPECT_EQ(fvs.size(), 0); + EXPECT_EQ(kfvKey(vkco[1]), key); + EXPECT_EQ(kfvOp(vkco[1]), "SET"); + fvs = kfvFieldsValues(vkco[1]); EXPECT_EQ(fvs.size(), (unsigned int)maxNumOfFields); map mm;