From 4bc5efaca01a2140177ff4b569236bfcb7250807 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 12 Sep 2024 09:59:30 +0800 Subject: [PATCH] [Fix](Job)Replaying logs should not modify the original information of the job (#40474) ## Proposed changes ``` JobExecutionConfiguration jobConfig = new JobExecutionConfiguration(); jobConfig.setExecuteType(JobExecuteType.INSTANT); setJobConfig(jobConfig); ``` - Replaying logs should not modify the original information of the job - Use the new optimizer to check whether the executed statement is legal (cherry picked from commit de90051162de7004cf171bbf4d21bd95ff9f3540) --- .../apache/doris/analysis/CreateJobStmt.java | 40 ++++++--------- .../job/extensions/insert/InsertJob.java | 19 +------ .../data/job_p0/job_meta/job_query_test.out | 7 +++ .../job_p0/job_meta/job_query_test.groovy | 28 +++++++++++ .../suites/job_p0/job_meta/load.groovy | 50 +++++++++++++++++++ .../suites/job_p0/test_base_insert_job.groovy | 2 +- 6 files changed, 103 insertions(+), 43 deletions(-) create mode 100644 regression-test/data/job_p0/job_meta/job_query_test.out create mode 100644 regression-test/suites/job_p0/job_meta/job_query_test.groovy create mode 100644 regression-test/suites/job_p0/job_meta/load.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 8a8db0a3d1eaea..088e9eb3e870d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -32,15 +32,15 @@ import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.ImmutableSet; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import java.util.HashSet; - /** * syntax: * CREATE @@ -91,12 +91,6 @@ public class CreateJobStmt extends DdlStmt { // exclude job name prefix, which is used by inner job private static final String excludeJobNamePrefix = "inner_"; - private static final ImmutableSet> supportStmtSuperClass - = new ImmutableSet.Builder>().add(InsertStmt.class) - .build(); - - private static final HashSet supportStmtClassNamesCache = new HashSet<>(16); - public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp, Long interval, String intervalTimeUnit, String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) { @@ -118,7 +112,6 @@ public void analyze(Analyzer analyzer) throws UserException { labelName.analyze(analyzer); String dbName = labelName.getDbName(); Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); - analyzerSqlStmt(); // check its insert stmt,currently only support insert stmt //todo when support other stmt,need to check stmt type and generate jobInstance JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); @@ -164,6 +157,7 @@ public void analyze(Analyzer analyzer) throws UserException { jobExecutionConfiguration.setTimerDefinition(timerDefinition); String originStmt = getOrigStmt().originStmt; String executeSql = parseExecuteSql(originStmt, jobName, comment); + analyzerSqlStmt(executeSql); // create job use label name as its job name InsertJob job = new InsertJob(jobName, JobStatus.RUNNING, @@ -191,22 +185,20 @@ protected static void checkAuth() throws AnalysisException { } } - private void checkStmtSupport() throws AnalysisException { - if (supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) { - return; - } - for (Class clazz : supportStmtSuperClass) { - if (clazz.isAssignableFrom(doStmt.getClass())) { - supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName()); - return; + private void analyzerSqlStmt(String sql) throws UserException { + NereidsParser parser = new NereidsParser(); + LogicalPlan logicalPlan = parser.parseSingle(sql); + if (logicalPlan instanceof InsertIntoTableCommand) { + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + try { + insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor()); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); } - } - throw new AnalysisException("Not support " + doStmt.getClass().getSimpleName() + " type in job"); - } - private void analyzerSqlStmt() throws UserException { - checkStmtSupport(); - doStmt.analyze(analyzer); + } else { + throw new AnalysisException("Not support this sql : " + sql); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 47d52c170b2bd6..43f43ba86997cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -31,8 +31,6 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.LogBuilder; -import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; @@ -647,23 +645,8 @@ public void onUnRegister() throws JobException { @Override public void onReplayCreate() throws JobException { - JobExecutionConfiguration jobConfig = new JobExecutionConfiguration(); - jobConfig.setExecuteType(JobExecuteType.INSTANT); - setJobConfig(jobConfig); onRegister(); - checkJobParams(); - log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build()); - } - - @Override - public void onReplayEnd(AbstractJob> replayJob) throws JobException { - if (!(replayJob instanceof InsertJob)) { - return; - } - InsertJob insertJob = (InsertJob) replayJob; - unprotectReadEndOperation(insertJob); - log.info(new LogBuilder(LogKey.LOAD_JOB, - insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build()); + super.onReplayCreate(); } public int getProgress() { diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out b/regression-test/data/job_p0/job_meta/job_query_test.out new file mode 100644 index 00000000000000..1a2bfe0f9cd995 --- /dev/null +++ b/regression-test/data/job_p0/job_meta/job_query_test.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); + +-- !select2 -- +JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); + diff --git a/regression-test/suites/job_p0/job_meta/job_query_test.groovy b/regression-test/suites/job_p0/job_meta/job_query_test.groovy new file mode 100644 index 00000000000000..3505a8108dd088 --- /dev/null +++ b/regression-test/suites/job_p0/job_meta/job_query_test.groovy @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite('job_query_test', 'p0,restart_fe') { + def oneTimeJobName = "JOB_ONETIME" + def recurringJobName = "JOB_RECURRING" + qt_select1 """ + select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" = "insert") where name = '${oneTimeJobName}' + """ + qt_select2 """ + select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" = "insert") where name = '${recurringJobName}' + """ + + +} \ No newline at end of file diff --git a/regression-test/suites/job_p0/job_meta/load.groovy b/regression-test/suites/job_p0/job_meta/load.groovy new file mode 100644 index 00000000000000..bf7b8a1212843f --- /dev/null +++ b/regression-test/suites/job_p0/job_meta/load.groovy @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('load', 'p0,restart_fe') { + def tableName = "t_test_BASE_inSert_job" + def oneTimeJobName = "JOB_ONETIME" + def recurringJobName = "JOB_RECURRING" + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${oneTimeJobName}' + """ + sql """ + DROP JOB IF EXISTS where jobname = '${recurringJobName}' + """ + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` + ( + `timestamp` DATE NOT NULL COMMENT "['0000-01-01', '9999-12-31']", + `type` TINYINT NOT NULL COMMENT "[-128, 127]", + `user_id` BIGINT COMMENT "[-9223372036854775808, 9223372036854775807]" + ) + DUPLICATE KEY(`timestamp`, `type`) + DISTRIBUTED BY HASH(`type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + CREATE JOB ${recurringJobName} ON SCHEDULE every 1 HOUR STARTS '2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + + sql """ + CREATE JOB ${oneTimeJobName} ON SCHEDULE AT '2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + +} \ No newline at end of file diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 51816414d19b25..c3b77336587c43 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -218,7 +218,7 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1; """ } catch (Exception e) { - assert e.getMessage().contains("Not support UpdateStmt type in job") + assert e.getMessage().contains("Not support this sql") } // assert start time greater than current time try {