From 6bf5fcc02027c4e3b9f50ede5daabd4028317015 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Tue, 7 Nov 2023 17:32:25 +0800 Subject: [PATCH] Remove YamlJdbcConfiguration (#28971) * Refactor StandardPipelineDataSourceConfiguration * Remove YamlJdbcConfiguration --- ...andardPipelineDataSourceConfiguration.java | 46 ++++++++------- .../api/yaml/YamlJdbcConfiguration.java | 38 ------------- ...rdPipelineDataSourceConfigurationTest.java | 7 +-- .../yaml/YamlJdbcConfigurationTest.java | 56 ------------------- .../mysql/ingest/MySQLIncrementalDumper.java | 22 ++++---- .../wal/OpenGaussLogicalReplication.java | 10 ++-- .../wal/PostgreSQLLogicalReplication.java | 8 +-- 7 files changed, 45 insertions(+), 142 deletions(-) delete mode 100644 kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java delete mode 100644 kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java index 2d855af278b3d..3b354bca6143c 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfiguration.java @@ -20,14 +20,13 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension; import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender; import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser; -import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; +import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; @@ -54,10 +53,16 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa private final DataSourcePoolProperties dataSourcePoolProps; @Getter - private final YamlJdbcConfiguration jdbcConfig; + private final DatabaseType databaseType; + + @Getter + private final String url; @Getter - private final DatabaseType databaseType; + private final String username; + + @Getter + private final String password; @SuppressWarnings("unchecked") public StandardPipelineDataSourceConfiguration(final String param) { @@ -68,21 +73,23 @@ public StandardPipelineDataSourceConfiguration(final Map poolPro this(YamlEngine.marshal(poolProps), new HashMap<>(poolProps)); } - private StandardPipelineDataSourceConfiguration(final String param, final Map yamlConfig) { + private StandardPipelineDataSourceConfiguration(final String param, final Map poolProps) { parameter = param; for (String each : Arrays.asList("minPoolSize", "minimumIdle")) { - yamlConfig.put(each, "1"); + poolProps.put(each, "1"); } - if (yamlConfig.containsKey("jdbcUrl")) { - yamlConfig.put("url", yamlConfig.get("jdbcUrl")); - yamlConfig.remove("jdbcUrl"); + if (poolProps.containsKey("jdbcUrl")) { + poolProps.put("url", poolProps.get("jdbcUrl")); + poolProps.remove("jdbcUrl"); } - yamlConfig.remove(DATA_SOURCE_CLASS_NAME); - jdbcConfig = YamlEngine.unmarshal(YamlEngine.marshal(yamlConfig), YamlJdbcConfiguration.class, true); - databaseType = DatabaseTypeFactory.get(jdbcConfig.getUrl()); - yamlConfig.put(DATA_SOURCE_CLASS_NAME, "com.zaxxer.hikari.HikariDataSource"); - appendJdbcQueryProperties(databaseType, yamlConfig); - dataSourcePoolProps = new YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(yamlConfig); + poolProps.remove(DATA_SOURCE_CLASS_NAME); + databaseType = DatabaseTypeFactory.get(String.valueOf(poolProps.get("url"))); + poolProps.put(DATA_SOURCE_CLASS_NAME, "com.zaxxer.hikari.HikariDataSource"); + appendJdbcQueryProperties(databaseType, poolProps); + username = String.valueOf(poolProps.get("username")); + password = String.valueOf(poolProps.get("password")); + url = String.valueOf(poolProps.get("url")); + dataSourcePoolProps = new YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(poolProps); } public StandardPipelineDataSourceConfiguration(final String jdbcUrl, final String username, final String password) { @@ -98,17 +105,16 @@ private static Map wrapParameter(final String jdbcUrl, final Str return result; } - private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map yamlConfig) { + private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map poolProps) { Optional extension = DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType); if (!extension.isPresent()) { return; } - String jdbcUrl = jdbcConfig.getUrl(); + String jdbcUrl = String.valueOf(poolProps.get("url")); Properties queryProps = new StandardJdbcUrlParser().parseQueryProperties(jdbcUrl.contains("?") ? jdbcUrl.substring(jdbcUrl.indexOf("?") + 1) : ""); extension.get().extendQueryProperties(queryProps); String url = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, queryProps); - jdbcConfig.setUrl(url); - yamlConfig.put("url", url); + poolProps.put("url", url); } @Override diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java deleted file mode 100644 index 42bc16fbdf26e..0000000000000 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/yaml/YamlJdbcConfiguration.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.api.yaml; - -import lombok.Getter; -import lombok.Setter; -import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; - -/** - * JDBC configuration for YAML. - */ -@Getter -@Setter -public final class YamlJdbcConfiguration implements YamlConfiguration { - - private String dataSourceClassName; - - private String url; - - private String username; - - private String password; -} diff --git a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java index 7d185055e3b34..b5bd9f6aa90d6 100644 --- a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java +++ b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/type/StandardPipelineDataSourceConfigurationTest.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.api.type; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; import org.junit.jupiter.api.Test; @@ -78,14 +77,10 @@ private void assertGetConfig(final StandardPipelineDataSourceConfiguration actua assertThat(actual.getType(), is(StandardPipelineDataSourceConfiguration.TYPE)); DataSourcePoolProperties props = (DataSourcePoolProperties) actual.getDataSourceConfiguration(); assertThat(props.getPoolClassName(), is("com.zaxxer.hikari.HikariDataSource")); - assertGetJdbcConfig(actual.getJdbcConfig()); - assertDataSourcePoolProperties(props); - } - - private void assertGetJdbcConfig(final YamlJdbcConfiguration actual) { assertThat(actual.getUrl(), is(JDBC_URL)); assertThat(actual.getUsername(), is(USERNAME)); assertThat(actual.getPassword(), is(PASSWORD)); + assertDataSourcePoolProperties(props); } private void assertDataSourcePoolProperties(final DataSourcePoolProperties props) { diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java deleted file mode 100644 index 57baee6a8ef7c..0000000000000 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/datasource/config/yaml/YamlJdbcConfigurationTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.datasource.config.yaml; - -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -class YamlJdbcConfigurationTest { - - private static final String JDBC_URL = "jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false"; - - private static final String USERNAME = "root"; - - private static final String PASSWORD = "password"; - - @Test - void assertConstructionWithUrl() { - assertYamlJdbcConfiguration(YamlEngine.unmarshal(YamlEngine.marshal(getDataSourcePoolPropertiesWithUrl()), YamlJdbcConfiguration.class)); - } - - private Map getDataSourcePoolPropertiesWithUrl() { - Map result = new HashMap<>(3, 1F); - result.put("url", JDBC_URL); - result.put("username", USERNAME); - result.put("password", PASSWORD); - return result; - } - - private void assertYamlJdbcConfiguration(final YamlJdbcConfiguration actual) { - assertThat(actual.getUrl(), is(JDBC_URL)); - assertThat(actual.getUsername(), is(USERNAME)); - assertThat(actual.getPassword(), is(PASSWORD)); - } -} diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index f3d4fd2d37f42..d64fd12236177 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -20,8 +20,13 @@ import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; +import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; +import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; +import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; +import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; @@ -30,12 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName; -import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData; -import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType; -import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent; @@ -86,11 +85,12 @@ public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, fina this.binlogPosition = (BinlogPosition) binlogPosition; this.channel = channel; this.metaDataLoader = metaDataLoader; - YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig(); + StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL")); - ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null); - ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()); - log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", jdbcConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort()); + ConnectionProperties connectionProps = parser.parse(pipelineDataSourceConfig.getUrl(), null, null); + ConnectInfo connectInfo = new ConnectInfo( + generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), pipelineDataSourceConfig.getUsername(), pipelineDataSourceConfig.getPassword()); + log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", pipelineDataSourceConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort()); client = new MySQLClient(connectInfo, dumperContext.isDecodeWithTX()); catalog = connectionProps.getCatalog(); } diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java index ffcb2a8a33389..082134dc0f29a 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber; import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrl; import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser; @@ -52,18 +51,17 @@ public final class OpenGaussLogicalReplication { */ public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { Properties props = new Properties(); - YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig(); - PGProperty.USER.set(props, jdbcConfig.getUsername()); - PGProperty.PASSWORD.set(props, jdbcConfig.getPassword()); + PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername()); + PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword()); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); try { - return DriverManager.getConnection(jdbcConfig.getUrl(), props); + return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props); } catch (final SQLException ex) { if (failedBecauseOfNonHAPort(ex)) { log.info("Failed to connect to openGauss caused by: {} - {}. Try connecting to HA port.", ex.getSQLState(), ex.getMessage()); - return tryConnectingToHAPort(jdbcConfig.getUrl(), props); + return tryConnectingToHAPort(pipelineDataSourceConfig.getUrl(), props); } throw ex; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java index 4efa1384423c8..3b8e8142a8172 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/PostgreSQLLogicalReplication.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber; import org.postgresql.PGConnection; import org.postgresql.PGProperty; @@ -44,13 +43,12 @@ public final class PostgreSQLLogicalReplication { */ public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException { Properties props = new Properties(); - YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig(); - PGProperty.USER.set(props, jdbcConfig.getUsername()); - PGProperty.PASSWORD.set(props, jdbcConfig.getPassword()); + PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername()); + PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword()); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); - return DriverManager.getConnection(jdbcConfig.getUrl(), props); + return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props); } /**