From 855d38825fa226995c4f39a9b0005ea40396ac35 Mon Sep 17 00:00:00 2001 From: jean-lopes Date: Wed, 13 Jul 2022 11:24:59 -0300 Subject: [PATCH] changed foreign key constraints validation --- periods--1.2.sql | 155 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 42 deletions(-) diff --git a/periods--1.2.sql b/periods--1.2.sql index 33f5f41..50b2b24 100644 --- a/periods--1.2.sql +++ b/periods--1.2.sql @@ -2106,23 +2106,51 @@ $function$ DECLARE foreign_key_info record; column_name name; - has_nulls boolean; - uk_column_names text[]; - uk_column_values text[]; + uk_column_names_arr text[]; + uk_column_values_arr text[]; + uk_column_names text; + uk_column_values text; fk_column_names text; + min_uk_start_value text; + max_uk_end_value text; violation boolean; - still_matches boolean; - QSQL CONSTANT text := - 'SELECT EXISTS ( ' - ' SELECT FROM %1$I.%2$I AS t ' - ' WHERE ROW(%3$s) = ROW(%6$s) ' - ' AND t.%4$I <= %7$L ' - ' AND t.%5$I >= %8$L ' - '%9$s' + QSQL_UK_MINMAX CONSTANT text := + 'SELECT MIN(%5$I), MAX(%6$I) ' + ' FROM %1$I.%2$I as t ' + ' WHERE ROW(%3$s) = ROW(%4$s)'; + + QSQL_FK_EXISTS CONSTANT text := + 'SELECT EXISTS( ' + ' SELECT FROM %1$I.%2$I as t ' + ' WHERE ROW(%3$s) = ROW(%4$s)' + ')'; + + QSQL_FK_OUT_OF_UK_MINMAX_RANGE CONSTANT text := + 'SELECT EXISTS( ' + ' SELECT ' + ' FROM %1$I.%2$I as t ' + ' WHERE ROW(%3$s) = ROW(%4$s) ' + ' AND NOT periods.contains(%5$L, %6$L, %7$I, %8$I) ' ')'; + QSQL_FK_CONTAINS_UK_HOLES CONSTANT text := + 'SELECT EXISTS( ' + ' WITH holes AS ( ' + ' SELECT %6$I AS "s", next_s AS "e" ' + ' FROM (SELECT %6$I, LEAD(%5$I, 1) OVER (ORDER BY %5$I) "next_s" ' + ' FROM %1$I.%2$I ' + ' WHERE ROW(%3$s) = ROW(%4$s)) t ' + ' WHERE (t.next_s IS NOT NULL AND t.next_s <> %6$I) ' + ' ) ' + ' SELECT FROM %7$I.%8$I t' + ' WHERE ROW(%9$s) = ROW(%4$s)' + ' AND EXISTS(SELECT ' + ' FROM holes h ' + ' WHERE PERIODS.CONTAINS(%10$I, %11$I, h.s, h.e)) ' + ')'; BEGIN + -- gets metadata about the periods, foreign-keys and unique-keys SELECT fc.oid AS fk_table_oid, fn.nspname AS fk_schema_name, fc.relname AS fk_table_name, @@ -2130,7 +2158,6 @@ BEGIN fp.period_name AS fk_period_name, fp.start_column_name AS fk_start_column_name, fp.end_column_name AS fk_end_column_name, - uc.oid AS uk_table_oid, un.nspname AS uk_schema_name, uc.relname AS uk_table_name, @@ -2138,7 +2165,6 @@ BEGIN up.period_name AS uk_period_name, up.start_column_name AS uk_start_column_name, up.end_column_name AS uk_end_column_name, - fk.match_type, fk.update_action, fk.delete_action @@ -2166,40 +2192,87 @@ BEGIN */ RETURN true; END IF; - uk_column_names := uk_column_names || ('t.' || quote_ident(column_name)); - uk_column_values := uk_column_values || quote_literal(row_data->>column_name); + uk_column_names_arr := uk_column_names_arr || ('t.' || quote_ident(column_name)); + uk_column_values_arr := uk_column_values_arr || quote_literal(row_data->>column_name); END LOOP; - IF is_update AND foreign_key_info.update_action = 'NO ACTION' THEN - EXECUTE format(QSQL, foreign_key_info.uk_schema_name, - foreign_key_info.uk_table_name, - array_to_string(uk_column_names, ', '), - foreign_key_info.uk_start_column_name, - foreign_key_info.uk_end_column_name, - array_to_string(uk_column_values, ', '), - row_data->>foreign_key_info.uk_start_column_name, - row_data->>foreign_key_info.uk_end_column_name, - 'FOR KEY SHARE') - INTO still_matches; - - IF still_matches THEN - RETURN true; - END IF; - END IF; + uk_column_names := array_to_string(uk_column_names_arr, ', '); + uk_column_values := array_to_string(uk_column_values_arr, ', '); + + -- query the range that the uk currently spans: + -- time: 1 2 3 4 5 6 | in ranges + -- uk: *** ******* | [1,2), [3,6) + -- in this case would return [1,6). + EXECUTE format(QSQL_UK_MINMAX, + foreign_key_info.uk_schema_name, + foreign_key_info.uk_table_name, + uk_column_names, + uk_column_values, + foreign_key_info.uk_start_column_name, + foreign_key_info.uk_end_column_name) + INTO min_uk_start_value, max_uk_end_value; SELECT string_agg('t.' || quote_ident(u.c), ', ' ORDER BY u.ordinality) INTO fk_column_names FROM unnest(foreign_key_info.fk_column_names) WITH ORDINALITY AS u (c, ordinality); - EXECUTE format(QSQL, foreign_key_info.fk_schema_name, - foreign_key_info.fk_table_name, - fk_column_names, - foreign_key_info.fk_start_column_name, - foreign_key_info.fk_end_column_name, - array_to_string(uk_column_values, ', '), - row_data->>foreign_key_info.uk_start_column_name, - row_data->>foreign_key_info.uk_end_column_name, - '') + -- min|max can be null if `uk` has no record for the specified filter, + -- which means that should not exists any `fk` record for the same filter. + IF min_uk_start_value IS NULL OR max_uk_end_value IS NULL THEN + EXECUTE format(QSQL_FK_EXISTS, foreign_key_info.fk_schema_name, + foreign_key_info.fk_table_name, + fk_column_names, + uk_column_values) + INTO violation; + + IF violation THEN + RAISE EXCEPTION 'update or delete on table "%" violates foreign key constraint "%" on table "%"', + foreign_key_info.uk_table_oid::regclass, + foreign_key_name, + foreign_key_info.fk_table_oid::regclass; + END IF; + END IF; + + -- check if any fk record is not contained in the range `[min(uk.start), max(uk.end)).` + -- time: 1 2 3 4 5 6 + -- uk: ******* + -- fk: ^-----^ + -- any fk record with start or end outside the ^ markers are considered a violation. + EXECUTE format(QSQL_FK_OUT_OF_UK_MINMAX_RANGE, foreign_key_info.fk_schema_name, + foreign_key_info.fk_table_name, + fk_column_names, + uk_column_values, + min_uk_start_value, + max_uk_end_value, + foreign_key_info.uk_start_column_name, + foreign_key_info.uk_end_column_name) + INTO violation; + + IF violation THEN + RAISE EXCEPTION 'update or delete on table "%" violates foreign key constraint "%" on table "%"', + foreign_key_info.uk_table_oid::regclass, + foreign_key_name, + foreign_key_info.fk_table_oid::regclass; + END IF; + + -- check if any fk record contains any hole periods from uk + -- time: 1 2 3 4 5 6 | in ranges + -- uk: *** *** *** | [1,2), [3,4), [5,6) + -- fk: ^-----^ | [1,4) + -- in this case, we have the following holes in `uk`: [2,3), [4,5) + -- if we have any `fk` record that contains such holes (such as [1,4)), + -- this mean that this record is referencing the `uk` for a period that it not existed. + EXECUTE format(QSQL_FK_CONTAINS_UK_HOLES, foreign_key_info.uk_schema_name, + foreign_key_info.uk_table_name, + replace(uk_column_names, 't.', ''), + uk_column_values, + foreign_key_info.uk_start_column_name, + foreign_key_info.uk_end_column_name, + foreign_key_info.fk_schema_name, + foreign_key_info.fk_table_name, + fk_column_names, + foreign_key_info.fk_start_column_name, + foreign_key_info.fk_end_column_name) INTO violation; IF violation THEN @@ -2359,7 +2432,6 @@ BEGIN END; $function$; - CREATE FUNCTION periods.add_system_versioning( table_class regclass, history_table_name name DEFAULT NULL, @@ -3694,4 +3766,3 @@ AS $function$ SELECT sv1 = ev2; $function$; -