From 4fcf985141fe89db23be1d8e06b87c11cef3ce1f Mon Sep 17 00:00:00 2001 From: Yufei-YAO <2672855186@qq.com> Date: Fri, 8 Nov 2024 21:29:23 +0800 Subject: [PATCH] [Feature](stream-load)(config) implement streamload via http stream (#36773) 1. Add a new BE config `enable_streamload_by_httpstream` 2. Implement streamload via httpstream --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/http/action/stream_load.cpp | 545 ++++++++++++++++++++++++++++- be/src/http/action/stream_load.h | 9 + 4 files changed, 557 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 31170b731f4e750..79ca3bf39560e8f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -545,6 +545,8 @@ DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // Whether to enable stream load record function, the default is false. // False: disable stream load record DEFINE_mBool(enable_stream_load_record, "false"); +// Whether to enable use httpstream to perform streamload , the default is false. +DEFINE_mBool(enable_streamload_by_httpstream, "false"); // batch size of stream load record reported to FE DEFINE_mInt32(stream_load_record_batch_size, "50"); // expire time of stream load record in rocksdb. diff --git a/be/src/common/config.h b/be/src/common/config.h index 585c4dc45ccef9c..80fc143208d2e18 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -592,6 +592,8 @@ DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec); // Whether to enable stream load record function, the default is false. // False: disable stream load record DECLARE_mBool(enable_stream_load_record); +// Whether to enable use httpstream to perform streamload , the default is false. +DECLARE_mBool(enable_streamload_by_httpstream); // batch size of stream load record reported to FE DECLARE_mInt32(stream_load_record_batch_size); // expire time of stream load record in rocksdb. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 60c9f659fbc4ebd..ebe2435bfc7f179 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -104,6 +104,18 @@ void StreamLoadAction::handle(HttpRequest* req) { return; } + if (config::enable_streamload_by_httpstream) { + if (ctx->sql_str.empty()) { + auto st = _construct_sql_from_req(req, ctx); + if (!st.ok()) { + ctx->status = st; + return; + } + } + _httpstream_handle(req, ctx); + return; + } + // status already set to fail if (ctx->status.ok()) { ctx->status = _handle(ctx); @@ -190,8 +202,19 @@ Status StreamLoadAction::_handle(std::shared_ptr ctx) { int StreamLoadAction::on_header(HttpRequest* req) { streaming_load_current_processing->increment(1); - std::shared_ptr ctx = std::make_shared(_exec_env); + + if (config::enable_streamload_by_httpstream) { + if (ctx->sql_str.empty()) { + auto st = _construct_sql_from_req(req, ctx); + if (!st.ok()) { + ctx->status = st; + return -1; + } + } + return _httpstream_on_header(req, ctx); + } + req->set_handler_ctx(ctx); ctx->load_type = TLoadType::MANUL_LOAD; @@ -354,6 +377,18 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { if (ctx == nullptr || !ctx->status.ok()) { return; } + if (config::enable_streamload_by_httpstream) { + //set req sql; + if (ctx->sql_str.empty()) { + auto st = _construct_sql_from_req(req, ctx); + if (!st.ok()) { + ctx->status = st; + return; + } + } + _httpstream_on_chunk_data(req, ctx); + return; + } struct evhttp_request* ev_req = req->get_evhttp_request(); auto evbuf = evhttp_request_get_input_buffer(ev_req); @@ -873,4 +908,512 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, return Status::OK(); } +void StreamLoadAction::_httpstream_handle(HttpRequest* req, + std::shared_ptr ctx) { + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _http_stream_handle(req, ctx); + if (!ctx->status.ok() && !ctx->status.is()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is()) { + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + // update statistics + streaming_load_requests_total->increment(1); + streaming_load_duration_ms->increment(ctx->load_cost_millis); +} + +Status StreamLoadAction::_http_stream_handle(HttpRequest* http_req, + std::shared_ptr ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::Error("receive body don't equal with body bytes"); + } + RETURN_IF_ERROR(ctx->body_sink->finish()); + + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + + if (ctx->group_commit) { + LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); + return Status::OK(); + } + + if (ctx->two_phase_commit) { + int64_t pre_commit_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); + ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; + } else { + // If put file success we need commit this load + int64_t commit_and_publish_start_time = MonotonicNanos(); + RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); + ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; + } + return Status::OK(); +} + +int StreamLoadAction::_httpstream_on_header(HttpRequest* req, + std::shared_ptr ctx) { + req->set_handler_ctx(ctx); + + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = TLoadSourceType::RAW; + ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; + Status st = _handle_group_commit(req, ctx); + + LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << ctx->sql_str + << ", group_commit=" << ctx->group_commit; + if (st.ok()) { + st = _http_stream_on_header(req, ctx); + } + if (!st.ok()) { + ctx->status = std::move(st); + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + auto str = ctx->to_json(); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + return -1; + } + return 0; +} + +Status StreamLoadAction::_http_stream_on_header(HttpRequest* http_req, + std::shared_ptr ctx) { + // auth information + if (!parse_basic_auth(*http_req, &ctx->auth)) { + LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); + return Status::NotAuthorized("no valid Basic authorization"); + } + + // TODO(zs) : need Need to request an FE to obtain information such as format + // check content length + ctx->body_bytes = 0; + size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + try { + ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + } catch (const std::exception& e) { + return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", + http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); + } + // csv max body size + if (ctx->body_bytes > csv_max_body_bytes) { + LOG(WARNING) << "body exceed max size." << ctx->brief(); + return Status::Error( + "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " + "are sure this load is reasonable", + ctx->body_bytes, csv_max_body_bytes); + } + } + + auto pipe = std::make_shared( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + ctx->body_bytes /* total_length */); + ctx->body_sink = pipe; + ctx->pipe = pipe; + + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); + + // Here, transactions are set from fe's NativeInsertStmt. + // TODO(zs) : How to support two_phase_commit + + return Status::OK(); +} + +void StreamLoadAction::_httpstream_on_chunk_data(HttpRequest* req, + std::shared_ptr ctx) { + if (!req->header(HTTP_WAL_ID_KY).empty()) { + ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY)); + } + struct evhttp_request* ev_req = req->get_evhttp_request(); + auto evbuf = evhttp_request_get_input_buffer(ev_req); + + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); + + int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } + while (evbuffer_get_length(evbuf) > 0) { + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + // schema_buffer stores 1M of data for parsing column information + // need to determine whether to cache for the first time + if (ctx->is_read_schema) { + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); + } else { + LOG(INFO) << "use a portion of data to request fe to obtain column information"; + ctx->is_read_schema = false; + ctx->status = httpstream_process_put(req, ctx); + } + } + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; + } + // after all the data has been read and it has not reached 1M, it will execute here + if (ctx->is_read_schema) { + LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " + << "here"; + ctx->is_read_schema = false; + ctx->status = httpstream_process_put(req, ctx); + } + ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); +} + +Status StreamLoadAction::httpstream_process_put(HttpRequest* http_req, + std::shared_ptr ctx) { + TStreamLoadPutRequest request; + if (http_req != nullptr) { + request.__set_load_sql(ctx->sql_str); + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { + bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); + request.__set_memtable_on_sink_node(value); + } + } else { + request.__set_token(ctx->auth.token); + request.__set_load_sql(ctx->sql_str); + ctx->auth.token = ""; + } + set_request_auth(&request, ctx->auth); + request.__set_loadId(ctx->id.to_thrift()); + request.__set_label(ctx->label); + if (ctx->group_commit) { + if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + } else { + // used for wait_internal_group_commit_finish + request.__set_group_commit_mode("sync_mode"); + } + } + if (_exec_env->master_info()->__isset.backend_id) { + request.__set_backend_id(_exec_env->master_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } + + // plan this load + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, ctx](FrontendServiceConnection& client) { + client->streamLoadPut(ctx->put_result, request); + })); + ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; + Status plan_status(Status::create(ctx->put_result.status)); + if (!plan_status.ok()) { + LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); + return plan_status; + } + if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { + return Status::NotSupported("http stream 2pc is unsupported for mow table"); + } + ctx->db = ctx->put_result.pipeline_params.db_name; + ctx->table = ctx->put_result.pipeline_params.table_name; + ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id; + ctx->label = ctx->put_result.pipeline_params.import_label; + ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id); + if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + // FIXME find a way to avoid chunked stream load write large WALs + size_t content_length = 0; + if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { + try { + content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + } catch (const std::exception& e) { + return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", + http_req->header(HttpHeaders::CONTENT_LENGTH), + e.what()); + } + if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || + ctx->format == TFileFormatType::FORMAT_CSV_LZO || + ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || + ctx->format == TFileFormatType::FORMAT_CSV_LZOP || + ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || + ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { + content_length *= 3; + } + } + ctx->put_result.pipeline_params.__set_content_length(content_length); + } + + return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); +} + +std::string escapeString(const std::string& str) { + std::stringstream ss; + for (char ch : str) { + switch (ch) { + case '\'': + ss << "\\\'"; + break; + case '\"': + ss << "\\\""; + break; + case '\\': + ss << "\\\\"; + break; + case '\a': + ss << "\\a"; + break; + case '\b': + ss << "\\b"; + break; + case '\f': + ss << "\\f"; + break; + case '\n': + ss << "\\n"; + break; + case '\r': + ss << "\\r"; + break; + case '\t': + ss << "\\t"; + break; + case '\v': + ss << "\\v"; + break; + default: + ss << ch; + } + } + return ss.str(); +} + +Status StreamLoadAction::_construct_sql_from_req(HttpRequest* req, + std::shared_ptr ctx) { + string db_name; + string table_name; + url_decode(req->param(HTTP_DB_KEY), &db_name); + url_decode(req->param(HTTP_TABLE_KEY), &table_name); + + string& sql = ctx->sql_str; + + sql = "INSERT INTO "; + + sql += db_name + "." + table_name + " "; + + sql += "SELECT * "; + + sql += "FROM http_stream("; + + if (!req->header(HTTP_ENCLOSE).empty() && !req->header(HTTP_ENCLOSE).empty()) { + const auto& enclose_str = req->header(HTTP_ENCLOSE); + if (enclose_str.length() != 1) { + return Status::InvalidArgument("enclose must be single-char, actually is {}", + enclose_str); + } + } + if (!req->header(HTTP_ESCAPE).empty() && !req->header(HTTP_ESCAPE).empty()) { + const auto& escape_str = req->header(HTTP_ESCAPE); + if (escape_str.length() != 1) { + return Status::InvalidArgument("escape must be single-char, actually is {}", + escape_str); + } + } + if (!req->header(HTTP_PARTITIONS).empty()) { + if (!req->header(HTTP_TEMP_PARTITIONS).empty()) { + return Status::InvalidArgument( + "Can not specify both partitions and temporary partitions"); + } + } + if (!req->header(HTTP_TEMP_PARTITIONS).empty()) { + if (!req->header(HTTP_PARTITIONS).empty()) { + return Status::InvalidArgument( + "Can not specify both partitions and temporary partitions"); + } + } + + if (!req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) { + try { + std::stoi(req->header(HTTP_SEND_BATCH_PARALLELISM)); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("send_batch_parallelism must be an integer, {}", + e.what()); + } catch (const std::out_of_range& e) { + return Status::InvalidArgument("send_batch_parallelism out of range, {}", e.what()); + } + } + + TMergeType::type merge_type = TMergeType::APPEND; + StringCaseMap merge_type_map = {{"APPEND", TMergeType::APPEND}, + {"DELETE", TMergeType::DELETE}, + {"MERGE", TMergeType::MERGE}}; + if (!req->header(HTTP_MERGE_TYPE).empty()) { + std::string merge_type_str = req->header(HTTP_MERGE_TYPE); + auto iter = merge_type_map.find(merge_type_str); + if (iter != merge_type_map.end()) { + merge_type = iter->second; + } else { + return Status::InvalidArgument("Invalid merge type {}", merge_type_str); + } + if (merge_type == TMergeType::MERGE && req->header(HTTP_DELETE_CONDITION).empty()) { + return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE."); + } else if (merge_type != TMergeType::MERGE && !req->header(HTTP_DELETE_CONDITION).empty()) { + return Status::InvalidArgument( + "Not support DELETE ON clause when merge type is not MERGE."); + } + } + + if (!req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { + static const StringCaseMap unique_key_update_mode_map = { + {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, + {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, + {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; + std::string unique_key_update_mode_str = req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); + auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); + if (iter != unique_key_update_mode_map.end()) { + TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; + if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { + // check constraints when flexible partial update is enabled + if (ctx->format != TFileFormatType::FORMAT_JSON) { + return Status::InvalidArgument( + "flexible partial update only support json format as input file " + "currently"); + } + if (!req->header(HTTP_FUZZY_PARSE).empty() && + iequal(req->header(HTTP_FUZZY_PARSE), "true")) { + return Status::InvalidArgument( + "Don't support flexible partial update when 'fuzzy_parse' is enabled"); + } + if (!req->header(HTTP_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when 'columns' is specified"); + } + if (!req->header(HTTP_JSONPATHS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when 'jsonpaths' is specified"); + } + if (!req->header(HTTP_HIDDEN_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when 'hidden_columns' is " + "specified"); + } + if (!req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "'function_column.sequence_col' is specified"); + } + if (!req->header(HTTP_MERGE_TYPE).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "'merge_type' is specified"); + } + if (!req->header(HTTP_WHERE).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "'where' is specified"); + } + } + } else { + return Status::InvalidArgument( + "Invalid unique_key_partial_mode {}, must be one of 'UPSERT', " + "'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'", + unique_key_update_mode_str); + } + } + + if (!req->header(HTTP_FORMAT_KEY).empty()) { + sql += "\"" + HTTP_FORMAT_KEY + "\" = \"" + escapeString(req->header(HTTP_FORMAT_KEY)) + + "\""; + } else { + sql += "\"" + HTTP_FORMAT_KEY + "\" = \"" + "CSV" + "\""; + } + +#define __ADD_IF_EXIST(PROPERTIY_KEY) \ + if (!req->header(PROPERTIY_KEY).empty()) { \ + sql += ","; \ + sql += "\"" + PROPERTIY_KEY + "\" = \"" + escapeString(req->header(PROPERTIY_KEY)) + "\""; \ + } + __ADD_IF_EXIST(HTTP_COLUMNS) + __ADD_IF_EXIST(HTTP_COLUMN_SEPARATOR) + __ADD_IF_EXIST(HTTP_LINE_DELIMITER) + __ADD_IF_EXIST(HTTP_ENCLOSE) + __ADD_IF_EXIST(HTTP_ESCAPE) + __ADD_IF_EXIST(HTTP_PARTITIONS) + __ADD_IF_EXIST(HTTP_TEMP_PARTITIONS) + __ADD_IF_EXIST(HTTP_NEGATIVE) + __ADD_IF_EXIST(HTTP_STRICT_MODE) + __ADD_IF_EXIST(HTTP_TIME_ZONE) + __ADD_IF_EXIST(HTTP_EXEC_MEM_LIMIT) + __ADD_IF_EXIST(HTTP_JSONPATHS) + __ADD_IF_EXIST(HTTP_JSONROOT) + __ADD_IF_EXIST(HTTP_STRIP_OUTER_ARRAY) + __ADD_IF_EXIST(HTTP_NUM_AS_STRING) + __ADD_IF_EXIST(HTTP_FUZZY_PARSE) + __ADD_IF_EXIST(HTTP_READ_JSON_BY_LINE) + __ADD_IF_EXIST(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL) + __ADD_IF_EXIST(HTTP_SEND_BATCH_PARALLELISM) + __ADD_IF_EXIST(HTTP_LOAD_TO_SINGLE_TABLET) + __ADD_IF_EXIST(HTTP_MERGE_TYPE) + __ADD_IF_EXIST(HTTP_DELETE_CONDITION) + __ADD_IF_EXIST(HTTP_MAX_FILTER_RATIO) + __ADD_IF_EXIST(HTTP_HIDDEN_COLUMNS) + __ADD_IF_EXIST(HTTP_TRIM_DOUBLE_QUOTES) + __ADD_IF_EXIST(HTTP_SKIP_LINES) + __ADD_IF_EXIST(HTTP_ENABLE_PROFILE) + __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE) + __ADD_IF_EXIST(HTTP_MEMTABLE_ON_SINKNODE) + __ADD_IF_EXIST(HTTP_LOAD_STREAM_PER_NODE) + __ADD_IF_EXIST(HTTP_GROUP_COMMIT) + __ADD_IF_EXIST(HTTP_CLOUD_CLUSTER) + __ADD_IF_EXIST(HTTP_UNIQUE_KEY_UPDATE_MODE) +#undef __ADD_IF_EXIST + + sql += ") "; + + if (!req->header(HTTP_WHERE).empty()) { + sql += "WHERE " + escapeString(req->header(HTTP_WHERE)); + } + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index d1de89c93970184..985b073459c552d 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -52,6 +52,15 @@ class StreamLoadAction : public HttpHandler { void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); + void _httpstream_handle(HttpRequest* req, std::shared_ptr ctx); + int _httpstream_on_header(HttpRequest* req, std::shared_ptr ctx); + void _httpstream_on_chunk_data(HttpRequest* req, std::shared_ptr ctx); + Status httpstream_process_put(HttpRequest* http_req, std::shared_ptr ctx); + Status _http_stream_on_header(HttpRequest* http_req, std::shared_ptr ctx); + Status _http_stream_handle(HttpRequest* req, std::shared_ptr ctx); + + Status _construct_sql_from_req(HttpRequest* req, std::shared_ptr ctx); + private: ExecEnv* _exec_env;