Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-3182] Schema Registry json external references #24125

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
77 changes: 65 additions & 12 deletions src/v/pandaproxy/schema_registry/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ json::Pointer to_json_pointer(std::string_view sv) {
return candidate;
}

json::Pointer to_json_pointer(const jsoncons::jsonpointer::json_pointer& jp) {
return to_json_pointer(jp.to_string());
}

// helper to convert a jsoncons::ojson to a rapidjson::Document
json::Document to_json_document(const jsoncons::ojson& oj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd name this something like to_rapidjson or to_rpjson_document to signal in the name that this is converting between different library representations

// serialize the input in a iobuf and parse it again
Expand Down Expand Up @@ -506,11 +510,11 @@ try_validate_json_schema(const jsoncons::ojson& schema) {
}

// forward declaration
result<document_context::local_schemas_index_t>
result<document_context_jsoncons::local_schemas_index_t>
collect_bundled_schema_and_fix_refs(
jsoncons::ojson& doc, json_schema_dialect dialect);

ss::future<document_context> parse_json(iobuf buf) {
ss::future<document_context_jsoncons> parse_jsoncons(iobuf buf) {
// parse string in json document, check it's a valid json
iobuf_istream is{buf.share(0, buf.size_bytes())};

Expand Down Expand Up @@ -568,17 +572,66 @@ ss::future<document_context> parse_json(iobuf buf) {
auto bundled_schemas_map
= collect_bundled_schema_and_fix_refs(schema, dialect).value();

auto schemas_index = document_context::schemas_index_t{
auto schemas_index = document_context_jsoncons::schemas_index_t{
std::move_iterator(bundled_schemas_map.begin()),
std::move_iterator(bundled_schemas_map.end())};

co_return document_context{
.doc = to_json_document(schema),
co_return document_context_jsoncons{
.doc = std::move(schema),
.dialect = dialect,
.schemas_index = std::move(schemas_index),
};
}

// wrapper for parse_jsoncons that perform the conversion from jsoncons::ojson
// to rapidjson::Document
ss::future<document_context> parse_json(iobuf buf) {
// we are parsing the root so we don't have a default_id
auto doc_ctx = co_await parse_jsoncons(std::move(buf));

// convert external_ptr and local_ptr to rapidjson::Pointer
constexpr static auto to_json_ctx_ptr =
[](const document_context_jsoncons::schemas_index_t::mapped_type& v) {
return ss::visit(
v,
[](const document_context_jsoncons::local_ptr& lp)
-> document_context::schemas_index_t::mapped_type {
return document_context::local_ptr{
.ptr = to_json_pointer(lp.ptr), .dialect = lp.dialect};
},
[](const document_context_jsoncons::external_ptr& ep)
-> document_context::schemas_index_t::mapped_type {
return document_context::external_ptr{
.external_schema_name = ep.external_schema_name,
.ptr = to_json_pointer(ep.ptr)};
});
};

// convert index to rapidjson
auto index_view = doc_ctx.schemas_index
| std::views::transform([](auto& p) {
return std::pair{
p.first, to_json_ctx_ptr(p.second)};
})
| std::views::common;
// convert external_schemas to rapidjson
auto external_view = doc_ctx.external_schemas
| std::views::transform([](auto& p) {
return std::pair{
p.first,
document_context::external_document_ctx{
.doc = to_json_document(p.second.doc),
.dialect = p.second.dialect}};
})
| std::views::common;
Comment on lines +794 to +804
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: std::views::common isn't required in either of these places, as the underlying range models a std::ranges::common_range.

co_return document_context{
.doc = to_json_document(doc_ctx.doc),
.dialect = doc_ctx.dialect,
.schemas_index = {index_view.begin(), index_view.end()},
.external_schemas = {external_view.begin(), external_view.end()},
};
}

/// is_superset section

// a schema O is a superset of another schema N if every schema that is valid
Expand Down Expand Up @@ -2249,7 +2302,7 @@ void sort(json::Value& val) {
}

void collect_bundled_schemas_and_fix_refs(
document_context::local_schemas_index_t& bundled_schemas,
document_context_jsoncons::local_schemas_index_t& bundled_schemas,
jsoncons::uri base_uri,
jsoncons::jsonpointer::json_pointer this_obj_ptr,
jsoncons::ojson& this_obj,
Expand Down Expand Up @@ -2343,8 +2396,8 @@ void collect_bundled_schemas_and_fix_refs(
dialect = maybe_new_dialect.value();
bundled_schemas.insert_or_assign(
to_json_id_uri(base_uri),
document_context::local_ptr{
.ptr = json::Pointer{this_obj_ptr.to_string()},
document_context_jsoncons::local_ptr{
.ptr = this_obj_ptr,
.dialect = dialect,
});
}
Expand Down Expand Up @@ -2379,7 +2432,7 @@ void collect_bundled_schemas_and_fix_refs(
}
}

result<document_context::local_schemas_index_t>
result<document_context_jsoncons::local_schemas_index_t>
collect_bundled_schema_and_fix_refs(
jsoncons::ojson& doc, json_schema_dialect dialect) {
// entry point to collect all bundled schemas
Expand All @@ -2402,10 +2455,10 @@ collect_bundled_schema_and_fix_refs(
}();

// insert the root schema as a bundled schema
auto bundled_schemas = document_context::local_schemas_index_t{
auto bundled_schemas = document_context_jsoncons::local_schemas_index_t{
{root_id,
document_context::local_ptr{
.ptr = json::Pointer{},
document_context_jsoncons::local_ptr{
.ptr = {},
.dialect = dialect,
}},
};
Expand Down