Skip to content

Commit

Permalink
[feature](mtmv)add Job and task tvf (apache#27967)
Browse files Browse the repository at this point in the history
add:
select * from jobs("type"="mv");
select * from tasks("type"="mv");
select * from jobs("type"="insert");
select * from tasks("type"="insert");

add check priv for mv_infos("database"="xxx");

change JobType MTMV==>MV
  • Loading branch information
zddr authored Dec 5, 2023
1 parent 02512cd commit 6074cdd
Show file tree
Hide file tree
Showing 30 changed files with 838 additions and 40 deletions.
44 changes: 44 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::MATERIALIZED_VIEWS:
RETURN_IF_ERROR(_build_materialized_views_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::JOBS:
RETURN_IF_ERROR(_build_jobs_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::TASKS:
RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::QUERIES:
RETURN_IF_ERROR(_build_queries_metadata_request(meta_scan_range, &request));
break;
Expand Down Expand Up @@ -399,6 +405,44 @@ Status VMetaScanner::_build_materialized_views_metadata_request(
return Status::OK();
}

Status VMetaScanner::_build_jobs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_jobs_metadata_request";
if (!meta_scan_range.__isset.jobs_params) {
return Status::InternalError("Can not find TJobsMetadataParams from meta_scan_range.");
}

// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::JOBS);
metadata_table_params.__set_jobs_metadata_params(meta_scan_range.jobs_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_tasks_metadata_request";
if (!meta_scan_range.__isset.tasks_params) {
return Status::InternalError("Can not find TTasksMetadataParams from meta_scan_range.");
}

// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::TASKS);
metadata_table_params.__set_tasks_metadata_params(meta_scan_range.tasks_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::_build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_queries_metadata_request";
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class VMetaScanner : public VScanner {
TFetchSchemaTableDataRequest* request);
Status _build_materialized_views_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_jobs_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_tasks_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -2602,11 +2602,11 @@ show_job_stmt ::=
:}
| KW_SHOW KW_MTMV KW_JOBS
{:
RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MTMV);
RESULT = new ShowJobStmt(null,org.apache.doris.job.common.JobType.MV);
:}
| KW_SHOW KW_MTMV KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MTMV);
RESULT = new ShowJobStmt(jobLabel,org.apache.doris.job.common.JobType.MV);
:}
| KW_SHOW KW_JOB KW_FOR job_label:jobLabel
{:
Expand All @@ -2618,7 +2618,7 @@ show_job_stmt ::=
:}
| KW_SHOW KW_MTMV KW_JOB KW_TASKS KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MTMV);
RESULT = new ShowJobTaskStmt(jobLabel,org.apache.doris.job.common.JobType.MV);
:}
;
pause_job_stmt ::=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.doris.nereids.trees.expressions.functions.table.GroupCommit;
import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.Queries;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
import org.apache.doris.nereids.trees.expressions.functions.table.WorkloadGroups;

import com.google.common.collect.ImmutableList;
Expand All @@ -51,6 +54,9 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(Numbers.class, "numbers"),
tableValued(Queries.class, "queries"),
tableValued(S3.class, "s3"),
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),
tableValued(Tasks.class, "tasks"),
tableValued(WorkloadGroups.class, "workload_groups")
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;

import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -216,11 +218,30 @@ public List<String> getCommonShowInfo() {
return commonShowInfo;
}

public TRow getCommonTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobId)));
trow.addToColumnValue(new TCell().setStringVal(jobName));
trow.addToColumnValue(new TCell().setStringVal(createUser.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(jobConfig.getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(jobConfig.convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(jobStatus.name()));
trow.addToColumnValue(new TCell().setStringVal(executeSql));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs)));
trow.addToColumnValue(new TCell().setStringVal(comment));
return trow;
}

@Override
public List<String> getShowInfo() {
return getCommonShowInfo();
}

@Override
public TRow getTvfInfo() {
return getCommonTvfInfo();
}

