Skip to content

Commit

Permalink
Merge branch 'master' into metadata-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
yiguolei authored Sep 18, 2023
2 parents 9e4c0c1 + ae0b58f commit d3d235a
Show file tree
Hide file tree
Showing 206 changed files with 5,766 additions and 2,350 deletions.
21 changes: 21 additions & 0 deletions sonar-project.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sonar.host.url=https://sonarcloud.io
sonar.projectKey=apache_incubator-doris
sonar.organization=apache
8 changes: 2 additions & 6 deletions .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
MAVEN_OPTS: -Xmx4g
run: |
cd fe
mvn --batch-mode verify sonar:sonar -DskipTests -Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache -Dsonar.projectKey=apache_incubator-doris -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
mvn --batch-mode verify sonar:sonar -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
sonar-cloud-cpp:
name: "SonarCloud on cpp"
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -154,9 +154,5 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: sonar-scanner \
-Dsonar.cfamily.compile-commands=be/build_Release/compile_commands.json \
-Dsonar.organization=apache \
-Dsonar.projectKey=apache_incubator-doris \
-Dsonar.sources=be
run: sonar-scanner -Dsonar.cfamily.compile-commands=be/build_Release/compile_commands.json

2 changes: 1 addition & 1 deletion be/src/apache-orc
6 changes: 3 additions & 3 deletions be/src/io/cache/block/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
break;
}
if (segment_state != FileBlock::State::DOWNLOADING) {
return Status::IOError(
return Status::InternalError(
"File Cache State is {}, the cache downloader encounters an error, "
"please "
"retry it",
Expand All @@ -220,7 +220,7 @@ Status CachedRemoteFileReader::_read_from_cache(size_t offset, Slice result, siz
} while (++wait_time < MAX_WAIT_TIME);
}
if (UNLIKELY(wait_time) == MAX_WAIT_TIME) {
return Status::IOError("Waiting too long for the download to complete");
return Status::InternalError("Waiting too long for the download to complete");
}
size_t file_offset = current_offset - left;
{
Expand All @@ -244,7 +244,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
DCHECK(!closed());
DCHECK(io_ctx);
if (offset > size()) {
return Status::IOError(
return Status::InvalidArgument(
fmt::format("offset exceeds file size(offset: {), file size: {}, path: {})", offset,
size(), path().native()));
}
Expand Down
105 changes: 57 additions & 48 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,56 +417,65 @@ Status Compaction::do_compaction_impl(int64_t permits) {
auto src_segment_num = src_seg_to_id_map.size();
auto dest_segment_num = dest_segment_num_rows.size();

// src index files
// format: rowsetId_segmentId
std::vector<std::string> src_index_files(src_segment_num);
for (auto m : src_seg_to_id_map) {
std::pair<RowsetId, uint32_t> p = m.first;
src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second);
}

// dest index files
// format: rowsetId_segmentId
std::vector<std::string> dest_index_files(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i);
dest_index_files[i] = prefix;
}
if (dest_segment_num > 0) {
// src index files
// format: rowsetId_segmentId
std::vector<std::string> src_index_files(src_segment_num);
for (auto m : src_seg_to_id_map) {
std::pair<RowsetId, uint32_t> p = m.first;
src_index_files[m.second] = p.first.to_string() + "_" + std::to_string(p.second);
}

// create index_writer to compaction indexes
auto& fs = _output_rowset->rowset_meta()->fs();
auto& tablet_path = _tablet->tablet_path();

DCHECK(dest_index_files.size() > 0);
// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_writer_path = tablet_path + "/" + dest_index_files[0];
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";
std::for_each(
ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(),
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows,
this](int32_t column_uniq_id) {
auto st = compact_column(
_cur_tablet_schema->get_inverted_index(column_uniq_id)->index_id(),
src_segment_num, dest_segment_num, src_index_files, dest_index_files,
fs, index_writer_path, tablet_path, trans_vec, dest_segment_num_rows);
if (!st.ok()) {
LOG(ERROR) << "failed to do index compaction"
<< ". tablet=" << _tablet->full_name()
<< ". column uniq id=" << column_uniq_id << ". index_id= "
<< _cur_tablet_schema->get_inverted_index(column_uniq_id)
->index_id();
}
});
// dest index files
// format: rowsetId_segmentId
std::vector<std::string> dest_index_files(dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {
auto prefix = dest_rowset_id.to_string() + "_" + std::to_string(i);
dest_index_files[i] = prefix;
}

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->full_name() << ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
// create index_writer to compaction indexes
auto& fs = _output_rowset->rowset_meta()->fs();
auto& tablet_path = _tablet->tablet_path();

// we choose the first destination segment name as the temporary index writer path
// Used to distinguish between different index compaction
auto index_writer_path = tablet_path + "/" + dest_index_files[0];
LOG(INFO) << "start index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", source index size=" << src_segment_num
<< ", destination index size=" << dest_segment_num << ".";
std::for_each(
ctx.skip_inverted_index.cbegin(), ctx.skip_inverted_index.cend(),
[&src_segment_num, &dest_segment_num, &index_writer_path, &src_index_files,
&dest_index_files, &fs, &tablet_path, &trans_vec, &dest_segment_num_rows,
this](int32_t column_uniq_id) {
auto st = compact_column(
_cur_tablet_schema->get_inverted_index(column_uniq_id)->index_id(),
src_segment_num, dest_segment_num, src_index_files,
dest_index_files, fs, index_writer_path, tablet_path, trans_vec,
dest_segment_num_rows);
if (!st.ok()) {
LOG(ERROR) << "failed to do index compaction"
<< ". tablet=" << _tablet->full_name()
<< ". column uniq id=" << column_uniq_id << ". index_id= "
<< _cur_tablet_schema->get_inverted_index(column_uniq_id)
->index_id();
}
});

LOG(INFO) << "succeed to do index compaction"
<< ". tablet=" << _tablet->full_name()
<< ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
} else {
LOG(INFO) << "skip doing index compaction due to no output segments"
<< ". tablet=" << _tablet->full_name()
<< ", input row number=" << _input_row_num
<< ", output row number=" << _output_rowset->num_rows()
<< ". elapsed time=" << inverted_watch.get_elapse_second() << "s.";
}
}

// 4. modify rowsets in memory
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ std::vector<DataDir*> StorageEngine::get_stores_for_create_tablet(

int tablet_num;

bool operator<(const DirInfo& other) {
bool operator<(const DirInfo& other) const {
if (available_level != other.available_level) {
return available_level < other.available_level;
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1639,9 +1639,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}

if (tablet_state() == TABLET_RUNNING) {
if (has_version_cross || is_io_error_too_times()) {
if (has_version_cross || is_io_error_too_times() || !data_dir()->is_used()) {
LOG(INFO) << "report " << full_name() << " as bad, version_cross=" << has_version_cross
<< ", ioe times=" << get_io_error_times();
<< ", ioe times=" << get_io_error_times() << ", data_dir used "
<< data_dir()->is_used();
tablet_info->__set_used(false);
}

Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,13 @@ class Tablet : public BaseTablet {
void gc_binlogs(int64_t version);
Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);

inline void increase_io_error_times() { ++_io_error_times; }
inline void report_error(const Status& st) {
if (st.is<ErrorCode::IO_ERROR>()) {
++_io_error_times;
} else if (st.is<ErrorCode::CORRUPTION>()) {
_io_error_times = config::max_tablet_io_errors + 1;
}
}

inline int64_t get_io_error_times() const { return _io_error_times; }

Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,14 @@ Status read_write_test_file(const std::string& test_file_path) {
if (access(test_file_path.c_str(), F_OK) == 0) {
if (remove(test_file_path.c_str()) != 0) {
char errmsg[64];
return Status::Error<IO_ERROR>("fail to access test file. path={}, errno={}, err={}",
test_file_path, errno, strerror_r(errno, errmsg, 64));
return Status::IOError("fail to access test file. path={}, errno={}, err={}",
test_file_path, errno, strerror_r(errno, errmsg, 64));
}
} else {
if (errno != ENOENT) {
char errmsg[64];
return Status::Error<IO_ERROR>("fail to access test file. path={}, errno={}, err={}",
test_file_path, errno, strerror_r(errno, errmsg, 64));
return Status::IOError("fail to access test file. path={}, errno={}, err={}",
test_file_path, errno, strerror_r(errno, errmsg, 64));
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(Base::profile()->total_time_counter());
SCOPED_TIMER(Base::_open_timer);
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
auto& p = Base::_parent->template cast<typename Derived::Parent>();
Expand Down Expand Up @@ -160,6 +161,7 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
template <typename DependencyType, typename Derived>
Status AggSinkLocalState<DependencyType, Derived>::open(RuntimeState* state) {
SCOPED_TIMER(Base::profile()->total_time_counter());
SCOPED_TIMER(Base::_open_timer);
RETURN_IF_ERROR(Base::open(state));
_agg_data = Base::_shared_state->agg_data.get();
// move _create_agg_status to open not in during prepare,
Expand Down Expand Up @@ -927,6 +929,7 @@ Status AggSinkOperatorX<LocalStateType>::sink(doris::RuntimeState* state,
template <typename DependencyType, typename Derived>
Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state) {
SCOPED_TIMER(Base::profile()->total_time_counter());
SCOPED_TIMER(Base::_close_timer);
if (Base::_closed) {
return Status::OK();
}
Expand Down
Loading

0 comments on commit d3d235a

Please sign in to comment.