Skip to content

Commit

Permalink
feat: add func last_value()
Browse files Browse the repository at this point in the history
  • Loading branch information
wy1433 committed Apr 24, 2024
1 parent 7565895 commit 2003ee3
Show file tree
Hide file tree
Showing 24 changed files with 277 additions and 11 deletions.
10 changes: 10 additions & 0 deletions include/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,16 @@ inline bool float_equal(double value, double compare, double epsilon = 1e-9) {
return std::fabs(value - compare) < epsilon;
}

inline std::string redis_encode(std::string str) {
//\n<length>\n<data>
std::string tmp;
tmp += "\n";
tmp += std::to_string(str.length());
tmp += "\n";
tmp += str;
return tmp;
}

//set double buffer
template<typename T>
using DoubleBufferSet = butil::DoublyBufferedData<std::unordered_set<T>>;
Expand Down
2 changes: 2 additions & 0 deletions include/exec/dml_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class DMLNode : public ExecNode {
return _table_info;
}


protected:
int init_schema_info(RuntimeState* state);
void add_delete_conditon_fields();
Expand Down Expand Up @@ -128,6 +129,7 @@ class DMLNode : public ExecNode {
int64_t _ttl_timestamp_us = 0; //ttl写入时间,0表示无ttl
bool _ddl_need_write = false;
int64_t _ddl_index_id = -1;
ExprNode* _last_value_expr = nullptr; // not own it
};
}

Expand Down
9 changes: 9 additions & 0 deletions include/exec/filter_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ class FilterNode : public ExecNode {
e->reset(state);
}
}
ExprNode* get_last_value() {
for (auto conjunct : _conjuncts) {
auto last_value_expr = conjunct->get_last_value();
if (last_value_expr != nullptr) {
return last_value_expr;
}
}
return nullptr;
}

private:
bool need_copy(MemRow* row);
Expand Down
2 changes: 1 addition & 1 deletion include/exec/insert_manager_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class InsertManagerNode : public DmlManagerNode {
}

