Skip to content

Commit

Permalink
feat:support information_schema.PARAMETERS query in cluster mod (#24378)
Browse files Browse the repository at this point in the history
  • Loading branch information
lujx98 committed Sep 28, 2023
1 parent a7a55ff commit 02b7b77
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,57 @@
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;

import java.util.Optional;
import java.util.Collections;
import java.util.*;
import java.util.Map.Entry;

/**
* ShardingSphere statistics builder for MySQL.
*/

public final class MySQLShardingSphereStatisticsBuilder implements ShardingSphereStatisticsBuilder {


private static final Set<String> CURRENT_SUPPORT = new HashSet<>(Arrays.asList("PARAMETERS"));

private static final String SHARDING_SPHERE = "shardingsphere";

private static final String INFORMATION_SCHEMA = "information_schema";
private static final String CLUSTER_INFORMATION = "cluster_information";

@Override
public ShardingSphereStatistics build(final ShardingSphereMetaData metaData) {
ShardingSphereStatistics result = new ShardingSphereStatistics();
Optional<ShardingSphereSchema> shardingSphereSchema = Optional.ofNullable(metaData.getDatabase(SHARDING_SPHERE)).map(database -> database.getSchema(SHARDING_SPHERE));
if (!shardingSphereSchema.isPresent()) {
return result;
if (shardingSphereSchema.isPresent()) {
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
for (Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
ShardingSphereTableData tableData = new ShardingSphereTableData(entry.getValue().getName());
if (CLUSTER_INFORMATION.equals(entry.getKey())) {
tableData.getRows().add(new ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));
}
schemaData.getTableData().put(entry.getKey(), tableData);
}
ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
result.getDatabaseData().put(SHARDING_SPHERE, databaseData);
}
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
for (Entry<String, ShardingSphereTable> entry : shardingSphereSchema.get().getTables().entrySet()) {
ShardingSphereTableData tableData = new ShardingSphereTableData(entry.getValue().getName());
if (CLUSTER_INFORMATION.equals(entry.getKey())) {
tableData.getRows().add(new ShardingSphereRowData(Collections.singletonList(ShardingSphereVersion.VERSION)));

Optional<ShardingSphereSchema> informationSchemaSchema = Optional.ofNullable(metaData.getDatabase("information_schema")).map(database -> database.getSchema("information_schema"));
if (informationSchemaSchema.isPresent()) {
ShardingSphereSchemaData schemaData = new ShardingSphereSchemaData();
for (Entry<String, ShardingSphereTable> entry : informationSchemaSchema.get().getTables().entrySet()) {
if (!CURRENT_SUPPORT.contains(entry.getValue().getName())) {
continue;
}
ShardingSphereTableData tableData = new ShardingSphereTableData(entry.getValue().getName());
schemaData.getTableData().put(entry.getValue().getName(), tableData);
}
schemaData.getTableData().put(entry.getKey(), tableData);
ShardingSphereDatabaseData informationSchemaResult = new ShardingSphereDatabaseData();
informationSchemaResult.getSchemaData().put(INFORMATION_SCHEMA, schemaData);
result.getDatabaseData().put(INFORMATION_SCHEMA, informationSchemaResult);
}
ShardingSphereDatabaseData databaseData = new ShardingSphereDatabaseData();
databaseData.getSchemaData().put(SHARDING_SPHERE, schemaData);
result.getDatabaseData().put(SHARDING_SPHERE, databaseData);

return result;
}

@Override
public String getDatabaseType() {
return "MySQL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void collectForTable(final String databaseName, final String schemaName,
log.error("Collect data failed!", ex);
}
tableData.ifPresent(optional -> statistics.getDatabaseData().computeIfAbsent(databaseName.toLowerCase(), key -> new ShardingSphereDatabaseData())
.getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName().toLowerCase(), optional));
.getSchemaData().computeIfAbsent(schemaName, key -> new ShardingSphereSchemaData()).getTableData().put(table.getName(), optional));
}

private void compareUpdateAndSendEvent(final ShardingSphereStatistics statistics, final ShardingSphereStatistics changedStatistics,
Expand All @@ -139,7 +139,7 @@ private void compareUpdateAndSendEventForTable(final String databaseName, final
if (tableData.equals(changedTableData)) {
return;
}
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName().toLowerCase(), changedTableData);
statistics.getDatabaseData().get(databaseName).getSchemaData().get(schemaName).getTableData().put(changedTableData.getName(), changedTableData);
ShardingSphereSchemaDataAlteredEvent event = getShardingSphereSchemaDataAlteredEvent(databaseName, schemaName, tableData, changedTableData, table);
contextManager.getInstanceContext().getEventBusContext().post(event);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.shardingsphere.proxy.backend.collector;

import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
import org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereTableDataCollectorUtils;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;

import java.sql.SQLException;
import java.util.*;

/**
* @author sheldon
* @date 2023-09-28
*/
public final class MySQLInformationSchemaParameterTableCollector implements ShardingSphereStatisticsCollector {

private final static String PARAMETER_TABLE_NAME = "PARAMETERS";

private final static String COLLECT_SQL = "select * from information_schema.PARAMETERS";

@Override
public Optional<ShardingSphereTableData> collect(String databaseName, ShardingSphereTable table, Map<String, ShardingSphereDatabase> shardingSphereDatabases) throws SQLException {
Optional<String> databaseWithDatasource = ProxyContext.getInstance().getAllDatabaseNames().stream().filter(MySQLInformationSchemaParameterTableCollector::hasDataSource).findFirst();
if (databaseWithDatasource.isPresent()) {
Collection<ShardingSphereRowData> rows = ShardingSphereTableDataCollectorUtils.collectRowData(shardingSphereDatabases.get(databaseWithDatasource.get()),
table, table.getColumnNames(), COLLECT_SQL);
ShardingSphereTableData result = new ShardingSphereTableData(PARAMETER_TABLE_NAME);
result.getRows().addAll(rows);
return Optional.of(result);
}
return Optional.empty();
}

private static Boolean hasDataSource(final String databaseName) {
return ProxyContext.getInstance().getDatabase(databaseName).containsDataSource();
}

@Override
public Object getType() {
return PARAMETER_TABLE_NAME;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.shardingsphere.proxy.backend.collector.MySQLInformationSchemaParameterTableCollector

0 comments on commit 02b7b77

Please sign in to comment.