Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into master-log-debug
Browse files Browse the repository at this point in the history
# Conflicts:
#	fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
  • Loading branch information
CalvinKirs committed Feb 19, 2024
2 parents 1eac98f + 6e4a2f5 commit 65d564a
Show file tree
Hide file tree
Showing 83 changed files with 829 additions and 567 deletions.
8 changes: 8 additions & 0 deletions be/src/io/fs/broker_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ Status BrokerFileWriter::finalize() {
return Status::OK();
}

Status BrokerFileWriter::open() {
if (!_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
return Status::OK();
}

Status BrokerFileWriter::_open() {
TBrokerOpenWriterRequest request;

Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/broker_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class BrokerFileWriter : public FileWriter {
int64_t start_offset, FileSystemSPtr fs);
virtual ~BrokerFileWriter();

Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
Expand Down
15 changes: 8 additions & 7 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) {
using namespace Aws::Http;
switch (err.GetResponseCode()) {
case HttpResponseCode::NOT_FOUND:
return Status::Error<NOT_FOUND, false>("{}: {} {}", msg, err.GetExceptionName(),
err.GetMessage());
return Status::Error<NOT_FOUND, false>("{}: {} {} type={}", msg, err.GetExceptionName(),
err.GetMessage(), err.GetErrorType());
case HttpResponseCode::FORBIDDEN:
return Status::Error<PERMISSION_DENIED, false>("{}: {} {}", msg, err.GetExceptionName(),
err.GetMessage());
return Status::Error<PERMISSION_DENIED, false>("{}: {} {} type={}", msg,
err.GetExceptionName(), err.GetMessage(),
err.GetErrorType());
default:
return Status::Error<doris::INTERNAL_ERROR, false>("{}: {} {} code={}", msg,
err.GetExceptionName(), err.GetMessage(),
err.GetResponseCode());
return Status::Error<doris::INTERNAL_ERROR, false>(
"{}: {} {} code={} type={}", msg, err.GetExceptionName(), err.GetMessage(),
err.GetResponseCode(), err.GetErrorType());
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class FileWriter {

DISALLOW_COPY_AND_ASSIGN(FileWriter);

// Open the file for writing.
virtual Status open() { return Status::OK(); }

// Normal close. Wait for all data to persist before returning.
virtual Status close() = 0;

Expand Down
8 changes: 8 additions & 0 deletions be/src/io/fs/hdfs_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ Status HdfsFileWriter::finalize() {
return Status::OK();
}

Status HdfsFileWriter::open() {
if (!_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
return Status::OK();
}

Status HdfsFileWriter::_open() {
_path = convert_path(_path, _hdfs_fs->_fs_name);
std::string hdfs_dir = _path.parent_path().string();
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/hdfs_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class HdfsFileWriter : public FileWriter {
HdfsFileWriter(Path file, FileSystemSPtr fs);
~HdfsFileWriter();

Status open() override;
Status close() override;
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "io/fs/err_utils.h"
#include "io/fs/s3_common.h"
#include "util/bvar_helper.h"
#include "util/doris_metrics.h"
#include "util/s3_util.h"

Expand Down Expand Up @@ -96,8 +97,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
if (!client) {
return Status::InternalError("init s3 client error");
}
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
auto outcome = client->GetObject(request);
s3_bvar::s3_get_total << 1;
if (!outcome.IsSuccess()) {
return s3fs_error(outcome.GetError(),
fmt::format("failed to read from {}", _path.native()));
Expand Down
32 changes: 21 additions & 11 deletions be/src/io/fs/s3_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_reader.h"
#include "io/fs/s3_file_writer.h"
#include "util/bvar_helper.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"

Expand Down Expand Up @@ -166,8 +167,8 @@ Status S3FileSystem::delete_file_impl(const Path& file) {
GET_KEY(key, file);
request.WithBucket(_s3_conf.bucket).WithKey(key);

SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto outcome = client->DeleteObject(request);
s3_bvar::s3_delete_total << 1;
if (outcome.IsSuccess() ||
outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return Status::OK();
Expand All @@ -190,8 +191,11 @@ Status S3FileSystem::delete_directory_impl(const Path& dir) {
delete_request.SetBucket(_s3_conf.bucket);
bool is_trucated = false;
do {
auto outcome = client->ListObjectsV2(request);
s3_bvar::s3_list_total << 1;
Aws::S3::Model::ListObjectsV2Outcome outcome;
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
outcome = client->ListObjectsV2(request);
}
if (!outcome.IsSuccess()) {
return s3fs_error(
outcome.GetError(),
Expand All @@ -207,8 +211,8 @@ Status S3FileSystem::delete_directory_impl(const Path& dir) {
Aws::S3::Model::Delete del;
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto delete_outcome = client->DeleteObjects(delete_request);
s3_bvar::s3_delete_total << 1;
if (!delete_outcome.IsSuccess()) {
return s3fs_error(delete_outcome.GetError(),
fmt::format("failed to delete dir {}", full_path(prefix)));
Expand Down Expand Up @@ -249,8 +253,8 @@ Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) {
}
del.WithObjects(std::move(objects)).SetQuiet(true);
delete_request.SetDelete(std::move(del));
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto delete_outcome = client->DeleteObjects(delete_request);
s3_bvar::s3_delete_total << 1;
if (UNLIKELY(!delete_outcome.IsSuccess())) {
return s3fs_error(
delete_outcome.GetError(),
Expand All @@ -276,8 +280,8 @@ Status S3FileSystem::exists_impl(const Path& path, bool* res) const {
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(_s3_conf.bucket).WithKey(key);

SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
auto outcome = client->HeadObject(request);
s3_bvar::s3_head_total << 1;
if (outcome.IsSuccess()) {
*res = true;
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
Expand All @@ -297,8 +301,8 @@ Status S3FileSystem::file_size_impl(const Path& file, int64_t* file_size) const
GET_KEY(key, file);
request.WithBucket(_s3_conf.bucket).WithKey(key);

SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
auto outcome = client->HeadObject(request);
s3_bvar::s3_head_total << 1;
if (!outcome.IsSuccess()) {
return s3fs_error(outcome.GetError(),
fmt::format("failed to get file size {}", full_path(key)));
Expand All @@ -324,8 +328,11 @@ Status S3FileSystem::list_impl(const Path& dir, bool only_file, std::vector<File
request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
bool is_trucated = false;
do {
auto outcome = client->ListObjectsV2(request);
s3_bvar::s3_list_total << 1;
Aws::S3::Model::ListObjectsV2Outcome outcome;
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
outcome = client->ListObjectsV2(request);
}
if (!outcome.IsSuccess()) {
return s3fs_error(outcome.GetError(),
fmt::format("failed to list {}", full_path(prefix)));
Expand Down Expand Up @@ -425,8 +432,11 @@ Status S3FileSystem::download_impl(const Path& remote_file, const Path& local_fi
GET_KEY(key, remote_file);
Aws::S3::Model::GetObjectRequest request;
request.WithBucket(_s3_conf.bucket).WithKey(key);
Aws::S3::Model::GetObjectOutcome response = _client->GetObject(request);
s3_bvar::s3_get_total << 1;
Aws::S3::Model::GetObjectOutcome response;
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
response = _client->GetObject(request);
}
if (response.IsSuccess()) {
Aws::OFStream local_file_s;
local_file_s.open(local_file, std::ios::out | std::ios::binary);
Expand Down
71 changes: 47 additions & 24 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,16 @@
#include "io/fs/path.h"
#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"

namespace Aws {
namespace S3 {
namespace Model {
namespace Aws::S3::Model {
class DeleteObjectRequest;
} // namespace Model
} // namespace S3
} // namespace Aws
} // namespace Aws::S3::Model

using Aws::S3::Model::AbortMultipartUploadRequest;
using Aws::S3::Model::CompletedPart;
Expand All @@ -74,8 +71,7 @@ using Aws::S3::Model::CreateMultipartUploadRequest;
using Aws::S3::Model::UploadPartRequest;
using Aws::S3::Model::UploadPartOutcome;

namespace doris {
namespace io {
namespace doris::io {
using namespace Aws::S3::Model;
using Aws::S3::S3Client;

Expand Down Expand Up @@ -126,8 +122,8 @@ Status S3FileWriter::_create_multi_upload_request() {
_bucket, _path.native(), _upload_id);
});

SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto outcome = _client->CreateMultipartUpload(create_request);
s3_bvar::s3_multi_part_upload_total << 1;

if (outcome.IsSuccess()) {
_upload_id = outcome.GetResult().GetUploadId();
Expand Down Expand Up @@ -175,8 +171,8 @@ Status S3FileWriter::_abort() {
_wait_until_finish("Abort");
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto outcome = _client->AbortMultipartUpload(request);
s3_bvar::s3_multi_part_upload_total << 1;
if (outcome.IsSuccess() ||
outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD ||
outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
Expand All @@ -202,17 +198,44 @@ Status S3FileWriter::close() {
return _st;
}
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
// it might be one file less than 5MB, we do upload here
if (_pending_buf != nullptr) {
if (_upload_id.empty()) {

if (_upload_id.empty()) {
if (_pending_buf != nullptr) {
// it might be one file less than 5MB, we do upload here
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
} else {
// if there is no pending buffer, we need to create an empty file
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
.set_upload_callback([this](UploadFileBuffer& buf) { _put_object(buf); })
.set_sync_after_complete_task([this](Status s) {
bool ret = false;
if (!s.ok()) [[unlikely]] {
VLOG_NOTICE << "failed at key: " << _key
<< ", status: " << s.to_string();
std::unique_lock<std::mutex> _lck {_completed_lock};
_failed = true;
ret = true;
this->_st = std::move(s);
}
// After the signal, there is a scenario where the previous invocation of _wait_until_finish
// returns to the caller, and subsequently, the S3 file writer is destructed.
// This means that accessing _failed afterwards would result in a heap use after free vulnerability.
_countdown_event.signal();
return ret;
})
.set_is_cancelled([this]() { return _failed.load(); });
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
}
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;
}
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
_pending_buf = nullptr;

DBUG_EXECUTE_IF("s3_file_writer::close", {
RETURN_IF_ERROR(_complete());
return Status::InternalError("failed to close s3 file writer");
Expand Down Expand Up @@ -324,10 +347,11 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
upload_request.SetContentLength(buf.get_size());
upload_request.SetContentType("application/octet-stream");

auto upload_part_callable = _client->UploadPartCallable(upload_request);
s3_bvar::s3_multi_part_upload_total << 1;

UploadPartOutcome upload_part_outcome = upload_part_callable.get();
UploadPartOutcome upload_part_outcome;
{
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
upload_part_outcome = _client->UploadPart(upload_request);
}
DBUG_EXECUTE_IF("s3_file_writer::_upload_one_part", {
if (part_num > 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
Expand Down Expand Up @@ -413,8 +437,8 @@ Status S3FileWriter::_complete() {
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
s3_bvar::s3_multi_part_upload_total << 1;

if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(complete_outcome.GetError(),
Expand Down Expand Up @@ -464,8 +488,8 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
LOG(WARNING) << _st;
return;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
auto response = _client->PutObject(request);
s3_bvar::s3_put_total << 1;
if (!response.IsSuccess()) {
_st = s3fs_error(response.GetError(), fmt::format("failed to put object {}, upload_id={}",
_path.native(), _upload_id));
Expand All @@ -477,5 +501,4 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
s3_file_created_total << 1;
}

} // namespace io
} // namespace doris
} // namespace doris::io
20 changes: 10 additions & 10 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1526,21 +1526,21 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
std::set<ColumnId> vec_pred_col_id_set;

for (auto predicate : _col_predicates) {
for (auto* predicate : _col_predicates) {
auto cid = predicate->column_id();
_is_pred_column[cid] = true;
pred_column_ids.insert(cid);

// check pred using short eval or vec eval
if (_can_evaluated_by_vectorized(predicate)) {
vec_pred_col_id_set.insert(predicate->column_id());
vec_pred_col_id_set.insert(cid);
_pre_eval_block_predicate.push_back(predicate);
} else {
short_cir_pred_col_id_set.insert(cid);
_short_cir_eval_predicate.push_back(predicate);
if (predicate->is_filter()) {
_filter_info_id.push_back(predicate);
}
}
if (predicate->is_filter()) {
_filter_info_id.push_back(predicate);
}
}

Expand Down Expand Up @@ -1959,17 +1959,17 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_
bool ret_flags[original_size];
DCHECK(!_pre_eval_block_predicate.empty());
bool is_first = true;
for (int i = 0; i < _pre_eval_block_predicate.size(); i++) {
if (_pre_eval_block_predicate[i]->always_true()) {
for (auto& pred : _pre_eval_block_predicate) {
if (pred->always_true()) {
continue;
}
auto column_id = _pre_eval_block_predicate[i]->column_id();
auto column_id = pred->column_id();
auto& column = _current_return_columns[column_id];
if (is_first) {
_pre_eval_block_predicate[i]->evaluate_vec(*column, original_size, ret_flags);
pred->evaluate_vec(*column, original_size, ret_flags);
is_first = false;
} else {
_pre_eval_block_predicate[i]->evaluate_and_vec(*column, original_size, ret_flags);
pred->evaluate_and_vec(*column, original_size, ret_flags);
}
}

Expand Down
Loading

0 comments on commit 65d564a

Please sign in to comment.