Skip to content

Commit

Permalink
Catch exception in the write method of PartitionRegionStateMachine (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Beyyes authored Sep 18, 2022
1 parent 478640e commit a28329e
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

/** StateMachine for PartitionRegion */
public class PartitionRegionStateMachine
Expand Down Expand Up @@ -72,14 +71,21 @@ public TSStatus write(IConsensusRequest request) {
if (request instanceof ByteBufferConsensusRequest) {
try {
plan = ConfigPhysicalPlan.Factory.create(request.serializeToByteBuffer());
} catch (IOException e) {
LOGGER.error("Deserialization error for write plan : {}", request, e);
} catch (Throwable e) {
LOGGER.error(
"Deserialization error for write plan, request: {}, bytebuffer: {}",
request,
request.serializeToByteBuffer(),
e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
} else if (request instanceof ConfigPhysicalPlan) {
plan = (ConfigPhysicalPlan) request;
} else {
LOGGER.error("Unexpected write plan : {}", request);
LOGGER.error(
"Unexpected write plan, request: {}, bytebuffer: {}",
request,
request.serializeToByteBuffer());
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
return write(plan);
Expand All @@ -103,7 +109,7 @@ public DataSet read(IConsensusRequest request) {
if (request instanceof ByteBufferConsensusRequest) {
try {
plan = ConfigPhysicalPlan.Factory.create(request.serializeToByteBuffer());
} catch (IOException e) {
} catch (Throwable e) {
LOGGER.error("Deserialization error for write plan : {}", request);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public class ProcedureManager {
private static final ConfigNodeConfig CONFIG_NODE_CONFIG =
ConfigNodeDescriptor.getInstance().getConf();

private static final int procedureWaitTimeOut = 30;
private static final int procedureWaitRetryTimeout = 250;
private static final int PROCEDURE_WAIT_TIME_OUT = 30;
private static final int PROCEDURE_WAIT_RETRY_TIMEOUT = 250;

private final ConfigManager configManager;
private ProcedureExecutor<ConfigNodeProcedureEnv> executor;
Expand Down Expand Up @@ -236,8 +236,8 @@ private boolean waitingProcedureFinished(List<Long> procedureIds, List<TSStatus>
&& !executor.isFinished(procedureId)
&& TimeUnit.MILLISECONDS.toSeconds(
System.currentTimeMillis() - startTimeForCurrentProcedure)
< procedureWaitTimeOut) {
sleepWithoutInterrupt(procedureWaitRetryTimeout);
< PROCEDURE_WAIT_TIME_OUT) {
sleepWithoutInterrupt(PROCEDURE_WAIT_RETRY_TIMEOUT);
}
Procedure<ConfigNodeProcedureEnv> finishedProcedure =
executor.getResultOrProcedure(procedureId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class DeleteStorageGroupProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, DeleteStorageGroupState> {
private static final Logger LOG = LoggerFactory.getLogger(DeleteStorageGroupProcedure.class);
private static final int retryThreshold = 5;
private static final int RETRY_THRESHOLD = 5;

private TStorageGroupSchema deleteSgSchema;

Expand Down Expand Up @@ -95,7 +95,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, DeleteStorageGroupSt
TSStatus status = env.deleteConfig(deleteSgSchema.getName());
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return Flow.NO_MORE_STATE;
} else if (getCycles() > retryThreshold) {
} else if (getCycles() > RETRY_THRESHOLD) {
setFailure(new ProcedureException("Delete config info id failed"));
}
}
Expand All @@ -108,7 +108,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, DeleteStorageGroupSt
deleteSgSchema.getName(),
state,
e);
if (getCycles() > retryThreshold) {
if (getCycles() > RETRY_THRESHOLD) {
setFailure(new ProcedureException("State stuck at " + state));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.impl.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.DeleteStorageGroupProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
Expand All @@ -89,6 +90,10 @@
import java.util.Map;
import java.util.Set;

import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.DataRegion;
import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion;
import static org.junit.Assert.assertEquals;

public class ConfigPhysicalPlanSerDeTest {

@Test
Expand Down Expand Up @@ -635,17 +640,46 @@ public void removeConfigNodePlanTest() throws IOException {

@Test
public void updateProcedureTest() throws IOException {
DeleteStorageGroupProcedure procedure = new DeleteStorageGroupProcedure();
TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
storageGroupSchema.setName("root.sg");
procedure.setDeleteSgSchema(storageGroupSchema);
UpdateProcedurePlan updateProcedurePlan = new UpdateProcedurePlan();
updateProcedurePlan.setProcedure(procedure);
UpdateProcedurePlan reqNew =
// test procedure equals DeleteStorageGroupProcedure
DeleteStorageGroupProcedure deleteStorageGroupProcedure = new DeleteStorageGroupProcedure();
deleteStorageGroupProcedure.setDeleteSgSchema(new TStorageGroupSchema("root.sg"));
UpdateProcedurePlan updateProcedurePlan0 = new UpdateProcedurePlan();
updateProcedurePlan0.setProcedure(deleteStorageGroupProcedure);
UpdateProcedurePlan updateProcedurePlan1 =
(UpdateProcedurePlan)
ConfigPhysicalPlan.Factory.create(updateProcedurePlan0.serializeToByteBuffer());
Procedure proc = updateProcedurePlan1.getProcedure();
Assert.assertEquals(proc, deleteStorageGroupProcedure);

// test procedure equals CreateRegionGroupsProcedure
TDataNodeLocation dataNodeLocation0 = new TDataNodeLocation();
dataNodeLocation0.setDataNodeId(5);
dataNodeLocation0.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
dataNodeLocation0.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation0.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 8777));
dataNodeLocation0.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010));
dataNodeLocation0.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010));

TConsensusGroupId schemaRegionGroupId = new TConsensusGroupId(SchemaRegion, 1);
TConsensusGroupId dataRegionGroupId = new TConsensusGroupId(DataRegion, 0);
TRegionReplicaSet schemaRegionSet =
new TRegionReplicaSet(schemaRegionGroupId, Collections.singletonList(dataNodeLocation0));
TRegionReplicaSet dataRegionSet =
new TRegionReplicaSet(dataRegionGroupId, Collections.singletonList(dataNodeLocation0));
Map<TConsensusGroupId, TRegionReplicaSet> failedRegions = new HashMap<>();
failedRegions.put(dataRegionGroupId, dataRegionSet);
failedRegions.put(schemaRegionGroupId, schemaRegionSet);
CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
createRegionGroupsPlan.addRegionGroup("root.sg0", dataRegionSet);
createRegionGroupsPlan.addRegionGroup("root.sg1", schemaRegionSet);
CreateRegionGroupsProcedure procedure0 =
new CreateRegionGroupsProcedure(createRegionGroupsPlan, failedRegions);

updateProcedurePlan0.setProcedure(procedure0);
updateProcedurePlan1 =
(UpdateProcedurePlan)
ConfigPhysicalPlan.Factory.create(updateProcedurePlan.serializeToByteBuffer());
Procedure proc = reqNew.getProcedure();
Assert.assertEquals(proc, procedure);
ConfigPhysicalPlan.Factory.create(updateProcedurePlan0.serializeToByteBuffer());
assertEquals(updateProcedurePlan0, updateProcedurePlan1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.tsfile.utils.PublicBAOS;

import org.junit.Test;
Expand Down Expand Up @@ -103,6 +104,13 @@ public void serializeDeserializeTest() {
procedure1.deserialize(buffer);
assertEquals(procedure0, procedure1);
assertEquals(procedure0.hashCode(), procedure1.hashCode());

CreateRegionGroupsProcedure procedure2 =
(CreateRegionGroupsProcedure)
ProcedureFactory.getInstance()
.create(ByteBuffer.wrap(byteArrayOutputStream.getBuf()));
assertEquals(procedure0, procedure2);
assertEquals(procedure0.hashCode(), procedure2.hashCode());
} catch (IOException e) {
fail();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.procedure.impl;

import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.tsfile.utils.PublicBAOS;

import org.junit.Test;

import java.io.DataOutputStream;
import java.nio.ByteBuffer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

public class DeleteStorageGroupProcedureTest {

@Test
public void serializeDeserializeTest() {

PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
DeleteStorageGroupProcedure p1 =
new DeleteStorageGroupProcedure(new TStorageGroupSchema("root.sg"));

try {
p1.serialize(outputStream);
ByteBuffer buffer =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());

DeleteStorageGroupProcedure p2 =
(DeleteStorageGroupProcedure) ProcedureFactory.getInstance().create(buffer);
assertEquals(p1, p2);

} catch (Exception e) {
fail();
}
}
}

0 comments on commit a28329e

Please sign in to comment.