Skip to content

Commit

Permalink
Merge branch 'master' into master-fix-job-status
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored Dec 12, 2024
2 parents 8532b91 + 069a33a commit 69844b5
Show file tree
Hide file tree
Showing 31 changed files with 29,498 additions and 19 deletions.
Binary file added aazcp.tar.gz
Binary file not shown.
9 changes: 9 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, start_resp.alter_version());
std::vector<uint32_t> cluster_key_idxes;
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
}
reader_context.read_orderby_key_columns = &cluster_key_idxes;
reader_context.is_unique = false;
reader_context.sequence_id_idx = -1;
}

for (auto& split : rs_splits) {
RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/aggregate_functions/aggregate_function_reader.h"
Expand Down Expand Up @@ -589,6 +590,7 @@ void MemTable::shrink_memtable_by_agg() {
}

bool MemTable::need_flush() const {
DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
auto max_size = config::write_buffer_size;
if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
auto update_columns_size = _num_columns;
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques
for (int i = 0; i < num_cols; ++i) {
return_columns[i] = i;
}
std::vector<uint32_t> cluster_key_idxes;

DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK);

Expand Down Expand Up @@ -982,6 +983,14 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap = &_base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);
if (!_base_tablet_schema->cluster_key_uids().empty()) {
for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
}
reader_context.read_orderby_key_columns = &cluster_key_idxes;
reader_context.is_unique = false;
reader_context.sequence_id_idx = -1;
}
for (auto& rs_split : rs_splits) {
res = rs_split.rs_reader->init(&reader_context);
if (!res) {
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,15 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean ignore_backup_not_support_table_type = false;

/**
* whether to ignore temp partitions when backup, and not report exception.
*/
@ConfField(mutable = true, masterOnly = true, description = {
"是否忽略备份临时分区,不报异常",
"Whether to ignore temp partitions when backup, and not report exception."
})
public static boolean ignore_backup_tmp_partitions = false;

/**
* A internal config, to control the update interval of backup handler. Only used to speed up tests.
*/
Expand Down Expand Up @@ -2950,6 +2959,12 @@ public class Config extends ConfigBase {
})
public static long auto_analyze_interval_seconds = 86400; // 24 hours.

// A internal config to control whether to enable the checkpoint.
//
// ATTN: it only used in test environment.
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_checkpoint = true;

//==========================================================================
// begin of cloud config
//==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,14 @@ private void backup(Repository repository, Database db, BackupStmt stmt) throws
OlapTable olapTbl = (OlapTable) tbl;
tbl.readLock();
try {
if (olapTbl.existTempPartitions()) {
if (!Config.ignore_backup_tmp_partitions && olapTbl.existTempPartitions()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support backup table " + olapTbl.getName() + " with temp partitions");
}

PartitionNames partitionNames = tblRef.getPartitionNames();
if (partitionNames != null) {
if (partitionNames.isTemp()) {
if (!Config.ignore_backup_tmp_partitions && partitionNames.isTemp()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Do not support backup temp partitions in table " + tblRef.getName());
}
Expand Down
15 changes: 10 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,15 @@ private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable o
// check backup table again
if (backupTableRef.getPartitionNames() != null) {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
Partition partition = olapTable.getPartition(partName);
Partition partition = olapTable.getPartition(partName, false); // exclude tmp partitions
if (partition == null) {
status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+ " does not exist in table" + backupTableRef.getName().getTbl());
if (olapTable.getPartition(partName, true) != null) {
status = new Status(ErrCode.NOT_FOUND, "backup tmp partition " + partName
+ " in table " + backupTableRef.getName().getTbl() + " is not supported");
} else {
status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+ " does not exist in table " + backupTableRef.getName().getTbl());
}
return;
}
}
Expand All @@ -609,10 +614,10 @@ private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable o
// create snapshot tasks
List<Partition> partitions = Lists.newArrayList();
if (backupTableRef.getPartitionNames() == null) {
partitions.addAll(olapTable.getPartitions());
partitions.addAll(olapTable.getPartitions()); // no temp partitions in OlapTable.getPartitions()
} else {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
Partition partition = olapTable.getPartition(partName);
Partition partition = olapTable.getPartition(partName, false); // exclude tmp partitions
partitions.add(partition);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,14 @@ public OlapTable selectiveCopy(Collection<String> reservedPartitions, IndexExtSt
}
}

if (isForBackup) {
// drop all tmp partitions in copied table
for (Partition partition : copied.tempPartitions.getAllPartitions()) {
copied.partitionInfo.dropPartition(partition.getId());
}
copied.tempPartitions = new TempPartitions();
}

if (reservedPartitions == null || reservedPartitions.isEmpty()) {
// reserve all
return copied;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ public List<HivePartitionWithStatistics> getPartitions() {
return partitions;
}

public void clear() {
partitions.clear();
createdPartitionValues.clear();
}

public void addPartition(HivePartitionWithStatistics partition) {
partitions.add(partition);
}
Expand Down Expand Up @@ -1143,6 +1148,7 @@ private void undoUpdateStatisticsTasks() {
for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) {
MoreFutures.getFutureValue(undoUpdateFuture);
}
updateStatisticsTasks.clear();
}

private void undoAddPartitionsTask() {
Expand All @@ -1157,6 +1163,7 @@ private void undoAddPartitionsTask() {
LOG.warn("Failed to rollback: add_partition for partition values {}.{}",
tableInfo, rollbackFailedPartitions);
}
addPartitionsTask.clear();
}

private void waitForAsyncFileSystemTaskSuppressThrowable() {
Expand All @@ -1169,6 +1176,7 @@ private void waitForAsyncFileSystemTaskSuppressThrowable() {
// ignore
}
}
asyncFileSystemTaskFutures.clear();
}

public void prepareInsertExistingTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) {
Expand Down Expand Up @@ -1319,6 +1327,7 @@ private void runDirectoryClearUpTasksForAbort() {
for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) {
recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir(), false);
}
directoryCleanUpTasksForAbort.clear();
}

