diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java index f07ef2e6d0054..e7473ffc11d71 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshTunnel.java @@ -368,9 +368,7 @@ ClientSession openTunnel(final SshClient client) { remoteServiceHost, remoteServicePort, address.toInetSocketAddress())); return session; } catch (final IOException | GeneralSecurityException e) { - if (e instanceof SshException && e.getMessage() - .toLowerCase(Locale.ROOT) - .contains("failed to get operation result within specified timeout")) { + if (e instanceof SshException && isTimeout(e)) { throw new ConfigErrorException(SSH_TIMEOUT_DISPLAY_MESSAGE, e); } else { throw new RuntimeException(e); @@ -378,6 +376,11 @@ ClientSession openTunnel(final SshClient client) { } } + private boolean isTimeout(final Exception e) { + return e.getMessage().toLowerCase(Locale.ROOT).contains("failed to get operation result within specified timeout") + || e.getMessage().contains("Failed (ConnectException) to execute: Connection refused"); + } + @Override public String toString() { return "SshTunnel{" + diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java index 22b57890da22c..f530472e65b65 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SshPasswordMySqlSourceAcceptanceTest.java @@ -7,13 +7,17 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshBastionContainer; import io.airbyte.integrations.base.ssh.SshTunnel; import io.airbyte.integrations.source.mysql.MySqlSource; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import java.io.IOException; import java.nio.file.Path; import java.util.List; import org.junit.jupiter.api.Test; @@ -43,12 +47,7 @@ public Path getConfigFilePath() { @Test public void sshTimeoutExceptionMarkAsConfigErrorTest() throws Exception { - SshBastionContainer bastion = new SshBastionContainer(); - final Network network = Network.newNetwork(); - // set up env - MySQLContainer db = startTestContainers(bastion, network); - config = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, bastion.getBasicDbConfigBuider(db, List.of("public"))); - bastion.stopAndClose(); + JsonNode config = prepareBastionEnv(false); Source sshWrappedSource = MySqlSource.sshWrappedSource(); Exception exception = assertThrows(ConfigErrorException.class, () -> sshWrappedSource.discover(config)); @@ -58,6 +57,32 @@ public void sshTimeoutExceptionMarkAsConfigErrorTest() throws Exception { assertTrue(actualMessage.contains(expectedMessage)); } + @Test + public void sshConnectionExceptionMarkAsConfigErrorTest() throws Exception { + JsonNode config = prepareBastionEnv(true); + // set fake port + JsonNode fakeConfig = Jsons.clone(config); + ((ObjectNode) fakeConfig.get("tunnel_method")).put("tunnel_port", 1111); + Source sshWrappedSource = MySqlSource.sshWrappedSource(); + Exception exception = assertThrows(ConfigErrorException.class, () -> sshWrappedSource.discover(fakeConfig)); + + String expectedMessage = "Timed out while opening a SSH Tunnel. Please double check the given SSH configurations and try again."; + String actualMessage = exception.getMessage(); + + assertTrue(actualMessage.contains(expectedMessage)); + } + + private JsonNode prepareBastionEnv(boolean isBastionRunning) throws IOException, InterruptedException { + SshBastionContainer bastion = new SshBastionContainer(); + final Network network = Network.newNetwork(); + MySQLContainer db = startTestContainers(bastion, network); + JsonNode config = bastion.getTunnelConfig(SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH, bastion.getBasicDbConfigBuider(db, List.of("public"))); + if(!isBastionRunning) { + bastion.stopAndClose(); + } + return config; + } + private MySQLContainer startTestContainers(SshBastionContainer bastion, Network network) { bastion.initAndStartBastion(network); return initAndStartJdbcContainer(network);