Skip to content

Commit

Permalink
Upgrade to liquibase 4.28
Browse files Browse the repository at this point in the history
Small bug fixes for liquibase 4.28
-- Parse url and add arrow settings for snapshots as well and raw commands
-- Add binary data type
  • Loading branch information
CodyAustinDavis committed May 27, 2024
1 parent aad85d3 commit cc91cda
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 18 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<liquibase.version>4.27.0</liquibase.version>
<liquibase.version>4.28.0</liquibase.version>
<sonar.organization>liquibase</sonar.organization>
<sonar.projectKey>${sonar.organization}_${project.artifactId}</sonar.projectKey>
<sonar.projectName>${project.name}</sonar.projectName>
Expand Down Expand Up @@ -119,6 +119,12 @@
<version>${dependency.spock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>databricks-jdbc</artifactId>
<version>2.6.38</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.databricks.client.jdbc.jdbc42.S42Connection;
import com.databricks.client.spark.core.SparkJDBCConnection;
import liquibase.Scope;
import liquibase.database.DatabaseConnection;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.DatabaseException;
import liquibase.exception.UnexpectedLiquibaseException;
Expand Down Expand Up @@ -55,12 +56,18 @@ public Connection getUnderlyingConnection() {
public void open(String url, Driver driverObject, Properties driverProperties) throws DatabaseException {

driverProperties.setProperty("UserAgentEntry", "Liquibase");
driverProperties.setProperty("EnableArrow", "0");
// Set UserAgent to specify to Databricks that liquibase is the tool running these commands
// Set EnableArrow because the arrow results break everything. And the JDBC release notes say to just disable it.
//String updatedUrl = url + "UserAgentEntry=Liquibase;EnableArrow=0";
// This is done in getConnectionUrl()

this.openConn(url, driverObject, driverProperties);
// Ensure there's a terminating semicolon for consistent parsing
if (!url.endsWith(";")) {
url += ";";
}

String updatedUrl = url + "UserAgentEntry=Liquibase;EnableArrow=0";

this.openConn(updatedUrl, driverObject, driverProperties);
}

public void openConn(String url, Driver driverObject, Properties driverProperties) throws DatabaseException {
Expand Down Expand Up @@ -96,25 +103,34 @@ public void setAutoCommit(boolean autoCommit) throws DatabaseException {
}

protected static String getUrlParamValue(String url, String paramName, String defaultValue) {

//System.out.println("PARSE URL - url" + url);

if (url == null) {
return null;
}

// Get catalog of connection and schema of connection
// Ensure there's a terminating semicolon for consistent parsing
if (!url.endsWith(";")) {
url += ";";
}
// Remove spaces and split by semicolon
String[] uriArgs = url.replace(" ", "").split(";");

// System.out.println("PARSE URL - url args" + uriArgs.toString());

// Use Java Streams to find the parameter value
Optional<String> paramString = Arrays.stream(uriArgs)
.filter(x -> x.startsWith(paramName + "="))
.findFirst();

// Return the parameter value if found, otherwise return the default value
if (!paramString.isPresent()) {
return defaultValue;
}
String[] defaultParamsArr = paramString.get().split("=");
return defaultParamsArr[1];
return defaultParamsArr.length > 1 ? defaultParamsArr[1] : defaultValue; // Check to avoid index out of bound
}



@Override
public String getDatabaseProductVersion() throws DatabaseException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import liquibase.Scope;
import liquibase.database.AbstractJdbcDatabase;
import liquibase.database.Database;
import liquibase.database.DatabaseConnection;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.DatabaseException;
Expand All @@ -11,6 +12,7 @@
import liquibase.structure.core.Catalog;
import liquibase.structure.core.Schema;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -206,6 +208,7 @@ protected String getConnectionSchemaName() {
}

try {

String foundSchema = parseUrlForSchema(connection.getURL());
Scope.getCurrentScope().getLog(getClass()).info("SCHEMA IDENTIFIED: " + foundSchema);

Expand Down Expand Up @@ -338,8 +341,6 @@ private Set<String> getDatabricksReservedWords() {
));
}



@Override
public void setConnection(DatabaseConnection conn) {
DatabaseConnection dbConn;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package liquibase.ext.databricks.datatype;

import liquibase.change.core.LoadDataChange;
import liquibase.database.Database;
import liquibase.datatype.DataTypeInfo;
import liquibase.datatype.DatabaseDataType;
import liquibase.datatype.LiquibaseDataType;
import liquibase.datatype.core.BlobType;
import liquibase.ext.databricks.database.DatabricksDatabase;


public class BinaryDataTypeDatabricks extends BlobType {


@Override
public DatabaseDataType toDatabaseDataType(Database database) {

if (database instanceof DatabricksDatabase) {
return new DatabaseDataType("BINARY");
}

return super.toDatabaseDataType(database);
}

@Override
public int getPriority() {
return DatabricksDatabase.DATABRICKS_PRIORITY_DATABASE;
}


@Override
public boolean supports(Database database) {
return database instanceof DatabricksDatabase;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import liquibase.Scope;
import liquibase.database.AbstractJdbcDatabase;
import liquibase.database.Database;
import liquibase.database.DatabaseConnection;
import liquibase.exception.DatabaseException;
import liquibase.executor.ExecutorService;
import liquibase.ext.databricks.database.DatabricksConnection;
import liquibase.snapshot.DatabaseSnapshot;
import liquibase.snapshot.jvm.ViewSnapshotGenerator;
import liquibase.statement.core.RawSqlStatement;
Expand All @@ -14,10 +16,12 @@
import liquibase.structure.core.View;
import liquibase.util.StringUtil;

import java.sql.ResultSet;
import java.util.List;
import java.util.Map;

import liquibase.ext.databricks.database.DatabricksDatabase;

public class ViewSnapshotGeneratorDatabricks extends ViewSnapshotGenerator {


Expand All @@ -37,31 +41,57 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
} else {
Database database = snapshot.getDatabase();
Schema schema = example.getSchema();
DatabaseConnection connection = database.getConnection();

CatalogAndSchema catalogAndSchema = (new CatalogAndSchema(schema.getCatalogName(), schema.getName())).customize(database);
String jdbcSchemaName = database.correctObjectName(((AbstractJdbcDatabase) database).getJdbcSchemaName(catalogAndSchema), Schema.class);
String query = String.format("SELECT view_definition FROM %s.%s.VIEWS WHERE table_name='%s' AND table_schema='%s' AND table_catalog='%s';",
schema.getCatalogName(), database.getSystemSchema(), example.getName(), schema.getName(), schema.getCatalogName());

// DEBUG
//System.out.println("Snapshot Database Connection URL : " + database.getConnection().getURL());
//System.out.println("Snapshot Database Connection Class : " + database.getConnection().getClass().getName());


List<Map<String, ?>> viewsMetadataRs = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawSqlStatement(query));

// New Code, likely superfluous, was used for testing
/// This should use our existing DatabaseConnection url processing
String rawViewDefinition = null;

try (ResultSet viewMetadataResultSet = ((DatabricksConnection) connection).createStatement().executeQuery(query)) {
//System.out.println("Raw Result VIEW " + viewMetadataResultSet);

viewMetadataResultSet.next();
rawViewDefinition = viewMetadataResultSet.getString(1);


} catch (Exception e) {
Scope.getCurrentScope().getLog(getClass()).info("Error getting View Definiton via existing context, going to pull from URL", e);
}

/// Old Code

if (viewsMetadataRs.isEmpty()) {
return null;
} else {

Map<String, ?> row = viewsMetadataRs.get(0);
String rawViewName = example.getName();
String rawSchemaName = schema.getName();
String rawCatalogName = schema.getCatalogName();


View view = (new View()).setName(this.cleanNameFromDatabase(rawViewName, database));
CatalogAndSchema schemaFromJdbcInfo = ((AbstractJdbcDatabase) database).getSchemaFromJdbcInfo(rawCatalogName, rawSchemaName);
view.setSchema(new Schema(schemaFromJdbcInfo.getCatalogName(), schemaFromJdbcInfo.getSchemaName()));

String definition = (String) row.get("VIEW_DEFINITION");
if (definition.startsWith("FULL_DEFINITION: ")) {
definition = definition.replaceFirst("^FULL_DEFINITION: ", "");
view.setContainsFullDefinition(true);
String definition = rawViewDefinition;

if (definition.isEmpty()) {
definition = (String) row.get("view_definition");

}

int length = definition.length();
Expand All @@ -81,5 +111,4 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot

}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ liquibase.ext.databricks.datatype.BooleanDatatypeDatabricks
liquibase.ext.databricks.datatype.FloatDatatypeDatabricks
liquibase.ext.databricks.datatype.DoubleDatatypeDatabricks
liquibase.ext.databricks.datatype.TinyintDatatypeDatabricks
liquibase.ext.databricks.datatype.SmallintDatatypeDatabricks
liquibase.ext.databricks.datatype.SmallintDatatypeDatabricks
liquibase.ext.databricks.datatype.BinaryDataTypeDatabricks

0 comments on commit cc91cda

Please sign in to comment.