Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "datalake: use serde parquet writer as a default" #24272

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cluster/archival/types.h"
#include "cluster/partition.h"
#include "datalake/batching_parquet_writer.h"
#include "datalake/coordinator/frontend.h"
#include "datalake/coordinator/translated_offset_range.h"
#include "datalake/data_writer_interface.h"
Expand Down Expand Up @@ -80,6 +81,8 @@ ss::futurize_t<FuncRet> retry_with_backoff(

static constexpr std::chrono::milliseconds translation_jitter{500};
constexpr ::model::timeout_clock::duration wait_timeout = 5s;
constexpr size_t max_rows_per_row_group = std::numeric_limits<size_t>::max();
constexpr size_t max_bytes_per_row_group = std::numeric_limits<size_t>::max();

partition_translator::~partition_translator() = default;
partition_translator::partition_translator(
Expand Down Expand Up @@ -174,6 +177,28 @@ partition_translator::max_offset_for_translation() const {
return kafka::prev_offset(model::offset_cast(lso.value()));
}

namespace {
/**
* In order to make it easily configurable we add a
* '__REDPANDA_USE_SERDE_PARQUET_WRITER' environment variable, when set a serde
* parquet writer will be used.
* TODO: remove this once serde parquet writer is ready to be used as
* default
*/
ss::shared_ptr<parquet_ostream_factory> get_parquet_writer_factory() {
auto use_serde_parquet_writer = std::getenv(
"__REDPANDA_USE_SERDE_PARQUET_WRITER");

if (use_serde_parquet_writer != nullptr) {
vlog(datalake_log.info, "Using serde parquet writer");
return ss::make_shared<serde_parquet_writer_factory>();
}
return ss::make_shared<batching_parquet_writer_factory>(
max_rows_per_row_group, // max entries per single parquet row group
max_bytes_per_row_group); // max bytes per single parquet row group
}
} // namespace

ss::future<std::optional<coordinator::translated_offset_range>>
partition_translator::do_translation_for_range(
retry_chain_node& parent,
Expand All @@ -184,7 +209,7 @@ partition_translator::do_translation_for_range(
auto writer_factory = std::make_unique<local_parquet_file_writer_factory>(
local_path{_writer_scratch_space}, // storage temp files are written to
fmt::format("{}", begin_offset), // file prefix
ss::make_shared<serde_parquet_writer_factory>());
get_parquet_writer_factory());

auto task = translation_task{
**_cloud_io, *_schema_mgr, *_type_resolver, *_record_translator};
Expand Down