Skip to content

Commit

Permalink
Merge code to github (#187)
Browse files Browse the repository at this point in the history
* merge to 95787f8be1fd0ff215708fb0f49997b632876586
* Bugs fixed
  • Loading branch information
morningman authored Mar 23, 2018
1 parent b8fa22d commit 5de798f
Show file tree
Hide file tree
Showing 133 changed files with 1,712 additions and 5,587 deletions.
4 changes: 2 additions & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,10 @@ SET(CXX_COMMON_FLAGS "-msse4.2 -Wall -Wno-sign-compare -Wno-deprecated -pthread
SET(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -D__STDC_FORMAT_MACROS")

# Add by zhaochun: use gnu++11 for make_unsigned<__int128>
SET(CMAKE_CXX_FLAGS "-g -O2 -ggdb -Wno-unused-local-typedefs -Wno-strict-aliasing -std=gnu++11 -DPERFORMANCE -D_FILE_OFFSET_BITS=64")
SET(CMAKE_CXX_FLAGS "-g -ggdb -O2 -Wno-unused-local-typedefs -Wno-strict-aliasing -std=gnu++11 -D_FILE_OFFSET_BITS=64")

# use address sanitizer, commented the malloc in ld flags
# SET(CMAKE_CXX_FLAGS "-g -ggdb -Wno-unused-local-typedefs -Wno-strict-aliasing -std=gnu++11 -DPERFORMANCE -fsanitize=address -fno-omit-frame-pointer -DADDRESS_SANITIZER")
# SET(CMAKE_CXX_FLAGS "-g -ggdb -Wno-unused-local-typedefs -Wno-strict-aliasing -std=gnu++11 -fsanitize=address -fno-omit-frame-pointer -DADDRESS_SANITIZER")
SET(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")

MESSAGE(STATUS "Compiler Flags: ${CMAKE_CXX_FLAGS}")
Expand Down
6 changes: 3 additions & 3 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ AgentServer::AgentServer(ExecEnv* exec_env,

// clean dpp download dir
_command_executor = new CommandExecutor();
vector<OLAPRootPathStat>* root_paths_stat = new vector<OLAPRootPathStat>();
_command_executor->get_all_root_path_stat(root_paths_stat);
for (auto root_path_stat : *root_paths_stat) {
vector<OLAPRootPathStat> root_paths_stat;
_command_executor->get_all_root_path_stat(&root_paths_stat);
for (auto root_path_stat : root_paths_stat) {
try {
string dpp_download_path_str = root_path_stat.root_path + DPP_PREFIX;
boost::filesystem::path dpp_download_path(dpp_download_path_str);
Expand Down
24 changes: 18 additions & 6 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ void TaskWorkerPool::start() {
_callback_function = _report_disk_state_worker_thread_callback;
break;
case TaskWorkerType::REPORT_OLAP_TABLE:
_wait_duration = boost::posix_time::time_duration(0, 0, config::report_disk_state_interval_seconds, 0);
_wait_duration = boost::posix_time::time_duration(0, 0, config::report_olap_table_interval_seconds, 0);
_worker_count = REPORT_OLAP_TABLE_WORKER_COUNT;
_callback_function = _report_olap_table_worker_thread_callback;
break;
Expand Down Expand Up @@ -1525,10 +1525,17 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
#ifndef BE_TEST
while (true) {
#endif
if (worker_pool_this->_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
// sleep a short time and try again
OLAP_LOG_INFO("waiting to receive first heartbeat from frontend");
sleep(config::sleep_one_second);
continue;
}

vector<OLAPRootPathStat> root_paths_stat;

OLAPStatus get_all_root_path_stat =
worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat);
worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat);

map<string, TDisk> disks;
for (auto root_path_state : root_paths_stat) {
Expand Down Expand Up @@ -1579,9 +1586,14 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
#ifndef BE_TEST
while (true) {
#endif
MasterServerClient client(
worker_pool_this->_master_info,
&_master_service_client_cache);
if (worker_pool_this->_master_info.network_address.port == 0) {
// port == 0 means not received heartbeat yet
// sleep a short time and try again
OLAP_LOG_INFO("waiting to receive first heartbeat from frontend");
sleep(config::sleep_one_second);
continue;
}

request.tablets.clear();

request.__set_report_version(_s_report_version);
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec")

add_library(Exec STATIC
set(EXEC_FILES
aggregation_node.cpp
#pre_aggregation_node.cpp
aggregation_node_ir.cpp
Expand Down Expand Up @@ -84,6 +84,18 @@ add_library(Exec STATIC
broker_writer.cpp
)

if(EXISTS "${BASE_DIR}/src/exec/kudu_util.cpp")
set(EXEC_FILES ${EXEC_FILES}
#kudu_scan_node.cpp
#kudu_scanner.cpp
#kudu_util.cpp
)
endif()

add_library(Exec STATIC
${EXEC_FILES}
)

# TODO: why is this test disabled?
#ADD_BE_TEST(new_olap_scan_node_test)
#ADD_BE_TEST(pre_aggregation_node_test)
Expand Down
48 changes: 24 additions & 24 deletions be/src/exec/aggregation_node.cpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
// Modifications copyright (C) 2017, Baidu.com, Inc.
// Copyright 2017 The Apache Software Foundation

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Modifications copyright (C) 2017, Baidu.com, Inc.
// Copyright 2017 The Apache Software Foundation

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/aggregation_node.h"

#include <math.h>
Expand Down Expand Up @@ -299,10 +299,10 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
++_num_rows_returned;

if (reached_limit()) {
// avoid calling finalize() duplicately with last tuple
// when _output_iterator don't reach end.
// chenhao added
_output_iterator.next<false>();
// avoid calling finalize() duplicately with last tuple
// when _output_iterator don't reach end.
// chenhao added
_output_iterator.next<false>();
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ Status BrokerScanNode::scanner_scan(
TupleRow* row = row_batch->get_row(row_idx);
// scan node is the first tuple of tuple row
row->set_tuple(0, tuple);
memset(tuple, 0, sizeof(_tuple_desc->num_null_bytes()));
memset(tuple, 0, _tuple_desc->num_null_bytes());

// Get from scanner
RETURN_IF_ERROR(scanner->get_next(tuple, tuple_pool, &scanner_eof));
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/csv_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ Status CsvScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos
}

_tuple = reinterpret_cast<Tuple*>(tuple_buffer);
memset(_tuple, 0, sizeof(_tuple_desc->num_null_bytes()));
memset(_tuple, 0, _tuple_desc->num_null_bytes());

// Indicates whether there are more rows to process.
bool csv_eos = false;
Expand Down
33 changes: 19 additions & 14 deletions be/src/exec/mysql_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

#include "mysql_scan_node.h"

#include <boost/foreach.hpp>
#include <sstream>

#include "exec/text_converter.hpp"
#include "gen_cpp/PlanNodes_types.h"
Expand All @@ -35,10 +35,7 @@ MysqlScanNode::MysqlScanNode(ObjectPool* pool, const TPlanNode& tnode,
_tuple_id(tnode.mysql_scan_node.tuple_id),
_columns(tnode.mysql_scan_node.columns),
_filters(tnode.mysql_scan_node.filters),
_tuple_desc(NULL),
_tuple_pool(NULL),
_mysql_scanner(NULL),
_text_converter(NULL) {
_tuple_desc(nullptr) {
}

MysqlScanNode::~MysqlScanNode() {
Expand Down Expand Up @@ -138,9 +135,9 @@ Status MysqlScanNode::write_text_slot(char* value, int value_length,
SlotDescriptor* slot, RuntimeState* state) {
if (!_text_converter->write_slot(slot, _tuple, value, value_length,
true, false, _tuple_pool.get())) {
LOG(WARNING) << "Error converting column "
<< "'" << value << "' TO " << slot->type();
return Status("convert mysql string failed.");
std::stringstream ss;
ss << "fail to convert mysql value '" << value << "' TO " << slot->type();
return Status(ss.str());
}

return Status::OK;
Expand Down Expand Up @@ -205,20 +202,27 @@ Status MysqlScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
TupleRow* row = row_batch->get_row(row_idx);
// scan node is the first tuple of tuple row
row->set_tuple(0, _tuple);
memset(_tuple, 0, sizeof(_tuple_desc->num_null_bytes()));
memset(_tuple, 0, _tuple_desc->num_null_bytes());
int j = 0;

for (int i = 0; i < _slot_num; ++i) {
auto slot_desc = _tuple_desc->slots()[i];
// because the fe planner filter the non_materialize column
if (!_tuple_desc->slots()[i]->is_materialized()) {
if (!slot_desc->is_materialized()) {
continue;
}

if (NULL == data[j]) {
_tuple->set_null(_tuple_desc->slots()[i]->null_indicator_offset());
if (data[j] == nullptr) {
if (slot_desc->is_nullable()) {
_tuple->set_null(slot_desc->null_indicator_offset());
} else {
std::stringstream ss;
ss << "nonnull column contains NULL. table=" << _table_name
<< ", column=" << slot_desc->col_name();
return Status(ss.str());
}
} else {
RETURN_IF_ERROR(write_text_slot(data[j], length[j],
_tuple_desc->slots()[i], state));
RETURN_IF_ERROR(write_text_slot(data[j], length[j], slot_desc, state));
}

j++;
Expand Down Expand Up @@ -248,6 +252,7 @@ Status MysqlScanNode::close(RuntimeState* state) {
if (memory_used_counter() != NULL) {
COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes());
}
_tuple_pool.reset();

return ExecNode::close(state);
}
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/mysql_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#ifndef BDG_PALO_BE_SRC_QUERY_EXEC_MYSQL_SCAN_NODE_H
#define BDG_PALO_BE_SRC_QUERY_EXEC_MYSQL_SCAN_NODE_H

#include <boost/scoped_ptr.hpp>
#include <memory>

#include "runtime/descriptors.h"
#include "exec/mysql_scanner.h"
Expand Down Expand Up @@ -80,13 +80,13 @@ class MysqlScanNode : public ScanNode {
// Tuple index in tuple row.
int _slot_num;
// Pool for allocating tuple data, including all varying-length slots.
boost::scoped_ptr<MemPool> _tuple_pool;
std::unique_ptr<MemPool> _tuple_pool;
// Jni helper for scanning an HBase table.
boost::scoped_ptr<MysqlScanner> _mysql_scanner;
std::unique_ptr<MysqlScanner> _mysql_scanner;
// Helper class for converting text to other types;
boost::scoped_ptr<TextConverter> _text_converter;
std::unique_ptr<TextConverter> _text_converter;
// Current tuple.
Tuple* _tuple;
Tuple* _tuple = nullptr;
};

}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/select_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Status SelectNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
*eos = true;
return Status::OK;
}
*eos = false;

// start (or continue) consuming row batches from child
while (true) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/exprs/agg_fn_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ Status AggFnEvaluator::open(RuntimeState* state, FunctionContext* agg_fn_ctx) {

void AggFnEvaluator::close(RuntimeState* state) {
Expr::close(_input_exprs_ctxs, state);
if (UNLIKELY(_total_mem_consumption > 0)) {
_mem_tracker->release(_total_mem_consumption);
}
}

// Utility to put val into an AnyVal struct
Expand Down Expand Up @@ -449,9 +452,6 @@ void AggFnEvaluator::update_mem_limlits(int len) {
}

AggFnEvaluator::~AggFnEvaluator() {
if (UNLIKELY(_total_mem_consumption > 0)) {
_mem_tracker->release(_total_mem_consumption);
}
}

inline void AggFnEvaluator::update_mem_trackers(bool is_filter, bool is_add_buckets, int len) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/scalar_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ Status ScalarFnCall::get_udf(RuntimeState* state, Function** udf) {
}
*udf = codegen->finalize_function(*udf);
if (*udf == NULL) {
return Status("udf verify falied");
return Status("udf verify failed");
// TODO(zc)
// TErrorCode::UDF_VERIFY_FAILED, _fn.scalar_fn.symbol, _fn.hdfs_location);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/exprs/timestamp_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ DateTimeVal TimestampFunctions::str_to_date(
if (str.is_null || format.is_null) {
return DateTimeVal::null();
}
LOG(INFO) << "format is " << std::string((const char*)format.ptr, format.len)
<< "str is " << std::string((const char*)str.ptr, str.len);
DateTimeValue ts_value;
if (!ts_value.from_date_format_str((const char*)format.ptr, format.len,
(const char*)str.ptr, str.len)) {
Expand Down
11 changes: 11 additions & 0 deletions be/src/http/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ add_library(Webserver STATIC
)

target_link_libraries(Webserver pthread dl Util)
#ADD_BE_TEST(integer-array-test)
#ADD_BE_TEST(runtime-profile-test)
#ADD_BE_TEST(benchmark-test)
#ADD_BE_TEST(decompress-test)
#ADD_BE_TEST(metrics-test)
#ADD_BE_TEST(debug-util-test)
#ADD_BE_TEST(url-coding-test)
#ADD_BE_TEST(thrift-util-test)
#ADD_BE_TEST(bit-util-test)
#ADD_BE_TEST(rle-test)
##ADD_BE_TEST(perf-counters-test)
2 changes: 1 addition & 1 deletion be/src/http/action/checksum_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void ChecksumAction::handle(HttpRequest *req, HttpChannel *channel) {

int64_t checksum = do_checksum(tablet_id, version, version_hash, schema_hash, req, channel);
if (checksum == -1L) {
std::string error_msg = std::string("checksum falied");
std::string error_msg = std::string("checksum failed");
HttpResponse response(HttpStatus::INTERNAL_SERVER_ERROR, &error_msg);
channel->send_response(response);
return;
Expand Down
11 changes: 4 additions & 7 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,11 @@ static bool parse_auth(const std::string& auth, std::string* user,
}
user->assign(decoded_auth.c_str(), pos);
passwd->assign(decoded_auth.c_str() + pos + 1);
std::string::size_type cluster_pos = decoded_auth.find('@');
if (cluster_pos == std::string::npos) {
cluster_pos = pos;
} else {
cluster->assign(decoded_auth.c_str(), cluster_pos + 1, (pos - cluster_pos - 1));
const std::string::size_type cluster_pos = user->find('@');
if (cluster_pos != std::string::npos) {
cluster->assign(user->c_str(), cluster_pos + 1, pos - cluster_pos - 1);
user->assign(user->c_str(), cluster_pos);
}

user->assign(decoded_auth.c_str(), cluster_pos);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/snapshot_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void SnapshotAction::handle(HttpRequest *req, HttpChannel *channel) {
std::string snapshot_path;
int64_t ret = make_snapshot(tablet_id, schema_hash, &snapshot_path);
if (ret != 0L) {
std::string error_msg = std::string("make snapshot falied");
std::string error_msg = std::string("make snapshot failed");
HttpResponse response(HttpStatus::INTERNAL_SERVER_ERROR, &error_msg);
channel->send_response(response);
return;
Expand Down
Loading

0 comments on commit 5de798f

Please sign in to comment.