From e99858f0520d6657920bb57faa4b1e0d3fa0b382 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 15:44:57 +0200 Subject: [PATCH 01/10] schema_registry/avro: move collect_schema to util.h no change in behavior, will be used for external references for json schema --- src/v/pandaproxy/BUILD | 1 + .../pandaproxy/schema_registry/CMakeLists.txt | 1 + src/v/pandaproxy/schema_registry/avro.cc | 44 +------------------ src/v/pandaproxy/schema_registry/util.cc | 33 ++++++++++++++ src/v/pandaproxy/schema_registry/util.h | 33 ++++++++++++++ 5 files changed, 69 insertions(+), 43 deletions(-) create mode 100644 src/v/pandaproxy/schema_registry/util.cc diff --git a/src/v/pandaproxy/BUILD b/src/v/pandaproxy/BUILD index 65eeaa5002b2..797a9837e724 100644 --- a/src/v/pandaproxy/BUILD +++ b/src/v/pandaproxy/BUILD @@ -152,6 +152,7 @@ redpanda_cc_library( "schema_registry/sharded_store.cc", "schema_registry/types.cc", "schema_registry/validation.cc", + "schema_registry/util.cc", "server.cc", ], hdrs = [ diff --git a/src/v/pandaproxy/schema_registry/CMakeLists.txt b/src/v/pandaproxy/schema_registry/CMakeLists.txt index f926b33ec520..e0faa0b9276f 100644 --- a/src/v/pandaproxy/schema_registry/CMakeLists.txt +++ b/src/v/pandaproxy/schema_registry/CMakeLists.txt @@ -25,6 +25,7 @@ v_cc_library( json.cc protobuf.cc validation.cc + util.cc ${schema_registry_file} DEPS v::pandaproxy_common diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 9b31f3629bc5..5e0910b6a76d 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -23,6 +23,7 @@ #include "pandaproxy/schema_registry/schema_getter.h" #include "pandaproxy/schema_registry/sharded_store.h" #include "pandaproxy/schema_registry/types.h" +#include "pandaproxy/schema_registry/util.h" #include "strings/string_switch.h" #include @@ -518,49 +519,6 @@ ss::sstring avro_schema_definition::name() const { return _impl.root()->name().fullname(); }; -class collected_schema { -public: - bool contains(const ss::sstring& name) const { - return _names.contains(name); - } - bool insert(ss::sstring name, canonical_schema_definition def) { - bool inserted = _names.insert(std::move(name)).second; - if (inserted) { - _schemas.push_back(std::move(def).raw()); - } - return inserted; - } - canonical_schema_definition::raw_string flatten() && { - iobuf out; - for (auto& s : _schemas) { - out.append(std::move(s)); - out.append("\n", 1); - } - return canonical_schema_definition::raw_string{std::move(out)}; - } - -private: - absl::flat_hash_set _names; - std::vector _schemas; -}; - -ss::future collect_schema( - schema_getter& store, - collected_schema collected, - ss::sstring name, - canonical_schema schema) { - for (const auto& ref : schema.def().refs()) { - if (!collected.contains(ref.name)) { - auto ss = co_await store.get_subject_schema( - ref.sub, ref.version, include_deleted::no); - collected = co_await collect_schema( - store, std::move(collected), ref.name, std::move(ss.schema)); - } - } - collected.insert(std::move(name), std::move(schema).def()); - co_return std::move(collected); -} - ss::future make_avro_schema_definition(schema_getter& store, canonical_schema schema) { std::optional ex; diff --git a/src/v/pandaproxy/schema_registry/util.cc b/src/v/pandaproxy/schema_registry/util.cc new file mode 100644 index 000000000000..b7293b9424c6 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/util.cc @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "pandaproxy/schema_registry/util.h" + +namespace pandaproxy::schema_registry { + +ss::future collect_schema( + schema_getter& store, + collected_schema collected, + ss::sstring name, + canonical_schema schema) { + for (const auto& ref : schema.def().refs()) { + if (!collected.contains(ref.name)) { + auto ss = co_await store.get_subject_schema( + ref.sub, ref.version, include_deleted::no); + collected = co_await collect_schema( + store, std::move(collected), ref.name, std::move(ss.schema)); + } + } + collected.insert(std::move(name), std::move(schema).def()); + + co_return std::move(collected); +} +} // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/util.h b/src/v/pandaproxy/schema_registry/util.h index 9955b6cbfe6f..c6d93c8a21b1 100644 --- a/src/v/pandaproxy/schema_registry/util.h +++ b/src/v/pandaproxy/schema_registry/util.h @@ -17,8 +17,10 @@ #include "json/document.h" #include "json/writer.h" #include "pandaproxy/schema_registry/errors.h" +#include "pandaproxy/schema_registry/schema_getter.h" #include "pandaproxy/schema_registry/types.h" +#include #include #include #include @@ -58,5 +60,36 @@ ss::sstring to_string(named_type def) { iobuf_parser p{std::move(def)}; return p.read_string(p.bytes_left()); } +class collected_schema { +public: + bool contains(const ss::sstring& name) const { + return _names.contains(name); + } + bool insert(ss::sstring name, canonical_schema_definition def) { + bool inserted = _names.insert(std::move(name)).second; + if (inserted) { + _schemas.push_back(std::move(def).raw()); + } + return inserted; + } + canonical_schema_definition::raw_string flatten() && { + iobuf out; + for (auto& s : _schemas) { + out.append(std::move(s)); + out.append("\n", 1); + } + return canonical_schema_definition::raw_string{std::move(out)}; + } + +private: + absl::flat_hash_set _names; + std::vector _schemas; +}; + +ss::future collect_schema( + schema_getter& store, + collected_schema collected, + ss::sstring name, + canonical_schema schema); } // namespace pandaproxy::schema_registry From ba1307f7aaa99e972e220d7a5cb6a9cc50659408 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 15:52:29 +0200 Subject: [PATCH 02/10] schema_registry/util: collect_schema overlad, collected_schema::get collect_schema overload accepts a list of references directly and does not save the input schema in the collected_schema list colleted_schema::get extranct the map of reference_name -> reference_iobuf to implement this, a bit of refactoring is applied to the class --- src/v/pandaproxy/schema_registry/util.cc | 18 ++++++++++-- src/v/pandaproxy/schema_registry/util.h | 35 ++++++++++++++++-------- 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/util.cc b/src/v/pandaproxy/schema_registry/util.cc index b7293b9424c6..721aebeb9507 100644 --- a/src/v/pandaproxy/schema_registry/util.cc +++ b/src/v/pandaproxy/schema_registry/util.cc @@ -11,14 +11,15 @@ #include "pandaproxy/schema_registry/util.h" +#include "pandaproxy/schema_registry/schema_getter.h" + namespace pandaproxy::schema_registry { ss::future collect_schema( schema_getter& store, collected_schema collected, - ss::sstring name, - canonical_schema schema) { - for (const auto& ref : schema.def().refs()) { + canonical_schema_definition::references refs) { + for (const auto& ref : refs) { if (!collected.contains(ref.name)) { auto ss = co_await store.get_subject_schema( ref.sub, ref.version, include_deleted::no); @@ -26,6 +27,17 @@ ss::future collect_schema( store, std::move(collected), ref.name, std::move(ss.schema)); } } + + co_return std::move(collected); +} + +ss::future collect_schema( + schema_getter& store, + collected_schema collected, + ss::sstring name, + canonical_schema schema) { + collected = co_await collect_schema( + store, std::move(collected), schema.def().refs()); collected.insert(std::move(name), std::move(schema).def()); co_return std::move(collected); diff --git a/src/v/pandaproxy/schema_registry/util.h b/src/v/pandaproxy/schema_registry/util.h index c6d93c8a21b1..cde00179d90d 100644 --- a/src/v/pandaproxy/schema_registry/util.h +++ b/src/v/pandaproxy/schema_registry/util.h @@ -19,8 +19,9 @@ #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/schema_getter.h" #include "pandaproxy/schema_registry/types.h" +#include "utils/absl_sstring_hash.h" -#include +#include #include #include #include @@ -62,34 +63,44 @@ ss::sstring to_string(named_type def) { } class collected_schema { public: - bool contains(const ss::sstring& name) const { - return _names.contains(name); + using map_t = absl::flat_hash_map< + ss::sstring, + canonical_schema_definition::raw_string, + sstring_hash, + sstring_eq>; + + bool contains(std::string_view name) const { + return _schemas.contains(name); } + bool insert(ss::sstring name, canonical_schema_definition def) { - bool inserted = _names.insert(std::move(name)).second; - if (inserted) { - _schemas.push_back(std::move(def).raw()); - } - return inserted; + return _schemas.emplace(std::move(name), std::move(def).raw()).second; } + canonical_schema_definition::raw_string flatten() && { iobuf out; - for (auto& s : _schemas) { + for (auto& [_, s] : _schemas) { out.append(std::move(s)); out.append("\n", 1); } return canonical_schema_definition::raw_string{std::move(out)}; } + map_t get() && { return std::exchange(_schemas, {}); } + private: - absl::flat_hash_set _names; - std::vector _schemas; + map_t _schemas; }; ss::future collect_schema( schema_getter& store, collected_schema collected, - ss::sstring name, + ss::sstring opt_name, canonical_schema schema); +ss::future collect_schema( + schema_getter& store, + collected_schema collected, + canonical_schema_definition::references refs); + } // namespace pandaproxy::schema_registry From 7d164b47b87d255bafc882b46687d5addc3a53e3 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 16:08:04 +0200 Subject: [PATCH 03/10] schema_registry/json: move to_json_pointer, conversion jsoncons->rapidjson --- src/v/pandaproxy/schema_registry/json.cc | 36 ++++++++++++------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 3ba5a56430ea..716f1404a469 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -124,6 +124,24 @@ json_id_uri to_json_id_uri(const jsoncons::uri& uri) { .string()}; } +// helper to parse a json pointer with rapidjson. throws if there is an error +// parsing it +json::Pointer to_json_pointer(std::string_view sv) { + auto candidate = json::Pointer{sv.data(), sv.size()}; + if (auto ec = candidate.GetParseErrorCode(); + ec != rapidjson::kPointerParseErrorNone) { + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format( + "invalid fragment '{}' error {} at {}", + sv, + ec, + candidate.GetParseErrorOffset())}); + } + + return candidate; +} + struct document_context { json::Document doc; json_schema_dialect dialect; @@ -742,24 +760,6 @@ merge_references(std::span references_objects) { return res; } -// helper to parse a json pointer with rapidjson. throws if there is an error -// parsing it -json::Pointer to_json_pointer(std::string_view sv) { - auto candidate = json::Pointer{sv.data(), sv.size()}; - if (auto ec = candidate.GetParseErrorCode(); - ec != rapidjson::kPointerParseErrorNone) { - throw as_exception(error_info{ - error_code::schema_invalid, - fmt::format( - "invalid fragment '{}' error {} at {}", - sv, - ec, - candidate.GetParseErrorOffset())}); - } - - return candidate; -} - // helper to resolve a pointer in a json object. throws if the object can't be // retrieved const json::Value& From 12b52b2dd178a78f65a243e90b91b315b0eeda25 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 16:25:31 +0200 Subject: [PATCH 04/10] schema_registry/json: prepare document_context to store external schemas no behavior change in this commit add a map external_schema_name -> json::Document, change the mapped_type of schemas_index to an union of local_ptr (json pointer to the root schema) and external_ptr (ptr to a named external schema in .external_schemas map) the type is templated to support rapidjson and jsoncons, this will be used later --- src/v/pandaproxy/schema_registry/json.cc | 93 ++++++++++++++++++------ 1 file changed, 69 insertions(+), 24 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 716f1404a469..c070ea73316d 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -111,12 +111,6 @@ constexpr std::optional from_uri(std::string_view uri) { // info, etc) using json_id_uri = named_type; -// mapping of $id to jsonpointer to the parent object -// it contains at least the root $id for doc. If the root $id is not -// present, then the default value "" is used -using id_to_schema_pointer = absl:: - flat_hash_map>; - json_id_uri to_json_id_uri(const jsoncons::uri& uri) { // ensure that only scheme, host and path are used return json_id_uri{ @@ -142,12 +136,47 @@ json::Pointer to_json_pointer(std::string_view sv) { return candidate; } -struct document_context { - json::Document doc; +// document_context contains the json document, the dialect, an index to resolve +// $ref and the external schemas +template +struct document_context_base { + struct external_ptr { + json_id_uri external_schema_name; + JPtrT ptr; + }; + + struct local_ptr { + JPtrT ptr; + json_schema_dialect dialect; + }; + + struct external_document_ctx { + JDocT doc; + json_schema_dialect dialect; + }; + + using schemas_index_t + = absl::flat_hash_map>; + using local_schemas_index_t = absl::flat_hash_map; + using external_schemas_map_t + = absl::flat_hash_map; + + // root schema + JDocT doc; json_schema_dialect dialect; - id_to_schema_pointer bundled_schemas; + // mapping of ($id|external_schema) -> (local_ptr|external_ptr). + // local_ptr resolves against `doc`, external_ptr resolves against one value + // in `external_schema`. it contains at least the root $id for doc. If the + // root $id is not present, then the default value "" is used + schemas_index_t schemas_index; + // store for the external schemas ) + external_schemas_map_t external_schemas; }; +using document_context = document_context_base; +using document_context_jsoncons + = document_context_base; + // Passed into is_superset_* methods where the path and the generated verbose // incompatibilities don't matter, only whether they are compatible or not static const std::filesystem::path ignored_path = ""; @@ -289,13 +318,14 @@ class schema_context { json_schema_dialect dialect() const { return _schema.ctx.dialect; } const json::Value& doc() const { return _schema.ctx.doc; } - const id_to_schema_pointer::mapped_type* + const document_context::local_ptr* find_bundled(const json_id_uri id) const { - auto it = _schema.ctx.bundled_schemas.find(id); - if (it == _schema.ctx.bundled_schemas.end()) { + auto it = _schema.ctx.schemas_index.find(id); + if (it == _schema.ctx.schemas_index.end()) { return nullptr; } - return &(it->second); + + return std::get_if(&it->second); } int remaining_ref_units() const { return _ref_units; } @@ -406,7 +436,8 @@ try_validate_json_schema(const jsoncons::ojson& schema) { } // forward declaration -result collect_bundled_schema_and_fix_refs( +result +collect_bundled_schema_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect); result parse_json(iobuf buf) { @@ -474,6 +505,10 @@ result parse_json(iobuf buf) { return bundled_schemas_map.as_failure(); } + auto schemas_index = document_context::schemas_index_t{ + std::move_iterator(bundled_schemas_map.assume_value().begin()), + std::move_iterator(bundled_schemas_map.assume_value().end())}; + // to use rapidjson we need to serialized schema again // We take a copy of the jsoncons schema here because it has the fixed-up // references that we want to use for compatibility checks @@ -494,10 +529,11 @@ result parse_json(iobuf buf) { rapidjson_schema.GetErrorOffset())}; } - return { - std::move(rapidjson_schema), - dialect, - std::move(bundled_schemas_map).assume_value()}; + return document_context{ + .doc = std::move(rapidjson_schema), + .dialect = dialect, + .schemas_index = std::move(schemas_index), + }; } /// is_superset section @@ -2206,7 +2242,7 @@ void sort(json::Value& val) { } void collect_bundled_schemas_and_fix_refs( - id_to_schema_pointer& bundled_schemas, + document_context::local_schemas_index_t& bundled_schemas, jsoncons::uri base_uri, jsoncons::jsonpointer::json_pointer this_obj_ptr, jsoncons::ojson& this_obj, @@ -2300,7 +2336,10 @@ void collect_bundled_schemas_and_fix_refs( dialect = maybe_new_dialect.value(); bundled_schemas.insert_or_assign( to_json_id_uri(base_uri), - std::pair{json::Pointer{this_obj_ptr.to_string()}, dialect}); + document_context::local_ptr{ + .ptr = json::Pointer{this_obj_ptr.to_string()}, + .dialect = dialect, + }); } if (auto ref_it = this_obj.find("$ref"); @@ -2333,7 +2372,8 @@ void collect_bundled_schemas_and_fix_refs( } } -result collect_bundled_schema_and_fix_refs( +result +collect_bundled_schema_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect) { // entry point to collect all bundled schemas // fetch the root id, if it exists @@ -2355,8 +2395,13 @@ result collect_bundled_schema_and_fix_refs( }(); // insert the root schema as a bundled schema - auto bundled_schemas = id_to_schema_pointer{ - {root_id, std::pair{json::Pointer{}, dialect}}}; + auto bundled_schemas = document_context::local_schemas_index_t{ + {root_id, + document_context::local_ptr{ + .ptr = json::Pointer{}, + .dialect = dialect, + }}, + }; if (doc.is_object()) { // note: current implementation is overly strict and reject any bundled @@ -2372,7 +2417,7 @@ result collect_bundled_schema_and_fix_refs( } } - return bundled_schemas; + return std::move(bundled_schemas); } } // namespace From d3486a814476ba82d41019937e7243521d89cc5e Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 16:36:12 +0200 Subject: [PATCH 05/10] schema_registry/json: add the support to resolve external references this is done with schema_context::resolve_reference. the function will get an uri and return a json::Value::ConstObject or throw. the search is done with schema_index, a local_ptr will be resolved against the root schema and an external_ptr will resolve against one of the external_schemas. note that there is no behavior change yet, because the external_schemas map is not populated yet --- src/v/pandaproxy/schema_registry/json.cc | 120 +++++++++++++---------- 1 file changed, 68 insertions(+), 52 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index c070ea73316d..be2ae7965403 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -310,6 +310,25 @@ struct pjp { } }; +// helper to resolve a pointer in a json object. throws if the object can't be +// retrieved +const json::Value& +resolve_pointer(const json::Pointer& p, const json::Value& root) { + auto unresolved_token = size_t{0}; + auto* value = p.Get(root, &unresolved_token); + if (value == nullptr) { + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format( + "object not found for pointer '{}' unresolved token at index " + "{}", + pjp{p}, + unresolved_token)}); + } + + return *value; +} + class schema_context { public: explicit schema_context(const json_schema_definition::impl& schema) @@ -318,14 +337,47 @@ class schema_context { json_schema_dialect dialect() const { return _schema.ctx.dialect; } const json::Value& doc() const { return _schema.ctx.doc; } - const document_context::local_ptr* - find_bundled(const json_id_uri id) const { - auto it = _schema.ctx.schemas_index.find(id); + // resolves a reference to a json object. throws if the reference can't be + // resolved. supports local, bundled and external references + std::pair + resolve_reference(jsoncons::uri uri) const { + // split uri into schema id and fragment + auto id_uri = to_json_id_uri(uri); + auto fragment_p = to_json_pointer(uri.fragment()); + + // try to find the referenced schema, + auto it = _schema.ctx.schemas_index.find(id_uri); if (it == _schema.ctx.schemas_index.end()) { - return nullptr; + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format("schema pointer not found for uri '{}'", id_uri)}); } + // step 1: get the schema object + const auto& [doc, ptr, dialect] = ss::visit( + it->second, + [&](const document_context::local_ptr& lp) { + // bundled schema, return the root doc and the ptr + return std::tie(_schema.ctx.doc, lp.ptr, lp.dialect); + }, + [&](const document_context::external_ptr& ep) { + // external schema, get the external doc and return the ptr into + // it + auto external_it = _schema.ctx.external_schemas.find( + ep.external_schema_name); + if (external_it == _schema.ctx.external_schemas.end()) { + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format( + "external schema pointer not found for uri '{}'", + id_uri)}); + } + return std::tie( + external_it->second.doc, ep.ptr, external_it->second.dialect); + }); - return std::get_if(&it->second); + // step 2: get the referenced object inside the schema + const auto& schema = resolve_pointer(ptr, doc); + return {resolve_pointer(fragment_p, schema).GetObject(), dialect}; } int remaining_ref_units() const { return _ref_units; } @@ -796,25 +848,6 @@ merge_references(std::span references_objects) { return res; } -// helper to resolve a pointer in a json object. throws if the object can't be -// retrieved -const json::Value& -resolve_pointer(const json::Pointer& p, const json::Value& root) { - auto unresolved_token = size_t{0}; - auto* value = p.Get(root, &unresolved_token); - if (value == nullptr) { - throw as_exception(error_info{ - error_code::schema_invalid, - fmt::format( - "object not found for pointer '{}' unresolved token at index " - "{}", - pjp{p}, - unresolved_token)}); - } - - return *value; -} - // iteratively resolve a reference, following the $ref field until the end or // the max_allowed_depth is reached. throws if the max depth is reached or if // the reference can't be resolved @@ -825,13 +858,7 @@ resolve_reference(schema_context& ctx, const json::Value& candidate) { return candidate.GetObject(); } - auto get_uri_fragment = [](std::string uri_s) { - // split into host and fragment - auto uri = jsoncons::uri{uri_s}; - return std::pair{to_json_id_uri(uri), to_json_pointer(uri.fragment())}; - }; - - auto [id_uri, fragment_p] = get_uri_fragment(ref_it->value.GetString()); + auto ref_uri = jsoncons::uri{ref_it->value.GetString()}; // store the reference chains here, to merge them later, start with base auto references_objects = absl::InlinedVector{ @@ -839,29 +866,16 @@ resolve_reference(schema_context& ctx, const json::Value& candidate) { // resolve the reference: while (ctx.consume_ref_units() > 0) { - // try to find the bundled schema, get a pointer to it - auto* lookup_p = ctx.find_bundled(id_uri); - if (lookup_p == nullptr) { - // TODO use a better error code - throw as_exception(error_info{ - error_code::schema_invalid, - fmt::format("schema pointer not found for uri '{}'", id_uri)}); - } - const auto& [schema_pointer, dialect] = *lookup_p; - - // step 1: get the schema object - const auto& schema = resolve_pointer(schema_pointer, ctx.doc()); - // step 2: get the referenced object inside the schema - const auto& referenced_obj = resolve_pointer(fragment_p, schema); - // step 2.5: store referenced_obj for merging later - references_objects.push_back(referenced_obj.GetObject()); + // step 1 get the referenced schema + auto [referenced_obj, dialect] = ctx.resolve_reference(ref_uri); + // step 2: store referenced_obj for merging later + references_objects.push_back(referenced_obj); // step 3: check if the referenced object has a $ref field, and if so // resolve it if (auto next_ref_it = referenced_obj.FindMember("$ref"); next_ref_it != referenced_obj.MemberEnd()) { - std::tie(id_uri, fragment_p) = get_uri_fragment( - next_ref_it->value.GetString()); + ref_uri = jsoncons::uri{next_ref_it->value.GetString()}; } else { // if this is the final target, return it. @@ -871,14 +885,16 @@ resolve_reference(schema_context& ctx, const json::Value& candidate) { if (dialect != ctx.dialect()) { throw as_exception(error_info{ error_code::schema_invalid, - fmt::format("schema dialect mismatch for uri '{}'", id_uri)}); + fmt::format( + "schema dialect mismatch for uri '{}'", ref_uri.string())}); } return merge_references(references_objects); } } - throw std::runtime_error(fmt::format( - "max traversals reached for uri {} '{}'", id_uri, pjp{fragment_p})); + + throw std::runtime_error( + fmt::format("max traversals reached for uri '{}'", ref_uri.string())); } // helper to convert a boolean to a schema, and to traverse $refs From 06c46bd04f5172de31f8830f58ee3169e7f4d8c7 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 16:48:15 +0200 Subject: [PATCH 06/10] schema_registry/json: convert parse_json to coroutine no behavior change --- src/v/pandaproxy/schema_registry/json.cc | 80 +++++++++++------------- 1 file changed, 35 insertions(+), 45 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index be2ae7965403..46792e474937 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -136,6 +136,24 @@ json::Pointer to_json_pointer(std::string_view sv) { return candidate; } +// helper to convert a jsoncons::ojson to a rapidjson::Document +json::Document to_json_document(const jsoncons::ojson& oj) { + // serialize the input in a iobuf and parse it again + auto iobuf_os = iobuf_ostream{}; + oj.dump(iobuf_os.ostream()); + auto schema_stream = json::chunked_input_stream{std::move(iobuf_os).buf()}; + auto json = json::Document{}; + if (json.ParseStream(schema_stream).HasParseError()) { + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format( + "Malformed json: {} at offset {}", + rapidjson::GetParseError_En(json.GetParseError()), + json.GetErrorOffset())}); + } + return json; +} + // document_context contains the json document, the dialect, an index to resolve // $ref and the external schemas template @@ -492,7 +510,7 @@ result collect_bundled_schema_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect); -result parse_json(iobuf buf) { +ss::future parse_json(iobuf buf) { // parse string in json document, check it's a valid json iobuf_istream is{buf.share(0, buf.size_bytes())}; @@ -502,13 +520,13 @@ result parse_json(iobuf buf) { reader.read(ec); if (ec || !decoder.is_valid()) { // not a valid json document, return error - return error_info{ + throw as_exception(error_info{ error_code::schema_invalid, fmt::format( "Malformed json schema: {} at line {} column {}", ec ? ec.message() : "Invalid document", reader.line(), - reader.column())}; + reader.column())}); } auto schema = decoder.get_result(); @@ -529,11 +547,11 @@ result parse_json(iobuf buf) { it->value().is_string() == false || !maybe_dialect.has_value()) { // if present, "$schema" have to be a string, and it has to be // one the implemented dialects. If not, return an error - return error_info{ + throw as_exception(error_info{ error_code::schema_invalid, fmt::format( "Unsupported json schema dialect: '{}'", - jsoncons::print(it->value()))}; + jsoncons::print(it->value()))}); } } } @@ -541,48 +559,21 @@ result parse_json(iobuf buf) { // We use jsoncons for validating the schema against the metaschema as // currently rapidjson doesn't support validating schemas newer than // draft 5. - auto validation_res = maybe_dialect.has_value() - ? validate_json_schema( - maybe_dialect.value(), schema) - : try_validate_json_schema(schema); - if (validation_res.has_error()) { - return validation_res.as_failure(); - } - auto dialect = validation_res.assume_value(); + auto dialect + = maybe_dialect.has_value() + ? validate_json_schema(maybe_dialect.value(), schema).value() + : try_validate_json_schema(schema).value(); // this function will resolve al local ref against their respective baseuri. - auto bundled_schemas_map = collect_bundled_schema_and_fix_refs( - schema, dialect); - if (bundled_schemas_map.has_error()) { - return bundled_schemas_map.as_failure(); - } + auto bundled_schemas_map + = collect_bundled_schema_and_fix_refs(schema, dialect).value(); auto schemas_index = document_context::schemas_index_t{ - std::move_iterator(bundled_schemas_map.assume_value().begin()), - std::move_iterator(bundled_schemas_map.assume_value().end())}; - - // to use rapidjson we need to serialized schema again - // We take a copy of the jsoncons schema here because it has the fixed-up - // references that we want to use for compatibility checks - auto iobuf_os = iobuf_ostream{}; - schema.dump(iobuf_os.ostream()); - - auto schema_stream = json::chunked_input_stream{std::move(iobuf_os).buf()}; - auto rapidjson_schema = json::Document{}; - if (rapidjson_schema.ParseStream(schema_stream).HasParseError()) { - // not a valid json document, return error - // this is unlikely to happen, since we already parsed this stream with - // jsoncons, but the possibility of a bug exists - return error_info{ - error_code::schema_invalid, - fmt::format( - "Malformed json schema: {} at offset {}", - rapidjson::GetParseError_En(rapidjson_schema.GetParseError()), - rapidjson_schema.GetErrorOffset())}; - } + std::move_iterator(bundled_schemas_map.begin()), + std::move_iterator(bundled_schemas_map.end())}; - return document_context{ - .doc = std::move(rapidjson_schema), + co_return document_context{ + .doc = to_json_document(schema), .dialect = dialect, .schemas_index = std::move(schemas_index), }; @@ -2440,8 +2431,7 @@ collect_bundled_schema_and_fix_refs( ss::future make_json_schema_definition(schema_getter&, canonical_schema schema) { - auto doc - = parse_json(schema.def().shared_raw()()).value(); // throws on error + auto doc = co_await parse_json(schema.def().shared_raw()()); std::string_view name = schema.sub()(); auto refs = std::move(schema).def().refs(); co_return json_schema_definition{ @@ -2454,7 +2444,7 @@ ss::future make_canonical_json_schema( auto [sub, unparsed] = std::move(unparsed_schema).destructure(); auto [def, type, refs] = std::move(unparsed).destructure(); - auto ctx = parse_json(std::move(def)).value(); // throws on error + auto ctx = co_await parse_json(std::move(def)); if (norm) { sort(ctx.doc); std::sort(refs.begin(), refs.end()); From 91956cb7a5a094b5d1a956965b72ea573ccf08b6 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 16:59:09 +0200 Subject: [PATCH 07/10] schema_registry/json: move parse_json to jsoncons-only this is done to reduce conversions when parse_json will call itself to handle external references. the name change to parse_jsoncons, and a new parse_json calls the former and applies the translation to rapidjson of the result. collect_bundled_schema_and_fix_ref has to change too. no behavior change yet --- src/v/pandaproxy/schema_registry/json.cc | 77 ++++++++++++++++++++---- 1 file changed, 65 insertions(+), 12 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 46792e474937..8c66c3934eb9 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -136,6 +136,10 @@ json::Pointer to_json_pointer(std::string_view sv) { return candidate; } +json::Pointer to_json_pointer(const jsoncons::jsonpointer::json_pointer& jp) { + return to_json_pointer(jp.to_string()); +} + // helper to convert a jsoncons::ojson to a rapidjson::Document json::Document to_json_document(const jsoncons::ojson& oj) { // serialize the input in a iobuf and parse it again @@ -506,11 +510,11 @@ try_validate_json_schema(const jsoncons::ojson& schema) { } // forward declaration -result +result collect_bundled_schema_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect); -ss::future parse_json(iobuf buf) { +ss::future parse_jsoncons(iobuf buf) { // parse string in json document, check it's a valid json iobuf_istream is{buf.share(0, buf.size_bytes())}; @@ -568,17 +572,66 @@ ss::future parse_json(iobuf buf) { auto bundled_schemas_map = collect_bundled_schema_and_fix_refs(schema, dialect).value(); - auto schemas_index = document_context::schemas_index_t{ + auto schemas_index = document_context_jsoncons::schemas_index_t{ std::move_iterator(bundled_schemas_map.begin()), std::move_iterator(bundled_schemas_map.end())}; - co_return document_context{ - .doc = to_json_document(schema), + co_return document_context_jsoncons{ + .doc = std::move(schema), .dialect = dialect, .schemas_index = std::move(schemas_index), }; } +// wrapper for parse_jsoncons that perform the conversion from jsoncons::ojson +// to rapidjson::Document +ss::future parse_json(iobuf buf) { + // we are parsing the root so we don't have a default_id + auto doc_ctx = co_await parse_jsoncons(std::move(buf)); + + // convert external_ptr and local_ptr to rapidjson::Pointer + constexpr static auto to_json_ctx_ptr = + [](const document_context_jsoncons::schemas_index_t::mapped_type& v) { + return ss::visit( + v, + [](const document_context_jsoncons::local_ptr& lp) + -> document_context::schemas_index_t::mapped_type { + return document_context::local_ptr{ + .ptr = to_json_pointer(lp.ptr), .dialect = lp.dialect}; + }, + [](const document_context_jsoncons::external_ptr& ep) + -> document_context::schemas_index_t::mapped_type { + return document_context::external_ptr{ + .external_schema_name = ep.external_schema_name, + .ptr = to_json_pointer(ep.ptr)}; + }); + }; + + // convert index to rapidjson + auto index_view = doc_ctx.schemas_index + | std::views::transform([](auto& p) { + return std::pair{ + p.first, to_json_ctx_ptr(p.second)}; + }) + | std::views::common; + // convert external_schemas to rapidjson + auto external_view = doc_ctx.external_schemas + | std::views::transform([](auto& p) { + return std::pair{ + p.first, + document_context::external_document_ctx{ + .doc = to_json_document(p.second.doc), + .dialect = p.second.dialect}}; + }) + | std::views::common; + co_return document_context{ + .doc = to_json_document(doc_ctx.doc), + .dialect = doc_ctx.dialect, + .schemas_index = {index_view.begin(), index_view.end()}, + .external_schemas = {external_view.begin(), external_view.end()}, + }; +} + /// is_superset section // a schema O is a superset of another schema N if every schema that is valid @@ -2249,7 +2302,7 @@ void sort(json::Value& val) { } void collect_bundled_schemas_and_fix_refs( - document_context::local_schemas_index_t& bundled_schemas, + document_context_jsoncons::local_schemas_index_t& bundled_schemas, jsoncons::uri base_uri, jsoncons::jsonpointer::json_pointer this_obj_ptr, jsoncons::ojson& this_obj, @@ -2343,8 +2396,8 @@ void collect_bundled_schemas_and_fix_refs( dialect = maybe_new_dialect.value(); bundled_schemas.insert_or_assign( to_json_id_uri(base_uri), - document_context::local_ptr{ - .ptr = json::Pointer{this_obj_ptr.to_string()}, + document_context_jsoncons::local_ptr{ + .ptr = this_obj_ptr, .dialect = dialect, }); } @@ -2379,7 +2432,7 @@ void collect_bundled_schemas_and_fix_refs( } } -result +result collect_bundled_schema_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect) { // entry point to collect all bundled schemas @@ -2402,10 +2455,10 @@ collect_bundled_schema_and_fix_refs( }(); // insert the root schema as a bundled schema - auto bundled_schemas = document_context::local_schemas_index_t{ + auto bundled_schemas = document_context_jsoncons::local_schemas_index_t{ {root_id, - document_context::local_ptr{ - .ptr = json::Pointer{}, + document_context_jsoncons::local_ptr{ + .ptr = {}, .dialect = dialect, }}, }; From 511f12ec611bc20de2c4d9d1e13d9664b9c1f5dc Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 17:13:53 +0200 Subject: [PATCH 08/10] schema_registry/json: parse_json extract external schemas this commit adds support for external schemas: parse_json gets an (optional) id for the root schema, and a list of external schemas name to build into the result. each external schema il collected, parse_jsoncons invoked on it, and it's list of bundles schemas is added to the root schemas_index. each bundled schema has to have an unique id (due to the current resolve_reference implementation) and the external_schema.$id has to be unique (if it's not present, the exernal schema name will be used as $id) --- src/v/pandaproxy/schema_registry/json.cc | 145 ++++++++++++++++++++--- 1 file changed, 129 insertions(+), 16 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 8c66c3934eb9..f9fdf6849ee7 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -20,8 +20,10 @@ #include "pandaproxy/schema_registry/compatibility.h" #include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" +#include "pandaproxy/schema_registry/schema_getter.h" #include "pandaproxy/schema_registry/sharded_store.h" #include "pandaproxy/schema_registry/types.h" +#include "pandaproxy/schema_registry/util.h" #include #include @@ -512,9 +514,30 @@ try_validate_json_schema(const jsoncons::ojson& schema) { // forward declaration result collect_bundled_schema_and_fix_refs( - jsoncons::ojson& doc, json_schema_dialect dialect); - -ss::future parse_jsoncons(iobuf buf) { + jsoncons::ojson& doc, + json_schema_dialect dialect, + const std::optional& default_id); + +// parse a iobuf into a valid json schema and supporting maps to resolve $ref to +// json objects. +// 1. buf is parsed as a json, then it's validated to ensure it's a valid json +// schema. +// 2. all the local $ref and $id are made absolute, and all the bundled schemas +// are validated and collected. 2.a if the root schema does not have an $id, +// then default_id is used as the root $id. this is done for external schemas, +// to ensure that relative refs inside them are resolved correctly. +// 3. all the external refs are resolved and collected. +// 3.a for each external ref, the schema is recursively parsed (ref name is used +// as a default_id), it's bundled schema are transformed and stored in the +// index. then the external schema is stored in the external_schemas map. The +// result is a the json object for the input, the json objects for all the +// direct and indirect external schemas, and the maps to perform $ref +// resolution, as performed in the function resolve_reference(). +ss::future parse_jsoncons( + schema_getter& store, + iobuf buf, + std::optional default_id, + canonical_schema_definition::references refs) { // parse string in json document, check it's a valid json iobuf_istream is{buf.share(0, buf.size_bytes())}; @@ -568,26 +591,108 @@ ss::future parse_jsoncons(iobuf buf) { ? validate_json_schema(maybe_dialect.value(), schema).value() : try_validate_json_schema(schema).value(); - // this function will resolve al local ref against their respective baseuri. - auto bundled_schemas_map - = collect_bundled_schema_and_fix_refs(schema, dialect).value(); - + // this function will resolve al local ref against their respective + // baseuri. if we are currently in the process of parsing a external + // schema, default_id will have a value and will be used as a baseuri if + // $id is not set. this is done to ensure that relative refs inside the + // external schema do not clash with relative refs in the root schema, + // if it does not have a base_uri. + auto bundled_schemas_map = collect_bundled_schema_and_fix_refs( + schema, dialect, default_id) + .value(); auto schemas_index = document_context_jsoncons::schemas_index_t{ std::move_iterator(bundled_schemas_map.begin()), std::move_iterator(bundled_schemas_map.end())}; + // now the schema is a valid schema, the $ref are absolute, and the bundled + // schemas are collected in schemas_index. proceed to parse the external + // refs + + // collect the iobufs for each external schema. all recursive external refs + // are collected, there will be no duplicated names + auto external_refs = (co_await collect_schema(store, {}, refs)).get(); + + auto external_schemas = document_context_jsoncons::external_schemas_map_t{}; + for (auto& [ref_name, def] : external_refs) { + // recursive call to parse the external schema. it will have no external + // refs but it will have a default_id and all the relative refs will be + // resolved against it + auto ref_document = co_await parse_jsoncons( + store, std::move(def), ref_name, {}); + + auto ref_name_as_uri = to_json_id_uri({ref_name.c_str()}); + for (auto& [uri, subschema_ptr] : ref_document.schemas_index) { + auto* local_subschema_ptr + = std::get_if( + &subschema_ptr); + if (!local_subschema_ptr) { + // this is not expected: we didn't provide any + // external refs to parse_jsoncons, so the only possible value + // is a local_ptr type + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format( + "External schema '{}' contains a reference to another " + "external schema '{}'", + ref_name, + uri)}); + } + // convert and save local_ptr to an external_ptr + if (!schemas_index + .emplace( + uri, + document_context_jsoncons::external_ptr{ + .external_schema_name = ref_name_as_uri, + .ptr = std::move(local_subschema_ptr->ptr), + }) + .second) { + // collect_schema should have ensured that there are no + // duplicated external schemas, so this means that the root + // schema already contains an index entry for a bundled schema + // inside the current external schema + throw as_exception(error_info{ + error_code::schema_invalid, + fmt::format( + "Non-unique bundled schema id '{}' in external schema '{}'", + uri, + ref_name)}); + } + } + // all bundled schemas are saved in the index. add an entry for the + // external schema itself, to resolve `"$ref": "ref_name_as_uri"` note: + // this key-value already exist if the external schema has no $id + schemas_index.emplace( + ref_name_as_uri, + document_context_jsoncons::external_ptr{ + .external_schema_name = ref_name_as_uri, + .ptr = {}, + }); + // store the external schema + external_schemas.emplace( + ref_name_as_uri, + document_context_jsoncons::external_document_ctx{ + .doc = std::move(ref_document.doc), + .dialect = ref_document.dialect, + }); + } + co_return document_context_jsoncons{ .doc = std::move(schema), .dialect = dialect, .schemas_index = std::move(schemas_index), + .external_schemas = std::move(external_schemas), }; } // wrapper for parse_jsoncons that perform the conversion from jsoncons::ojson // to rapidjson::Document -ss::future parse_json(iobuf buf) { +ss::future parse_json( + schema_getter& store, + iobuf buf, + canonical_schema_definition::references refs = {}) { // we are parsing the root so we don't have a default_id - auto doc_ctx = co_await parse_jsoncons(std::move(buf)); + auto doc_ctx = co_await parse_jsoncons( + store, std::move(buf), std::nullopt, std::move(refs)); // convert external_ptr and local_ptr to rapidjson::Pointer constexpr static auto to_json_ctx_ptr = @@ -2432,22 +2537,29 @@ void collect_bundled_schemas_and_fix_refs( } } +// scan the `doc` with root `dialect`, collect all bundled schemas and ensure +// that all the $ref are absolute uris. `default_id` is used as the root id if +// the schema does not have an explicit $id. this is useful for external +// schemas, to ensure that their local refs do not conflict with the root +// schema. result collect_bundled_schema_and_fix_refs( - jsoncons::ojson& doc, json_schema_dialect dialect) { + jsoncons::ojson& doc, + json_schema_dialect dialect, + const std::optional& default_id) { // entry point to collect all bundled schemas // fetch the root id, if it exists auto root_id = [&] { if (!doc.is_object()) { // might be the case for "true" or "false" schemas - return json_id_uri{""}; + return json_id_uri{default_id.value_or("").c_str()}; } auto id_it = doc.find( dialect == json_schema_dialect::draft4 ? "id" : "$id"); if (id_it == doc.object_range().end()) { - // no explicit id, use the empty string - return json_id_uri{""}; + // no explicit id, use the default_id + return json_id_uri{default_id.value_or("").c_str()}; } // $id is set in the schema, use it as the root id @@ -2483,8 +2595,9 @@ collect_bundled_schema_and_fix_refs( } // namespace ss::future -make_json_schema_definition(schema_getter&, canonical_schema schema) { - auto doc = co_await parse_json(schema.def().shared_raw()()); +make_json_schema_definition(schema_getter& store, canonical_schema schema) { + auto doc = co_await parse_json( + store, schema.def().shared_raw()(), schema.def().refs()); std::string_view name = schema.sub()(); auto refs = std::move(schema).def().refs(); co_return json_schema_definition{ @@ -2497,7 +2610,7 @@ ss::future make_canonical_json_schema( auto [sub, unparsed] = std::move(unparsed_schema).destructure(); auto [def, type, refs] = std::move(unparsed).destructure(); - auto ctx = co_await parse_json(std::move(def)); + auto ctx = co_await parse_json(store, std::move(def)); if (norm) { sort(ctx.doc); std::sort(refs.begin(), refs.end()); From 57ff37c0c0c46a01ee86d86ab66bac62e33ddf0d Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Sun, 13 Oct 2024 17:24:47 +0200 Subject: [PATCH 09/10] schema_registry/json collect_bundled_schemas_and_fix_refs improvement - if we are visiting the root, skip validation as it's already validated externally - if we are visiting a bundled schema, ensure its $id is unique --- src/v/pandaproxy/schema_registry/json.cc | 43 ++++++++++++++++-------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index f9fdf6849ee7..69a9622ba90e 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -2484,14 +2484,18 @@ void collect_bundled_schemas_and_fix_refs( to_uri(maybe_new_dialect.value())))); } - // run validation since we are not a guaranteed to be in proper schema - if (auto validation = validate_json_schema( - maybe_new_dialect.value(), this_obj); - validation.has_error()) { - // stop exploring this branch, the schema is invalid - throw as_exception(invalid_schema(fmt::format( - "bundled schema is invalid. {}", - validation.assume_error().message()))); + // only for non-root objects, since root is already validated: + if (!this_obj_ptr.empty()) { + // run validation since we are not a guaranteed to be in proper + // schema + if (auto validation = validate_json_schema( + maybe_new_dialect.value(), this_obj); + validation.has_error()) { + // stop exploring this branch, the schema is invalid + throw as_exception(invalid_schema(fmt::format( + "bundled schema is invalid. {}", + validation.assume_error().message()))); + } } // base uri keyword agrees with the dialect, it's a validated schema, we @@ -2499,12 +2503,23 @@ void collect_bundled_schemas_and_fix_refs( // (run resolve because it could be relative to the parent schema). base_uri = jsoncons::uri{id_it->value().as_string()}.resolve(base_uri); dialect = maybe_new_dialect.value(); - bundled_schemas.insert_or_assign( - to_json_id_uri(base_uri), - document_context_jsoncons::local_ptr{ - .ptr = this_obj_ptr, - .dialect = dialect, - }); + auto inserted = bundled_schemas + .emplace( + to_json_id_uri(base_uri), + document_context_jsoncons::local_ptr{ + .ptr = this_obj_ptr, + .dialect = dialect, + }) + .second; + if (!this_obj_ptr.empty() && !inserted) { + // last check, only for bundled schemas: ensure the $id is unique + // for this object the root id might already be in the map, but it's + // just an artifact + throw as_exception(invalid_schema(fmt::format( + "bundled schema with duplicate id '{}' at '{}'", + base_uri.string(), + this_obj_ptr.string()))); + } } if (auto ref_it = this_obj.find("$ref"); From 117abf9ddd3f72b6e950fb0d175121f84f8491c2 Mon Sep 17 00:00:00 2001 From: Andrea Barbadoro Date: Tue, 15 Oct 2024 16:17:32 +0200 Subject: [PATCH 10/10] schema_registry/json: remaning work this commit is a wip an unlikely to be finished. the idea is to add $ref validation at parse time, and at the same time handle cleanily various edge cases of external schemas to do so: 1. external refs are changed to https://schema-registry.com/extrnal_ref_name this is to make it work like a bundled ref, in resolve_ref note: this does not handle well refs/with/slashes in the middle. they look like a valid uri path. try to add a test with them and see what breaks or not 2. at collect_bundled_schemas_and_fix_ref, first do a tree traversal for id, and save refs in an output parameter unresolved_ref. after the first run, check every ref: if it's not a bundled/local ref, or an external ref, throw. bundled/local ref are made absolute, external refs are encoded as (1) a new parameter controls what to do if a ref is unknown. for the root schema, an unknown ref should raise an error. for an external schema, assume that it's external, because parse_jsocons is recursive and gets invoked for each external schema to extract their bundled schemas index. it is assumed that external schemas are already good (no unresolved refs) because they are already accepted, so we can limit the recursion to 2 levels (root, and then one for each direct/indirect external schema) 3. schema_context::resolve_reference does not need to handle external schemas anymore, since they appear like bundled schemas (the uri is either in the index or not). that code can be simplified --- src/v/pandaproxy/schema_registry/json.cc | 237 +++++++++++--- .../schema_registry/test/test_json_schema.cc | 292 +++++++++++++++++- 2 files changed, 485 insertions(+), 44 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 69a9622ba90e..9e3728cd1fc9 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -120,6 +120,16 @@ json_id_uri to_json_id_uri(const jsoncons::uri& uri) { .string()}; } +// use ext_name as the path of a uri with a predefined host. this integrates +// neatly when using this as a base uri for relative refs. example: base_uri +// https://schema-registry.com/an.external.schema + $ref: +// another.external.schema -> +// https://schema-registry.com/another.external.schema +jsoncons::uri uri_for_external_name(std::string_view ext_name) { + return jsoncons::uri{ + "https", "", "schema-registry.com", "", ext_name, "", ""}; +} + // helper to parse a json pointer with rapidjson. throws if there is an error // parsing it json::Pointer to_json_pointer(std::string_view sv) { @@ -353,6 +363,22 @@ resolve_pointer(const json::Pointer& p, const json::Value& root) { return *value; } +// Extracts the last token of a uri path and returns it as a json_id_uri. +// Example: https://example.com/some/path -> path. +// This is useful to check if the input is an external reference, because +// collect_bundled_schemas_and_fix_refs eagerly resolves refs to the base_uri of +// the current schema, so external refs end up looking like refs to a bundled +// schema. +std::optional +extract_external_schema_name(const jsoncons::uri& maybe_external) { + auto path = jsoncons::jsonpointer::json_pointer{maybe_external.path()}; + if (path.empty()) { + return std::nullopt; + } + + return to_json_id_uri(uri_for_external_name(*(--path.end()))); +} + class schema_context { public: explicit schema_context(const json_schema_definition::impl& schema) @@ -364,21 +390,41 @@ class schema_context { // resolves a reference to a json object. throws if the reference can't be // resolved. supports local, bundled and external references std::pair - resolve_reference(jsoncons::uri uri) const { - // split uri into schema id and fragment - auto id_uri = to_json_id_uri(uri); - auto fragment_p = to_json_pointer(uri.fragment()); - - // try to find the referenced schema, - auto it = _schema.ctx.schemas_index.find(id_uri); - if (it == _schema.ctx.schemas_index.end()) { + resolve_reference(const jsoncons::uri& uri) const { + fmt::print( + "--uri: {} h:{} p:{} f:{}\n", + uri.string(), + uri.host(), + uri.path(), + uri.fragment()); + // try to find the referenced schema, either a bundled or an external + auto [id_uri, reference_mapping] = [&] { + // try to process uri as a bundled schema + auto bundled_uri = to_json_id_uri(uri); + auto it = _schema.ctx.schemas_index.find(bundled_uri); + if (it != _schema.ctx.schemas_index.end()) { + return std::pair{bundled_uri, it->second}; + } + // try to process uri as an external schema + auto maybe_external_uri = extract_external_schema_name(uri); + if (maybe_external_uri.has_value()) { + auto ext_it = _schema.ctx.schemas_index.find( + maybe_external_uri.value()); + if (ext_it != _schema.ctx.schemas_index.end()) { + return std::pair{ + maybe_external_uri.value(), ext_it->second}; + } + } + throw as_exception(error_info{ error_code::schema_invalid, - fmt::format("schema pointer not found for uri '{}'", id_uri)}); - } + fmt::format( + "schema pointer not found for uri '{}'", uri.string())}); + }(); + // step 1: get the schema object const auto& [doc, ptr, dialect] = ss::visit( - it->second, + reference_mapping, [&](const document_context::local_ptr& lp) { // bundled schema, return the root doc and the ptr return std::tie(_schema.ctx.doc, lp.ptr, lp.dialect); @@ -401,7 +447,9 @@ class schema_context { // step 2: get the referenced object inside the schema const auto& schema = resolve_pointer(ptr, doc); - return {resolve_pointer(fragment_p, schema).GetObject(), dialect}; + return { + resolve_pointer(to_json_pointer(uri.fragment()), schema).GetObject(), + dialect}; } int remaining_ref_units() const { return _ref_units; } @@ -512,11 +560,16 @@ try_validate_json_schema(const jsoncons::ojson& schema) { } // forward declaration +using assume_unknown_ref_is_external + = ss::bool_class; + result -collect_bundled_schema_and_fix_refs( +collect_bundled_schemas_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect, - const std::optional& default_id); + const canonical_schema_definition::references& refs, + const std::optional& default_id, + assume_unknown_ref_is_external assume_is_external); // parse a iobuf into a valid json schema and supporting maps to resolve $ref to // json objects. @@ -536,8 +589,9 @@ collect_bundled_schema_and_fix_refs( ss::future parse_jsoncons( schema_getter& store, iobuf buf, - std::optional default_id, - canonical_schema_definition::references refs) { + std::optional default_id, + canonical_schema_definition::references refs, + assume_unknown_ref_is_external unknown_ref_action) { // parse string in json document, check it's a valid json iobuf_istream is{buf.share(0, buf.size_bytes())}; @@ -597,9 +651,19 @@ ss::future parse_jsoncons( // $id is not set. this is done to ensure that relative refs inside the // external schema do not clash with relative refs in the root schema, // if it does not have a base_uri. - auto bundled_schemas_map = collect_bundled_schema_and_fix_refs( - schema, dialect, default_id) - .value(); + auto bundled_schemas_map + = document_context_jsoncons::local_schemas_index_t{}; + try { + bundled_schemas_map + = collect_bundled_schemas_and_fix_refs( + schema, dialect, refs, default_id, unknown_ref_action) + .value(); + } catch (...) { + fmt::print( + "Error in collect_bundled_schemas_and_fix_refs\n{}\n", + jsoncons::pretty_print(schema)); + throw; + } auto schemas_index = document_context_jsoncons::schemas_index_t{ std::move_iterator(bundled_schemas_map.begin()), std::move_iterator(bundled_schemas_map.end())}; @@ -611,16 +675,21 @@ ss::future parse_jsoncons( // collect the iobufs for each external schema. all recursive external refs // are collected, there will be no duplicated names auto external_refs = (co_await collect_schema(store, {}, refs)).get(); - auto external_schemas = document_context_jsoncons::external_schemas_map_t{}; for (auto& [ref_name, def] : external_refs) { + // build an uri with ref_name as the host + auto ref_name_as_uri = uri_for_external_name(ref_name); + auto ref_name_as_id_uri = json_id_uri{ref_name_as_uri.string()}; // recursive call to parse the external schema. it will have no external // refs but it will have a default_id and all the relative refs will be // resolved against it auto ref_document = co_await parse_jsoncons( - store, std::move(def), ref_name, {}); + store, + std::move(def), + ref_name_as_uri, + {}, + assume_unknown_ref_is_external::yes); - auto ref_name_as_uri = to_json_id_uri({ref_name.c_str()}); for (auto& [uri, subschema_ptr] : ref_document.schemas_index) { auto* local_subschema_ptr = std::get_if( @@ -642,7 +711,7 @@ ss::future parse_jsoncons( .emplace( uri, document_context_jsoncons::external_ptr{ - .external_schema_name = ref_name_as_uri, + .external_schema_name = ref_name_as_id_uri, .ptr = std::move(local_subschema_ptr->ptr), }) .second) { @@ -662,14 +731,14 @@ ss::future parse_jsoncons( // external schema itself, to resolve `"$ref": "ref_name_as_uri"` note: // this key-value already exist if the external schema has no $id schemas_index.emplace( - ref_name_as_uri, + ref_name_as_id_uri, document_context_jsoncons::external_ptr{ - .external_schema_name = ref_name_as_uri, + .external_schema_name = ref_name_as_id_uri, .ptr = {}, }); // store the external schema external_schemas.emplace( - ref_name_as_uri, + ref_name_as_id_uri, document_context_jsoncons::external_document_ctx{ .doc = std::move(ref_document.doc), .dialect = ref_document.dialect, @@ -692,7 +761,11 @@ ss::future parse_json( canonical_schema_definition::references refs = {}) { // we are parsing the root so we don't have a default_id auto doc_ctx = co_await parse_jsoncons( - store, std::move(buf), std::nullopt, std::move(refs)); + store, + std::move(buf), + std::nullopt, + std::move(refs), + assume_unknown_ref_is_external::no); // convert external_ptr and local_ptr to rapidjson::Pointer constexpr static auto to_json_ctx_ptr = @@ -2306,6 +2379,7 @@ json_compatibility_result is_superset( return res; } + fmt::print("---path: {}\n", p); auto older = get_schema(ctx.older, older_schema); auto newer = get_schema(ctx.newer, newer_schema); @@ -2406,8 +2480,74 @@ void sort(json::Value& val) { } } +struct unresolved_ref { + jsoncons::uri base_uri; + jsoncons::ojson& reference_value; +}; + +result statically_fix_ref( + unresolved_ref& uref, + const jsoncons::ojson& schema, + const document_context_jsoncons::local_schemas_index_t& bundled_schemas, + const canonical_schema_definition::references& external_refs, + assume_unknown_ref_is_external assume_external) { + auto& [base_uri, reference_value] = uref; + // try to solve it against a bundled schema + auto resolved_uri = jsoncons::uri{reference_value.as_string()}.resolve( + base_uri); + auto uri_id = to_json_id_uri(resolved_uri); + auto uri_fragment = resolved_uri.fragment(); + auto bundled_schema_it = bundled_schemas.find(uri_id); + if (bundled_schema_it != bundled_schemas.end()) { + // the reference matches a bundled schema. verify that it points to an + // existing object + auto ec = std::error_code{}; + auto target = jsoncons::jsonpointer::get( + schema, bundled_schema_it->second.ptr, ec); + if (ec) { + // points to nothing: error + return invalid_schema(fmt::format( + "internal reference '{}' points to nothing", + resolved_uri.string())); + } + // points to something: check that's a schema + if (!(target.is_bool() || target.is_object())) { + // does not point to a schema: error + return invalid_schema(fmt::format( + "internal reference '{}' points to the non-schema '{}'", + resolved_uri.string(), + jsoncons::print(target))); + } + // points to a valid schema: fix the ref to ensure that it's in absolute + // form + reference_value = resolved_uri.string(); + return outcome::success(); + } + + // could not solve it against a bundled schema, try to solve it against an + // external schema + if ( + assume_external + || std::ranges::find( + external_refs, + reference_value.as_string_view(), + &schema_reference::name) + != external_refs.end()) { + // the reference matches an external schema. transform the ref to an + // absolute uri for external schemas + reference_value + = uri_for_external_name(reference_value.as_string()).string(); + return outcome::success(); + } + + // could not solve it against an external schema either, error + return invalid_schema(fmt::format( + "reference '{}' points to nothing", reference_value.as_string())); +} + void collect_bundled_schemas_and_fix_refs( document_context_jsoncons::local_schemas_index_t& bundled_schemas, + std::vector& unresolved_refs, jsoncons::uri base_uri, jsoncons::jsonpointer::json_pointer this_obj_ptr, jsoncons::ojson& this_obj, @@ -2524,16 +2664,18 @@ void collect_bundled_schemas_and_fix_refs( if (auto ref_it = this_obj.find("$ref"); ref_it != this_obj.object_range().end()) { - // ensure refs are absolute uris - ref_it->value() = jsoncons::uri{ref_it->value().as_string()} - .resolve(base_uri) - .string(); + unresolved_refs.emplace_back(base_uri, ref_it->value()); } // lambda to recursively scan the object for more bundled schemas and $refs auto collect_and_fix = [&](const auto& key, auto& value) { collect_bundled_schemas_and_fix_refs( - bundled_schemas, base_uri, this_obj_ptr / key, value, dialect); + bundled_schemas, + unresolved_refs, + base_uri, + this_obj_ptr / key, + value, + dialect); }; // recursively scan the object for more bundled schemas and $refs @@ -2558,32 +2700,34 @@ void collect_bundled_schemas_and_fix_refs( // schemas, to ensure that their local refs do not conflict with the root // schema. result -collect_bundled_schema_and_fix_refs( +collect_bundled_schemas_and_fix_refs( jsoncons::ojson& doc, json_schema_dialect dialect, - const std::optional& default_id) { + const canonical_schema_definition::references& external_refs, + const std::optional& default_id, + assume_unknown_ref_is_external unkown_ref_action) { // entry point to collect all bundled schemas // fetch the root id, if it exists auto root_id = [&] { if (!doc.is_object()) { // might be the case for "true" or "false" schemas - return json_id_uri{default_id.value_or("").c_str()}; + return default_id.value_or(jsoncons::uri{}); } auto id_it = doc.find( dialect == json_schema_dialect::draft4 ? "id" : "$id"); if (id_it == doc.object_range().end()) { // no explicit id, use the default_id - return json_id_uri{default_id.value_or("").c_str()}; + return default_id.value_or(jsoncons::uri{}); } // $id is set in the schema, use it as the root id - return to_json_id_uri(jsoncons::uri{id_it->value().as_string()}); + return jsoncons::uri{id_it->value().as_string()}; }(); // insert the root schema as a bundled schema auto bundled_schemas = document_context_jsoncons::local_schemas_index_t{ - {root_id, + {json_id_uri{root_id.string()}, document_context_jsoncons::local_ptr{ .ptr = {}, .dialect = dialect, @@ -2591,17 +2735,30 @@ collect_bundled_schema_and_fix_refs( }; if (doc.is_object()) { + // extract the refs to fix them after we collect all the bundled schemas + auto unresolved_refs = std::vector{}; // note: current implementation is overly strict and reject any bundled // schema that is deemed invalid. this could be relaxed if the invalid // schema is not actually accessed by a $ref, but it requires to scan // the document in two passes. try { collect_bundled_schemas_and_fix_refs( - bundled_schemas, jsoncons::uri{}, {}, doc, dialect); + bundled_schemas, unresolved_refs, root_id, {}, doc, dialect); } catch (const exception& e) { return error_info( static_cast(e.code().value()), e.message()); } + + // try to resolve the refs against the bundled schemas or the external + // refs, return an error if it fails. the function modifies the ref in + // place, if it is to a bundled schema + for (auto& uref : unresolved_refs) { + if (auto r = statically_fix_ref( + uref, doc, bundled_schemas, external_refs, unkown_ref_action); + r.has_error()) { + return r.error(); + } + } } return std::move(bundled_schemas); @@ -2625,7 +2782,7 @@ ss::future make_canonical_json_schema( auto [sub, unparsed] = std::move(unparsed_schema).destructure(); auto [def, type, refs] = std::move(unparsed).destructure(); - auto ctx = co_await parse_json(store, std::move(def)); + auto ctx = co_await parse_json(store, std::move(def), refs); if (norm) { sort(ctx.doc); std::sort(refs.begin(), refs.end()); diff --git a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc index af2d64925905..44588c66584b 100644 --- a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc +++ b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include namespace pp = pandaproxy; @@ -199,6 +200,38 @@ static const auto error_test_cases = std::to_array({ pps::error_info{ pps::error_code::schema_invalid, R"(bundled schema is invalid. Invalid json schema: '{"$comment":"schema is invalid","$id":"https://example.com/mismatch_id","type":"potato"}'. Error: '/type: Must be valid against at least one schema, but found no matching schemas')"}}, + error_test_case{ + R"( +{ + "$comment": "bundled schema has an id that clashes with the root id", + "$id": "https://example.com/clashing_id", + "$defs": { + "clashing": { + "$id": "/clashing_id" + } + } +} +)", + pps::error_info{ + pps::error_code::schema_invalid, + R"(bundled schema with duplicate id 'https://example.com/clashing_id' at '/$defs/clashing')"}}, + error_test_case{ + R"( +{ + "$comment": "multiple bundled schema with the same id", + "$defs": { + "a": { + "$id": "https://example.com/clashing_id" + }, + "b": { + "$id": "https://example.com/clashing_id" + } + } +} +)", + pps::error_info{ + pps::error_code::schema_invalid, + R"(bundled schema with duplicate id 'https://example.com/clashing_id' at '/$defs/b')"}}, }); SEASTAR_THREAD_TEST_CASE(test_make_invalid_json_schema) { for (const auto& data : error_test_cases) { @@ -2356,7 +2389,8 @@ SEASTAR_THREAD_TEST_CASE(test_refs_fixing) { // local ref "giga": {"$ref": "#/properties/mega"}, // absolute ref to bundled schema - "mega": {"$ref": "https://example.com/schemas/customer#/properties/local"} + "mega": {"$ref": "https://example.com/schemas/customer#/properties/local"}, + "external": {"$ref": "an.external.schema.json"} }, "$defs": { "bundled": { @@ -2379,7 +2413,10 @@ SEASTAR_THREAD_TEST_CASE(test_refs_fixing) { "another_schema_frag": { "$ref": "/schemas/address#/name" }, // recursive ref - "recursive": { "$ref": "#/properties/recursive" } + "recursive": { "$ref": "#/properties/recursive" }, + + // external ref + "external": { "$ref": "an.external.schema.json#/$defs/name" } }, "$defs": { "bundled": { @@ -2427,7 +2464,7 @@ SEASTAR_THREAD_TEST_CASE(test_refs_fixing) { auto expected_schema = R"({ "properties": { "giga": {"$ref": "#/properties/mega"}, - "mega": {"$ref": "https://example.com/schemas/customer#/properties/local"} + "mega": {"$ref": "https://example.com/schemas/customer#/properties/local"}, }, "$defs": { "bundled": { @@ -2438,7 +2475,10 @@ SEASTAR_THREAD_TEST_CASE(test_refs_fixing) { "another_schema": { "$ref": "https://example.com/schemas/address" }, "another_schemas_absolute": { "$ref": "https://example.com/schemas/address#/name" }, "another_schema_frag": { "$ref": "https://example.com/schemas/address#/name" }, - "recursive": { "$ref": "https://example.com/schemas/customer#/properties/recursive" } + "recursive": { "$ref": "https://example.com/schemas/customer#/properties/recursive" }, + // external refs are eagerly resolved against the base uri, + // but they will be resolved against the external ref name at runtime + "external": { "$ref": "https://example.com/schemas/an.external.schema.json#/$defs/name" } }, "$defs": { "bundled": { @@ -2475,3 +2515,247 @@ SEASTAR_THREAD_TEST_CASE(test_refs_fixing) { jsoncons::pretty_print(jpatch))); } } + +SEASTAR_THREAD_TEST_CASE(test_external_ref_resolution_jsoncons) { + using namespace jsoncons::literals; + auto input_subject = pps::subject{"test"}; + auto external_subject = pps::subject{"external"}; + auto input_schema = pps::canonical_schema_definition{ + R"( +{ + "type": "object", + "properties": { + "local_ref": { "$ref": "#/$defs/a_property" }, + "bundled_ref": { "$ref": "https://example.com/schemas/negative_num" }, + "external_ref": { "$ref": "an.external.schema.json" } + }, + "$defs": { + "a_property": { + "type": "number", + "exclusiveMinimum": 0 + }, + "this_is_a_bundled_schema": { + "$id": "https://example.com/schemas/negative_num", + "type": "number", + "exclusiveMaximum": 0 + } + } +} +)", + pps::schema_type::json, + {{"an.external.schema.json", external_subject, pps::schema_version{2}}}}; + + auto resolved = jsoncons::uri{"#/$defs_a_property"}.resolve( + jsoncons::uri{"https", "", "an.external.schema.json", "", "", "", ""}); + + fmt::print( + "\nresolved '{}', host '{}', path '{}'\n", + resolved.string(), + resolved.host(), + resolved.path()); + + auto uri = jsoncons::uri{"/an.external.schema.json"}; + fmt::print( + "\nuri '{}', host '{}', path '{}', fragment '{}'\n", + uri.string(), + uri.host(), + uri.path(), + uri.fragment()); + + auto path = jsoncons::jsonpointer::json_pointer{uri.path()}; + fmt::print( + "\n adjusted '{}'\n", + jsoncons::uri{"https", "", *(--path.end()), "", "", "", ""}.string()); + + auto an_external_schema_json = pps::canonical_schema_definition{ + R"( +{ + // note: no explicit $id, will disambiguate with the external ref name + "type": "object", + "properties": { + "local_ref": { "$ref": "#/$defs/a_property" }, + "bundled_ref": { "$ref": "https://example.com/schemas/abool" }, + "external_ref": { "$ref": "another.external.schema.json" } + }, + "$defs": { + "a_property": { + // this has the same json pointer as the root schema but does not clash with it + "type": "string" + }, + "this_is_a_bundled_schema": { + "$id": "https://example.com/schemas/abool", + "type": "boolean" + } + } +} +)", + pps::schema_type::json, + {{"another.external.schema.json", + external_subject, + pps::schema_version{1}}}}; + + fmt::print("--line{}\n", __LINE__); + auto another_external_schema_json = pps::canonical_schema_definition{ + R"( +{ + // no explicit $id + "type": "array" +} +)", + pps::schema_type::json}; + + fmt::print("--line{}\n", __LINE__); + auto equivalent_schema = pps::canonical_schema_definition{ + R"( +{ + "type": "object", + "properties": { + "local_ref": { + "type": "number", + "exclusiveMinimum": 0 + }, + "bundled_ref": { + "type": "number", + "exclusiveMaximum": 0 + }, + "external_ref": { + "type": "object", + "properties": { + "local_ref": { + "type": "string" + }, + "bundled_ref": { + "type": "boolean" + }, + "external_ref": { + "type": "array" + } + } + } + } +} +)", + pps::schema_type::json}; + + fmt::print("--line{}\n", __LINE__); + auto f = store_fixture{}; + auto& store = f.store; + + auto first_ref = pps::canonical_schema{ + external_subject, another_external_schema_json.share()}; + store + .upsert( + pps::seq_marker{ + std::nullopt, + std::nullopt, + pps::schema_version{1}, + pps::seq_marker_key_type::schema}, + first_ref.share(), + pps::schema_id{1}, + pps::schema_version{1}, + pps::is_deleted::no) + .get(); + fmt::print("--line{}\n", __LINE__); + auto second_ref = pps::canonical_schema{ + external_subject, an_external_schema_json.share()}; + store + .upsert( + pps::seq_marker{ + std::nullopt, + std::nullopt, + pps::schema_version{2}, + pps::seq_marker_key_type::schema}, + second_ref.share(), + pps::schema_id{2}, + pps::schema_version{2}, + pps::is_deleted::no) + .get(); + fmt::print("--line{}\n", __LINE__); + + auto root_def = pps::canonical_schema{input_subject, input_schema.share()}; + auto json_schema_def + = pps::make_json_schema_definition(store, root_def.share()).get(); + + fmt::print("--line{}\n", __LINE__); + BOOST_REQUIRE( + pps::check_compatible(json_schema_def, json_schema_def, pps::verbose::no) + .is_compat); + auto equivalent_def = pps::canonical_schema{ + input_subject, equivalent_schema.share()}; + auto equivalent_schema_def + = pps::make_json_schema_definition(store, equivalent_def.share()).get(); + + fmt::print("--line{}\n", __LINE__); + auto res = pps::check_compatible( + json_schema_def, equivalent_schema_def, pps::verbose::yes); + BOOST_REQUIRE_MESSAGE( + res.is_compat, fmt::format("{}", fmt::join(res.messages, ", "))); + fmt::print("--line{}\n", __LINE__); +} + +BOOST_AUTO_TEST_CASE(test_jsoncons_uri_external_refs) { + constexpr static auto external_names = std::to_array( + {"simplename", + " space\tcase ", + "dot.case", + "snake_case", + "kebab-case", + "CamelCase", + "slash/case", + "/startingwithslash", + "/startingwithslash/andmore", + // R"(percent%case)", // check that this is not supported by + // confluent + "question?mark?case"}); + + auto candidates = [](std::string_view path) { + auto jp = jsoncons::jsonpointer::json_pointer{path}; + auto res = std::vector{}; + for (auto it = jp.begin(); it != jp.end(); ++it) { + res.push_back(fmt::format("{}", fmt::join(it, jp.end(), "/"))); + } + return res; + }; + + for (auto en : external_names) { + auto uri = jsoncons::uri{ + "https", "", "schema-registry.com", "", en, "", ""}; + BOOST_TEST_CONTEXT(fmt::format( + "en: '{}', uri '{}', host '{}', path '{}', fragment '{}'", + en, + uri.string(), + uri.host(), + uri.path(), + uri.fragment())) { + if (!en.starts_with('/')) { + // en is not an external reference name, really can't be + BOOST_CHECK_EQUAL(uri.host(), "schema-registry.com"); + BOOST_CHECK_EQUAL(uri.path().substr(1), en); + } + + for (auto f : external_names) { + if (f.starts_with('/')) { + // f is not an external reference name, really can't be + continue; + } + auto new_uri = jsoncons::uri{std::string{f}}.resolve(uri); + BOOST_TEST_CONTEXT(fmt::format( + "f '{}', new uri '{}', host '{}', path '{}', fragment '{}', " + "candidates '{}'", + f, + new_uri.string(), + new_uri.host(), + new_uri.path(), + new_uri.fragment(), + fmt::join(candidates(new_uri.path()), ", "))) { + BOOST_CHECK_EQUAL(new_uri.host(), "schema-registry.com"); + BOOST_CHECK_MESSAGE( + std::ranges::any_of( + candidates(new_uri.path()), + [&](std::string_view in) { return in == f; }), + f); + } + } + } + } +}