Skip to content

Commit

Permalink
Revert "[feature](executor) using fe version to set instance_num (apa…
Browse files Browse the repository at this point in the history
…che#22047)"

This reverts commit 28b714c.
  • Loading branch information
HappenLee committed Aug 4, 2023
1 parent bae363d commit b9a9ae5
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ set enable_pipeline_engine = true;
#### parallel_pipeline_task_num

`parallel_pipeline_task_num` represents the concurrency of pipeline tasks of a query. Default value is `0` (e.g. half number of CPU cores). Users can adjust this value according to their own workloads.
If the user upgrades from a lower version, the default value will be the parallel_fragment_exec_instance_num before the upgrade.

```
set parallel_pipeline_task_num = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ set enable_pipeline_engine = true;
#### parallel_pipeline_task_num

`parallel_pipeline_task_num`代表了 SQL 查询进行查询并发的 Pipeline Task 数目。Doris默认的配置为`0`,即CPU核数的一半。用户也可以实际根据自己的实际情况进行调整。
如果用户从较低的版本升级过来,则默认值为升级前的`parallel_fragment_exec_instance_num`

```
set parallel_pipeline_task_num = 0;
Expand Down
75 changes: 1 addition & 74 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.EnvUtils;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
Expand All @@ -108,7 +107,6 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.Daemon;
Expand Down Expand Up @@ -297,13 +295,10 @@ public class Env {
public static final String CLIENT_NODE_HOST_KEY = "CLIENT_NODE_HOST";
public static final String CLIENT_NODE_PORT_KEY = "CLIENT_NODE_PORT";

private static final String VERSION_DIR = "/VERSION";
private String latestFeVersion;
private String previousFeVersion;
private String metaDir;
private String bdbDir;
private String imageDir;
private String versionDir;

private MetaContext metaContext;
private long epoch = 0;

Expand Down Expand Up @@ -868,7 +863,6 @@ public void initialize(String[] args) throws Exception {
this.metaDir = Config.meta_dir;
this.bdbDir = this.metaDir + BDB_DIR;
this.imageDir = this.metaDir + IMAGE_DIR;
this.versionDir = EnvUtils.getDorisHome() + VERSION_DIR;

// 0. get local node and helper node info
getSelfHostPort();
Expand All @@ -888,21 +882,12 @@ public void initialize(String[] args) throws Exception {
bdbDir.mkdirs();
}
}

File imageDir = new File(this.imageDir);

if (!imageDir.exists()) {
imageDir.mkdirs();
}

File verDir = new File(this.versionDir);

if (!verDir.exists()) {
verDir.mkdirs();
}

// init plugin manager
initVersionInfo();
pluginMgr.init();
auditEventProcessor.start();

Expand Down Expand Up @@ -5456,64 +5441,6 @@ private static void addTableComment(Table table, StringBuilder sb) {
}
}

public void writeVersionFile(String version, int seq) {
String versionName = versionDir + "/" + version + "-commitid-" + seq + "-version";
File versionFile = new File(versionName);
try {
versionFile.createNewFile();
} catch (Exception e) {
LOG.error(e.toString());
}
}

public boolean isMajorVersionUpgrade() {
if (previousFeVersion == null) {
// There are two possible scenarios when there is no 'previousFeVersion':
// If 'image' is empty, it indicates a completely new FE.
// If 'image' is not empty, it means an upgrade from a lower version.
File imageDir = new File(this.imageDir);
File[] files = imageDir.listFiles();
if (files == null || files.length == 0) {
return false;
}
return true;
}
return previousFeVersion.charAt(0) != latestFeVersion.charAt(0);
}

private void initVersionInfo() {
latestFeVersion = Version.DORIS_BUILD_VERSION_MAJOR + "_" + Version.DORIS_BUILD_VERSION_MINOR + "_"
+ Version.DORIS_BUILD_VERSION_PATCH;
File folder = new File(versionDir);
File[] files = folder.listFiles();
int previousSeq = 0;
if (files != null) {
// Every part meaning (2_0_0-commitid-1-version)
// [version] - [commitid] - [seq]
// 'VersionFile' can be transformed like this.
// 2_0_0-commitid-1-version -> 2_1_0-commitid-2-version ->
// 2_3_0-commitid-3-version -> 2_0_0-commitid-4-version
// You can observe the process of FE upgrades through these files.
for (File file : files) {
String[] splitArr = file.getName().split("-");
String version = splitArr[0];
int seq = Integer.parseInt(splitArr[2]);
if (seq > previousSeq) {
previousSeq = seq;
previousFeVersion = version;
}
}
}
if (previousFeVersion == null) {
writeVersionFile(latestFeVersion, 1);
} else if (!previousFeVersion.equals(latestFeVersion)) {
writeVersionFile(latestFeVersion, previousSeq + 1);
}
if (isMajorVersionUpgrade()) {
ConnectContext.isMajorVersionUpgrade = true;
}
}

public int getFollowerCount() {
int count = 0;
for (Frontend fe : frontends.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
public class ConnectContext {
private static final Logger LOG = LogManager.getLogger(ConnectContext.class);
protected static ThreadLocal<ConnectContext> threadLocalInfo = new ThreadLocal<>();
public static boolean isMajorVersionUpgrade = false;

private static final String SSL_PROTOCOL = "TLS";

// set this id before analyze
Expand Down Expand Up @@ -263,10 +263,6 @@ public ConnectContext(StreamConnection connection) {
mysqlChannel = new DummyMysqlChannel();
}
sessionVariable = VariableMgr.newSessionVariable();
if (isMajorVersionUpgrade) {
VariableMgr.setGlobalPipelineTask(sessionVariable.parallelExecInstanceNum);
sessionVariable = VariableMgr.newSessionVariable();
}
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
Expand Down
19 changes: 0 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,25 +367,6 @@ private static void setGlobalVarAndWriteEditLog(VarContext ctx, String name, Str
}
}

public static void setGlobalPipelineTask(int instance) {
wlock.lock();
try {
String name = "parallel_pipeline_task_num";
String value = instance + "";
VarContext ctx = ctxByVarName.get(name);
try {
setValue(ctx.getObj(), ctx.getField(), value);
} catch (DdlException e) {
LOG.error(e.toString());
}
// write edit log
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, Lists.newArrayList(name));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
}

public static void setLowerCaseTableNames(int mode) throws DdlException {
VarContext ctx = ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES);
setGlobalVarAndWriteEditLog(ctx, GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);
Expand Down

0 comments on commit b9a9ae5

Please sign in to comment.