@Override
public ShowResultSetMetaData getJobMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TRow;

import java.util.List;

Expand Down Expand Up @@ -116,4 +117,10 @@ public interface Job<T extends AbstractTask> {
* @return List<String> job common show info
*/
List<String> getShowInfo();

/**
* get info for tvf `jobs`
* @return TRow
*/
TRow getTvfInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

public enum JobType {
INSERT,
MTMV
MV
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -51,6 +53,27 @@
@Slf4j
public class InsertJob extends AbstractJob<InsertTask> {

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
new Column("Definer", ScalarType.createStringType()),
new Column("ExecuteType", ScalarType.createStringType()),
new Column("RecurringStrategy", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("ExecuteSql", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}

@SerializedName(value = "lp")
String labelPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.doris.job.extensions.insert;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
Expand All @@ -29,9 +31,13 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -49,6 +55,29 @@
@Slf4j
public class InsertTask extends AbstractTask {

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("TaskId", ScalarType.createStringType()),
new Column("Label", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("EtlInfo", ScalarType.createStringType()),
new Column("TaskInfo", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTimeMs", ScalarType.createStringType()),
new Column("FinishTimeMs", ScalarType.createStringType()),
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}

private String labelName;

private InsertIntoTableCommand command;
Expand Down Expand Up @@ -188,4 +217,45 @@ public List<String> getShowInfo() {
return jobInfo;
}

@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
if (loadJob == null) {
return trow;
}

trow.addToColumnValue(new TCell().setStringVal(String.valueOf(loadJob.getId())));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLabel()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getState().name()));
// etl info
String etlInfo = FeConstants.null_string;
if (!loadJob.getLoadingStatus().getCounters().isEmpty()) {
etlInfo = Joiner.on("; ").withKeyValueSeparator("=").join(loadJob.getLoadingStatus().getCounters());
}
trow.addToColumnValue(new TCell().setStringVal(etlInfo));

// task info
String taskInfo = "cluster:" + loadJob.getResourceName() + "; timeout(s):" + loadJob.getTimeout()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio() + "; priority:" + loadJob.getPriority();
trow.addToColumnValue(new TCell().setStringVal(taskInfo));

// err msg
String errMsg = FeConstants.null_string;
if (loadJob.getFailMsg() != null) {
errMsg = "type:" + loadJob.getFailMsg().getCancelType() + "; msg:" + loadJob.getFailMsg().getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));

// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getCreateTimestamp())));

// load end time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(loadJob.getFinishTimestamp())));
// tracking url
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadingStatus().getTrackingUrl()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getLoadStatistic().toJson()));
trow.addToColumnValue(new TCell().setStringVal(loadJob.getUserInfo().getQualifiedUser()));
return trow;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
import org.apache.doris.job.common.TaskType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -56,6 +60,26 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
.addColumn(new Column("CreateTime", ScalarType.createVarchar(20)))
.addColumn(new Column("Comment", ScalarType.createVarchar(20)))
.build();

public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
new Column("Name", ScalarType.createStringType()),
new Column("ExecuteType", ScalarType.createStringType()),
new Column("RecurringStrategy", ScalarType.createStringType()),
new Column("Status", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

static {
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}

private static final ShowResultSetMetaData TASK_META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("JobId", ScalarType.createVarchar(20)))
Expand Down Expand Up @@ -111,7 +135,7 @@ public ShowResultSetMetaData getTaskMetaData() {

@Override
public JobType getJobType() {
return JobType.MTMV;
return JobType.MV;
}

@Override
Expand Down Expand Up @@ -139,6 +163,19 @@ public List<String> getShowInfo() {
return data;
}

@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobConfig().convertRecurringStrategyToString()));
trow.addToColumnValue(new TCell().setStringVal(super.getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(super.getComment()));
return trow;
}

private MTMV getMTMV() throws DdlException, MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
Expand Down
Loading

0 comments on commit 6074cdd

Please sign in to comment.