Skip to content

Commit

Permalink
feat: completed view snapshot + code refactoring for class ViewSnapsh…
Browse files Browse the repository at this point in the history
…otGeneratorDatabricks
  • Loading branch information
filipelautert committed Oct 3, 2024
1 parent ead0685 commit 7d8ae2f
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
import liquibase.diff.output.changelog.core.MissingViewChangeGenerator;
import liquibase.ext.databricks.change.createView.CreateViewChangeDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.ext.databricks.structure.core.ViewDatabricks;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;

/**
* Custom implementation of {@link MissingViewChangeGenerator} for Databricks.
*/
public class MissingViewChangeGeneratorDatabricks extends MissingViewChangeGenerator {

@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
if (database instanceof DatabricksDatabase && ViewDatabricks.class.isAssignableFrom(objectType)) {
if (database instanceof DatabricksDatabase && View.class.isAssignableFrom(objectType)) {
return PRIORITY_DATABASE;
} else {
return PRIORITY_NONE;
Expand All @@ -28,7 +31,11 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
changes[0] = getCreateViewChangeDatabricks(missingObject.getAttribute("tblProperties", String.class), changes);
return changes;
}

private CreateViewChangeDatabricks getCreateViewChangeDatabricks(String tblProperties, Change[] changes) {
CreateViewChange temp = (CreateViewChange) changes[0];
CreateViewChangeDatabricks createViewChangeDatabricks = new CreateViewChangeDatabricks();
createViewChangeDatabricks.setViewName(temp.getViewName());
Expand All @@ -41,10 +48,8 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
createViewChangeDatabricks.setPath(temp.getPath());
createViewChangeDatabricks.setRelativeToChangelogFile(temp.getRelativeToChangelogFile());
createViewChangeDatabricks.setEncoding(temp.getEncoding());
createViewChangeDatabricks.setTblProperties(((ViewDatabricks)missingObject).getTblProperties());
changes[0] = createViewChangeDatabricks;

return changes;
createViewChangeDatabricks.setTblProperties(tblProperties);
return createViewChangeDatabricks;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,28 @@
import liquibase.Scope;
import liquibase.database.AbstractJdbcDatabase;
import liquibase.database.Database;
import liquibase.database.DatabaseConnection;
import liquibase.exception.DatabaseException;
import liquibase.executor.ExecutorService;
import liquibase.ext.databricks.database.DatabricksConnection;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.ext.databricks.structure.core.ViewDatabricks;
import liquibase.snapshot.DatabaseSnapshot;
import liquibase.snapshot.jvm.ViewSnapshotGenerator;
import liquibase.statement.core.RawSqlStatement;
import liquibase.statement.core.RawParameterizedSqlStatement;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Schema;
import liquibase.structure.core.View;
import org.apache.commons.lang3.StringUtils;

import java.sql.ResultSet;
import java.util.List;
import java.util.Map;

/**
* Overrides ViewSnapshotGenerator for Databricks views contemplating the tblProperties field
*/
public class ViewSnapshotGeneratorDatabricks extends ViewSnapshotGenerator {


@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
if (database instanceof DatabricksDatabase) {
return super.getPriority(objectType, database) + PRIORITY_DATABASE;
} else {
return PRIORITY_NONE;
}
return database instanceof DatabricksDatabase ? PRIORITY_DATABASE : PRIORITY_NONE;
}

@Override
Expand All @@ -41,75 +35,57 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
} else {
Database database = snapshot.getDatabase();
Schema schema = example.getSchema();
DatabaseConnection connection = database.getConnection();

String query = String.format("SELECT view_definition FROM %s.%s.VIEWS WHERE table_name='%s' AND table_schema='%s' AND table_catalog='%s';",
schema.getCatalogName(), database.getSystemSchema(), example.getName(), schema.getName(), schema.getCatalogName());

// DEBUG
//System.out.println("Snapshot Database Connection URL : " + database.getConnection().getURL());
//System.out.println("Snapshot Database Connection Class : " + database.getConnection().getClass().getName());

String query = String.format("SELECT view_definition FROM %s.%s.VIEWS WHERE table_name=? AND table_schema=? AND table_catalog=?",
schema.getCatalogName(), database.getSystemSchema());

List<Map<String, ?>> viewsMetadataRs = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawSqlStatement(query));

// New Code, likely superfluous, was used for testing
/// This should use our existing DatabaseConnection url processing
String rawViewDefinition = null;

try (ResultSet viewMetadataResultSet = ((DatabricksConnection) connection).createStatement().executeQuery(query)) {
//System.out.println("Raw Result VIEW " + viewMetadataResultSet);

viewMetadataResultSet.next();
rawViewDefinition = viewMetadataResultSet.getString(1);


} catch (Exception e) {
Scope.getCurrentScope().getLog(getClass()).info("Error getting View Definiton via existing context, going to pull from URL", e);
}

/// Old Code
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query, example.getName(), schema.getName(), schema.getCatalogName()));

