diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 6404fe8009e377..4c8528abd16b26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -865,6 +865,8 @@ public static LoadJob read(DataInput in) throws IOException { job = new SparkLoadJob(); } else if (type == EtlJobType.INSERT) { job = new InsertLoadJob(); + } else if (type == EtlJobType.MINI) { + job = new MiniLoadJob(); } else { throw new IOException("Unknown load type: " + type.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 73bc6f6054af09..278e23bba53657 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -51,7 +51,6 @@ import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; -import org.apache.doris.transaction.TransactionState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -815,7 +814,8 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId public void write(DataOutput out) throws IOException { long currentTimeMs = System.currentTimeMillis(); List loadJobs = - idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList()); + idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)) + .filter(t -> !(t instanceof MiniLoadJob)).collect(Collectors.toList()); out.writeInt(loadJobs.size()); for (LoadJob loadJob : loadJobs) { @@ -836,25 +836,9 @@ public void readFields(DataInput in) throws IOException { } if (loadJob.getJobType() == EtlJobType.MINI) { - // This is a bug fix. the mini load job should not with state LOADING. - if (loadJob.getState() == JobState.LOADING) { - LOG.warn("skip mini load job {} in db {} with LOADING state", loadJob.getId(), loadJob.getDbId()); - continue; - } - - if (loadJob.getState() == JobState.PENDING) { - // bad case. When a mini load job is created and then FE restart. - // the job will be in PENDING state forever. - // This is a temp solution to remove these jobs. - // And the mini load job should be deprecated in Doris v1.1 - TransactionState state = Env.getCurrentEnv().getGlobalTransactionMgr() - .getTransactionState(loadJob.getDbId(), loadJob.getTransactionId()); - if (state == null) { - LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", loadJob.getId(), - loadJob.getDbId(), loadJob.getTransactionId()); - continue; - } - } + LOG.warn("skip mini load job {} in db {} as it is no longer supported", loadJob.getId(), + loadJob.getDbId()); + continue; } idToLoadJob.put(loadJob.getId(), loadJob); Map> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java new file mode 100644 index 00000000000000..514a38ccb51bb6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.catalog.AuthorizationInfo; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.io.Text; +import org.apache.doris.load.EtlJobType; +import org.apache.doris.transaction.TransactionState; + +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +@Deprecated +public class MiniLoadJob extends LoadJob { + private static final Logger LOG = LogManager.getLogger(MiniLoadJob.class); + + private String tableName; + + private long tableId; + + public MiniLoadJob() { + super(EtlJobType.MINI); + } + + @Override + public Set getTableNamesForShow() { + return Sets.newHashSet(tableName); + } + + @Override + public Set getTableNames() throws MetaNotFoundException { + return Sets.newHashSet(tableName); + } + + @Override + public void beginTxn() { + } + + @Override + protected void replayTxnAttachment(TransactionState txnState) { + updateLoadingStatue(txnState); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, tableName); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + tableName = Text.readString(in); + } + + public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { + Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); + return new AuthorizationInfo(database.getFullName(), getTableNames()); + } + + private void updateLoadingStatue(TransactionState txnState) { + MiniLoadTxnCommitAttachment miniLoadTxnCommitAttachment = + (MiniLoadTxnCommitAttachment) txnState.getTxnCommitAttachment(); + if (miniLoadTxnCommitAttachment == null) { + // aborted txn may not has attachment + LOG.info("no miniLoadTxnCommitAttachment, txn id: {} status: {}", txnState.getTransactionId(), + txnState.getTransactionStatus()); + return; + } + loadingStatus.replaceCounter(DPP_ABNORMAL_ALL, String.valueOf(miniLoadTxnCommitAttachment.getFilteredRows())); + loadingStatus.replaceCounter(DPP_NORMAL_ALL, String.valueOf(miniLoadTxnCommitAttachment.getLoadedRows())); + if (miniLoadTxnCommitAttachment.getErrorLogUrl() != null) { + loadingStatus.setTrackingUrl(miniLoadTxnCommitAttachment.getErrorLogUrl()); + } + } +}