Skip to content

Commit

Permalink
Merge branch 'master' into patch-4
Browse files Browse the repository at this point in the history
  • Loading branch information
freemandealer authored Nov 13, 2024
2 parents b8949d3 + d6476dd commit ec7c00d
Show file tree
Hide file tree
Showing 38 changed files with 1,250 additions and 279 deletions.
45 changes: 44 additions & 1 deletion cloud/src/meta-service/meta_service_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

#include <brpc/controller.h>
#include <gen_cpp/cloud.pb.h>
#include <openssl/md5.h>

#include <iomanip>
#include <memory>
#include <string>
#include <string_view>
Expand All @@ -29,12 +31,26 @@
#include "common/logging.h"
#include "common/stopwatch.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "resource-manager/resource_manager.h"

namespace doris::cloud {
inline std::string md5(const std::string& str) {
unsigned char digest[MD5_DIGEST_LENGTH];
MD5_CTX context;
MD5_Init(&context);
MD5_Update(&context, str.c_str(), str.length());
MD5_Final(digest, &context);

std::ostringstream ss;
for (unsigned char i : digest) {
ss << std::setw(2) << std::setfill('0') << std::hex << (int)i;
}
return ss.str();
}

template <class Request>
void begin_rpc(std::string_view func_name, brpc::Controller* ctrl, const Request* req) {
Expand Down Expand Up @@ -101,7 +117,34 @@ void finish_rpc(std::string_view func_name, brpc::Controller* ctrl, Response* re
LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
<< " status=" << res->status().ShortDebugString()
<< " delete_bitmap_size=" << res->segment_delete_bitmaps_size();

} else if constexpr (std::is_same_v<Response, GetObjStoreInfoResponse> ||
std::is_same_v<Response, GetStageResponse>) {
std::string debug_string = res->DebugString();
// Start position for searching "sk" fields
size_t pos = 0;
// Iterate through the string and find all occurrences of "sk: "
while ((pos = debug_string.find("sk: ", pos)) != std::string::npos) {
// Find the start and end of the "sk" value (assumed to be within quotes)
// Start after the quote
size_t sk_value_start = debug_string.find('\"', pos) + 1;
// End at the next quote
size_t sk_value_end = debug_string.find('\"', sk_value_start);

// Extract the "sk" value
std::string sk_value =
debug_string.substr(sk_value_start, sk_value_end - sk_value_start);
// Encrypt the "sk" value with MD5
std::string encrypted_sk = "md5: " + md5(sk_value);

// Replace the original "sk" value with the encrypted MD5 value
debug_string.replace(sk_value_start, sk_value_end - sk_value_start, encrypted_sk);

// Move the position to the end of the current "sk" field and continue searching
pos = sk_value_end;
}
TEST_SYNC_POINT_CALLBACK("sk_finish_rpc", &debug_string);
LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
<< " response=" << debug_string;
} else {
LOG(INFO) << "finish " << func_name << " from " << ctrl->remote_side()
<< " response=" << res->ShortDebugString();
Expand Down
4 changes: 4 additions & 0 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro
GetObjStoreInfoResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_obj_store_info);
TEST_SYNC_POINT_CALLBACK("obj-store-info_sk_response", &response);
TEST_SYNC_POINT_RETURN_WITH_VOID("obj-store-info_sk_response_return");
// Prepare data
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
if (cloud_unique_id.empty()) {
Expand Down Expand Up @@ -2600,6 +2602,8 @@ void MetaServiceImpl::get_stage(google::protobuf::RpcController* controller,
const GetStageRequest* request, GetStageResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(get_stage);
TEST_SYNC_POINT_CALLBACK("stage_sk_response", &response);
TEST_SYNC_POINT_RETURN_WITH_VOID("stage_sk_response_return");
std::string cloud_unique_id = request->has_cloud_unique_id() ? request->cloud_unique_id() : "";
if (cloud_unique_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
Expand Down
79 changes: 79 additions & 0 deletions cloud/test/meta_service_http_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1456,4 +1456,83 @@ TEST(MetaServiceHttpTest, TxnLazyCommit) {
}
}

TEST(MetaServiceHttpTest, get_stage_response_sk) {
auto sp = SyncPoint::get_instance();
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](...) { sp->disable_processing(); });

GetStageResponse res;
auto* stage = res.add_stage();
stage->mutable_obj_info()->set_ak("stage-ak");
stage->mutable_obj_info()->set_sk("stage-sk");
auto foo = [res](auto args) { (*(try_any_cast<GetStageResponse**>(args[0])))->CopyFrom(res); };
sp->set_call_back("stage_sk_response", foo);
sp->set_call_back("stage_sk_response_return",
[](auto&& args) { *try_any_cast<bool*>(args.back()) = true; });

auto rate_limiter = std::make_shared<cloud::RateLimiter>();

auto ms = std::make_unique<cloud::MetaServiceImpl>(nullptr, nullptr, rate_limiter);

auto bar = [](auto args) {
std::cout << *try_any_cast<std::string*>(args[0]);

EXPECT_TRUE((*try_any_cast<std::string*>(args[0])).find("stage-sk") == std::string::npos);
EXPECT_TRUE((*try_any_cast<std::string*>(args[0]))
.find("md5: f497d053066fa4b7d3b1f6564597d233") != std::string::npos);
};
sp->set_call_back("sk_finish_rpc", bar);

GetStageResponse res1;
GetStageRequest req1;
brpc::Controller cntl;
ms->get_stage(&cntl, &req1, &res1, nullptr);
}

TEST(MetaServiceHttpTest, get_obj_store_info_response_sk) {
auto sp = SyncPoint::get_instance();
sp->enable_processing();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](...) { sp->disable_processing(); });

GetObjStoreInfoResponse res;
auto* obj_info = res.add_obj_info();
obj_info->set_ak("obj-store-info-ak1");
obj_info->set_sk("obj-store-info-sk1");
obj_info = res.add_storage_vault()->mutable_obj_info();
obj_info->set_ak("obj-store-info-ak2");
obj_info->set_sk("obj-store-info-sk2");
auto foo = [res](auto args) {
(*(try_any_cast<GetObjStoreInfoResponse**>(args[0])))->CopyFrom(res);
};
sp->set_call_back("obj-store-info_sk_response", foo);
sp->set_call_back("obj-store-info_sk_response_return",
[](auto&& args) { *try_any_cast<bool*>(args.back()) = true; });

auto rate_limiter = std::make_shared<cloud::RateLimiter>();

auto ms = std::make_unique<cloud::MetaServiceImpl>(nullptr, nullptr, rate_limiter);

auto bar = [](auto args) {
std::cout << *try_any_cast<std::string*>(args[0]);

EXPECT_TRUE((*try_any_cast<std::string*>(args[0])).find("obj-store-info-sk1") ==
std::string::npos);
EXPECT_TRUE((*try_any_cast<std::string*>(args[0]))
.find("md5: 35d5a637fd9d45a28207a888b751efc4") != std::string::npos);

EXPECT_TRUE((*try_any_cast<std::string*>(args[0])).find("obj-store-info-sk2") ==
std::string::npos);
EXPECT_TRUE((*try_any_cast<std::string*>(args[0]))
.find("md5: 01d7473ae201a2ecdf1f7c064eb81a95") != std::string::npos);
};
sp->set_call_back("sk_finish_rpc", bar);

GetObjStoreInfoResponse res1;
GetObjStoreInfoRequest req1;
brpc::Controller cntl;
ms->get_obj_store_info(&cntl, &req1, &res1, nullptr);
}

} // namespace doris::cloud
3 changes: 1 addition & 2 deletions common/cpp/sync_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ void SyncPoint::Data::disable_processing() {

bool SyncPoint::Data::has_point(const std::string& point) {
std::unique_lock lock(mutex_);
auto marked_point_iter = marked_thread_id_.find(point);
return marked_point_iter != marked_thread_id_.end();
return callbacks_.find(point) != callbacks_.end();
}

bool SyncPoint::Data::get_enable() {
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ under the License.
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>doris-fe-common</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3249,4 +3249,10 @@ public static int metaServiceRpcRetryTimes() {
"For testing purposes, all queries are forcibly forwarded to the master to verify"
+ "the behavior of forwarding queries."})
public static boolean force_forward_all_queries = false;

@ConfField(description = {"用于禁用某些SQL,配置项为AST的class simple name列表(例如CreateRepositoryStmt,"
+ "CreatePolicyCommand),用逗号间隔开",
"For disabling certain SQL queries, the configuration item is a list of simple class names of AST"
+ "(for example CreateRepositoryStmt, CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.doris.common.security.authentication;

import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public abstract class AuthenticationConfig {
private static final Logger LOG = LogManager.getLogger(AuthenticationConfig.class);
public static String HADOOP_USER_NAME = "hadoop.username";
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
Expand All @@ -42,6 +46,10 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf) {
return AuthenticationConfig.getKerberosConfig(conf, HADOOP_KERBEROS_PRINCIPAL, HADOOP_KERBEROS_KEYTAB);
}

public static AuthenticationConfig getSimpleAuthenticationConfig(Configuration conf) {
return AuthenticationConfig.createSimpleAuthenticationConfig(conf);
}

/**
* get kerberos config from hadoop conf
* @param conf config
Expand All @@ -54,17 +62,35 @@ public static AuthenticationConfig getKerberosConfig(Configuration conf,
String krbKeytabKey) {
String authentication = conf.get(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, null);
if (AuthType.KERBEROS.getDesc().equals(authentication)) {
KerberosAuthenticationConfig krbConfig = new KerberosAuthenticationConfig();
krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey));
krbConfig.setKerberosKeytab(conf.get(krbKeytabKey));
krbConfig.setConf(conf);
krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, "false")));
return krbConfig;
} else {
// AuthType.SIMPLE
SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig();
simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME));
return simpleAuthenticationConfig;
String principalKey = conf.get(krbPrincipalKey);
String keytabKey = conf.get(krbKeytabKey);
if (!Strings.isNullOrEmpty(principalKey) && !Strings.isNullOrEmpty(keytabKey)) {
KerberosAuthenticationConfig krbConfig = new KerberosAuthenticationConfig();
krbConfig.setKerberosPrincipal(principalKey);
krbConfig.setKerberosKeytab(keytabKey);
krbConfig.setConf(conf);
krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, "false")));
return krbConfig;
} else {
// Due to some historical reasons, `core-size.xml` may be stored in path:`fe/conf`,
// but this file may only contain `hadoop.security.authentication configuration`,
// and no krbPrincipalKey and krbKeytabKey,
// which will cause kerberos initialization failure.
// Now:
// if kerberos is needed, the relevant configuration can be written in the catalog properties,
// if kerberos is not needed, to prevent the influence of historical reasons,
// the following simple authentication method needs to be used.
LOG.warn("{} or {} is null or empty, fallback to simple authentication",
krbPrincipalKey, krbKeytabKey);
}
}
return createSimpleAuthenticationConfig(conf);
}

private static AuthenticationConfig createSimpleAuthenticationConfig(Configuration conf) {
// AuthType.SIMPLE
SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig();
simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME));
return simpleAuthenticationConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.security.authentication;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.junit.Assert;
import org.junit.Test;

public class AuthenticationTest {

@Test
public void testAuthConf() {
Configuration conf = new Configuration();

AuthenticationConfig conf1 = AuthenticationConfig.getKerberosConfig(conf);
Assert.assertEquals(SimpleAuthenticationConfig.class, conf1.getClass());

conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");

AuthenticationConfig conf2 = AuthenticationConfig.getKerberosConfig(conf);
Assert.assertEquals(SimpleAuthenticationConfig.class, conf2.getClass());

conf.set(AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, "principal");
conf.set(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB, "keytab");

AuthenticationConfig conf3 = AuthenticationConfig.getKerberosConfig(conf);
Assert.assertEquals(KerberosAuthenticationConfig.class, conf3.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ClearAlterTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.thrift.TInvertedIndexFileStorageFormat;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTaskType;
Expand Down Expand Up @@ -2730,12 +2731,15 @@ private boolean processAddIndex(CreateIndexClause alterClause, OlapTable olapTab
+ " ) already exist.");
}
}

boolean disableInvertedIndexV1ForVariant = olapTable.getInvertedIndexFileStorageFormat()
== TInvertedIndexFileStorageFormat.V1 && ConnectContext.get().getSessionVariable()
.getDisableInvertedIndexV1ForVaraint();
for (String col : indexDef.getColumns()) {
Column column = olapTable.getColumn(col);
if (column != null) {
indexDef.checkColumn(column, olapTable.getKeysType(),
olapTable.getTableProperty().getEnableUniqueKeyMergeOnWrite());
olapTable.getTableProperty().getEnableUniqueKeyMergeOnWrite(),
disableInvertedIndexV1ForVariant);
indexDef.getColumnUniqueIds().add(column.getUniqueId());
} else {
throw new DdlException("index column does not exist in table. invalid column: " + col);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriteRule;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.thrift.TInvertedIndexFileStorageFormat;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -598,7 +599,9 @@ public void analyze(Analyzer analyzer) throws UserException {
if (CollectionUtils.isNotEmpty(indexDefs)) {
Set<String> distinct = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
Set<Pair<IndexType, List<String>>> distinctCol = new HashSet<>();

boolean disableInvertedIndexV1ForVariant = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(
new HashMap<>(properties)) == TInvertedIndexFileStorageFormat.V1
&& ConnectContext.get().getSessionVariable().getDisableInvertedIndexV1ForVaraint();
for (IndexDef indexDef : indexDefs) {
indexDef.analyze();
if (!engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) {
Expand All @@ -608,7 +611,8 @@ public void analyze(Analyzer analyzer) throws UserException {
boolean found = false;
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(indexColName)) {
indexDef.checkColumn(column, getKeysDesc().getKeysType(), enableUniqueKeyMergeOnWrite);
indexDef.checkColumn(column, getKeysDesc().getKeysType(),
enableUniqueKeyMergeOnWrite, disableInvertedIndexV1ForVariant);
found = true;
break;
}
Expand Down
Loading

0 comments on commit ec7c00d

Please sign in to comment.