diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index e59aee1b91f..bda85b1bcf0 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -692,7 +692,8 @@ StatusOr SlotMigrator::migrateOneKey(const rocksdb::Slice &k // Construct command according to type of the key switch (metadata.Type()) { - case kRedisString: { + case kRedisString: + case kRedisJson: { auto s = migrateSimpleKey(key, metadata, bytes, restore_cmds); if (!s.IsOK()) { return s.Prefixed("failed to migrate simple key"); @@ -738,13 +739,32 @@ StatusOr SlotMigrator::migrateOneKey(const rocksdb::Slice &k Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes, std::string *restore_cmds) { - std::vector command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))}; - if (metadata.expire > 0) { - command.emplace_back("PXAT"); - command.emplace_back(std::to_string(metadata.expire)); + if (metadata.Type() == kRedisString) { + std::vector command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))}; + if (metadata.expire > 0) { + command.emplace_back("PXAT"); + command.emplace_back(std::to_string(metadata.expire)); + } + *restore_cmds += redis::ArrayOfBulkStrings(command); + current_pipeline_size_++; + } else if (metadata.Type() == kRedisJson) { + // kRedisJson + JsonValue json_value; + if (auto s = redis::Json::FromString(bytes, &json_value); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } + auto json_bytes = GET_OR_RET(json_value.Dump()); + std::vector command = {"JSON.SET", key.ToString(), "$", std::move(json_bytes)}; + *restore_cmds += redis::ArrayOfBulkStrings(command); + current_pipeline_size_++; + + if (metadata.expire > 0) { + *restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)}); + current_pipeline_size_++; + } + } else { + return {Status::NotOK, "unsupported simple key type"}; } - *restore_cmds += redis::ArrayOfBulkStrings(command); - current_pipeline_size_++; // Check whether pipeline needs to be sent // TODO(chrisZMF): Resend data if failed to send data diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 8ab8f04ee58..4a1b3c85c41 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -69,6 +69,18 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); } + } else if (metadata.Type() == kRedisJson) { + JsonValue json_value; + s = redis::Json::FromString(value.ToString(), &json_value); + if (!s.ok()) return s; + auto json_bytes = json_value.Dump(); + if (!json_bytes) return rocksdb::Status::Corruption(json_bytes.Msg()); + command_args = {"JSON.SET", user_key, "$", json_bytes.GetValue()}; + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); + if (metadata.expire > 0) { + command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)}; + resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args)); + } } else if (metadata.expire > 0) { auto args = log_data_.GetArguments(); if (args->size() > 0) { diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 26864f3ee8b..c0a0471556e 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -47,21 +47,10 @@ Database::Database(engine::Storage *storage, std::string ns) metadata_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Metadata)), namespace_(std::move(ns)) {} -// Some data types may support reading multiple types of metadata. -// For example, bitmap supports reading string metadata and bitmap metadata. rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata) { std::string old_metadata; metadata->Encode(&old_metadata); - bool is_keyspace_hit = false; - ScopeExit se([this, &is_keyspace_hit] { - if (is_keyspace_hit) { - storage_->RecordStat(engine::StatType::KeyspaceHits, 1); - } else { - storage_->RecordStat(engine::StatType::KeyspaceMisses, 1); - } - }); - auto s = metadata->Decode(bytes); // delay InvalidArgument error check after type match check if (!s.ok() && !s.IsInvalidArgument()) return s; @@ -85,7 +74,14 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata auto _ [[maybe_unused]] = metadata->Decode(old_metadata); return rocksdb::Status::NotFound("no element found"); } - is_keyspace_hit = true; + return s; +} + +// Some data types may support reading multiple types of metadata. +// For example, bitmap supports reading string metadata and bitmap metadata. +rocksdb::Status Database::ParseMetadataWithStats(RedisTypes types, Slice *bytes, Metadata *metadata) { + auto s = ParseMetadata(types, bytes, metadata); + storage_->RecordStat(s.ok() ? engine::StatType::KeyspaceHits : engine::StatType::KeyspaceHits, 1); return s; } @@ -100,7 +96,7 @@ rocksdb::Status Database::GetMetadata(engine::Context &ctx, RedisTypes types, co auto s = GetRawMetadata(ctx, ns_key, raw_value); *rest = *raw_value; if (!s.ok()) return s; - return ParseMetadata(types, rest, metadata); + return ParseMetadataWithStats(types, rest, metadata); } rocksdb::Status Database::GetRawMetadata(engine::Context &ctx, const Slice &ns_key, std::string *bytes) { diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index 41ed3daeb24..9563b1e1d6a 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -75,7 +75,9 @@ class Database { explicit Database(engine::Storage *storage, std::string ns = ""); /// Parsing metadata with type of `types` from bytes, the metadata is a base class of all metadata. /// When parsing, the bytes will be consumed. - [[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata); + [[nodiscard]] rocksdb::Status ParseMetadataWithStats(RedisTypes types, Slice *bytes, Metadata *metadata); + // ParseMetadata behaves the same as ParseMetadataWithStats, but without recording stats. + [[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata); /// GetMetadata is a helper function to get metadata from the database. It will read the "raw metadata" /// from underlying storage, and then parse the raw metadata to the specified metadata type. /// diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index e75deea3644..d7216d7dfbd 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -101,7 +101,7 @@ rocksdb::Status Bitmap::GetMetadata(engine::Context &ctx, const Slice &ns_key, B if (!s.ok()) return s; Slice slice = *raw_value; - return ParseMetadata({kRedisBitmap, kRedisString}, &slice, metadata); + return ParseMetadataWithStats({kRedisBitmap, kRedisString}, &slice, metadata); } rocksdb::Status Bitmap::GetBit(engine::Context &ctx, const Slice &user_key, uint32_t bit_offset, bool *bit) { diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index ba331ef96ff..b08772195dd 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -630,7 +630,7 @@ std::vector Json::readMulti(engine::Context &ctx, const std::ve if (!statuses[i].ok()) continue; Slice rest(pin_values[i].data(), pin_values[i].size()); JsonMetadata metadata; - statuses[i] = ParseMetadata({kRedisJson}, &rest, &metadata); + statuses[i] = ParseMetadataWithStats({kRedisJson}, &rest, &metadata); if (!statuses[i].ok()) continue; statuses[i] = parse(metadata, rest, &values[i]); @@ -674,4 +674,12 @@ rocksdb::Status Json::Resp(engine::Context &ctx, const std::string &user_key, co return rocksdb::Status::OK(); } +rocksdb::Status Json::FromString(const std::string &value, JsonValue *result) { + Slice rest = value; + JsonMetadata metadata; + auto s = ParseMetadata({kRedisJson}, &rest, &metadata); + if (!s.ok()) return s; + return parse(metadata, rest, result); +} + } // namespace redis diff --git a/src/types/redis_json.h b/src/types/redis_json.h index 54831472dc1..b55da638d0f 100644 --- a/src/types/redis_json.h +++ b/src/types/redis_json.h @@ -82,6 +82,7 @@ class Json : public Database { rocksdb::Status Resp(engine::Context &ctx, const std::string &user_key, const std::string &path, std::vector *results, RESP resp); + static rocksdb::Status FromString(const std::string &value, JsonValue *result); private: rocksdb::Status write(engine::Context &ctx, Slice ns_key, JsonMetadata *metadata, const JsonValue &json_val); diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc index 210fb697b91..7d7f476d878 100644 --- a/src/types/redis_string.cc +++ b/src/types/redis_string.cc @@ -47,7 +47,7 @@ std::vector String::getRawValues(engine::Context &ctx, const st (*raw_values)[i].assign(pin_values[i].data(), pin_values[i].size()); Metadata metadata(kRedisNone, false); Slice slice = (*raw_values)[i]; - auto s = ParseMetadata({kRedisString}, &slice, &metadata); + auto s = ParseMetadataWithStats({kRedisString}, &slice, &metadata); if (!s.ok()) { statuses[i] = s; (*raw_values)[i].clear(); @@ -65,7 +65,7 @@ rocksdb::Status String::getRawValue(engine::Context &ctx, const std::string &ns_ Metadata metadata(kRedisNone, false); Slice slice = *raw_value; - return ParseMetadata({kRedisString}, &slice, &metadata); + return ParseMetadataWithStats({kRedisString}, &slice, &metadata); } rocksdb::Status String::getValueAndExpire(engine::Context &ctx, const std::string &ns_key, std::string *value, diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index d1ddbb3d6a8..e04acfc4fb9 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -535,7 +535,7 @@ func TestSlotMigrateDataType(t *testing.T) { testSlot += 1 keys := make(map[string]string, 0) - for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { + for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream", "json"} { keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) } @@ -590,8 +590,9 @@ func TestSlotMigrateDataType(t *testing.T) { }).Err()) } require.NoError(t, rdb0.Expire(ctx, keys["stream"], 10*time.Second).Err()) + require.NoError(t, rdb0.JSONSet(ctx, keys["json"], "$", `{"a": 1, "b": "hello"}`).Err()) // check source data existence - for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { + for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream", "json"} { require.EqualValues(t, 1, rdb0.Exists(ctx, keys[typ]).Val()) } // get source data @@ -653,6 +654,8 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, 19, streamInfo.EntriesAdded) require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) require.EqualValues(t, 19, streamInfo.Length) + // type json + require.Equal(t, `{"a":1,"b":"hello"}`, rdb1.JSONGet(ctx, keys["json"]).Val()) // topology is changed on source server for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { require.ErrorContains(t, rdb0.Exists(ctx, keys[typ]).Err(), "MOVED") diff --git a/utils/kvrocks2redis/tests/check_consistency.py b/utils/kvrocks2redis/tests/check_consistency.py index 71b8b6b792f..f813154d77a 100644 --- a/utils/kvrocks2redis/tests/check_consistency.py +++ b/utils/kvrocks2redis/tests/check_consistency.py @@ -60,6 +60,11 @@ def _compare_bitmap_data(self, key, pos): dst_data = self.dst_cli.getbit(key, pos) return src_data, dst_data + def _compare_json_data(self, key): + src_data = self.src_cli.json_get(key, '$') + dst_data = self.dst_cli.json_get(key, '$') + return src_data, dst_data + def _compare_data(self, keys : list, data_type): if data_type == "string": return self._compare_string_data(keys[0]) @@ -73,6 +78,8 @@ def _compare_data(self, keys : list, data_type): return self._compare_zset_data(keys[0]) elif data_type == 'bitmap': return self._compare_bitmap_data(keys[0], keys[1]) + elif data_type == 'json': + return self._compare_json_data(keys[0]) elif data_type == 'none': return self.src_cli.type(keys[0]), 'none' else: diff --git a/utils/kvrocks2redis/tests/populate-kvrocks.py b/utils/kvrocks2redis/tests/populate-kvrocks.py index fefae86a0fe..81c71d66df3 100644 --- a/utils/kvrocks2redis/tests/populate-kvrocks.py +++ b/utils/kvrocks2redis/tests/populate-kvrocks.py @@ -31,6 +31,9 @@ [('set', 'foo', 1), True], [('setex', 'foo_ex', 3600, 1), True], ]), + ('json', [ + [('JSON.SET', 'jfoo', '$', '{"a":1,"b":2}'), True], + ]), ('zset', [ [('zadd', 'zfoo', 1, 'a', 2, 'b', 3, 'c'), 3] ]),