-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: introduce DuckDB table assurance
- Loading branch information
Showing
12 changed files
with
788 additions
and
254 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import * as ws from "../../../lib/universal/whitespace.ts"; | ||
import * as tmpl from "../../emit/mod.ts"; | ||
|
||
export class AssuranceRules<Context extends tmpl.SqlEmitContext> { | ||
constructor( | ||
readonly govn: { | ||
readonly SQL: ReturnType<typeof tmpl.SQL<Context>>; | ||
}, | ||
) { | ||
} | ||
|
||
insertIssue( | ||
from: string, | ||
typeText: string, | ||
messageSql: string, | ||
remediationSql?: string, | ||
) { | ||
return ws.unindentWhitespace( | ||
`INSERT INTO ingest_issue (issue_type, issue_message, remediation) | ||
SELECT '${typeText}', | ||
${messageSql}, | ||
${remediationSql ?? "NULL"} | ||
FROM ${from}`, | ||
); | ||
} | ||
|
||
insertRowValueIssue( | ||
from: string, | ||
typeText: string, | ||
rowNumSql: string, | ||
columnNameSql: string, | ||
valueSql: string, | ||
messageSql: string, | ||
remediationSql?: string, | ||
) { | ||
return ws.unindentWhitespace( | ||
`INSERT INTO ingest_issue (issue_type, issue_row, issue_column, invalid_value, issue_message, remediation) | ||
SELECT '${typeText}', | ||
${rowNumSql}, | ||
${columnNameSql}, | ||
${valueSql}, | ||
${messageSql}, | ||
${remediationSql ?? "NULL"} | ||
FROM ${from}`, | ||
); | ||
} | ||
|
||
requiredColumnNamesInTable( | ||
tableName: string, | ||
requiredColNames: string[], | ||
): tmpl.SqlTextSupplier<Context> { | ||
const cteName = "required_column_names_in_src"; | ||
// deno-fmt-ignore | ||
return this.govn.SQL` | ||
WITH ${cteName} AS ( | ||
SELECT column_name | ||
FROM (VALUES ${requiredColNames.map(cn => `('${cn}')`).join(', ')}) AS required(column_name) | ||
WHERE required.column_name NOT IN ( | ||
SELECT upper(trim(column_name)) | ||
FROM information_schema.columns | ||
WHERE table_name = '${tableName}') | ||
) | ||
${this.insertIssue(cteName, | ||
'Missing Column', | ||
`'Required column ' || column_name || ' is missing in the CSV file.'`, | ||
`'Ensure the CSV contains the column "' || column_name || '"'` | ||
)}`; | ||
} | ||
|
||
intValueInAllTableRows( | ||
tableName: string, | ||
columnName: string, | ||
): tmpl.SqlTextSupplier<Context> { | ||
const cteName = "numeric_value_in_all_rows"; | ||
// deno-fmt-ignore | ||
return this.govn.SQL` | ||
WITH ${cteName} AS ( | ||
SELECT '${columnName}' AS issue_column, | ||
${columnName} AS invalid_value, | ||
src_file_row_number AS issue_row | ||
FROM ${tableName} | ||
WHERE ${columnName} IS NOT NULL | ||
AND ${columnName} NOT SIMILAR TO '^[+-]?[0-9]+$' | ||
) | ||
${this.insertRowValueIssue(cteName, | ||
'Data Type Mismatch', | ||
'issue_row', | ||
'issue_column', | ||
'invalid_value', | ||
`'Non-integer value "' || invalid_value || '" found in ' || issue_column`, | ||
`'Convert non-integer values to INTEGER'` | ||
)}`; | ||
} | ||
|
||
dotComEmailValueInAllTableRows( | ||
tableName: string, | ||
columnName: string, | ||
): tmpl.SqlTextSupplier<Context> { | ||
const cteName = "proper_dot_com_email_address_in_all_rows"; | ||
// deno-fmt-ignore | ||
return this.govn.SQL` | ||
WITH ${cteName} AS ( | ||
SELECT '${columnName}' AS issue_column, | ||
${columnName} AS invalid_value, | ||
src_file_row_number AS issue_row | ||
FROM ${tableName} | ||
WHERE ${columnName} IS NOT NULL | ||
AND ${columnName} NOT SIMILAR TO '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.com$' | ||
) | ||
${this.insertRowValueIssue(cteName, | ||
'Format Mismatch', | ||
'issue_row', | ||
'issue_column', | ||
'invalid_value', | ||
`'Invalid email format "' || invalid_value || '" in ' || issue_column`, | ||
`'Correct the email format'` | ||
)}`; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
column1,column2,column3,column4,column5,column6,column7,column8,column9 | ||
Valid,[email protected],123,10,50,Unique1,Mandatory,ABC-1234,Yes | ||
"Invalid (Email, Integer): Invalid email in column2 and non-integer in column4",invalid-email,123,abc,200,Unique2,Mandatory,ABC-1234,No | ||
"Invalid (Range, Mandatory): Value out of range in column5 and missing mandatory value in column7",[email protected],123,20,5,Unique3,,ABC-1235,Yes | ||
"Invalid (Pattern): Pattern mismatch in column8",[email protected],123,30,150,Unique4,Mandatory,ABC-ABCD,Yes | ||
"Invalid (List): Value not in allowed list in column9",[email protected],123,40,60,Unique5,Mandatory,XYZ-1234,No | ||
"Duplicate (Unique): Duplicate value in column6",[email protected],123,50,70,Unique1,Mandatory,ABC-1234,Maybe | ||
"Invalid (Negative Integer): Negative integer in column4",invalid-email,123,-10,80,Unique6,Mandatory,ABC-1234,Yes | ||
Valid,[email protected],123,60,90,Unique7,,ABC-1234,No | ||
Valid,[email protected],123,70,100,Unique8,Mandatory,ABC-1234,Yes | ||
"Invalid (Range): Value out of range in column5",[email protected],123,80,110,Unique9,Mandatory,XYZ-1234,Maybe | ||
Valid,[email protected],123,90,120,Unique10,Mandatory,ABC-1234,Yes | ||
"Invalid (List): Value not in allowed list in column9",invalid-email,123,100,130,Unique11,Mandatory,ABC-1234,No |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
CREATE TABLE ingest_session ( | ||
ingest_session_id VARCHAR NOT NULL, | ||
ingest_src VARCHAR NOT NULL, | ||
ingest_table_name VARCHAR NOT NULL, | ||
); | ||
|
||
CREATE TABLE ingest_issue ( | ||
session_id VARCHAR NOT NULL, | ||
issue_row INT, | ||
issue_type VARCHAR NOT NULL, | ||
issue_column VARCHAR, | ||
invalid_value VARCHAR, | ||
issue_message VARCHAR NOT NULL, | ||
remediation VARCHAR | ||
); | ||
|
||
INSERT INTO ingest_session (ingest_session_id, ingest_src, ingest_table_name) | ||
VALUES (uuid(), 'assurance_test-fixture-fail.csv', 'synthetic_csv_fail'); | ||
|
||
CREATE TEMPORARY TABLE synthetic_csv_fail AS | ||
SELECT *, row_number() OVER () as src_file_row_number, (SELECT ingest_session_id from ingest_session LIMIT 1) as ingest_session_id | ||
FROM read_csv_auto('assurance_test-fixture-fail.csv', header=true); | ||
|
||
WITH required_column_names_in_src AS ( | ||
SELECT column_name | ||
FROM (VALUES ('column1'), ('column2'), ('column3'), ('column4'), ('column5'), ('column6'), ('column7'), ('column8'), ('column9')) AS required(column_name) | ||
WHERE required.column_name NOT IN ( | ||
SELECT upper(trim(column_name)) | ||
FROM information_schema.columns | ||
WHERE table_name = 'synthetic_csv_fail') | ||
) | ||
INSERT INTO ingest_issue (session_id, issue_type, issue_message, remediation) | ||
SELECT (SELECT ingest_session_id FROM ingest_session LIMIT 1), | ||
'Missing Column', | ||
'Required column ' || column_name || ' is missing in the CSV file.', | ||
'Ensure the CSV contains the column "' || column_name || '"' | ||
FROM required_column_names_in_src; | ||
|
||
-- NOTE: If the above does not pass, meaning not all columns with the proper | ||
-- names are present, do not run the queries below because they assume | ||
-- proper names and existence of columns. | ||
|
||
WITH numeric_value_in_all_rows AS ( | ||
SELECT 'column4' AS issue_column, | ||
column4 AS invalid_value, | ||
src_file_row_number AS issue_row | ||
FROM synthetic_csv_fail | ||
WHERE column4 IS NOT NULL | ||
AND column4 NOT SIMILAR TO '^[+-]?[0-9]+$' | ||
) | ||
INSERT INTO ingest_issue (session_id, issue_type, issue_row, issue_column, invalid_value, issue_message, remediation) | ||
SELECT (SELECT ingest_session_id FROM ingest_session LIMIT 1), | ||
'Data Type Mismatch', | ||
issue_row, | ||
issue_column, | ||
invalid_value, | ||
'Non-integer value "' || invalid_value || '" found in ' || issue_column, | ||
'Convert non-integer values to INTEGER' | ||
FROM numeric_value_in_all_rows; | ||
|
||
WITH proper_dot_com_email_address_in_all_rows AS ( | ||
SELECT 'column2' AS issue_column, | ||
column2 AS invalid_value, | ||
src_file_row_number AS issue_row | ||
FROM synthetic_csv_fail | ||
WHERE column2 IS NOT NULL | ||
AND column2 NOT SIMILAR TO '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.com$' | ||
) | ||
INSERT INTO ingest_issue (session_id, issue_type, issue_row, issue_column, invalid_value, issue_message, remediation) | ||
SELECT (SELECT ingest_session_id FROM ingest_session LIMIT 1), | ||
'Format Mismatch', | ||
issue_row, | ||
issue_column, | ||
invalid_value, | ||
'Invalid email format "' || invalid_value || '" in ' || issue_column, | ||
'Correct the email format' | ||
FROM proper_dot_com_email_address_in_all_rows; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import { testingAsserts as ta } from "../../deps-test.ts"; | ||
import { path } from "../../deps.ts"; | ||
import * as ws from "../../../lib/universal/whitespace.ts"; | ||
import * as tmpl from "../../emit/mod.ts"; | ||
import * as intr from "./integration.ts"; | ||
import * as mod from "./assurance.ts"; | ||
|
||
export class SyntheticAssuranceRules | ||
extends mod.AssuranceRules<tmpl.SqlEmitContext> { | ||
constructor( | ||
readonly govn: { | ||
readonly SQL: ReturnType<typeof tmpl.SQL<tmpl.SqlEmitContext>>; | ||
}, | ||
) { | ||
super(govn); | ||
} | ||
|
||
insertIssue( | ||
from: string, | ||
typeText: string, | ||
messageSql: string, | ||
remediationSql?: string, | ||
) { | ||
return ws.unindentWhitespace(` | ||
INSERT INTO ingest_issue (session_id, issue_type, issue_message, remediation) | ||
SELECT (SELECT ingest_session_id FROM ingest_session LIMIT 1), | ||
'${typeText}', | ||
${messageSql}, | ||
${remediationSql ?? "NULL"} | ||
FROM ${from}`); | ||
} | ||
|
||
insertRowValueIssue( | ||
from: string, | ||
typeText: string, | ||
rowNumSql: string, | ||
columnNameSql: string, | ||
valueSql: string, | ||
messageSql: string, | ||
remediationSql?: string, | ||
) { | ||
return ws.unindentWhitespace(` | ||
INSERT INTO ingest_issue (session_id, issue_type, issue_row, issue_column, invalid_value, issue_message, remediation) | ||
SELECT (SELECT ingest_session_id FROM ingest_session LIMIT 1), | ||
'${typeText}', | ||
${rowNumSql}, | ||
${columnNameSql}, | ||
${valueSql}, | ||
${messageSql}, | ||
${remediationSql ?? "NULL"} | ||
FROM ${from}`); | ||
} | ||
} | ||
|
||
Deno.test("DuckDB Table Content Assurance", () => { | ||
const ctx = tmpl.typicalSqlEmitContext({ sqlDialect: tmpl.duckDbDialect() }); | ||
const ddlOptions = tmpl.typicalSqlTextSupplierOptions<typeof ctx>(); | ||
const ar = new SyntheticAssuranceRules({ | ||
SQL: tmpl.SQL<typeof ctx>(ddlOptions), | ||
}); | ||
const tableName = "synthetic_csv_fail"; | ||
const csvSrcFsPath = "assurance_test-fixture-fail.csv"; | ||
|
||
// deno-fmt-ignore | ||
const ddlDefn = tmpl.SQL<typeof ctx>(ddlOptions)` | ||
CREATE TABLE ingest_session ( | ||
ingest_session_id VARCHAR NOT NULL, | ||
ingest_src VARCHAR NOT NULL, | ||
ingest_table_name VARCHAR NOT NULL, | ||
); | ||
CREATE TABLE ingest_issue ( | ||
session_id VARCHAR NOT NULL, | ||
issue_row INT, | ||
issue_type VARCHAR NOT NULL, | ||
issue_column VARCHAR, | ||
invalid_value VARCHAR, | ||
issue_message VARCHAR NOT NULL, | ||
remediation VARCHAR | ||
); | ||
INSERT INTO ingest_session (ingest_session_id, ingest_src, ingest_table_name) | ||
VALUES (uuid(), '${csvSrcFsPath}', '${tableName}'); | ||
${intr.csvTableIntegration({ | ||
csvSrcFsPath: () => csvSrcFsPath, | ||
tableName, | ||
isTempTable: true, | ||
extraColumnsSql: [ | ||
"row_number() OVER () as src_file_row_number", | ||
"(SELECT ingest_session_id from ingest_session LIMIT 1) as ingest_session_id", | ||
], | ||
})} | ||
${ar.requiredColumnNamesInTable(tableName, | ||
['column1', 'column2', 'column3', | ||
'column4', 'column5', 'column6', | ||
'column7', 'column8', 'column9'])} | ||
-- NOTE: If the above does not pass, meaning not all columns with the proper | ||
-- names are present, do not run the queries below because they assume | ||
-- proper names and existence of columns. | ||
${ar.intValueInAllTableRows(tableName, 'column4')} | ||
${ar.dotComEmailValueInAllTableRows(tableName, 'column2')}`; | ||
|
||
ta.assertEquals( | ||
ddlDefn.SQL(ctx), | ||
Deno.readTextFileSync(path.fromFileUrl( | ||
import.meta.resolve("./assurance_test-fixture.duckdb.sql"), | ||
)), | ||
); | ||
}); |
Oops, something went wrong.