Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAT-18792: added cluster by support for snapshot and diff table related change types #199

Merged
merged 21 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ed2f55f
wip
KushnirykOleh Sep 26, 2024
74850a8
reverted ColumnSnapshotGeneratorDatabricks, it went to DAT-18790
KushnirykOleh Oct 4, 2024
c758cf3
wip
KushnirykOleh Oct 7, 2024
3e29705
added missingTable generator
KushnirykOleh Oct 8, 2024
be58985
addressed PR comment
KushnirykOleh Oct 10, 2024
392c417
added tblProperties to expectedJson
KushnirykOleh Oct 11, 2024
31efa0e
resolved Sonar issues
KushnirykOleh Oct 11, 2024
d01e359
Merge branch 'main' into DAT-18787
KushnirykOleh Oct 14, 2024
32bd731
DAT-18792: added cluster by support for snapshot and diff table relat…
Oct 15, 2024
fe3dd5c
fixed exception when DBCL is missing
KushnirykOleh Oct 15, 2024
a4747f1
DAT-18792: merged changes from base branch
Oct 15, 2024
eb0557d
DAT-18792: added extra DB call to fetch TBLPROPERIES map in key + val…
Oct 16, 2024
6cb528a
DAT-18792: sonar lint issue fix - updated replace patern from single …
Oct 16, 2024
79043b3
DAT-18792: merged recent main branch changes
Oct 16, 2024
3df12ed
DAT-18792: review changes - updated attribute name to match exactly t…
Oct 17, 2024
2ca4291
DAT-18792: fixed finalsql tblProperies mistransfering
Oct 17, 2024
c878b1a
DAT-18792: merged main branch changes
Oct 17, 2024
d2350bb
DAT-18792: fixing one more misspeling of clusteringColumns tblPropert…
Oct 17, 2024
aeab0cd
DAT-18792: fixing namespace for databrix create table changeType.
Oct 18, 2024
f7ef735
DAT-18792: added failing tblProperties filtering, wrapping of tblProp…
Oct 18, 2024
bcff058
DAT-18792: SerializableFieldNamespace added, need more investigation …
Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {
import java.util.regex.Pattern;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {

@Override
public int getPriority(Class<? extends DatabaseObject> objectType, Database database) {
Expand All @@ -34,12 +35,14 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(
missingObject.getAttribute("Location", String.class),
missingObject.getAttribute("tblProperties", String.class));
String clusterColumns = missingObject.getAttribute("clusterColumns", "");

changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes);
changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes, clusterColumns);
return changes;
}

private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, Change[] changes) {
private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties,
Change[] changes, String clusterColumns) {
CreateTableChange temp = (CreateTableChange) changes[0];
CreateTableChangeDatabricks createTableChangeDatabricks = new CreateTableChangeDatabricks();
createTableChangeDatabricks.setColumns(temp.getColumns());
Expand All @@ -51,6 +54,9 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
createTableChangeDatabricks.setRemarks(temp.getRemarks());
createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists());
createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies());
if (!clusterColumns.isEmpty()) {
createTableChangeDatabricks.setClusterColumns(sanitizeClusterColumns(clusterColumns));
}

createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties);
return createTableChangeDatabricks;
Expand All @@ -60,4 +66,9 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
protected CreateTableChange createCreateTableChange() {
return new CreateTableChangeDatabricks();
}
}

private String sanitizeClusterColumns(String clusterColumnProperty) {
Pattern pattern = Pattern.compile("[\\[\\]\\\"]");
return clusterColumnProperty.replaceAll(pattern.toString(), "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TableSnapshotGeneratorDatabricks extends TableSnapshotGenerator {

private static final String LOCATION = "Location";
private static final String TABLE_PROPERTIES = "Table Properties";
private static final String TBL_PROPERTIES = "tblProperties";
private static final String CLUSTER_COLUMNS = "clusterColumns";
private static final String DETAILED_TABLE_INFORMATION_NODE = "# Detailed Table Information";

@Override
Expand Down Expand Up @@ -49,13 +50,32 @@ protected DatabaseObject snapshotObject(DatabaseObject example, DatabaseSnapshot
if (detailedInformationNode && tableProperty.get("COL_NAME").equals(LOCATION)) {
table.setAttribute(LOCATION, tableProperty.get("DATA_TYPE"));
}
if (detailedInformationNode && tableProperty.get("COL_NAME").equals(TABLE_PROPERTIES)) {
String tblProperties = (String) tableProperty.get("DATA_TYPE");
table.setAttribute(TBL_PROPERTIES, tblProperties.substring(1, tblProperties.length() - 1));// remove starting and ending square brackets
}
}
Map<String, String> tblProperties = getTblPropertiesMap(database, example.getName());
if (tblProperties.containsKey(CLUSTER_COLUMNS)) {
// used remove, as clusterColumns tblProperty is not allowed in create/alter table statements
table.setAttribute(CLUSTER_COLUMNS, tblProperties.remove(CLUSTER_COLUMNS));
}
table.setAttribute(TBL_PROPERTIES, getTblPropertiesString(tblProperties));
}
return table;
}

