diff --git a/src/diskquota.c b/src/diskquota.c index cc25b70a..ff9101d8 100644 --- a/src/diskquota.c +++ b/src/diskquota.c @@ -981,8 +981,6 @@ create_monitor_db_table(void) ".diskquota_active_table_type AS '$libdir/" DISKQUOTA_BINARY_NAME ".so', 'diskquota_fetch_table_stat' LANGUAGE C VOLATILE;"; - StartTransactionCommand(); - /* * Cache Errors during SPI functions, for example a segment may be down * and current SPI execute will fail. diskquota launcher process should @@ -990,21 +988,12 @@ create_monitor_db_table(void) */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("[diskquota launcher] unable to connect to execute internal query. return code: %d.", - ret_code))); - } - connected = true; - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; + SPI_connect_my(&connected, &pushed_active_snap, &ret); /* debug_query_string need to be set for SPI_execute utility functions. */ debug_query_string = sql; - ret_code = SPI_execute(sql, false, 0); + int ret_code = SPI_execute(sql, false, 0); if (ret_code != SPI_OK_UTILITY) { int saved_errno = errno; @@ -1024,12 +1013,7 @@ create_monitor_db_table(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); - if (pushed_active_snap) PopActiveSnapshot(); - if (ret) - CommitTransactionCommand(); - else - AbortCurrentTransaction(); + SPI_finish_my(connected, pushed_active_snap, ret); debug_query_string = NULL; } @@ -1042,7 +1026,10 @@ static void init_database_list(void) { TupleDesc tupdesc; - int num = 0; + bool connected = false; + bool pushed_active_snap = false; + bool commit = true; + int num = 0; int ret; int i; @@ -1051,16 +1038,8 @@ init_database_list(void) * startup worker for diskquota launcher. If error happens, we just let * launcher exits. */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); + SPI_connect_my(&connected, &pushed_active_snap, &commit); - ret = SPI_connect(); - if (ret != SPI_OK_CONNECT) - { - int saved_errno = errno; - ereport(ERROR, (errmsg("[diskquota launcher] SPI connect error, reason: %s, return code: %d.", - strerror(saved_errno), ret))); - } ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0); if (ret != SPI_OK_SELECT) { @@ -1129,9 +1108,7 @@ init_database_list(void) update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA); } } - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); + SPI_finish_my(connected, pushed_active_snap, commit); /* TODO: clean invalid database */ if (num_db > diskquota_max_workers) DiskquotaLauncherShmem->isDynamicWorker = true; } @@ -1185,8 +1162,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ bool pushed_active_snap = false; bool ret = true; - StartTransactionCommand(); - /* * Cache Errors during SPI functions, for example a segment may be down * and current SPI execute will fail. diskquota launcher process should @@ -1194,15 +1169,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } - connected = true; - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; + SPI_connect_my(&connected, &pushed_active_snap, &ret); switch (local_extension_ddl_message.cmd) { @@ -1235,28 +1202,16 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ } PG_END_TRY(); - if (connected) SPI_finish(); - if (pushed_active_snap) PopActiveSnapshot(); - if (ret) - CommitTransactionCommand(); - else - AbortCurrentTransaction(); + SPI_finish_my(connected, pushed_active_snap, ret); + /* update something in memory after transaction committed */ if (ret) { PG_TRY(); { /* update_monitor_db_mpp runs sql to distribute dbid to segments */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; - Oid dbid = local_extension_ddl_message.dbid; - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } + Oid dbid = local_extension_ddl_message.dbid; + SPI_connect_my(&connected, &pushed_active_snap, &ret); switch (local_extension_ddl_message.cmd) { case CMD_CREATE_EXTENSION: @@ -1278,9 +1233,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ local_extension_ddl_message.cmd))); break; } - SPI_finish(); - if (pushed_active_snap) PopActiveSnapshot(); - CommitTransactionCommand(); } PG_CATCH(); { @@ -1291,6 +1243,8 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ RESUME_INTERRUPTS(); } PG_END_TRY(); + + SPI_finish_my(connected, pushed_active_snap, ret); } DisconnectAndDestroyAllGangs(false); } diff --git a/src/diskquota.h b/src/diskquota.h index 7c2bbb15..284ac6ee 100644 --- a/src/diskquota.h +++ b/src/diskquota.h @@ -319,4 +319,6 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s extern void refresh_monitored_dbid_cache(void); extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, TimestampTz *last_overflow_report); +void SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret); +void SPI_finish_my(bool connected, bool pushed_active_snap, bool ret); #endif diff --git a/src/diskquota_utility.c b/src/diskquota_utility.c index f306cef8..a325237a 100644 --- a/src/diskquota_utility.c +++ b/src/diskquota_utility.c @@ -1235,10 +1235,12 @@ set_per_segment_quota(PG_FUNCTION_ARGS) int worker_spi_get_extension_version(int *major, int *minor) { - StartTransactionCommand(); - int ret = SPI_connect(); - Assert(ret = SPI_OK_CONNECT); - PushActiveSnapshot(GetTransactionSnapshot()); + bool connected = false; + bool pushed_active_snap = false; + bool commit = true; + int ret; + + SPI_connect_my(&connected, &pushed_active_snap, &commit); ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); @@ -1283,9 +1285,7 @@ worker_spi_get_extension_version(int *major, int *minor) ret = 0; out: - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); + SPI_finish_my(connected, pushed_active_snap, commit); return ret; } @@ -1709,3 +1709,32 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time return HASH_FIND; } + +void +SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret) +{ + int rc; + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + if ((rc = SPI_connect()) != SPI_OK_CONNECT) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"), + errdetail("%s", SPI_result_code_string(rc)))); + *connected = true; + PushActiveSnapshot(GetTransactionSnapshot()); + *pushed_active_snap = true; + *ret = true; +} + +void +SPI_finish_my(bool connected, bool pushed_active_snap, bool ret) +{ + int rc; + if (pushed_active_snap) PopActiveSnapshot(); + if (connected && (rc = SPI_finish()) != SPI_OK_FINISH) + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"), + errdetail("%s", SPI_result_code_string(rc)))); + if (ret) + CommitTransactionCommand(); + else + AbortCurrentTransaction(); +} diff --git a/src/quotamodel.c b/src/quotamodel.c index d4dc8a90..6fb34cd5 100644 --- a/src/quotamodel.c +++ b/src/quotamodel.c @@ -678,8 +678,6 @@ check_diskquota_state_is_ready() bool pushed_active_snap = false; bool ret = true; - StartTransactionCommand(); - /* * Cache Errors during SPI functions, for example a segment may be down * and current SPI execute will fail. diskquota worker process should @@ -687,15 +685,8 @@ check_diskquota_state_is_ready() */ PG_TRY(); { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - connected = true; - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; - is_ready = do_check_diskquota_state_is_ready(); + SPI_connect_my(&connected, &pushed_active_snap, &ret); + is_ready = do_check_diskquota_state_is_ready(); } PG_CATCH(); { @@ -708,12 +699,7 @@ check_diskquota_state_is_ready() RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); - if (pushed_active_snap) PopActiveSnapshot(); - if (ret) - CommitTransactionCommand(); - else - AbortCurrentTransaction(); + SPI_finish_my(connected, pushed_active_snap, ret); return is_ready; } @@ -805,8 +791,6 @@ refresh_disk_quota_usage(bool is_init) bool ret = true; HTAB *local_active_table_stat_map = NULL; - StartTransactionCommand(); - /* * Cache Errors during SPI functions, for example a segment may be down * and current SPI execute will fail. diskquota worker process should @@ -814,14 +798,7 @@ refresh_disk_quota_usage(bool is_init) */ PG_TRY(); { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - connected = true; - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; + SPI_connect_my(&connected, &pushed_active_snap, &ret); /* * initialization stage all the tables are active. later loop, only the * tables whose disk size changed will be treated as active @@ -861,13 +838,7 @@ refresh_disk_quota_usage(bool is_init) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); - if (pushed_active_snap) PopActiveSnapshot(); - if (ret) - CommitTransactionCommand(); - else - AbortCurrentTransaction(); - + SPI_finish_my(connected, pushed_active_snap, ret); return; } @@ -1229,7 +1200,7 @@ flush_to_table_size(void) } } /* update the table size by delete+insert in table table_size */ - else if (TableSizeEntryGetFlushFlag(tsentry, i)) + else if (TableSizeEntryGetFlushFlag(tsentry, i)) // { appendStringInfo(&delete_statement, "%s(%u,%d)", (delete_entries_num == 0) ? " " : ", ", tsentry->key.reloid, i); @@ -1417,8 +1388,6 @@ load_quotas(void) bool pushed_active_snap = false; bool ret = true; - StartTransactionCommand(); - /* * Cache Errors during SPI functions, for example a segment may be down * and current SPI execute will fail. diskquota worker process should @@ -1426,15 +1395,7 @@ load_quotas(void) */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("[diskquota] unable to connect to execute SPI query, return code: %d", ret_code))); - } - connected = true; - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_active_snap = true; + SPI_connect_my(&connected, &pushed_active_snap, &ret); do_load_quotas(); } PG_CATCH(); @@ -1448,13 +1409,7 @@ load_quotas(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); - if (pushed_active_snap) PopActiveSnapshot(); - if (ret) - CommitTransactionCommand(); - else - AbortCurrentTransaction(); - + SPI_finish_my(connected, pushed_active_snap, ret); return ret; }