Skip to content

Commit

Permalink
[Fix](Job)Replaying logs should not modify the original information o…
Browse files Browse the repository at this point in the history
…f the job (apache#40474)

## Proposed changes
```
        JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
        jobConfig.setExecuteType(JobExecuteType.INSTANT);
        setJobConfig(jobConfig);
```
- Replaying logs should not modify the original information of the job
- Use the new optimizer to check whether the executed statement is legal

(cherry picked from commit de90051)
  • Loading branch information
CalvinKirs committed Sep 13, 2024
1 parent 873f70c commit 4bc5efa
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.HashSet;

/**
* syntax:
* CREATE
Expand Down Expand Up @@ -91,12 +91,6 @@ public class CreateJobStmt extends DdlStmt {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";

private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.build();

private static final HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);

public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
Expand All @@ -118,7 +112,6 @@ public void analyze(Analyzer analyzer) throws UserException {
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
Expand Down Expand Up @@ -164,6 +157,7 @@ public void analyze(Analyzer analyzer) throws UserException {
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt, jobName, comment);
analyzerSqlStmt(executeSql);
// create job use label name as its job name
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
Expand Down Expand Up @@ -191,22 +185,20 @@ protected static void checkAuth() throws AnalysisException {
}
}

private void checkStmtSupport() throws AnalysisException {
if (supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) {
return;
}
for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
if (clazz.isAssignableFrom(doStmt.getClass())) {
supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName());
return;
private void analyzerSqlStmt(String sql) throws UserException {
NereidsParser parser = new NereidsParser();
LogicalPlan logicalPlan = parser.parseSingle(sql);
if (logicalPlan instanceof InsertIntoTableCommand) {
InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan;
try {
insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor());
} catch (Exception e) {
throw new AnalysisException(e.getMessage());
}
}
throw new AnalysisException("Not support " + doStmt.getClass().getSimpleName() + " type in job");
}

private void analyzerSqlStmt() throws UserException {
checkStmtSupport();
doStmt.analyze(analyzer);
} else {
throw new AnalysisException("Not support this sql : " + sql);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
Expand Down Expand Up @@ -647,23 +645,8 @@ public void onUnRegister() throws JobException {

@Override
public void onReplayCreate() throws JobException {
JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
jobConfig.setExecuteType(JobExecuteType.INSTANT);
setJobConfig(jobConfig);
onRegister();
checkJobParams();
log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build());
}

@Override
public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob) throws JobException {
if (!(replayJob instanceof InsertJob)) {
return;
}
InsertJob insertJob = (InsertJob) replayJob;
unprotectReadEndOperation(insertJob);
log.info(new LogBuilder(LogKey.LOAD_JOB,
insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build());
super.onReplayCreate();
}

public int getProgress() {
Expand Down
7 changes: 7 additions & 0 deletions regression-test/data/job_p0/job_meta/job_query_test.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213');

-- !select2 --
JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213');

28 changes: 28 additions & 0 deletions regression-test/suites/job_p0/job_meta/job_query_test.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite('job_query_test', 'p0,restart_fe') {
def oneTimeJobName = "JOB_ONETIME"
def recurringJobName = "JOB_RECURRING"
qt_select1 """
select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" = "insert") where name = '${oneTimeJobName}'
"""
qt_select2 """
select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" = "insert") where name = '${recurringJobName}'
"""


}
50 changes: 50 additions & 0 deletions regression-test/suites/job_p0/job_meta/load.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('load', 'p0,restart_fe') {
def tableName = "t_test_BASE_inSert_job"
def oneTimeJobName = "JOB_ONETIME"
def recurringJobName = "JOB_RECURRING"
sql """drop table if exists `${tableName}` force"""
sql """
DROP JOB IF EXISTS where jobname = '${oneTimeJobName}'
"""
sql """
DROP JOB IF EXISTS where jobname = '${recurringJobName}'
"""
sql """
CREATE TABLE IF NOT EXISTS `${tableName}`
(
`timestamp` DATE NOT NULL COMMENT "['0000-01-01', '9999-12-31']",
`type` TINYINT NOT NULL COMMENT "[-128, 127]",
`user_id` BIGINT COMMENT "[-9223372036854775808, 9223372036854775807]"
)
DUPLICATE KEY(`timestamp`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """
CREATE JOB ${recurringJobName} ON SCHEDULE every 1 HOUR STARTS '2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""

sql """
CREATE JOB ${oneTimeJobName} ON SCHEDULE AT '2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
"""

}
2 changes: 1 addition & 1 deletion regression-test/suites/job_p0/test_base_insert_job.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ suite("test_base_insert_job") {
CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1;
"""
} catch (Exception e) {
assert e.getMessage().contains("Not support UpdateStmt type in job")
assert e.getMessage().contains("Not support this sql")
}
// assert start time greater than current time
try {
Expand Down

0 comments on commit 4bc5efa

Please sign in to comment.