From 6074cddcf8a43911fecb0f358b7337db2806024d Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Tue, 5 Dec 2023 15:12:36 +0800 Subject: [PATCH] [feature](mtmv)add Job and task tvf (#27967) add: select * from jobs("type"="mv"); select * from tasks("type"="mv"); select * from jobs("type"="insert"); select * from tasks("type"="insert"); add check priv for mv_infos("database"="xxx"); change JobType MTMV==>MV --- be/src/vec/exec/scan/vmeta_scanner.cpp | 44 ++++++ be/src/vec/exec/scan/vmeta_scanner.h | 4 + fe/fe-core/src/main/cup/sql_parser.cup | 6 +- .../catalog/BuiltinTableValuedFunctions.java | 6 + .../apache/doris/job/base/AbstractJob.java | 21 +++ .../java/org/apache/doris/job/base/Job.java | 7 + .../org/apache/doris/job/common/JobType.java | 2 +- .../job/extensions/insert/InsertJob.java | 23 ++++ .../job/extensions/insert/InsertTask.java | 70 ++++++++++ .../doris/job/extensions/mtmv/MTMVJob.java | 39 +++++- .../doris/job/extensions/mtmv/MTMVTask.java | 43 ++++++ .../manager/TaskDisruptorGroupManager.java | 2 +- .../apache/doris/job/task/AbstractTask.java | 6 + .../java/org/apache/doris/job/task/Task.java | 7 + .../org/apache/doris/mtmv/MTMVJobManager.java | 4 +- .../expressions/functions/table/Jobs.java | 58 ++++++++ .../expressions/functions/table/MvInfos.java | 58 ++++++++ .../expressions/functions/table/Tasks.java | 58 ++++++++ .../visitor/TableValuedFunctionVisitor.java | 15 +++ .../JobsTableValuedFunction.java | 125 ++++++++++++++++++ .../tablefunction/MetadataGenerator.java | 74 ++++++++++- .../MetadataTableValuedFunction.java | 12 +- ...n.java => MvInfosTableValuedFunction.java} | 12 +- .../tablefunction/TableValuedFunctionIf.java | 8 +- .../TasksTableValuedFunction.java | 125 ++++++++++++++++++ gensrc/thrift/FrontendService.thrift | 2 + gensrc/thrift/PlanNodes.thrift | 15 +++ gensrc/thrift/Types.thrift | 2 + .../doris/regression/suite/Suite.groovy | 18 +-- .../suites/mtmv_p0/test_base_mtmv.groovy | 12 +- 30 files changed, 838 insertions(+), 40 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Jobs.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/MvInfos.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Tasks.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java rename fe/fe-core/src/main/java/org/apache/doris/tablefunction/{MaterializedViewsTableValuedFunction.java => MvInfosTableValuedFunction.java} (90%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 3a3473fa1643e8..d5706e34786d75 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -234,6 +234,12 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { case TMetadataType::MATERIALIZED_VIEWS: RETURN_IF_ERROR(_build_materialized_views_metadata_request(meta_scan_range, &request)); break; + case TMetadataType::JOBS: + RETURN_IF_ERROR(_build_jobs_metadata_request(meta_scan_range, &request)); + break; + case TMetadataType::TASKS: + RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request)); + break; case TMetadataType::QUERIES: RETURN_IF_ERROR(_build_queries_metadata_request(meta_scan_range, &request)); break; @@ -399,6 +405,44 @@ Status VMetaScanner::_build_materialized_views_metadata_request( return Status::OK(); } +Status VMetaScanner::_build_jobs_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_jobs_metadata_request"; + if (!meta_scan_range.__isset.jobs_params) { + return Status::InternalError("Can not find TJobsMetadataParams from meta_scan_range."); + } + + // create request + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::JOBS); + metadata_table_params.__set_jobs_metadata_params(meta_scan_range.jobs_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + +Status VMetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_tasks_metadata_request"; + if (!meta_scan_range.__isset.tasks_params) { + return Status::InternalError("Can not find TTasksMetadataParams from meta_scan_range."); + } + + // create request + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::TASKS); + metadata_table_params.__set_tasks_metadata_params(meta_scan_range.tasks_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::_build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request) { VLOG_CRITICAL << "VMetaScanner::_build_queries_metadata_request"; diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index 66e594231fcb06..7c4a1f2b2deff5 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -83,6 +83,10 @@ class VMetaScanner : public VScanner { TFetchSchemaTableDataRequest* request); Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_jobs_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); + Status _build_tasks_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); bool _meta_eos; diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 3a7b5b1b5fc5d4..e28dac717047a2 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2602,11 +2602,11 @@ show_job_stmt ::= :} | KW_SHOW KW_MTMV KW_JOBS {: - RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MTMV); + RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MV); :} | KW_SHOW KW_MTMV KW_JOB KW_FOR job_label:jobLabel {: - RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MTMV); + RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MV); :} | KW_SHOW KW_JOB KW_FOR job_label:jobLabel {: @@ -2618,7 +2618,7 @@ show_job_stmt ::= :} | KW_SHOW KW_MTMV KW_JOB KW_TASKS KW_FOR job_label:jobLabel {: - RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MTMV); + RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MV); :} ; pause_job_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index 4a24d27ee6bdb1..66b0fb49de6691 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -24,10 +24,13 @@ import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; +import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; +import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.Queries; import org.apache.doris.nereids.trees.expressions.functions.table.S3; +import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups; import com.google.common.collect.ImmutableList; @@ -51,6 +54,9 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Numbers.class, "numbers"), tableValued(Queries.class, "queries"), tableValued(S3.class, "s3"), + tableValued(MvInfos.class, "mv_infos"), + tableValued(Jobs.class, "jobs"), + tableValued(Tasks.class, "tasks"), tableValued(WorkloadGroups.class, "workload_groups") ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 908676df5ba8fb..ca98756f6d3f18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -30,6 +30,8 @@ import org.apache.doris.job.task.AbstractTask; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; import com.google.common.collect.ImmutableList; import com.google.gson.annotations.SerializedName; @@ -216,11 +218,30 @@ public List getCommonShowInfo() { return commonShowInfo; } + public TRow getCommonTvfInfo() { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobId))); + trow.addToColumnValue(new TCell().setStringVal(jobName)); + trow.addToColumnValue(new TCell().setStringVal(createUser.getQualifiedUser())); + trow.addToColumnValue(new TCell().setStringVal(jobConfig.getExecuteType().name())); + trow.addToColumnValue(new TCell().setStringVal(jobConfig.convertRecurringStrategyToString())); + trow.addToColumnValue(new TCell().setStringVal(jobStatus.name())); + trow.addToColumnValue(new TCell().setStringVal(executeSql)); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs))); + trow.addToColumnValue(new TCell().setStringVal(comment)); + return trow; + } + @Override public List getShowInfo() { return getCommonShowInfo(); } + @Override + public TRow getTvfInfo() { + return getCommonTvfInfo(); + } + @Override public ShowResultSetMetaData getJobMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index fef447d160f189..a530ce3b2a0ac2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -22,6 +22,7 @@ import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.thrift.TRow; import java.util.List; @@ -116,4 +117,10 @@ public interface Job { * @return List job common show info */ List getShowInfo(); + + /** + * get info for tvf `jobs` + * @return TRow + */ + TRow getTvfInfo(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java index f815e16cecd80f..1beb4e0a3840f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobType.java @@ -19,5 +19,5 @@ public enum JobType { INSERT, - MTMV + MV } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index eb5e9499e38d4e..619a5c7fded026 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -34,6 +34,8 @@ import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.qe.StmtExecutor; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.gson.annotations.SerializedName; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -51,6 +53,27 @@ @Slf4j public class InsertJob extends AbstractJob { + public static final ImmutableList SCHEMA = ImmutableList.of( + new Column("Id", ScalarType.createStringType()), + new Column("Name", ScalarType.createStringType()), + new Column("Definer", ScalarType.createStringType()), + new Column("ExecuteType", ScalarType.createStringType()), + new Column("RecurringStrategy", ScalarType.createStringType()), + new Column("Status", ScalarType.createStringType()), + new Column("ExecuteSql", ScalarType.createStringType()), + new Column("CreateTime", ScalarType.createStringType()), + new Column("Comment", ScalarType.createStringType())); + + public static final ImmutableMap COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA.size(); i++) { + builder.put(SCHEMA.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + @SerializedName(value = "lp") String labelPrefix; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 38b3969d7d8c10..eb319f4d5e7dd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -18,7 +18,9 @@ package org.apache.doris.job.extensions.insert; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; @@ -29,9 +31,13 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import lombok.Getter; import lombok.Setter; @@ -49,6 +55,29 @@ @Slf4j public class InsertTask extends AbstractTask { + public static final ImmutableList SCHEMA = ImmutableList.of( + new Column("TaskId", ScalarType.createStringType()), + new Column("Label", ScalarType.createStringType()), + new Column("Status", ScalarType.createStringType()), + new Column("EtlInfo", ScalarType.createStringType()), + new Column("TaskInfo", ScalarType.createStringType()), + new Column("ErrorMsg", ScalarType.createStringType()), + new Column("CreateTimeMs", ScalarType.createStringType()), + new Column("FinishTimeMs", ScalarType.createStringType()), + new Column("TrackingUrl", ScalarType.createStringType()), + new Column("LoadStatistic", ScalarType.createStringType()), + new Column("User", ScalarType.createStringType())); + + public static final ImmutableMap COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA.size(); i++) { + builder.put(SCHEMA.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + private String labelName; private InsertIntoTableCommand command; @@ -188,4 +217,45 @@ public List getShowInfo() { return jobInfo; } + @Override + public TRow getTvfInfo() { + TRow trow = new TRow(); + if (loadJob == null) { + return trow; + } + + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId()))); + trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel())); + trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name())); + // etl info + String etlInfo = FeConstants.null_string; + if (!loadJob.getLoadingStatus().getCounters().isEmpty()) { + etlInfo = Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters()); + } + trow.addToColumnValue(new TCell().setStringVal(etlInfo)); + + // task info + String taskInfo = "cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout() + + "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority(); + trow.addToColumnValue(new TCell().setStringVal(taskInfo)); + + // err msg + String errMsg = FeConstants.null_string; + if (loadJob.getFailMsg() != null) { + errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg(); + } + trow.addToColumnValue(new TCell().setStringVal(errMsg)); + + // create time + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp()))); + + // load end time + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp()))); + // tracking url + trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl())); + trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson())); + trow.addToColumnValue(new TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser())); + return trow; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index c08a96ee6eb39f..c00f97cd79f202 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -32,7 +32,11 @@ import org.apache.doris.job.common.TaskType; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.commons.collections.CollectionUtils; @@ -56,6 +60,26 @@ public class MTMVJob extends AbstractJob { .addColumn(new Column("CreateTime", ScalarType.createVarchar(20))) .addColumn(new Column("Comment", ScalarType.createVarchar(20))) .build(); + + public static final ImmutableList SCHEMA = ImmutableList.of( + new Column("Id", ScalarType.createStringType()), + new Column("Name", ScalarType.createStringType()), + new Column("ExecuteType", ScalarType.createStringType()), + new Column("RecurringStrategy", ScalarType.createStringType()), + new Column("Status", ScalarType.createStringType()), + new Column("CreateTime", ScalarType.createStringType()), + new Column("Comment", ScalarType.createStringType())); + + public static final ImmutableMap COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA.size(); i++) { + builder.put(SCHEMA.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + private static final ShowResultSetMetaData TASK_META_DATA = ShowResultSetMetaData.builder() .addColumn(new Column("JobId", ScalarType.createVarchar(20))) @@ -111,7 +135,7 @@ public ShowResultSetMetaData getTaskMetaData() { @Override public JobType getJobType() { - return JobType.MTMV; + return JobType.MV; } @Override @@ -139,6 +163,19 @@ public List getShowInfo() { return data; } + @Override + public TRow getTvfInfo() { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().getExecuteType().name())); + trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().convertRecurringStrategyToString())); + trow.addToColumnValue(new TCell().setStringVal(super.getJobStatus().name())); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(super.getComment())); + return trow; + } + private MTMV getMTMV() throws DdlException, MetaNotFoundException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 7ddddc018fb3ee..c1e230f0ed0282 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -18,9 +18,11 @@ package org.apache.doris.job.extensions.mtmv; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; @@ -36,8 +38,12 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TUniqueId; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -50,6 +56,27 @@ public class MTMVTask extends AbstractTask { private static final Logger LOG = LogManager.getLogger(MTMVTask.class); public static final Long MAX_HISTORY_TASKS_NUM = 100L; + public static final ImmutableList SCHEMA = ImmutableList.of( + new Column("TaskId", ScalarType.createStringType()), + new Column("JobId", ScalarType.createStringType()), + new Column("JobName", ScalarType.createStringType()), + new Column("Status", ScalarType.createStringType()), + new Column("CreateTime", ScalarType.createStringType()), + new Column("StartTime", ScalarType.createStringType()), + new Column("FinishTime", ScalarType.createStringType()), + new Column("DurationMs", ScalarType.createStringType()), + new Column("ExecuteSql", ScalarType.createStringType())); + + public static final ImmutableMap COLUMN_TO_INDEX; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder(); + for (int i = 0; i < SCHEMA.size(); i++) { + builder.put(SCHEMA.get(i).getName().toLowerCase(), i); + } + COLUMN_TO_INDEX = builder.build(); + } + @SerializedName(value = "di") private long dbId; @SerializedName(value = "mi") @@ -130,6 +157,22 @@ public List getShowInfo() { return data; } + @Override + public TRow getTvfInfo() { + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + trow.addToColumnValue(new TCell().setStringVal(super.getStatus().toString())); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs()))); + trow.addToColumnValue( + new TCell().setStringVal(String.valueOf(super.getFinishTimeMs() - super.getStartTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(sql)); + return trow; + } + private static String generateSql(MTMV mtmv) { StringBuilder builder = new StringBuilder(); builder.append("INSERT OVERWRITE TABLE "); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index 97a6f94099ffdb..d07a109fc53f2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -120,7 +120,7 @@ private void registerMTMVDisruptor() { }; TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE, mtmvTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); - disruptorMap.put(JobType.MTMV, mtmvDisruptor); + disruptorMap.put(JobType.MV, mtmvDisruptor); } public void dispatchTimerJob(AbstractJob job) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 4580456928f342..efe38b70136a2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -18,6 +18,7 @@ package org.apache.doris.job.task; import org.apache.doris.catalog.Env; +import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.base.Job; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.common.TaskType; @@ -122,4 +123,9 @@ public boolean isCancelled() { return status.equals(TaskStatus.CANCEL); } + public String getJobName() { + AbstractJob job = Env.getCurrentEnv().getJobManager().getJob(jobId); + return job == null ? "" : job.getJobName(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index 3f61ce60c700f7..b13d22ff665633 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -18,6 +18,7 @@ package org.apache.doris.job.task; import org.apache.doris.job.exception.JobException; +import org.apache.doris.thrift.TRow; import java.util.List; @@ -72,4 +73,10 @@ public interface Task { * @return List task common show info */ List getShowInfo(); + + /** + * get info for tvf `tasks` + * @return TRow + */ + TRow getTvfInfo(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index dbd534e47c4dd7..df1e3dbe07b526 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -118,7 +118,7 @@ private void setScheduleJobConfig(JobExecutionConfiguration jobExecutionConfigur @Override public void dropMTMV(MTMV mtmv) throws DdlException { List jobs = Env.getCurrentEnv().getJobManager() - .queryJobs(JobType.MTMV, mtmv.getJobInfo().getJobName()); + .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName()); if (!CollectionUtils.isEmpty(jobs)) { try { Env.getCurrentEnv().getJobManager() @@ -165,7 +165,7 @@ public void refreshMTMV(RefreshMTMVInfo info) throws DdlException, MetaNotFoundE Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getMvName().getDb()); MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getMvName().getTbl(), TableType.MATERIALIZED_VIEW); List jobs = Env.getCurrentEnv().getJobManager() - .queryJobs(JobType.MTMV, mtmv.getJobInfo().getJobName()); + .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName()); if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) { throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Jobs.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Jobs.java new file mode 100644 index 00000000000000..1f45d40fbdca7b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Jobs.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.JobsTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** + * jobs + */ +public class Jobs extends TableValuedFunction { + public Jobs(Properties properties) { + super("jobs", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new JobsTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build JobsTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitJobs(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/MvInfos.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/MvInfos.java new file mode 100644 index 00000000000000..ae346e52b778cb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/MvInfos.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.MvInfosTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** + * mv_infos + */ +public class MvInfos extends TableValuedFunction { + public MvInfos(Properties properties) { + super("mv_infos", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new MvInfosTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build MvInfosTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitMvInfos(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Tasks.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Tasks.java new file mode 100644 index 00000000000000..f5036f86431e9e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Tasks.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.TableValuedFunctionIf; +import org.apache.doris.tablefunction.TasksTableValuedFunction; + +import java.util.Map; + +/** + * tasks + */ +public class Tasks extends TableValuedFunction { + public Tasks(Properties properties) { + super("tasks", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map arguments = getTVFProperties().getMap(); + return new TasksTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build TasksTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitTasks(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 8d11bd5e0774a6..9967e472694db7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -24,11 +24,14 @@ import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit; import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; +import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; import org.apache.doris.nereids.trees.expressions.functions.table.Local; +import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; import org.apache.doris.nereids.trees.expressions.functions.table.Queries; import org.apache.doris.nereids.trees.expressions.functions.table.S3; import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction; +import org.apache.doris.nereids.trees.expressions.functions.table.Tasks; import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups; /** TableValuedFunctionVisitor */ @@ -47,6 +50,18 @@ default R visitFrontends(Frontends frontends, C context) { return visitTableValuedFunction(frontends, context); } + default R visitMvInfos(MvInfos mvInfos, C context) { + return visitTableValuedFunction(mvInfos, context); + } + + default R visitJobs(Jobs jobs, C context) { + return visitTableValuedFunction(jobs, context); + } + + default R visitTasks(Tasks tasks, C context) { + return visitTableValuedFunction(tasks, context); + } + default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) { return visitTableValuedFunction(frontendsDisks, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java new file mode 100644 index 00000000000000..2a3a698eb9dbbc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Column; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.extensions.mtmv.MTMVJob; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TJobsMetadataParams; +import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataTableRequestParams; +import org.apache.doris.thrift.TMetadataType; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +/** + * The Implement of table valued function + * jobs("type" = "mv"). + */ +public class JobsTableValuedFunction extends MetadataTableValuedFunction { + public static final String NAME = "jobs"; + private static final String TYPE = "type"; + + private static final ImmutableSet PROPERTIES_SET = ImmutableSet.of(TYPE); + + private final JobType jobType; + + public JobsTableValuedFunction(Map params) throws AnalysisException { + Map validParams = Maps.newHashMap(); + for (String key : params.keySet()) { + if (!PROPERTIES_SET.contains(key.toLowerCase())) { + throw new AnalysisException("'" + key + "' is invalid property"); + } + validParams.put(key.toLowerCase(), params.get(key)); + } + String type = validParams.get(TYPE); + if (type == null) { + throw new AnalysisException("Invalid job metadata query"); + } + JobType jobType = JobType.valueOf(type.toUpperCase()); + if (jobType == null) { + throw new AnalysisException("Invalid job metadata query"); + } + this.jobType = jobType; + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } + + public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params) + throws org.apache.doris.common.AnalysisException { + if (!params.isSetJobsMetadataParams()) { + throw new org.apache.doris.common.AnalysisException("Jobs metadata params is not set."); + } + TJobsMetadataParams jobMetadataParams = params.getJobsMetadataParams(); + String type = jobMetadataParams.getType(); + JobType jobType = JobType.valueOf(type.toUpperCase()); + if (jobType == null) { + throw new AnalysisException("Invalid job metadata query"); + } + if (JobType.MV == jobType) { + return MTMVJob.COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } else if (JobType.INSERT == jobType) { + return InsertJob.COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } else { + throw new AnalysisException("Invalid job type: " + jobType.toString()); + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.JOBS; + } + + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.JOBS); + TJobsMetadataParams jobParam = new TJobsMetadataParams(); + jobParam.setType(jobType.name()); + jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); + metaScanRange.setJobsParams(jobParam); + return metaScanRange; + } + + @Override + public String getTableName() { + return "JobsTableValuedFunction"; + } + + @Override + public List getTableColumns() throws AnalysisException { + if (JobType.MV == jobType) { + return MTMVJob.SCHEMA; + } else if (JobType.INSERT == jobType) { + return InsertJob.SCHEMA; + } else { + throw new AnalysisException("Invalid job type: " + jobType.toString()); + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 542ece27fc50f3..aeceacb9d6ee01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -17,6 +17,7 @@ package org.apache.doris.tablefunction; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Table; @@ -29,6 +30,9 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryDetail; @@ -42,6 +46,7 @@ import org.apache.doris.thrift.TFetchSchemaTableDataResult; import org.apache.doris.thrift.TIcebergMetadataParams; import org.apache.doris.thrift.TIcebergQueryType; +import org.apache.doris.thrift.TJobsMetadataParams; import org.apache.doris.thrift.TMaterializedViewsMetadataParams; import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; @@ -50,6 +55,7 @@ import org.apache.doris.thrift.TRow; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TTasksMetadataParams; import org.apache.doris.thrift.TUserIdentity; import com.google.common.base.Stopwatch; @@ -100,6 +106,12 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData case MATERIALIZED_VIEWS: result = mtmvMetadataResult(params); break; + case JOBS: + result = jobMetadataResult(params); + break; + case TASKS: + result = taskMetadataResult(params); + break; case QUERIES: result = queriesMetadataResult(params, request); break; @@ -107,7 +119,7 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData return errorResult("Metadata table params is not set."); } if (result.getStatus().getStatusCode() == TStatusCode.OK) { - filterColumns(result, params.getColumnsName(), params.getMetadataType()); + filterColumns(result, params.getColumnsName(), params.getMetadataType(), params); } return result; } @@ -461,14 +473,14 @@ private static List forwardToOtherFrontends(TFetchS } private static void filterColumns(TFetchSchemaTableDataResult result, - List columnNames, TMetadataType type) throws TException { + List columnNames, TMetadataType type, TMetadataTableRequestParams params) throws TException { List fullColumnsRow = result.getDataBatch(); List filterColumnsRows = Lists.newArrayList(); for (TRow row : fullColumnsRow) { TRow filterRow = new TRow(); try { for (String columnName : columnNames) { - Integer index = MetadataTableValuedFunction.getColumnIndexFromColumnName(type, columnName); + Integer index = MetadataTableValuedFunction.getColumnIndexFromColumnName(type, columnName, params); filterRow.addToColumnValue(row.getColumnValue().get(index)); } } catch (AnalysisException e) { @@ -492,6 +504,8 @@ private static TFetchSchemaTableDataResult mtmvMetadataResult(TMetadataTableRequ TMaterializedViewsMetadataParams mtmvMetadataParams = params.getMaterializedViewsMetadataParams(); String dbName = mtmvMetadataParams.getDatabase(); + TUserIdentity currentUserIdent = mtmvMetadataParams.getCurrentUserIdent(); + UserIdentity userIdentity = UserIdentity.fromThrift(currentUserIdent); List dataBatch = Lists.newArrayList(); TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); List tables; @@ -505,6 +519,12 @@ private static TFetchSchemaTableDataResult mtmvMetadataResult(TMetadataTableRequ for (Table table : tables) { if (table instanceof MTMV) { + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(userIdentity, InternalCatalog.INTERNAL_CATALOG_NAME, + table.getQualifiedDbName(), table.getName(), + PrivPredicate.SHOW)) { + continue; + } MTMV mv = (MTMV) table; TRow trow = new TRow(); trow.addToColumnValue(new TCell().setLongVal(mv.getId())); @@ -524,5 +544,53 @@ private static TFetchSchemaTableDataResult mtmvMetadataResult(TMetadataTableRequ result.setStatus(new TStatus(TStatusCode.OK)); return result; } + + private static TFetchSchemaTableDataResult jobMetadataResult(TMetadataTableRequestParams params) { + if (!params.isSetJobsMetadataParams()) { + return errorResult("Jobs metadata params is not set."); + } + + TJobsMetadataParams jobsMetadataParams = params.getJobsMetadataParams(); + String type = jobsMetadataParams.getType(); + JobType jobType = JobType.valueOf(type); + List dataBatch = Lists.newArrayList(); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + + List jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType); + + for (org.apache.doris.job.base.AbstractJob job : jobList) { + dataBatch.add(job.getTvfInfo()); + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + + private static TFetchSchemaTableDataResult taskMetadataResult(TMetadataTableRequestParams params) { + if (!params.isSetTasksMetadataParams()) { + return errorResult("Tasks metadata params is not set."); + } + + TTasksMetadataParams tasksMetadataParams = params.getTasksMetadataParams(); + String type = tasksMetadataParams.getType(); + JobType jobType = JobType.valueOf(type); + List dataBatch = Lists.newArrayList(); + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + + List jobList = Env.getCurrentEnv().getJobManager().queryJobs(jobType); + + for (org.apache.doris.job.base.AbstractJob job : jobList) { + List tasks = job.queryTasks(); + for (AbstractTask task : tasks) { + TRow tvfInfo = task.getTvfInfo(); + if (tvfInfo != null) { + dataBatch.add(tvfInfo); + } + } + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index 1cea2ce048bf84..d2c3278314efc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -23,11 +23,13 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.external.MetadataScanNode; import org.apache.doris.thrift.TMetaScanRange; +import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; public abstract class MetadataTableValuedFunction extends TableValuedFunctionIf { - public static Integer getColumnIndexFromColumnName(TMetadataType type, String columnName) - throws AnalysisException { + public static Integer getColumnIndexFromColumnName(TMetadataType type, String columnName, + TMetadataTableRequestParams params) + throws AnalysisException { switch (type) { case BACKENDS: return BackendsTableValuedFunction.getColumnIndexFromColumnName(columnName); @@ -42,7 +44,11 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co case CATALOGS: return CatalogsTableValuedFunction.getColumnIndexFromColumnName(columnName); case MATERIALIZED_VIEWS: - return MaterializedViewsTableValuedFunction.getColumnIndexFromColumnName(columnName); + return MvInfosTableValuedFunction.getColumnIndexFromColumnName(columnName); + case JOBS: + return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params); + case TASKS: + return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case QUERIES: return QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName); default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MaterializedViewsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java similarity index 90% rename from fe/fe-core/src/main/java/org/apache/doris/tablefunction/MaterializedViewsTableValuedFunction.java rename to fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java index a6384cfd6834ee..1e67d7f69a70c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MaterializedViewsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMaterializedViewsMetadataParams; import org.apache.doris.thrift.TMetaScanRange; @@ -37,10 +38,10 @@ /** * The Implement of table valued function - * mtmvs("database" = "db1"). + * mv_infos("database" = "db1"). */ -public class MaterializedViewsTableValuedFunction extends MetadataTableValuedFunction { - public static final String NAME = "mtmvs"; +public class MvInfosTableValuedFunction extends MetadataTableValuedFunction { + public static final String NAME = "mv_infos"; private static final String DB = "database"; private static final ImmutableSet PROPERTIES_SET = ImmutableSet.of(DB); @@ -73,7 +74,7 @@ public static Integer getColumnIndexFromColumnName(String columnName) { private final String databaseName; - public MaterializedViewsTableValuedFunction(Map params) throws AnalysisException { + public MvInfosTableValuedFunction(Map params) throws AnalysisException { Map validParams = Maps.newHashMap(); for (String key : params.keySet()) { if (!PROPERTIES_SET.contains(key.toLowerCase())) { @@ -100,13 +101,14 @@ public TMetaScanRange getMetaScanRange() { metaScanRange.setMetadataType(TMetadataType.MATERIALIZED_VIEWS); TMaterializedViewsMetadataParams mtmvParam = new TMaterializedViewsMetadataParams(); mtmvParam.setDatabase(databaseName); + mtmvParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); metaScanRange.setMaterializedViewsParams(mtmvParam); return metaScanRange; } @Override public String getTableName() { - return "MaterializedViewsTableValuedFunction"; + return "MvInfosTableValuedFunction"; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 012f163440f15a..b14a09769cb13d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -66,8 +66,12 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map PROPERTIES_SET = ImmutableSet.of(TYPE); + + private final JobType jobType; + + public TasksTableValuedFunction(Map params) throws AnalysisException { + Map validParams = Maps.newHashMap(); + for (String key : params.keySet()) { + if (!PROPERTIES_SET.contains(key.toLowerCase())) { + throw new AnalysisException("'" + key + "' is invalid property"); + } + validParams.put(key.toLowerCase(), params.get(key)); + } + String type = validParams.get(TYPE); + if (type == null) { + throw new AnalysisException("Invalid task metadata query"); + } + JobType jobType = JobType.valueOf(type.toUpperCase()); + if (jobType == null) { + throw new AnalysisException("Invalid task metadata query"); + } + this.jobType = jobType; + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } + + public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params) + throws org.apache.doris.common.AnalysisException { + if (!params.isSetTasksMetadataParams()) { + throw new org.apache.doris.common.AnalysisException("Tasks metadata params is not set."); + } + TTasksMetadataParams taskMetadataParams = params.getTasksMetadataParams(); + String type = taskMetadataParams.getType(); + JobType jobType = JobType.valueOf(type.toUpperCase()); + if (jobType == null) { + throw new AnalysisException("Invalid task metadata query"); + } + if (JobType.MV == jobType) { + return MTMVTask.COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } else if (JobType.INSERT == jobType) { + return InsertTask.COLUMN_TO_INDEX.get(columnName.toLowerCase()); + } else { + throw new AnalysisException("Invalid job type: " + jobType.toString()); + } + } + + @Override + public TMetadataType getMetadataType() { + return TMetadataType.TASKS; + } + + @Override + public TMetaScanRange getMetaScanRange() { + TMetaScanRange metaScanRange = new TMetaScanRange(); + metaScanRange.setMetadataType(TMetadataType.TASKS); + TTasksMetadataParams taskParam = new TTasksMetadataParams(); + taskParam.setType(jobType.name()); + taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift()); + metaScanRange.setTasksParams(taskParam); + return metaScanRange; + } + + @Override + public String getTableName() { + return "TasksTableValuedFunction"; + } + + @Override + public List getTableColumns() throws AnalysisException { + if (JobType.MV == jobType) { + return MTMVTask.SCHEMA; + } else if (JobType.INSERT == jobType) { + return InsertTask.SCHEMA; + } else { + throw new AnalysisException("Invalid job type: " + jobType.toString()); + } + } +} + diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 19a8437fb53727..c49b5438f75032 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -904,6 +904,8 @@ struct TMetadataTableRequestParams { 6: optional Types.TUserIdentity current_user_ident 7: optional PlanNodes.TQueriesMetadataParams queries_metadata_params 8: optional PlanNodes.TMaterializedViewsMetadataParams materialized_views_metadata_params + 9: optional PlanNodes.TJobsMetadataParams jobs_metadata_params + 10: optional PlanNodes.TTasksMetadataParams tasks_metadata_params } struct TFetchSchemaTableDataRequest { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 09e3f4788d61bb..2a59ddca9fa6f1 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -475,12 +475,25 @@ struct TFrontendsMetadataParams { struct TMaterializedViewsMetadataParams { 1: optional string database + 2: optional Types.TUserIdentity current_user_ident +} + +struct TJobsMetadataParams { + 1: optional string type + 2: optional Types.TUserIdentity current_user_ident +} + +struct TTasksMetadataParams { + 1: optional string type + 2: optional Types.TUserIdentity current_user_ident } struct TQueriesMetadataParams { 1: optional string cluster_name 2: optional bool relay_to_other_fe 3: optional TMaterializedViewsMetadataParams materialized_views_params + 4: optional TJobsMetadataParams jobs_params + 5: optional TTasksMetadataParams tasks_params } struct TMetaScanRange { @@ -490,6 +503,8 @@ struct TMetaScanRange { 4: optional TFrontendsMetadataParams frontends_params 5: optional TQueriesMetadataParams queries_params 6: optional TMaterializedViewsMetadataParams materialized_views_params + 7: optional TJobsMetadataParams jobs_params + 8: optional TTasksMetadataParams tasks_params } // Specification of an individual data range which is held in its entirety diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 92cce3ae27269c..4f101f1177e121 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -699,6 +699,8 @@ enum TMetadataType { CATALOGS, FRONTENDS_DISKS, MATERIALIZED_VIEWS, + JOBS, + TASKS, QUERIES, } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 234c50561b6997..a89edd36bbf2d1 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -842,12 +842,8 @@ class Suite implements GroovyInterceptable { void waitingMTMVTaskFinished(String jobName) { Thread.sleep(2000); - String showTasks = "SHOW MTMV JOB TASKS FOR ${jobName}" - List> showTaskMetaResult = sql_meta(showTasks) - logger.info("showTaskMetaResult: " + showTaskMetaResult.toString()) - int index = showTaskMetaResult.indexOf(['Status', 'CHAR']) - logger.info("index: " + index) - String status = "PENDING" + String showTasks = "select Status from tasks('type'='mv') where JobName = '${jobName}'" + String status = "NULL" List> result long startTime = System.currentTimeMillis() long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min @@ -855,7 +851,7 @@ class Suite implements GroovyInterceptable { result = sql(showTasks) logger.info("result: " + result.toString()) if (!result.isEmpty()) { - status = result.last().get(index) + status = result.last().get(0) } logger.info("The state of ${showTasks} is ${status}") Thread.sleep(1000); @@ -867,18 +863,14 @@ class Suite implements GroovyInterceptable { } String getJobName(String dbName, String mtmvName) { - String showMTMV = "select * from mtmvs('database'='${dbName}') where Name = '${mtmvName}'"; + String showMTMV = "select JobName from mv_infos('database'='${dbName}') where Name = '${mtmvName}'"; logger.info(showMTMV) - List> showTaskMetaResult = sql_meta(showMTMV) - logger.info("showTaskMetaResult: " + showTaskMetaResult.toString()) - int index = showTaskMetaResult.indexOf(['JobName', 'TINYTEXT']) - logger.info("index: " + index) List> result = sql(showMTMV) logger.info("result: " + result.toString()) if (result.isEmpty()) { Assert.fail(); } - return result.last().get(index); + return result.last().get(0); } } diff --git a/regression-test/suites/mtmv_p0/test_base_mtmv.groovy b/regression-test/suites/mtmv_p0/test_base_mtmv.groovy index 62fd1cbbc9ca83..1111fdc0a11e29 100644 --- a/regression-test/suites/mtmv_p0/test_base_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_base_mtmv.groovy @@ -48,18 +48,18 @@ suite("test_base_mtmv") { SELECT * FROM ${tableName}; """ def jobName = getJobName("regression_test_mtmv_p0", mvName); - order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'" + order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'" sql """ REFRESH MATERIALIZED VIEW ${mvName} """ waitingMTMVTaskFinished(jobName) - order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'" + order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'" // alter table sql """ alter table ${tableName} add COLUMN new_col INT AFTER username; """ - order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'" + order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'" sql """ alter table ${tableName} drop COLUMN new_col; """ @@ -67,13 +67,13 @@ suite("test_base_mtmv") { REFRESH MATERIALIZED VIEW ${mvName} """ waitingMTMVTaskFinished(jobName) - order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'" + order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'" // drop table sql """ drop table ${tableName} """ - order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'" + order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'" sql """ CREATE TABLE IF NOT EXISTS `${tableName}` ( event_day DATE, @@ -89,7 +89,7 @@ suite("test_base_mtmv") { REFRESH MATERIALIZED VIEW ${mvName} """ waitingMTMVTaskFinished(jobName) - order_qt_status "select Name,State,RefreshState from mtmvs('database'='${dbName}') where Name='${mvName}'" + order_qt_status "select Name,State,RefreshState from mv_infos('database'='${dbName}') where Name='${mvName}'" sql """ DROP MATERIALIZED VIEW ${mvName} """