Skip to content

Commit

Permalink
Add ignore broadcast tables when unregister storage unit (#28911)
Browse files Browse the repository at this point in the history
* Add ignore broadcast tables when unregister storage unit

* Fix getInUsedStorageUnitNameAndRulesMap error for RuleMetaData

* Optimize KernelDistSQLStatementVisitor

* Optimize RuleMetaData
  • Loading branch information
jiangML authored Nov 1, 2023
1 parent 1e1096c commit 90e7d18
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -106,9 +107,9 @@ public Map<String, Collection<Class<? extends ShardingSphereRule>>> getInUsedSto
Map<String, Collection<Class<? extends ShardingSphereRule>>> result = new LinkedHashMap<>();
for (ShardingSphereRule each : rules) {
if (each instanceof DataSourceContainedRule) {
result.putAll(getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataSourceContainedRule) each)));
mergeInUsedStorageUnitNameAndRules(result, getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataSourceContainedRule) each)));
} else if (each instanceof DataNodeContainedRule) {
result.putAll(getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataNodeContainedRule) each)));
mergeInUsedStorageUnitNameAndRules(result, getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataNodeContainedRule) each)));
}
}
return result;
Expand All @@ -132,4 +133,19 @@ private Collection<String> getInUsedStorageUnitNames(final DataSourceContainedRu
private Collection<String> getInUsedStorageUnitNames(final DataNodeContainedRule rule) {
return rule.getAllDataNodes().values().stream().flatMap(each -> each.stream().map(DataNode::getDataSourceName).collect(Collectors.toSet()).stream()).collect(Collectors.toSet());
}

private void mergeInUsedStorageUnitNameAndRules(final Map<String, Collection<Class<? extends ShardingSphereRule>>> storageUnitNameAndRules,
final Map<String, Collection<Class<? extends ShardingSphereRule>>> toBeMergedStorageUnitNameAndRules) {
for (Entry<String, Collection<Class<? extends ShardingSphereRule>>> entry : toBeMergedStorageUnitNameAndRules.entrySet()) {
if (storageUnitNameAndRules.containsKey(entry.getKey())) {
for (Class<? extends ShardingSphereRule> each : entry.getValue()) {
if (!storageUnitNameAndRules.get(entry.getKey()).contains(each)) {
storageUnitNameAndRules.get(entry.getKey()).add(each);
}
}
} else {
storageUnitNameAndRules.put(entry.getKey(), entry.getValue());
}
}
}
}
4 changes: 4 additions & 0 deletions parser/distsql/engine/src/main/antlr4/imports/Keyword.g4
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,7 @@ CLUSTER
LOCK_STRATEGY
: L O C K UL_ S T R A T E G Y
;

BROADCAST
: B R O A D C A S T
;
8 changes: 5 additions & 3 deletions parser/distsql/engine/src/main/antlr4/imports/RDLStatement.g4
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ alterStorageUnit
;

unregisterStorageUnit
: UNREGISTER STORAGE UNIT ifExists? storageUnitName (COMMA_ storageUnitName)* ignoreSingleTables?
: UNREGISTER STORAGE UNIT ifExists? storageUnitName (COMMA_ storageUnitName)* ignoreTables?
;

storageUnitDefinition
Expand Down Expand Up @@ -67,8 +67,10 @@ password
: STRING_
;

ignoreSingleTables
: IGNORE SINGLE TABLES
ignoreTables
: IGNORE (SINGLE COMMA_ BROADCAST | BROADCAST COMMA_ SINGLE) TABLES # ignoreSingleAndBroadcastTables
| IGNORE SINGLE TABLES # ignoreSingleTables
| IGNORE BROADCAST TABLES # ignoreBroadcastTables
;

ifExists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ExportMetaDataContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ExportStorageNodesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.FromSegmentContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.IgnoreBroadcastTablesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.IgnoreSingleAndBroadcastTablesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.IgnoreSingleTablesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ImportDatabaseConfigurationContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ImportMetaDataContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.InstanceIdContext;
Expand Down Expand Up @@ -206,9 +209,11 @@ private Properties getProperties(final PropertiesDefinitionContext ctx) {

@Override
public ASTNode visitUnregisterStorageUnit(final UnregisterStorageUnitContext ctx) {
boolean ignoreSingleTables = null != ctx.ignoreSingleTables();
boolean ignoreSingleTables = ctx.ignoreTables() instanceof IgnoreSingleAndBroadcastTablesContext || ctx.ignoreTables() instanceof IgnoreSingleTablesContext;
boolean ignoreBroadcastTables = ctx.ignoreTables() instanceof IgnoreSingleAndBroadcastTablesContext || ctx.ignoreTables() instanceof IgnoreBroadcastTablesContext;
return new UnregisterStorageUnitStatement(null != ctx.ifExists(),
ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()), ignoreSingleTables);
ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()),
ignoreSingleTables, ignoreBroadcastTables);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public final class UnregisterStorageUnitStatement extends StorageUnitDefinitionS

private final boolean ignoreSingleTables;