private void runRenameDirTasksForAbort() {
Expand All @@ -1334,6 +1343,7 @@ private void runRenameDirTasksForAbort() {
}
}
}
renameDirectoryTasksForAbort.clear();
}

private void runClearPathsForFinish() {
Expand Down Expand Up @@ -1486,6 +1496,7 @@ private void abortMultiUploads() {
.build());
}, fileSystemExecutor));
}
uncompletedMpuPendingUploads.clear();
}

public void doNothing() {
Expand Down Expand Up @@ -1520,6 +1531,7 @@ public void rollback() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
asyncFileSystemTaskFutures.clear();
}

public void shutdownExecutorService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void dropDb(DropDbStmt stmt) throws DdlException {
return null;
});
} catch (Exception e) {
throw new DdlException("Failed to drop database: " + stmt.getDbName() + " ,error message is: ", e);
throw new DdlException(
"Failed to drop database: " + stmt.getDbName() + ", error message is: " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ protected void runAfterCatalogReady() {
// public for unit test, so that we can trigger checkpoint manually.
// DO NOT call it manually outside the unit test.
public synchronized void doCheckpoint() throws CheckpointException {
if (!Config.enable_checkpoint) {
LOG.warn("checkpoint is disabled. please enable the config 'enable_checkpoint'.");
return;
}

if (!Env.getServingEnv().isHttpReady()) {
LOG.info("Http server is not ready.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* Common class for SqlBlockRule Commands.
*/
public abstract class SqlBlockRuleCommand extends Command {
public abstract class SqlBlockRuleCommand extends Command implements ForwardWithSync {
public static final String SQL_PROPERTY = "sql";

public static final String SQL_HASH_PROPERTY = "sqlHash";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.NoForward;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -68,7 +68,7 @@
type of routine load:
KAFKA
*/
public class CreateRoutineLoadCommand extends Command implements NoForward {
public class CreateRoutineLoadCommand extends Command implements ForwardWithSync {
CreateRoutineLoadInfo createRoutineLoadInfo;

public CreateRoutineLoadCommand(CreateRoutineLoadInfo createRoutineLoadInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,5 +675,43 @@ public void testRollbackNewPartitionForPartitionedTableWithNewAppendPartition()
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(3, pa);
}
}

@Test
public void testCommitWithRollback() {
genQueryID();
List<THivePartitionUpdate> pus = new ArrayList<>();
try {
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
} catch (Throwable t) {
Assert.fail();
}

mockDoOther(() -> {
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
});

HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, fileSystemProvider, fileSystemExecutor);
try {
hmsTransaction.setHivePartitionUpdates(pus);
HiveInsertCommandContext ctx = new HiveInsertCommandContext();
String queryId = DebugUtil.printId(ConnectContext.get().queryId());
ctx.setQueryId(queryId);
ctx.setWritePath(getWritePath());
hmsTransaction.beginInsertTable(ctx);
hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName, tbWithoutPartition));
hmsTransaction.commit();
Assert.fail();
} catch (Throwable t) {
Assert.assertTrue(t.getMessage().contains("failed to do nothing"));
}

try {
hmsTransaction.rollback();
} catch (Throwable t) {
Assert.fail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DbName;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogFactory;
import org.apache.doris.nereids.parser.NereidsParser;
Expand Down Expand Up @@ -201,4 +203,28 @@ public String getTableName() {
String s = "test_tb_" + UUID.randomUUID();
return s.replaceAll("-", "");
}

@Test
public void testDropDB() {
String dbName = "db_to_delete";
CreateDbStmt createDBStmt = new CreateDbStmt(false, new DbName("iceberg", dbName), new HashMap<>());
DropDbStmt dropDbStmt = new DropDbStmt(false, new DbName("iceberg", dbName), false);
DropDbStmt dropDbStmt2 = new DropDbStmt(false, new DbName("iceberg", "not_exists"), false);
try {
// create db success
ops.createDb(createDBStmt);
// drop db success
ops.dropDb(dropDbStmt);
} catch (Throwable t) {
Assert.fail();
}

try {
ops.dropDb(dropDbStmt2);
Assert.fail();
} catch (Throwable t) {
Assert.assertTrue(t instanceof DdlException);
Assert.assertTrue(t.getMessage().contains("database doesn't exist"));
}
}
}
7 changes: 7 additions & 0 deletions regression-test/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,10 @@ lakesoulMinioEndpoint="*******"
metaServiceToken = "greedisgood9999"
instanceId = "default_instance_id"
multiClusterInstance = "default_instance_id"

storageProvider = "oss"
cbsS3Ak = "*******"
cbsS3Sk = "*******"
cbsS3Endpoint = "oss-cn-beijing.aliyuncs.com"
cbsS3Bucket = "test-bucket"
cbsS3Prefix = "test-cluster-prefix"
Loading

0 comments on commit 69844b5

Please sign in to comment.