allEndPoint = new HashSet<>();
+ for (TRegionReplicaSet replicaSet : replicaSets) {
+ for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
+ allEndPoint.add(dataNodeLocation.getInternalEndPoint());
+ }
+ }
+
+ for (TEndPoint endPoint : allEndPoint) {
+ try (SetThreadName threadName =
+ new SetThreadName(
+ LoadTsFileScheduler.class.getName() + "-" + loadCommandReq.commandType)) {
+ if (isDispatchedToLocal(endPoint)) {
+ dispatchLocally(loadCommandReq);
+ } else {
+ dispatchRemote(loadCommandReq, endPoint);
+ }
+ } catch (FragmentInstanceDispatchException e) {
+ return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+ } catch (Throwable t) {
+ logger.error("cannot dispatch LoadCommand for load operation", t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+ }
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+
+ private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint)
+ throws FragmentInstanceDispatchException {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ logger.error(loadResp.message);
+ throw new FragmentInstanceDispatchException(loadResp.status);
+ }
+ } catch (IOException | TException e) {
+ logger.error("can't connect to node {}", endPoint, e);
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.SYNC_CONNECTION_EXCEPTION.getStatusCode());
+ status.setMessage("can't connect to node {}" + endPoint);
+ throw new FragmentInstanceDispatchException(status);
+ }
+ }
+
+ private void dispatchLocally(TLoadCommandReq loadCommandReq)
+ throws FragmentInstanceDispatchException {
+ TSStatus resultStatus =
+ StorageEngineV2.getInstance()
+ .executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+ loadCommandReq.uuid);
+ if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+ throw new FragmentInstanceDispatchException(resultStatus);
+ }
+ }
+
+ @Override
+ public void abort() {}
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
new file mode 100644
index 000000000000..cad9b8b29764
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.scheduler.load;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadSingleTsFileNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.mpp.plan.scheduler.FragInstanceDispatchResult;
+import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import io.airlift.units.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * {@link LoadTsFileScheduler} is used for scheduling {@link LoadSingleTsFileNode} and {@link
+ * LoadTsFilePieceNode}. because these two nodes need two phases to finish transfer.
+ *
+ * for more details please check: ...;
+ */
+public class LoadTsFileScheduler implements IScheduler {
+ private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class);
+
+ private final MPPQueryContext queryContext;
+ private QueryStateMachine stateMachine;
+ private LoadTsFileDispatcherImpl dispatcher;
+ private List tsFileNodeList;
+ private PlanFragmentId fragmentId;
+
+ private Set allReplicaSets;
+
+ public LoadTsFileScheduler(
+ DistributedQueryPlan distributedQueryPlan,
+ MPPQueryContext queryContext,
+ QueryStateMachine stateMachine,
+ IClientManager internalServiceClientManager) {
+ this.queryContext = queryContext;
+ this.stateMachine = stateMachine;
+ this.tsFileNodeList = new ArrayList<>();
+ this.fragmentId = distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
+ this.dispatcher = new LoadTsFileDispatcherImpl(internalServiceClientManager);
+ this.allReplicaSets = new HashSet<>();
+
+ for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) {
+ tsFileNodeList.add((LoadSingleTsFileNode) fragmentInstance.getFragment().getPlanNodeTree());
+ }
+ }
+
+ @Override
+ public void start() {
+ stateMachine.transitionToRunning();
+ for (LoadSingleTsFileNode node : tsFileNodeList) {
+ if (!node.needDecodeTsFile()) {
+ boolean isLoadLocallySuccess = loadLocally(node);
+ if (!isLoadLocallySuccess) {
+ return;
+ }
+ continue;
+ }
+
+ String uuid = UUID.randomUUID().toString();
+ dispatcher.setUuid(uuid);
+ allReplicaSets.clear();
+
+ boolean isFirstPhaseSuccess = firstPhase(node);
+ boolean isSecondPhaseSuccess = secondPhase(isFirstPhaseSuccess, uuid);
+ if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
+ return;
+ }
+ }
+ stateMachine.transitionToFinished();
+ }
+
+ private boolean firstPhase(LoadSingleTsFileNode node) {
+ if (!dispatchOneTsFile(node)) {
+ logger.error(
+ String.format("Dispatch Single TsFile Node error, LoadSingleTsFileNode %s.", node));
+ return false;
+ }
+ return true;
+ }
+
+ private boolean dispatchOneTsFile(LoadSingleTsFileNode node) {
+ for (Map.Entry> entry :
+ node.getReplicaSet2Pieces().entrySet()) {
+ allReplicaSets.add(entry.getKey());
+ for (LoadTsFilePieceNode pieceNode : entry.getValue()) {
+ FragmentInstance instance =
+ new FragmentInstance(
+ new PlanFragment(fragmentId, pieceNode),
+ fragmentId.genFragmentInstanceId(),
+ null,
+ queryContext.getQueryType(),
+ queryContext.getTimeOut());
+ instance.setDataRegionAndHost(entry.getKey());
+ Future dispatchResultFuture =
+ dispatcher.dispatch(Collections.singletonList(instance));
+
+ try {
+ FragInstanceDispatchResult result = dispatchResultFuture.get();
+ if (!result.isSuccessful()) {
+ // TODO: retry.
+ logger.error(
+ String.format(
+ "Dispatch one piece to ReplicaSet %s error, result status code %s.",
+ entry.getKey(),
+ TSStatusCode.representOf(result.getFailureStatus().getCode()).name()));
+ logger.error(
+ String.format("Result status message %s.", result.getFailureStatus().getMessage()));
+ if (result.getFailureStatus().getSubStatus() != null) {
+ for (TSStatus status : result.getFailureStatus().getSubStatus()) {
+ logger.error(
+ String.format(
+ "Sub status code %s.", TSStatusCode.representOf(status.getCode()).name()));
+ logger.error(String.format("Sub status message %s.", status.getMessage()));
+ }
+ }
+ logger.error(String.format("Dispatch piece node:%n%s", pieceNode));
+ stateMachine.transitionToFailed(result.getFailureStatus()); // TODO: record more status
+ return false;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ logger.warn("Interrupt or Execution error.", e);
+ stateMachine.transitionToFailed(e);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean secondPhase(boolean isFirstPhaseSuccess, String uuid) {
+ TLoadCommandReq loadCommandReq =
+ new TLoadCommandReq(
+ (isFirstPhaseSuccess ? LoadCommand.EXECUTE : LoadCommand.ROLLBACK).ordinal(), uuid);
+ Future dispatchResultFuture =
+ dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
+
+ try {
+ FragInstanceDispatchResult result = dispatchResultFuture.get();
+ if (!result.isSuccessful()) {
+ // TODO: retry.
+ logger.error(
+ String.format("Dispatch LoadCommand error to replicaSets %s error.", allReplicaSets));
+ logger.error(String.format("Result status code %s.", result.getFailureStatus().getCode()));
+ logger.error(
+ String.format("Result status message %s.", result.getFailureStatus().getMessage()));
+ stateMachine.transitionToFailed(result.getFailureStatus());
+ return false;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ logger.warn("Interrupt or Execution error.", e);
+ stateMachine.transitionToFailed(e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean loadLocally(LoadSingleTsFileNode node) {
+ try {
+ FragmentInstance instance =
+ new FragmentInstance(
+ new PlanFragment(fragmentId, node),
+ fragmentId.genFragmentInstanceId(),
+ null,
+ queryContext.getQueryType(),
+ queryContext.getTimeOut());
+ instance.setDataRegionAndHost(node.getLocalRegionReplicaSet());
+ dispatcher.dispatchLocally(instance);
+ } catch (FragmentInstanceDispatchException e) {
+ logger.error("Dispatch LoadCommand error to local error.");
+ logger.error(String.format("Result status code %s.", e.getFailureStatus().getCode()));
+ logger.error(String.format("Result status message %s.", e.getFailureStatus().getMessage()));
+ stateMachine.transitionToFailed(e.getFailureStatus());
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public Duration getTotalCpuTime() {
+ return null;
+ }
+
+ @Override
+ public FragmentInfo getFragmentInfo() {
+ return null;
+ }
+
+ @Override
+ public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
+
+ @Override
+ public void cancelFragment(PlanFragmentId planFragmentId) {}
+
+ public enum LoadCommand {
+ EXECUTE,
+ ROLLBACK
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index de9ad46b779b..fba7acdc9ad5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement;
@@ -201,6 +202,10 @@ public R visitInsertTablet(InsertTabletStatement insertTabletStatement, C contex
return visitStatement(insertTabletStatement, context);
}
+ public R visitLoadFile(LoadTsFileStatement loadTsFileStatement, C context) {
+ return visitStatement(loadTsFileStatement, context);
+ }
+
/** Data Control Language (DCL) */
public R visitAuthor(AuthorStatement authorStatement, C context) {
return visitStatement(authorStatement, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
new file mode 100644
index 000000000000..4fb39cad1a71
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/LoadTsFileStatement.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.utils.FilePathUtils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LoadTsFileStatement extends Statement {
+ private File file;
+ private boolean autoCreateSchema;
+ private int sgLevel;
+ private boolean verifySchema;
+
+ private List tsFiles;
+
+ public LoadTsFileStatement(String filePath) {
+ this.file = new File(filePath);
+ this.autoCreateSchema = true;
+ this.sgLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel();
+ this.verifySchema = true;
+
+ tsFiles = new ArrayList<>();
+ if (file.isFile()) {
+ tsFiles.add(file);
+ } else {
+ findAllTsFile(file);
+ }
+ sortTsFiles(tsFiles);
+ }
+
+ private void findAllTsFile(File file) {
+ for (File nowFile : file.listFiles()) {
+ if (nowFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ tsFiles.add(nowFile);
+ } else if (nowFile.isDirectory()) {
+ findAllTsFile(nowFile);
+ }
+ }
+ }
+
+ private void sortTsFiles(List files) {
+ Map file2Timestamp = new HashMap<>();
+ Map file2Version = new HashMap<>();
+ for (File file : files) {
+ String[] splitStrings = file.getName().split(FilePathUtils.FILE_NAME_SEPARATOR);
+ file2Timestamp.put(file, Long.parseLong(splitStrings[0]));
+ file2Version.put(file, Long.parseLong(splitStrings[1]));
+ }
+
+ Collections.sort(
+ files,
+ (o1, o2) -> {
+ long timestampDiff = file2Timestamp.get(o1) - file2Timestamp.get(o2);
+ if (timestampDiff != 0) {
+ return (int) (timestampDiff);
+ }
+ return (int) (file2Version.get(o1) - file2Version.get(o2));
+ });
+ }
+
+ public void setAutoCreateSchema(boolean autoCreateSchema) {
+ this.autoCreateSchema = autoCreateSchema;
+ }
+
+ public void setSgLevel(int sgLevel) {
+ this.sgLevel = sgLevel;
+ }
+
+ public void setVerifySchema(boolean verifySchema) {
+ this.verifySchema = verifySchema;
+ }
+
+ public List getTsFiles() {
+ return tsFiles;
+ }
+
+ @Override
+ public List extends PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public R accept(StatementVisitor visitor, C context) {
+ return visitor.visitLoadFile(this, context);
+ }
+
+ @Override
+ public String toString() {
+ return "LoadTsFileStatement{"
+ + "file="
+ + file
+ + ", autoCreateSchema="
+ + autoCreateSchema
+ + ", sgLevel="
+ + sgLevel
+ + ", verifySchema="
+ + verifySchema
+ + ", tsFiles Size="
+ + tsFiles.size()
+ + '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index bc487251b274..a63b68427416 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -67,10 +67,12 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
+import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.metrics.MetricService;
@@ -105,6 +107,8 @@
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
@@ -115,6 +119,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TactiveTriggerInstanceReq;
@@ -267,6 +272,44 @@ public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
throw new UnsupportedOperationException();
}
+ @Override
+ public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
+ LOGGER.info(String.format("Receive load node from uuid %s.", req.uuid));
+
+ ConsensusGroupId groupId =
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+ LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(req.body);
+ if (pieceNode == null) {
+ return createTLoadResp(new TSStatus(TSStatusCode.NODE_DESERIALIZE_ERROR.getStatusCode()));
+ }
+
+ TSStatus resultStatus =
+ StorageEngineV2.getInstance()
+ .writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
+
+ return createTLoadResp(resultStatus);
+ }
+
+ @Override
+ public TLoadResp sendLoadCommand(TLoadCommandReq req) throws TException {
+
+ TSStatus resultStatus =
+ StorageEngineV2.getInstance()
+ .executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand.values()[req.commandType], req.uuid);
+ return createTLoadResp(resultStatus);
+ }
+
+ private TLoadResp createTLoadResp(TSStatus resultStatus) {
+ boolean isAccepted = RpcUtils.SUCCESS_STATUS.equals(resultStatus);
+ TLoadResp loadResp = new TLoadResp(isAccepted);
+ if (!isAccepted) {
+ loadResp.setMessage(resultStatus.getMessage());
+ loadResp.setStatus(resultStatus);
+ }
+ return loadResp;
+ }
+
@Override
public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
return regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup());
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 4c11f4b5df7b..8e714f4d1acd 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -91,6 +91,7 @@ public enum TSStatusCode {
QUERY_ID_NOT_EXIST(414),
SNAPSHOT_DIR_NOT_LEGAL(415),
SEMANTIC_ERROR(416),
+ TSFILE_RUNTIME_ERROR(417),
UNSUPPORTED_INDEX_FUNC_ERROR(421),
UNSUPPORTED_INDEX_TYPE_ERROR(422),
@@ -131,6 +132,7 @@ public enum TSStatusCode {
CREATE_REGION_ERROR(711),
DELETE_REGION_ERROR(712),
CACHE_UPDATE_FAIL(713),
+ NODE_DESERIALIZE_ERROR(714),
// configuration
CONFIG_ERROR(800),
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index db8ebf20fb95..0f303294c61c 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -216,6 +216,23 @@ struct TUpdateTemplateReq{
2: required binary templateInfo
}
+struct TTsFilePieceReq{
+ 1: required binary body
+ 2: required string uuid
+ 3: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TLoadCommandReq{
+ 1: required i32 commandType
+ 2: required string uuid
+}
+
+struct TLoadResp{
+ 1: required bool accepted
+ 2: optional string message
+ 3: optional common.TSStatus status
+}
+
struct TConstructSchemaBlackListReq{
1: required list schemaRegionIdList
2: required binary pathPatternTree
@@ -274,6 +291,10 @@ service IDataNodeRPCService {
TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req)
+ TLoadResp sendTsFilePieceNode(TTsFilePieceReq req);
+
+ TLoadResp sendLoadCommand(TLoadCommandReq req);
+
// -----------------------------------For Config Node-----------------------------------------------
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
index 05b32e81f40f..24a441870221 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java
@@ -121,7 +121,8 @@ public ChunkHeader(
/** the exact serialized size of chunk header */
public static int getSerializedSize(String measurementID, int dataSize) {
- int measurementIdLength = measurementID.getBytes(TSFileConfig.STRING_CHARSET).length;
+ int measurementIdLength =
+ measurementID == null ? 0 : measurementID.getBytes(TSFileConfig.STRING_CHARSET).length;
return Byte.BYTES // chunkType
+ ReadWriteForEncodingUtils.varIntSize(measurementIdLength) // measurementID length
+ measurementIdLength // measurementID
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 452ecb938176..2c1020c1bd03 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -1075,7 +1075,7 @@ private ChunkHeader readChunkHeader(long position, int chunkHeaderSize) throws I
* @param position the offset of the chunk data
* @return the pages of this chunk
*/
- private ByteBuffer readChunk(long position, int dataSize) throws IOException {
+ public ByteBuffer readChunk(long position, int dataSize) throws IOException {
try {
return readData(position, dataSize);
} catch (Throwable t) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
index a1fc973d600b..2737317be36f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -21,6 +21,8 @@
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
import org.apache.iotdb.tsfile.encoding.encoder.TSEncodingBuilder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -32,6 +34,7 @@
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -125,6 +128,30 @@ public void write(long time, Binary value, boolean isNull) {
valueChunkWriterList.get(valueIndex++).write(time, value, isNull);
}
+ public void write(long time, int value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, long value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, boolean value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, float value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, double value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
+ public void write(long time, Binary value, boolean isNull, int valueIndex) {
+ valueChunkWriterList.get(valueIndex).write(time, value, isNull);
+ }
+
public void write(long time, TsPrimitiveType[] points) {
valueIndex = 0;
for (TsPrimitiveType point : points) {
@@ -270,6 +297,16 @@ private void writePageToPageBuffer() {
}
}
+ public void writePageHeaderAndDataIntoTimeBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ timeChunkWriter.writePageHeaderAndDataIntoBuff(data, header);
+ }
+
+ public void writePageHeaderAndDataIntoValueBuff(
+ ByteBuffer data, PageHeader header, int valueIndex) throws PageException {
+ valueChunkWriterList.get(valueIndex).writePageHeaderAndDataIntoBuff(data, header);
+ }
+
@Override
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
timeChunkWriter.writeToFileWriter(tsfileWriter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
index 59a1fd465b6a..dc2f2d15f889 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/TimeChunkWriter.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -30,6 +31,7 @@
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.TimeStatistics;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.write.page.TimePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -37,6 +39,9 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
public class TimeChunkWriter {
@@ -168,6 +173,53 @@ public void writePageToPageBuffer() {
}
}
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ // write the page header to pageBuffer
+ try {
+ logger.debug(
+ "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = header.getStatistics();
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ }
+ logger.debug(
+ "finish to flush a page header {} of time page into buffer, buffer position {} ",
+ header,
+ pageBuffer.size());
+
+ statistics.mergeStatistics(header.getStatistics());
+
+ } catch (IOException e) {
+ throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
+ }
+ numOfPages++;
+ // write page content to temp PBAOS
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ channel.write(data);
+ } catch (IOException e) {
+ throw new PageException(e);
+ }
+ }
+
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
writeAllPagesOfChunkToTsFile(tsfileWriter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
index 8b75269388f6..98d67a695444 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ValueChunkWriter.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.encoding.encoder.Encoder;
+import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -30,6 +31,7 @@
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -38,6 +40,9 @@
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
public class ValueChunkWriter {
@@ -187,6 +192,57 @@ public void writePageToPageBuffer() {
}
}
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader header)
+ throws PageException {
+ // write the page header to pageBuffer
+ try {
+ logger.debug(
+ "start to flush a page header into buffer, buffer position {} ", pageBuffer.size());
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ if (header.getStatistics() != null) {
+ this.firstPageStatistics = header.getStatistics();
+ }
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ this.sizeWithoutStatistic +=
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into pageBuffer
+ if (firstPageStatistics != null) {
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length - this.sizeWithoutStatistic);
+ }
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(), pageBuffer);
+ ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(), pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ }
+ logger.debug(
+ "finish to flush a page header {} of time page into buffer, buffer position {} ",
+ header,
+ pageBuffer.size());
+
+ statistics.mergeStatistics(header.getStatistics());
+
+ } catch (IOException e) {
+ throw new PageException("IO Exception in writeDataPageHeader,ignore this page", e);
+ }
+ numOfPages++;
+ // write page content to temp PBAOS
+ try (WritableByteChannel channel = Channels.newChannel(pageBuffer)) {
+ channel.write(data);
+ } catch (IOException e) {
+ throw new PageException(e);
+ }
+ }
+
public void writeToFileWriter(TsFileIOWriter tsfileWriter) throws IOException {
sealCurrentPage();
writeAllPagesOfChunkToTsFile(tsfileWriter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 2f865f297f08..22a68bdfd761 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -234,6 +234,19 @@ public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOExcept
}
}
+ public void writeChunk(Chunk chunk) throws IOException {
+ ChunkHeader chunkHeader = chunk.getHeader();
+ currentChunkMetadata =
+ new ChunkMetadata(
+ chunkHeader.getMeasurementID(),
+ chunkHeader.getDataType(),
+ out.getPosition(),
+ chunk.getChunkStatistic());
+ chunkHeader.serializeTo(out.wrapAsStream());
+ out.write(chunk.getData());
+ endCurrentChunk();
+ }
+
/** end chunk and write some log. */
public void endCurrentChunk() {
chunkMetadataList.add(currentChunkMetadata);