Skip to content

Commit

Permalink
Add support for creating partitioned tables
Browse files Browse the repository at this point in the history
Add support and tests for creating partitioned tables
  • Loading branch information
CodyAustinDavis committed Sep 23, 2023
1 parent 39e7fdf commit 990c4d4
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,32 @@
import liquibase.change.DatabaseChange;
import liquibase.change.DatabaseChangeProperty;
import liquibase.change.core.CreateTableChange;
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.statement.core.CreateTableStatement;
import liquibase.util.ObjectUtil;


@DatabaseChange(name = "createTable", description = "Create Table", priority = ChangeMetaData.PRIORITY_DATABASE +500)
public class CreateTableChangeDatabricks extends CreateTableChange {

private String tableFormat;
private String tableLocation;

private String clusterColumns;
private String partitionColumns;


@Override
public ValidationErrors validate(Database database) {
ValidationErrors validationErrors = new ValidationErrors();
validationErrors.addAll(super.validate(database));

if (partitionColumns != null & clusterColumns != null) {
validationErrors.addError("Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one. And do not supply the other");
}
return validationErrors;
}

@DatabaseChangeProperty
public String getTableFormat() {return tableFormat;}
Expand All @@ -30,13 +46,18 @@ public String getClusterColumns() {
return clusterColumns;
}

@DatabaseChangeProperty
public String getPartitionColumns() {return partitionColumns; }

public void setTableLocation(String tableLocation) {this.tableLocation = tableLocation;}

@DatabaseChangeProperty
public void setClusterColumns(String clusterColumns) {
this.clusterColumns = clusterColumns;
}

@DatabaseChangeProperty
public void setPartitionColumns(String partitionColumns) { this.partitionColumns = partitionColumns; }

@Override
protected CreateTableStatement generateCreateTableStatement() {
Expand All @@ -46,6 +67,7 @@ protected CreateTableStatement generateCreateTableStatement() {
ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());

return ctas;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class CreateTableStatementDatabricks extends CreateTableStatement {

private ArrayList<String> clusterColumns;

private ArrayList<String> partitionColumns;


public CreateTableStatementDatabricks(String catalogName, String schemaName, String tableName) {
super(catalogName, schemaName, tableName);
Expand All @@ -30,6 +32,21 @@ public ArrayList<String> getClusterColumns () {
return clusterColumns;
}

public ArrayList<String> getPartitionColumns () {
return partitionColumns;
}


public void setPartitionColumns (String partitionColumns) {
if (partitionColumns == null) {
this.partitionColumns = new ArrayList<>();
return;
}
this.partitionColumns = new ArrayList<>(Arrays.asList(partitionColumns.split("\\s*,\\s*")));
}



public void setClusterColumns (String clusterColumns) {
if (clusterColumns == null) {
this.clusterColumns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package liquibase.ext.databricks.sqlgenerator;


import liquibase.exception.ValidationErrors;
import liquibase.ext.databricks.change.createTable.CreateTableStatementDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.sqlgenerator.core.CreateTableGenerator;
Expand Down Expand Up @@ -28,6 +29,15 @@ public boolean supports(CreateTableStatement statement, Database database) {
return super.supports(statement, database) && (database instanceof DatabricksDatabase);
}

public ValidationErrors validate(CreateTableStatementDatabricks createStatement, Database database, SqlGeneratorChain sqlGeneratorChain) {
ValidationErrors validationErrors = new ValidationErrors();
if (!(createStatement.getPartitionColumns().isEmpty()) && !(createStatement.getClusterColumns().isEmpty())){
validationErrors.addError("WARNING! Databricks does not supported creating tables with PARTITION and CLUSTER columns, please one supply one option.");
}
return validationErrors;
}


@Override
public Sql[] generateSql(CreateTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {

Expand All @@ -49,8 +59,11 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
}

ArrayList<String> clusterCols = thisStatement.getClusterColumns();
ArrayList<String> partitionCols = thisStatement.getPartitionColumns();


// If there are any cluster columns, add the clause
// ONLY if there are NOT cluster columns, then do partitions, but never both.
if (clusterCols.size() >= 1 ) {

finalsql += " CLUSTER BY (";
Expand All @@ -67,6 +80,21 @@ public Sql[] generateSql(CreateTableStatement statement, Database database, SqlG
finalsql += ")";
}
}
} else if (partitionCols.size() >=1) {
finalsql += " PARTITIONED BY (";

int val = 0;
while (partitionCols.size() > val) {
finalsql += partitionCols.get(val);

val +=1;
if (partitionCols.size() > val) {
finalsql += ", ";
}
else {
finalsql += ")";
}
}
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-latest.xsd http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd">

<changeSet id="1" author="codydavis">
<ext:createTable tableName="test_table_partitioned" partitionColumns = "partition_column">
<column name="test_id" type="int">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="test_column" type="varchar(50)">
<constraints nullable="false"/>
</column>
<column name="partition_column" type="string">
<constraints nullable="false"/>
</column>
</ext:createTable>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"snapshot": {
"objects": {
"liquibase.structure.core.Table": [
{
"table": {
"name": "test_table_partitioned"
}
}
],
"liquibase.structure.core.Column": [
{
"column": {
"name": "test_id"
}
},
{
"column": {
"name": "test_column"
}
},
{
"column": {
"name": "partition_column"
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE TABLE main.liquibase_harness_test_ds.test_table_partitioned (test_id INT NOT NULL, test_column VARCHAR(50) NOT NULL, partition_column STRING NOT NULL, CONSTRAINT PK_TEST_TABLE_PARTITIONED PRIMARY KEY (test_id)) USING delta TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name') PARTITIONED BY (partition_column)

0 comments on commit 990c4d4

Please sign in to comment.