Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-3182] Schema Registry json external references #24125

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
145 changes: 129 additions & 16 deletions src/v/pandaproxy/schema_registry/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include "pandaproxy/schema_registry/compatibility.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/schema_getter.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
#include "pandaproxy/schema_registry/util.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/shared_ptr.hh>
Expand Down Expand Up @@ -512,9 +514,30 @@ try_validate_json_schema(const jsoncons::ojson& schema) {
// forward declaration
result<document_context_jsoncons::local_schemas_index_t>
collect_bundled_schema_and_fix_refs(
jsoncons::ojson& doc, json_schema_dialect dialect);

ss::future<document_context_jsoncons> parse_jsoncons(iobuf buf) {
jsoncons::ojson& doc,
json_schema_dialect dialect,
const std::optional<ss::sstring>& default_id);

// parse a iobuf into a valid json schema and supporting maps to resolve $ref to
// json objects.
// 1. buf is parsed as a json, then it's validated to ensure it's a valid json
// schema.
// 2. all the local $ref and $id are made absolute, and all the bundled schemas
// are validated and collected. 2.a if the root schema does not have an $id,
// then default_id is used as the root $id. this is done for external schemas,
// to ensure that relative refs inside them are resolved correctly.
// 3. all the external refs are resolved and collected.
// 3.a for each external ref, the schema is recursively parsed (ref name is used
// as a default_id), it's bundled schema are transformed and stored in the
// index. then the external schema is stored in the external_schemas map. The
// result is a the json object for the input, the json objects for all the
// direct and indirect external schemas, and the maps to perform $ref
// resolution, as performed in the function resolve_reference().
ss::future<document_context_jsoncons> parse_jsoncons(
schema_getter& store,
iobuf buf,
std::optional<ss::sstring> default_id,
canonical_schema_definition::references refs) {
// parse string in json document, check it's a valid json
iobuf_istream is{buf.share(0, buf.size_bytes())};

Expand Down Expand Up @@ -568,26 +591,108 @@ ss::future<document_context_jsoncons> parse_jsoncons(iobuf buf) {
? validate_json_schema(maybe_dialect.value(), schema).value()
: try_validate_json_schema(schema).value();

// this function will resolve al local ref against their respective baseuri.
auto bundled_schemas_map
= collect_bundled_schema_and_fix_refs(schema, dialect).value();

// this function will resolve al local ref against their respective
// baseuri. if we are currently in the process of parsing a external
// schema, default_id will have a value and will be used as a baseuri if
// $id is not set. this is done to ensure that relative refs inside the
// external schema do not clash with relative refs in the root schema,
// if it does not have a base_uri.
auto bundled_schemas_map = collect_bundled_schema_and_fix_refs(
schema, dialect, default_id)
.value();
auto schemas_index = document_context_jsoncons::schemas_index_t{
std::move_iterator(bundled_schemas_map.begin()),
std::move_iterator(bundled_schemas_map.end())};

// now the schema is a valid schema, the $ref are absolute, and the bundled
// schemas are collected in schemas_index. proceed to parse the external
// refs

// collect the iobufs for each external schema. all recursive external refs
// are collected, there will be no duplicated names
auto external_refs = (co_await collect_schema(store, {}, refs)).get();

auto external_schemas = document_context_jsoncons::external_schemas_map_t{};
for (auto& [ref_name, def] : external_refs) {
// recursive call to parse the external schema. it will have no external
// refs but it will have a default_id and all the relative refs will be
// resolved against it
auto ref_document = co_await parse_jsoncons(
store, std::move(def), ref_name, {});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading it correctly that this means that we're not supporting transitive external references (ie. referencing a schema that also references schemas)?


auto ref_name_as_uri = to_json_id_uri({ref_name.c_str()});
for (auto& [uri, subschema_ptr] : ref_document.schemas_index) {
auto* local_subschema_ptr
= std::get_if<document_context_jsoncons::local_ptr>(
&subschema_ptr);
if (!local_subschema_ptr) {
// this is not expected: we didn't provide any
// external refs to parse_jsoncons, so the only possible value
// is a local_ptr type
throw as_exception(error_info{
error_code::schema_invalid,
fmt::format(
"External schema '{}' contains a reference to another "
"external schema '{}'",
ref_name,
uri)});
}
// convert and save local_ptr to an external_ptr
if (!schemas_index
.emplace(
uri,
document_context_jsoncons::external_ptr{
.external_schema_name = ref_name_as_uri,
.ptr = std::move(local_subschema_ptr->ptr),
})
.second) {
// collect_schema should have ensured that there are no
// duplicated external schemas, so this means that the root
// schema already contains an index entry for a bundled schema
// inside the current external schema
throw as_exception(error_info{
error_code::schema_invalid,
fmt::format(
"Non-unique bundled schema id '{}' in external schema '{}'",
uri,
ref_name)});
}
}
// all bundled schemas are saved in the index. add an entry for the
// external schema itself, to resolve `"$ref": "ref_name_as_uri"` note:
// this key-value already exist if the external schema has no $id
schemas_index.emplace(
ref_name_as_uri,
document_context_jsoncons::external_ptr{
.external_schema_name = ref_name_as_uri,
.ptr = {},
});
// store the external schema
external_schemas.emplace(
ref_name_as_uri,
document_context_jsoncons::external_document_ctx{
.doc = std::move(ref_document.doc),
.dialect = ref_document.dialect,
});
}

co_return document_context_jsoncons{
.doc = std::move(schema),
.dialect = dialect,
.schemas_index = std::move(schemas_index),
.external_schemas = std::move(external_schemas),
};
}

// wrapper for parse_jsoncons that perform the conversion from jsoncons::ojson
// to rapidjson::Document
ss::future<document_context> parse_json(iobuf buf) {
ss::future<document_context> parse_json(
schema_getter& store,
iobuf buf,
canonical_schema_definition::references refs = {}) {
// we are parsing the root so we don't have a default_id
auto doc_ctx = co_await parse_jsoncons(std::move(buf));
auto doc_ctx = co_await parse_jsoncons(
store, std::move(buf), std::nullopt, std::move(refs));

// convert external_ptr and local_ptr to rapidjson::Pointer
constexpr static auto to_json_ctx_ptr =
Expand Down Expand Up @@ -2432,22 +2537,29 @@ void collect_bundled_schemas_and_fix_refs(
}
}

// scan the `doc` with root `dialect`, collect all bundled schemas and ensure
// that all the $ref are absolute uris. `default_id` is used as the root id if
// the schema does not have an explicit $id. this is useful for external
// schemas, to ensure that their local refs do not conflict with the root
// schema.
Comment on lines +2699 to +2701
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result<document_context_jsoncons::local_schemas_index_t>
collect_bundled_schema_and_fix_refs(
jsoncons::ojson& doc, json_schema_dialect dialect) {
jsoncons::ojson& doc,
json_schema_dialect dialect,
const std::optional<ss::sstring>& default_id) {
// entry point to collect all bundled schemas
// fetch the root id, if it exists
auto root_id = [&] {
if (!doc.is_object()) {
// might be the case for "true" or "false" schemas
return json_id_uri{""};
return json_id_uri{default_id.value_or("").c_str()};
}

auto id_it = doc.find(
dialect == json_schema_dialect::draft4 ? "id" : "$id");
if (id_it == doc.object_range().end()) {
// no explicit id, use the empty string
return json_id_uri{""};
// no explicit id, use the default_id
return json_id_uri{default_id.value_or("").c_str()};
}

// $id is set in the schema, use it as the root id
Expand Down Expand Up @@ -2483,8 +2595,9 @@ collect_bundled_schema_and_fix_refs(
} // namespace

ss::future<json_schema_definition>
make_json_schema_definition(schema_getter&, canonical_schema schema) {
auto doc = co_await parse_json(schema.def().shared_raw()());
make_json_schema_definition(schema_getter& store, canonical_schema schema) {
auto doc = co_await parse_json(
store, schema.def().shared_raw()(), schema.def().refs());
std::string_view name = schema.sub()();
auto refs = std::move(schema).def().refs();
co_return json_schema_definition{
Expand All @@ -2497,7 +2610,7 @@ ss::future<canonical_schema> make_canonical_json_schema(
auto [sub, unparsed] = std::move(unparsed_schema).destructure();
auto [def, type, refs] = std::move(unparsed).destructure();

auto ctx = co_await parse_json(std::move(def));
auto ctx = co_await parse_json(store, std::move(def));
if (norm) {
sort(ctx.doc);
std::sort(refs.begin(), refs.end());
Expand Down