public UnregisterStorageUnitStatement(final Collection<String> storageUnitNames, final boolean ignoreSingleTables) {
this(false, storageUnitNames, ignoreSingleTables);
private final boolean ignoreBroadcastTables;

public UnregisterStorageUnitStatement(final Collection<String> storageUnitNames, final boolean ignoreSingleTables, final boolean ignoreBroadcastTables) {
this(false, storageUnitNames, ignoreSingleTables, ignoreBroadcastTables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.InvalidStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.StorageUnitInUsedException;
Expand All @@ -37,6 +38,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -82,19 +84,33 @@ private void checkInUsed(final String databaseName, final UnregisterStorageUnitS
Collection<String> inUsedStorageUnitNames = inUsedStorageUnits.keySet();
inUsedStorageUnitNames.retainAll(sqlStatement.getStorageUnitNames());
if (!inUsedStorageUnitNames.isEmpty()) {
if (sqlStatement.isIgnoreSingleTables()) {
checkInUsedIgnoreSingleTables(new HashSet<>(inUsedStorageUnitNames), inUsedStorageUnits);
Collection<Class<? extends ShardingSphereRule>> ignoreShardingSphereRules = getIgnoreShardingSphereRules(sqlStatement);
if (!ignoreShardingSphereRules.isEmpty()) {
checkInUsedIgnoreTables(new HashSet<>(inUsedStorageUnitNames), inUsedStorageUnits, ignoreShardingSphereRules);
} else {
String firstResource = inUsedStorageUnitNames.iterator().next();
throw new StorageUnitInUsedException(firstResource, inUsedStorageUnits.get(firstResource));
}
}
}

private void checkInUsedIgnoreSingleTables(final Collection<String> inUsedResourceNames, final Map<String, Collection<Class<? extends ShardingSphereRule>>> inUsedStorageUnits) {
private Collection<Class<? extends ShardingSphereRule>> getIgnoreShardingSphereRules(final UnregisterStorageUnitStatement sqlStatement) {
Collection<Class<? extends ShardingSphereRule>> result = new LinkedList<>();
if (sqlStatement.isIgnoreSingleTables()) {
result.add(SingleRule.class);
}
if (sqlStatement.isIgnoreBroadcastTables()) {
result.add(BroadcastRule.class);
}
return result;
}

private void checkInUsedIgnoreTables(final Collection<String> inUsedResourceNames,
final Map<String, Collection<Class<? extends ShardingSphereRule>>> inUsedStorageUnits,
final Collection<Class<? extends ShardingSphereRule>> ignoreShardingSphereRules) {
for (String each : inUsedResourceNames) {
Collection<Class<? extends ShardingSphereRule>> inUsedRules = inUsedStorageUnits.get(each);
inUsedRules.remove(SingleRule.class);
ignoreShardingSphereRules.forEach(inUsedRules::remove);
ShardingSpherePreconditions.checkState(inUsedRules.isEmpty(), () -> new StorageUnitInUsedException(each, inUsedRules));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ void assertExecute() throws SQLException {
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(database.getRuleMetaData().getInUsedStorageUnitNameAndRulesMap()).thenReturn(Collections.emptyMap());
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false);
assertThat(handler.execute("foo_db", unregisterStorageUnitStatement), instanceOf(UpdateResponseHeader.class));
verify(modeContextManager).unregisterStorageUnits("foo_db", unregisterStorageUnitStatement.getStorageUnitNames());
}

@Test
void assertStorageUnitNameNotExistedExecute() {
when(ProxyContext.getInstance().getDatabase("foo_db").getResourceMetaData().getStorageUnits()).thenReturn(Collections.emptyMap());
assertThrows(MissingRequiredStorageUnitsException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false)));
assertThrows(MissingRequiredStorageUnitsException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false)));
}

@Test
Expand All @@ -132,7 +132,7 @@ void assertStorageUnitNameInUseExecute() {
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
assertThrows(StorageUnitInUsedException.class,
() -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false)));
() -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false)));
}

@Test
Expand All @@ -146,7 +146,7 @@ void assertStorageUnitNameInUseWithoutIgnoreSingleTables() {
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("foo_ds", storageUnit));
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
assertThrows(StorageUnitInUsedException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false)));
assertThrows(StorageUnitInUsedException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false)));
}

@Test
Expand All @@ -160,14 +160,14 @@ void assertStorageUnitNameInUseIgnoreSingleTables() throws SQLException {
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("foo_ds", storageUnit));
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), true);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), true, false);
assertThat(handler.execute("foo_db", unregisterStorageUnitStatement), instanceOf(UpdateResponseHeader.class));
verify(modeContextManager).unregisterStorageUnits("foo_db", unregisterStorageUnitStatement.getStorageUnitNames());
}

@Test
void assertExecuteWithIfExists() throws SQLException {
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true, false);
assertThat(handler.execute("foo_db", unregisterStorageUnitStatement), instanceOf(UpdateResponseHeader.class));
verify(modeContextManager).unregisterStorageUnits("foo_db", unregisterStorageUnitStatement.getStorageUnitNames());
}
Expand All @@ -177,7 +177,7 @@ void assertStorageUnitNameInUseWithIfExists() {
when(database.getRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton(shadowRule)));
when(shadowRule.getDataSourceMapper()).thenReturn(Collections.singletonMap("", Collections.singleton("foo_ds")));
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true, false);
assertThrows(DistSQLException.class, () -> handler.execute("foo_db", unregisterStorageUnitStatement));
}
}

0 comments on commit 90e7d18

Please sign in to comment.