Skip to content

Commit

Permalink
Destination Snowflake: Adapting to new connector interfaces (#38658)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored Jun 3, 2024
1 parent 75b9d06 commit f4c4c34
Show file tree
Hide file tree
Showing 32 changed files with 1,498 additions and 1,533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.13'
cdkVersionRequired = '0.35.14'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand All @@ -15,12 +15,6 @@ java {
}
}

compileKotlin {
compilerOptions {
allWarningsAsErrors = false
}
}

application {
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestinationKt'
// enable when profiling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.8.4
dockerImageTag: 3.9.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import com.zaxxer.hikari.HikariDataSource
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.json.Jsons.deserialize
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import java.io.IOException
import java.io.PrintWriter
import java.net.URI
Expand All @@ -23,12 +25,13 @@ import java.util.*
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors
import javax.sql.DataSource
import net.snowflake.client.jdbc.SnowflakeSQLException
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/** SnowflakeDatabase contains helpers to create connections to and run queries on Snowflake. */
object SnowflakeDatabase {
private val LOGGER: Logger = LoggerFactory.getLogger(SnowflakeDatabase::class.java)
object SnowflakeDatabaseUtils {
private val LOGGER: Logger = LoggerFactory.getLogger(SnowflakeDatabaseUtils::class.java)
private const val PAUSE_BETWEEN_TOKEN_REFRESH_MIN =
7 // snowflake access token TTL is 10min and can't be modified

Expand All @@ -42,14 +45,20 @@ object SnowflakeDatabase {
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(10))
.build()
const val PRIVATE_KEY_FILE_NAME: String = "rsa_key.p8"
const val PRIVATE_KEY_FIELD_NAME: String = "private_key"
const val PRIVATE_KEY_PASSWORD: String = "private_key_password"
private const val PRIVATE_KEY_FILE_NAME: String = "rsa_key.p8"
private const val PRIVATE_KEY_FIELD_NAME: String = "private_key"
private const val PRIVATE_KEY_PASSWORD: String = "private_key_password"
private const val CONNECTION_STRING_IDENTIFIER_KEY = "application"
private const val CONNECTION_STRING_IDENTIFIER_VAL = "Airbyte_Connector"

// This is an unfortunately fragile way to capture the errors, but Snowflake doesn't
// provide a more specific permission exception error code
private const val NO_PRIVILEGES_ERROR_MESSAGE = "but current role has no privileges on it"
private const val IP_NOT_IN_WHITE_LIST_ERR_MSG = "not allowed to access Snowflake"

@JvmStatic
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): HikariDataSource {

val dataSource = HikariDataSource()

val jdbcUrl =
Expand Down Expand Up @@ -243,4 +252,45 @@ object SnowflakeDatabase {
}
}
}

fun checkForKnownConfigExceptions(e: Exception?): Optional<ConfigErrorException> {
if (e is SnowflakeSQLException && e.message!!.contains(NO_PRIVILEGES_ERROR_MESSAGE)) {
return Optional.of(
ConfigErrorException(
"Encountered Error with Snowflake Configuration: Current role does not have permissions on the target schema please verify your privileges",
e
)
)
}
if (e is SnowflakeSQLException && e.message!!.contains(IP_NOT_IN_WHITE_LIST_ERR_MSG)) {
return Optional.of(
ConfigErrorException(
"""
Snowflake has blocked access from Airbyte IP address. Please make sure that your Snowflake user account's
network policy allows access from all Airbyte IP addresses. See this page for the list of Airbyte IPs:
https://docs.airbyte.com/cloud/getting-started-with-airbyte-cloud#allowlist-ip-addresses and this page
for documentation on Snowflake network policies: https://docs.snowflake.com/en/user-guide/network-policies
""".trimIndent(),
e
)
)
}
return Optional.empty()
}

fun toSqlTypeName(airbyteProtocolType: AirbyteProtocolType): String {
return when (airbyteProtocolType) {
AirbyteProtocolType.STRING -> "TEXT"
AirbyteProtocolType.NUMBER -> "FLOAT"
AirbyteProtocolType.INTEGER -> "NUMBER"
AirbyteProtocolType.BOOLEAN -> "BOOLEAN"
AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE -> "TIMESTAMP_TZ"
AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE -> "TIMESTAMP_NTZ"
AirbyteProtocolType.TIME_WITH_TIMEZONE -> "TEXT"
AirbyteProtocolType.TIME_WITHOUT_TIMEZONE -> "TIME"
AirbyteProtocolType.DATE -> "DATE"
AirbyteProtocolType.UNKNOWN -> "VARIANT"
}
}
}
Loading

0 comments on commit f4c4c34

Please sign in to comment.