if (viewsMetadataRs.isEmpty()) {
return null;
} else {

Map<String, ?> row = viewsMetadataRs.get(0);
String rawViewName = example.getName();
String viewName = this.cleanNameFromDatabase(example.getName(), database);
String rawSchemaName = schema.getName();
String rawCatalogName = schema.getCatalogName();


ViewDatabricks view = new ViewDatabricks();
view.setName(this.cleanNameFromDatabase(rawViewName, database));
CatalogAndSchema schemaFromJdbcInfo = ((AbstractJdbcDatabase) database).getSchemaFromJdbcInfo(rawCatalogName, rawSchemaName);
view.setSchema(new Schema(schemaFromJdbcInfo.getCatalogName(), schemaFromJdbcInfo.getSchemaName()));
View view = (View) new View()
.setName(viewName)
.setSchema(new Schema(schemaFromJdbcInfo.getCatalogName(), schemaFromJdbcInfo.getSchemaName()))
.setAttribute("tblProperties", this.getTblProperties(database, viewName));
view.setDefinition(getViewDefinition(viewsMetadataRs));

String definition = rawViewDefinition;

if (definition == null || definition.isEmpty()) {
definition = (String) row.get("view_definition");

}

int length = definition.length();
if (length > 0 && definition.charAt(length - 1) == 0) {
definition = definition.substring(0, length - 1);
}
return view;
}
}
}

definition = StringUtils.trimToNull(definition);
if (definition == null) {
definition = "[CANNOT READ VIEW DEFINITION]";
}
private String getViewDefinition(List<Map<String, ?>> viewsMetadataRs) {
Map<String, ?> row = viewsMetadataRs.get(0);
String definition = (String) row.get("VIEW_DEFINITION");

view.setDefinition(definition);
int length = definition.length();
if (length > 0 && definition.charAt(length - 1) == 0) {
definition = definition.substring(0, length - 1);
}

view.setTblProperties("IMPLEMENT IT HERE");
definition = StringUtils.trimToNull(definition);
if (definition == null) {
definition = "[CANNOT READ VIEW DEFINITION]";
}
return definition;
}

return view;
}
private String getTblProperties(Database database, String viewName) throws DatabaseException {
String query = String.format("SHOW TBLPROPERTIES %s.%s.%s;", database.getDefaultCatalogName(), database.getDefaultSchemaName(), viewName);
List<Map<String, ?>> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query));

}
StringBuilder csvString = new StringBuilder();
tablePropertiesResponse.forEach(tableProperty ->
csvString.append("'").append(tableProperty.get("KEY")).append("'='").append(tableProperty.get("VALUE")).append("', ")
);
return csvString.toString().replaceAll(", $", "");
}
}

This file was deleted.

This file was deleted.

0 comments on commit 7d8ae2f

Please sign in to comment.