private Map<String, String> getTblPropertiesMap(Database database, String table) throws DatabaseException {
String query = String.format("SHOW TBLPROPERTIES %s.%s.%s;", database.getDefaultCatalogName(), database.getDefaultSchemaName(), table);
List<Map<String, ?>> tablePropertiesResponse = Scope.getCurrentScope().getSingleton(ExecutorService.class)
.getExecutor("jdbc", database).queryForList(new RawParameterizedSqlStatement(query));
return tablePropertiesResponse.stream()
.collect(Collectors.toMap(mapElement -> (String) mapElement.get("KEY"), mapElement -> (String) mapElement.get("VALUE")));
}

private String getTblPropertiesString(Map<String, String> propertiesMap) {
StringBuilder csvString = new StringBuilder();
propertiesMap.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> csvString.append(entry.getKey()).append("=").append(entry.getValue()).append(","));
return csvString.toString().replaceAll(",$", "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

public class CreateTableGeneratorDatabricks extends CreateTableGenerator {

private static final String CLUSTERING_INFORMATION_TBL_PROPERTY_START = "clusteringColumns=[[";


@Override
public int getPriority() {
Expand Down Expand Up @@ -49,7 +51,7 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
if ((!StringUtils.isEmpty(thisStatement.getTableFormat()))) {
finalsql.append(" USING ").append(thisStatement.getTableFormat());
} else if (thisStatement.getExtendedTableProperties() != null && StringUtils.isNoneEmpty(thisStatement.getExtendedTableProperties().getTblProperties())) {
finalsql.append(" TBLPROPERTIES (").append(thisStatement.getExtendedTableProperties().getTblProperties()).append(")");
finalsql.append(" TBLPROPERTIES (").append(avoidClusterProperties(thisStatement)).append(")");
} else {
finalsql.append(" USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true)");
}
Expand Down Expand Up @@ -109,4 +111,23 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG

}

/**
* While we are passing TBLPROPERTIES as raw string into create table statement, especially in cases of
* changelog generation we need to sanitize them from 'clusteringColumns' property, otherwise generated changelog
* will fail to execute.
* Parsing of tblProperties map as an actual Map structured collection should make this approach safer and easier.
* @param statement CreateTableStatementDatabricks containing tblProperties raw string
* @return tblProperties string without 'clusteringColumns' property if it was present, otherwise untouched
* tblProperties raw string.
* */
private String avoidClusterProperties(CreateTableStatementDatabricks statement) {
String tblProperties = statement.getExtendedTableProperties().getTblProperties();
if(tblProperties.contains(CLUSTERING_INFORMATION_TBL_PROPERTY_START)) {
int clusterColumnsStartIndex = tblProperties.indexOf(CLUSTERING_INFORMATION_TBL_PROPERTY_START);
String replaceString = tblProperties.substring(clusterColumnsStartIndex, tblProperties.indexOf("\"]],", clusterColumnsStartIndex) + 4);
return tblProperties.replace(replaceString, "");
}
return tblProperties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
"name": "test_new",
"type": "int"
}
},
{
"column": {
"name": "test_present_new",
"type": "int"
}
}
],
"clusterColumns": "test_id,test_new"
"clusterColumns": "test_id,test_new,test_present_new"
}
}
],
Expand All @@ -48,6 +54,11 @@
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_present_new`"
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
<createTable tableName="test_table_clustered_new" >
<column name="test_id" type="int" />
<column name="test_new" type="int"/>
<databricks:clusterColumns>test_id,test_new</databricks:clusterColumns>
<column name="test_present_new" type="int"/>
<databricks:clusterColumns>test_id,test_new,test_present_new</databricks:clusterColumns>
</createTable>
<rollback>
<!-- The dropTable will drop a full table whether it has clustered columns or not. -->
Expand All @@ -23,6 +24,7 @@
<changeSet id="2" author="your.name">
<databricks:alterCluster tableName="test_table_clustered_new">
<databricks:column name="test_id"/>
<databricks:column name="test_present_new"/>
</databricks:alterCluster>
<rollback/>
</changeSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ databaseChangeLog:
- column:
name: test_new
type: int
clusterColumns: test_id, test_new
- column:
name: test_present_new
type: int
clusterColumns: test_id, test_new, test_present_new
rollback:
dropTable:
tableName: test_table_clustered_new
Expand All @@ -25,6 +28,8 @@ databaseChangeLog:
columns:
- column:
name: test_id
- column:
name: test_present_new
rollback:
empty
- changeSet:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
{
"snapshot": {
"objects": {
"liquibase.structure.core.Table": [
{
"table": {
"name": "test_table_clustered_new"
}
}
],
"liquibase.structure.core.Column": [
{
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_present_new"
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id)
CREATE TABLE main.liquibase_harness_test_ds.test_table_clustered_new (test_id INT, test_new INT, test_present_new INT) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = true) CLUSTER BY (test_id, test_new, test_present_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new CLUSTER BY (test_id,test_present_new)
ALTER TABLE main.liquibase_harness_test_ds.test_table_clustered_new DROP COLUMN test_new
Loading