Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add record_multiplexer microbenchmarks #24155

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/v/datalake/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ redpanda_test_cc_library(
implementation_deps = [
"//src/v/container:chunked_hash_map",
"//src/v/schema:registry",
"//src/v/serde/avro/tests:data_generator",
"//src/v/utils:vint",
"@avro",
"@protobuf",
],
include_prefix = "datalake/tests",
visibility = ["//visibility:public"],
Expand All @@ -112,6 +113,8 @@ redpanda_test_cc_library(
"//src/v/container:fragmented_vector",
"//src/v/model",
"//src/v/pandaproxy",
"//src/v/serde/avro/tests:data_generator",
"//src/v/serde/protobuf/tests:data_generator",
"//src/v/storage:record_batch_builder",
"//src/v/utils:named_type",
"@seastar",
Expand All @@ -125,9 +128,11 @@ redpanda_test_cc_library(
],
include_prefix = "datalake/tests",
deps = [
"//src/v/datalake:serde_parquet_writer",
"//src/v/datalake:writer",
"//src/v/iceberg:datatypes",
"//src/v/iceberg:values",
"//src/v/utils:null_output_stream",
"@seastar",
],
)
Expand Down
19 changes: 19 additions & 0 deletions src/v/datalake/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ v_cc_library(
DEPS
Avro::avro
Seastar::seastar
protobuf::libprotobuf
v::avro_test_utils
v::protobuf_test_utils
v::schema
v::schema_test_fixture
v::storage
Expand Down Expand Up @@ -202,3 +204,20 @@ rp_test(
LABELS datalake
ARGS "-- -c 1"
)

rp_test(
BENCHMARK_TEST
BINARY_NAME record_multiplexer
SOURCES record_multiplexer_bench.cc
LIBRARIES
Seastar::seastar_perf_testing
Boost::unit_test_framework
v::cloud_io_utils
v::application
v::datalake_test_utils
v::iceberg_test_utils
v::schema
v::s3_imposter
ARGS "-c 1 --duration=1 --runs=1 --memory=4G"
LABELS datalake
)
4 changes: 2 additions & 2 deletions src/v/datalake/tests/datalake_avro_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,8 @@ prepare_avro_test(std::string_view schema) {
// Convert to iceberg schema
auto iceberg_struct_res = datalake::type_to_iceberg(valid_schema.root());
// Generate random generic datum
generator_state state{0};
avro::GenericDatum datum = generate_datum(valid_schema.root(), state, 10);
avro_generator gen({});
avro::GenericDatum datum = gen.generate_datum(valid_schema.root());

// Serialize using avro library
auto buffer = serialize_with_avro(datum, valid_schema);
Expand Down
116 changes: 112 additions & 4 deletions src/v/datalake/tests/record_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,24 @@
*/
#include "datalake/tests/record_generator.h"

#include "pandaproxy/schema_registry/protobuf.h"
#include "pandaproxy/schema_registry/types.h"
#include "schema/registry.h"
#include "serde/avro/tests/data_generator.h"
#include "storage/record_batch_builder.h"
#include "utils/vint.h"

#include <seastar/core/temporary_buffer.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/util/variant_utils.hh>

#include <avro/Encoder.hh>
#include <avro/Generic.hh>
#include <avro/Specific.hh>
#include <avro/Stream.hh>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor_database.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/text_format.h>

namespace datalake::tests {

Expand All @@ -41,11 +49,111 @@ record_generator::register_avro_schema(
co_return std::nullopt;
}

ss::future<checked<std::nullopt_t, record_generator::error>>
record_generator::register_protobuf_schema(
std::string_view name, std::string_view schema) {
using namespace pandaproxy::schema_registry;
auto id = co_await ss::coroutine::as_future(
_sr->create_schema(unparsed_schema{
subject{"foo"},
unparsed_schema_definition{schema, schema_type::protobuf}}));
if (id.failed()) {
co_return error{fmt::format(
"Error creating schema {}: {}", name, id.get_exception())};
}
auto [_, added] = _id_by_name.emplace(name, id.get());
if (!added) {
co_return error{fmt::format("Failed to add schema {} to map", name)};
}
co_return std::nullopt;
}

iobuf encode_protobuf_message_index(const std::vector<int32_t>& message_index) {
Copy link
Contributor Author

@ballard26 ballard26 Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any existing serializer for this message index format anywhere in our code-base? There is a de-serializer; get_proto_offsets in src/v/datalake/schema_registry.h. Happy to move this serializer to a more general location if there is any use for it outside of the record generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closest thing is

iobuf encode_pb_offsets(const std::vector<int32_t>& offsets) {
auto cnt_bytes = vint::to_bytes(offsets.size());
iobuf buf;
buf.append(cnt_bytes.data(), cnt_bytes.size());
for (auto o : offsets) {
auto bytes = vint::to_bytes(o);
buf.append(bytes.data(), bytes.size());
}
return buf;
}

I don't have strong feelings about code placement, I think leaving it in the record generate seems reasonable

iobuf ret;
if (message_index.size() == 1 && message_index[0] == 0) {
ret.append("\0", 1);
return ret;
}

std::array<uint8_t, vint::max_length> bytes{0};
size_t res_size = vint::serialize(message_index.size(), &bytes[0]);
ret.append(&bytes[0], res_size);

for (const auto& o : message_index) {
size_t res_size = vint::serialize(o, &bytes[0]);
ret.append(&bytes[0], res_size);
}

return ret;
}

ss::future<checked<std::nullopt_t, record_generator::error>>
record_generator::add_random_protobuf_record(
storage::record_batch_builder& b,
std::string_view name,
const std::vector<int32_t>& message_index,
std::optional<iobuf> key,
testing::protobuf_generator_config config) {
using namespace pandaproxy::schema_registry;
auto it = _id_by_name.find(name);
if (it == _id_by_name.end()) {
co_return error{fmt::format("Schema {} is missing", name)};
}
auto schema_id = it->second;
auto schema_def = co_await _sr->get_valid_schema(schema_id);
if (!schema_def) {
co_return error{
fmt::format("Unable to find schema def for id: {}", schema_id)};
}
if (schema_def->type() != schema_type::protobuf) {
co_return error{fmt::format(
"Schema {} has wrong type: {}", name, schema_def->type())};
}

auto protobuf_def = schema_def
->visit(ss::make_visitor(
[](const avro_schema_definition&)
-> std::optional<protobuf_schema_definition> {
return std::nullopt;
},
[](const protobuf_schema_definition& pb_def)
-> std::optional<protobuf_schema_definition> {
return {pb_def};
},
[](const json_schema_definition&)
-> std::optional<protobuf_schema_definition> {
return std::nullopt;
}))
.value();
auto md_res = pandaproxy::schema_registry::descriptor(
protobuf_def, message_index);
if (md_res.has_error()) {
co_return error{fmt::format(
"Wasn't able to get descriptor for protobuf def with id: {}",
schema_id)};
}

iobuf val;
val.append("\0", 1);
int32_t encoded_id = ss::cpu_to_be(schema_id());
val.append((const uint8_t*)(&encoded_id), 4);

testing::protobuf_generator pb_gen(config);
auto msg = pb_gen.generate_protobuf_message(&md_res.value().get());

val.append(encode_protobuf_message_index(message_index));
val.append(iobuf::from(msg->SerializeAsString()));

b.add_raw_kv(std::move(key), std::move(val));
co_return std::nullopt;
}

ss::future<checked<std::nullopt_t, record_generator::error>>
record_generator::add_random_avro_record(
storage::record_batch_builder& b,
std::string_view name,
std::optional<iobuf> key) {
std::optional<iobuf> key,
testing::avro_generator_config config) {
using namespace pandaproxy::schema_registry;
auto it = _id_by_name.find(name);
if (it == _id_by_name.end()) {
Expand Down Expand Up @@ -83,8 +191,8 @@ record_generator::add_random_avro_record(
co_return error{
fmt::format("Schema {} didn't resolve Avro node", name)};
}
testing::generator_state gs;
auto datum = generate_datum(node_ptr, gs, 10);
testing::avro_generator gen(config);
auto datum = gen.generate_datum(node_ptr);
std::unique_ptr<avro::OutputStream> out = avro::memoryOutputStream();
avro::EncoderPtr e = avro::binaryEncoder();
e->init(*out);
Expand Down
18 changes: 16 additions & 2 deletions src/v/datalake/tests/record_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
#include "base/seastarx.h"
#include "bytes/iobuf.h"
#include "container/chunked_hash_map.h"
#include "model/record.h"
#include "model/timestamp.h"
#include "pandaproxy/schema_registry/types.h"
#include "serde/avro/tests/data_generator.h"
#include "serde/protobuf/tests/data_generator.h"
#include "storage/record_batch_builder.h"
#include "utils/named_type.h"

Expand All @@ -36,11 +37,24 @@ class record_generator {
ss::future<checked<std::nullopt_t, error>>
register_avro_schema(std::string_view name, std::string_view schema);

// Registers the given schema with the given name.
ss::future<checked<std::nullopt_t, error>>
register_protobuf_schema(std::string_view name, std::string_view schema);

// Adds a record of the given schema to the builder.
ss::future<checked<std::nullopt_t, error>> add_random_avro_record(
storage::record_batch_builder&,
std::string_view schema_name,
std::optional<iobuf> key);
std::optional<iobuf> key,
testing::avro_generator_config config = {});

// Adds a record of the given schema to the builder.
ss::future<checked<std::nullopt_t, error>> add_random_protobuf_record(
storage::record_batch_builder&,
std::string_view schema_name,
const std::vector<int32_t>& message_index,
std::optional<iobuf> key,
testing::protobuf_generator_config config = {});

private:
chunked_hash_map<std::string_view, pandaproxy::schema_registry::schema_id>
Expand Down
Loading
Loading