private:
void update_record(const SmartRecord& record, const SmartRecord& origin_record);
void update_record(const SmartRecord& record, const SmartRecord& origin_record, RuntimeState* state = nullptr);
int64_t _table_id = -1;
int32_t _tuple_id = -1;
int32_t _values_tuple_id = -1;
Expand Down
1 change: 1 addition & 0 deletions include/exec/lock_primary_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class LockPrimaryNode : public DMLNode {
std::vector<int64_t> _affected_index_ids;
std::vector<ExprNode*> _conjuncts;
bool _conjuncts_need_destory = false;
ExprNode* _last_value_expr = nullptr; // not own it
};

}
Expand Down
62 changes: 61 additions & 1 deletion include/expr/expr_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,72 @@ class ExprNode {
}
virtual ExprNode* get_last_insert_id() {
for (auto c : _children) {
if (c->get_last_insert_id() != nullptr) {
auto expr = c->get_last_insert_id();
if (expr != nullptr) {
return c;
}
}
return nullptr;
}
virtual ExprNode* get_last_value() {
for (auto c : _children) {
auto expr = c->get_last_value();
if (expr != nullptr) {
return expr;
}
}
return nullptr;
}
virtual bool is_valid_int_cast(MemRow* row) {
if (_node_type == pb::SLOT_REF ||
_node_type == pb::STRING_LITERAL) {
auto v = get_value(row);
if (v.type == pb::STRING) {
char* end = nullptr;
strtoll(v.str_val.c_str(), &end, 10);
if (strlen(end) > 0) {
return false;
}
if (errno == ERANGE) {
errno = 0;
return false;
}
}
return true;
}
for (auto c : _children) {
if (!c->is_valid_int_cast(row)) {
return false;
}
}
return true;
}

virtual bool is_valid_double_cast(MemRow* row) {
if (_node_type == pb::SLOT_REF ||
_node_type == pb::STRING_LITERAL) {
auto v = get_value(row);
if (v.type == pb::STRING) {
char* end = nullptr;
strtod(v.str_val.c_str(), &end);
if (strlen(end) > 0) {
return false;
}
if (errno == ERANGE) {
errno = 0;
return false;
}
}
return true;
}
for (auto c : _children) {
if (!c->is_valid_double_cast(row)) {
return false;
}
}
return true;
}

bool is_row_expr() {
return _node_type == pb::ROW_EXPR;
}
Expand Down
1 change: 1 addition & 0 deletions include/expr/internal_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ ExprValue tdigest_location(const std::vector<ExprValue>& input);
// other
ExprValue version(const std::vector<ExprValue>& input);
ExprValue last_insert_id(const std::vector<ExprValue>& input);
ExprValue last_value(const std::vector<ExprValue>& input);
ExprValue find_in_set(const std::vector<ExprValue>& input);
//transfer (latitude A, longitude A), (latitude B, longitude B) to distance of A to B (m)
ExprValue point_distance(const std::vector<ExprValue>& input);
Expand Down
48 changes: 48 additions & 0 deletions include/expr/scalar_fn_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,54 @@ class ScalarFnCall : public ExprNode {
}
return ExprNode::get_last_insert_id();
}
ExprNode* get_last_value() {
if (_fn.name() == "last_value") {
return this;
}
return ExprNode::get_last_value();
}
virtual bool is_valid_int_cast(MemRow* row) {
if (_fn.fn_op() == parser::FT_ADD || _fn.fn_op() == parser::FT_MINUS) {
if (_children.size() != 2 || !_children[0]->is_valid_int_cast(row) || !_children[1]->is_valid_int_cast(row)) {
return false;
}
auto left = children(0)->get_value(row).cast_to(pb::INT64)._u.int64_val;
auto right = children(1)->get_value(row).cast_to(pb::INT64)._u.int64_val;
auto s = get_value(row).cast_to(pb::INT64)._u.int64_val;
if (_fn.fn_op() == parser::FT_MINUS) {
right = -right;
}
if (left >= 0 && right >= 0 && s < 0) {
return false; // 上溢
}
if (left < 0 && right < 0 && s >= 0) {
return false; // 下溢
}
return true;
}
return ExprNode::is_valid_int_cast(row);
}
virtual bool is_valid_double_cast(MemRow* row) {
if (_fn.fn_op() == parser::FT_ADD || _fn.fn_op() == parser::FT_MINUS) {
if (_children.size() != 2 || !_children[0]->is_valid_double_cast(row) || !_children[1]->is_valid_double_cast(row)) {
return false;
}
auto left = children(0)->get_value(row).cast_to(pb::DOUBLE)._u.double_val;
auto right = children(1)->get_value(row).cast_to(pb::DOUBLE)._u.double_val;
auto s = get_value(row).cast_to(pb::DOUBLE)._u.double_val;
if (_fn.fn_op() == parser::FT_MINUS) {
right = -right;
}
if (left >= 0 && right >= 0 && s < 0) {
return false; // 上溢
}
if (left < 0 && right < 0 && s >= 0) {
return false; // 下溢
}
return true;
}
return ExprNode::is_valid_int_cast(row);
}
private:
ExprValue multi_eq_value(MemRow* row) {
for (size_t i = 0; i < children(0)->children_size(); i++) {
Expand Down
1 change: 1 addition & 0 deletions include/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ class RuntimeState {
ExplainType explain_type = EXPLAIN_NULL;
std::shared_ptr<CMsketch> cmsketch = nullptr;
int64_t last_insert_id = INT64_MIN; //存储baikalStore last_insert_id(expr)更新的字段
std::string last_value = "";
pb::StoreRes* response = nullptr;

bool need_statistics = true; // 用于动态超时的时间统计,如果请求的实例非NORMAL或着返回backup的结果,则不记入统计
Expand Down
3 changes: 2 additions & 1 deletion include/session/network_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ struct NetworkSocket {
int is_auth_result_send_partly; // Auth result is sended partly,
// need to go on sending.
int64_t last_insert_id;
// Socket status.
std::string last_value = "";
// string status.
std::string current_db; // Current use database.
int charset_num; // Client charset number.
std::string charset_name; // Client charset name.
Expand Down
1 change: 1 addition & 0 deletions proto/store.interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ message ExtraReq {
message ExtraRes {
repeated RegionIndexs infos = 1;
optional RegionOfflineBinlogInfo offline_binlog_info = 2;
optional string last_value = 3;
};

message StoreReq {
Expand Down
25 changes: 25 additions & 0 deletions src/exec/delete_manager_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "network_socket.h"
#include "query_context.h"
#include "binlog_context.h"
#include "filter_node.h"

namespace baikaldb {
int DeleteManagerNode::open(RuntimeState* state) {
Expand Down Expand Up @@ -69,6 +70,27 @@ int DeleteManagerNode::open_global_delete(RuntimeState* state) {
DB_WARNING("select manager node fail");
return ret;
}
ExprNode* last_value_expr = nullptr; // not own it
FilterNode* where_filter_node = static_cast<FilterNode*>(
select_manager_or_limit_node->get_node(pb::WHERE_FILTER_NODE));
if (where_filter_node != nullptr) {
last_value_expr = where_filter_node->get_last_value();
}
FilterNode* table_filter_node = static_cast<FilterNode*>(
select_manager_or_limit_node->get_node(pb::TABLE_FILTER_NODE));
if (table_filter_node != nullptr) {
last_value_expr = table_filter_node->get_last_value();
}
if (last_value_expr != nullptr) {
ret = last_value_expr->open();
ON_SCOPE_EXIT(([last_value_expr]() {
last_value_expr->close();
}));
if (ret < 0) {
DB_WARNING("expr open fail, log_id:%lu ret:%d", state->log_id(), ret);
return ret;
}
}
_tuple_id = state->tuple_descs()[0].tuple_id();
SmartRecord record_template = SchemaFactory::get_instance()->new_record(*_table_info);
bool eos = false;
Expand All @@ -87,6 +109,9 @@ int DeleteManagerNode::open_global_delete(RuntimeState* state) {
for (auto slot : _primary_slots) {
record->set_value(record->get_field_by_tag(slot.field_id()), row->get_value(_tuple_id, slot.slot_id()));
}
if (last_value_expr != nullptr) {
state->client_conn()->last_value += redis_encode(last_value_expr->get_value(row).get_string());
}
scan_records.push_back(record);
}
} while (!eos);
Expand Down
36 changes: 35 additions & 1 deletion src/exec/dml_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "runtime_state.h"
#include "dml_node.h"
#include "filter_node.h"

namespace baikaldb {

Expand Down Expand Up @@ -207,6 +208,14 @@ int DMLNode::init_schema_info(RuntimeState* state) {
} else {
_affected_indexes = _all_indexes;
}
FilterNode* where_filter_node = static_cast<FilterNode*>(get_node(pb::WHERE_FILTER_NODE));
if (where_filter_node != nullptr) {
_last_value_expr = where_filter_node->get_last_value();
}
FilterNode* table_filter_node = static_cast<FilterNode*>(get_node(pb::TABLE_FILTER_NODE));
if (table_filter_node != nullptr) {
_last_value_expr = table_filter_node->get_last_value();
}
return 0;
}

Expand Down Expand Up @@ -378,7 +387,8 @@ int DMLNode::insert_row(RuntimeState* state, SmartRecord record, bool is_update)
}
return ret;
} else if (_is_replace) {
ret = delete_row(state, old_record, nullptr);
std::unique_ptr<MemRow> row = state->mem_row_desc()->fetch_mem_row();
ret = delete_row(state, old_record, row.get());
if (ret < 0) {
DB_WARNING_STATE(state, "remove fail, index:%ld ,ret:%d", info.id, ret);
return -1;
Expand Down Expand Up @@ -648,6 +658,10 @@ int DMLNode::delete_row(RuntimeState* state, SmartRecord record, MemRow* row) {
DB_WARNING_STATE(state, "lock table:%ld failed", _table_id);
return -1;
}
if (_last_value_expr != nullptr) {
state->last_value += redis_encode(_last_value_expr->get_value(row).get_string());
}

if (!satisfy_condition_again(state, row)) {
DB_WARNING_STATE(state, "condition changed when delete record:%s", record->debug_string().c_str());
// UndoGetForUpdate(pk_str)?
Expand Down Expand Up @@ -730,6 +744,26 @@ int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) {
if (last_insert_id_expr != nullptr) {
state->last_insert_id = last_insert_id_expr->get_value(row).get_numberic<int64_t>();
}
auto last_value_expr = expr->get_last_value();
if (last_value_expr != nullptr) {
// 类型检查
if (last_value_expr->children_size() == 2 && last_value_expr->children(1)->is_literal()) {
std::string frt = last_value_expr->children(1)->get_value(nullptr).get_string();
bool is_valid = true;
if (frt == "%d") {
is_valid = last_value_expr->children(0)->is_valid_int_cast(row);
} else if (frt == "%f") {
is_valid = last_value_expr->children(0)->is_valid_double_cast(row);
}
if (!is_valid) {
state->error_code = ER_ILLEGAL_VALUE_FOR_TYPE;
state->error_msg << "ERR value is not an integer or out of range";
DB_WARNING_STATE(state, "ERR value is not an integer or out of range");
return -1;
}
}
state->last_value += redis_encode(last_value_expr->get_value(row).get_string());
}
}
ret = insert_row(state, record, true);
if (ret < 0) {
Expand Down
5 changes: 5 additions & 0 deletions src/exec/fetcher_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,11 @@ ErrorType OnRPCDone::handle_response(const std::string& remote_side) {
if (_response.has_last_insert_id()) {
_client_conn->last_insert_id = _response.last_insert_id();
}
if (_response.has_extra_res()) {
if (_response.extra_res().has_last_value()) {
_client_conn->last_value = _response.extra_res().last_value();
}
}
if (_op_type != pb::OP_SELECT && _op_type != pb::OP_SELECT_FOR_UPDATE && _op_type != pb::OP_ROLLBACK && _op_type != pb::OP_COMMIT) {
_fetcher_store->affected_rows += _response.affected_rows();
_client_conn->txn_affected_rows += _response.affected_rows();
Expand Down
1 change: 1 addition & 0 deletions src/exec/filter_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ int FilterNode::get_next(RuntimeState* state, RowBatch* batch, bool* eos) {
}
}
std::unique_ptr<MemRow>& row = _child_row_batch.get_row();

if (_is_explain || need_copy(row.get())) {
batch->move_row(std::move(row));
++_num_rows_returned;
Expand Down
Loading

0 comments on commit 2003ee3

Please sign in to comment.