Skip to content

Commit

Permalink
Remove YamlJdbcConfiguration (#28971)
Browse files Browse the repository at this point in the history
* Refactor StandardPipelineDataSourceConfiguration

* Remove YamlJdbcConfiguration
  • Loading branch information
terrymanu authored Nov 7, 2023
1 parent d49bc14 commit 6bf5fcc
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension;
import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;

Expand All @@ -54,10 +53,16 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
private final DataSourcePoolProperties dataSourcePoolProps;

@Getter
private final YamlJdbcConfiguration jdbcConfig;
private final DatabaseType databaseType;

@Getter
private final String url;

@Getter
private final DatabaseType databaseType;
private final String username;

@Getter
private final String password;

@SuppressWarnings("unchecked")
public StandardPipelineDataSourceConfiguration(final String param) {
Expand All @@ -68,21 +73,23 @@ public StandardPipelineDataSourceConfiguration(final Map<String, Object> poolPro
this(YamlEngine.marshal(poolProps), new HashMap<>(poolProps));
}

private StandardPipelineDataSourceConfiguration(final String param, final Map<String, Object> yamlConfig) {
private StandardPipelineDataSourceConfiguration(final String param, final Map<String, Object> poolProps) {
parameter = param;
for (String each : Arrays.asList("minPoolSize", "minimumIdle")) {
yamlConfig.put(each, "1");
poolProps.put(each, "1");
}
if (yamlConfig.containsKey("jdbcUrl")) {
yamlConfig.put("url", yamlConfig.get("jdbcUrl"));
yamlConfig.remove("jdbcUrl");
if (poolProps.containsKey("jdbcUrl")) {
poolProps.put("url", poolProps.get("jdbcUrl"));
poolProps.remove("jdbcUrl");
}
yamlConfig.remove(DATA_SOURCE_CLASS_NAME);
jdbcConfig = YamlEngine.unmarshal(YamlEngine.marshal(yamlConfig), YamlJdbcConfiguration.class, true);
databaseType = DatabaseTypeFactory.get(jdbcConfig.getUrl());
yamlConfig.put(DATA_SOURCE_CLASS_NAME, "com.zaxxer.hikari.HikariDataSource");
appendJdbcQueryProperties(databaseType, yamlConfig);
dataSourcePoolProps = new YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(yamlConfig);
poolProps.remove(DATA_SOURCE_CLASS_NAME);
databaseType = DatabaseTypeFactory.get(String.valueOf(poolProps.get("url")));
poolProps.put(DATA_SOURCE_CLASS_NAME, "com.zaxxer.hikari.HikariDataSource");
appendJdbcQueryProperties(databaseType, poolProps);
username = String.valueOf(poolProps.get("username"));
password = String.valueOf(poolProps.get("password"));
url = String.valueOf(poolProps.get("url"));
dataSourcePoolProps = new YamlDataSourceConfigurationSwapper().swapToDataSourcePoolProperties(poolProps);
}

public StandardPipelineDataSourceConfiguration(final String jdbcUrl, final String username, final String password) {
Expand All @@ -98,17 +105,16 @@ private static Map<String, Object> wrapParameter(final String jdbcUrl, final Str
return result;
}

private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map<String, Object> yamlConfig) {
private void appendJdbcQueryProperties(final DatabaseType databaseType, final Map<String, Object> poolProps) {
Optional<JdbcQueryPropertiesExtension> extension = DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
if (!extension.isPresent()) {
return;
}
String jdbcUrl = jdbcConfig.getUrl();
String jdbcUrl = String.valueOf(poolProps.get("url"));
Properties queryProps = new StandardJdbcUrlParser().parseQueryProperties(jdbcUrl.contains("?") ? jdbcUrl.substring(jdbcUrl.indexOf("?") + 1) : "");
extension.get().extendQueryProperties(queryProps);
String url = new JdbcUrlAppender().appendQueryProperties(jdbcUrl, queryProps);
jdbcConfig.setUrl(url);
yamlConfig.put("url", url);
poolProps.put("url", url);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.api.type;

import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,14 +77,10 @@ private void assertGetConfig(final StandardPipelineDataSourceConfiguration actua
assertThat(actual.getType(), is(StandardPipelineDataSourceConfiguration.TYPE));
DataSourcePoolProperties props = (DataSourcePoolProperties) actual.getDataSourceConfiguration();
assertThat(props.getPoolClassName(), is("com.zaxxer.hikari.HikariDataSource"));
assertGetJdbcConfig(actual.getJdbcConfig());
assertDataSourcePoolProperties(props);
}

private void assertGetJdbcConfig(final YamlJdbcConfiguration actual) {
assertThat(actual.getUrl(), is(JDBC_URL));
assertThat(actual.getUsername(), is(USERNAME));
assertThat(actual.getPassword(), is(PASSWORD));
assertDataSourcePoolProperties(props);
}

private void assertDataSourcePoolProperties(final DataSourcePoolProperties props) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
Expand All @@ -30,12 +35,6 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
Expand Down Expand Up @@ -86,11 +85,12 @@ public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, fina
this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig();
ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL"));
ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", jdbcConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort());
ConnectionProperties connectionProps = parser.parse(pipelineDataSourceConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(
generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), pipelineDataSourceConfig.getUsername(), pipelineDataSourceConfig.getPassword());
log.info("incremental dump, jdbcUrl={}, serverId={}, hostname={}, port={}", pipelineDataSourceConfig.getUrl(), connectInfo.getServerId(), connectInfo.getHost(), connectInfo.getPort());
client = new MySQLClient(connectInfo, dumperContext.isDecodeWithTX());
catalog = connectionProps.getCatalog();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrl;
import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
Expand Down Expand Up @@ -52,18 +51,17 @@ public final class OpenGaussLogicalReplication {
*/
public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
Properties props = new Properties();
YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig();
PGProperty.USER.set(props, jdbcConfig.getUsername());
PGProperty.PASSWORD.set(props, jdbcConfig.getPassword());
PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername());
PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
try {
return DriverManager.getConnection(jdbcConfig.getUrl(), props);
return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props);
} catch (final SQLException ex) {
if (failedBecauseOfNonHAPort(ex)) {
log.info("Failed to connect to openGauss caused by: {} - {}. Try connecting to HA port.", ex.getSQLState(), ex.getMessage());
return tryConnectingToHAPort(jdbcConfig.getUrl(), props);
return tryConnectingToHAPort(pipelineDataSourceConfig.getUrl(), props);
}
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;

import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
Expand All @@ -44,13 +43,12 @@ public final class PostgreSQLLogicalReplication {
*/
public Connection createConnection(final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
Properties props = new Properties();
YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig();
PGProperty.USER.set(props, jdbcConfig.getUsername());
PGProperty.PASSWORD.set(props, jdbcConfig.getPassword());
PGProperty.USER.set(props, pipelineDataSourceConfig.getUsername());
PGProperty.PASSWORD.set(props, pipelineDataSourceConfig.getPassword());
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.6");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
return DriverManager.getConnection(jdbcConfig.getUrl(), props);
return DriverManager.getConnection(pipelineDataSourceConfig.getUrl(), props);
}

/**
Expand Down

0 comments on commit 6bf5fcc

Please sign in to comment.