Skip to content

Commit

Permalink
Use short-lived SPI contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Oct 28, 2024
1 parent 24546b2 commit 1701482
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 122 deletions.
78 changes: 16 additions & 62 deletions src/diskquota.c
Original file line number Diff line number Diff line change
Expand Up @@ -981,30 +981,19 @@ create_monitor_db_table(void)
".diskquota_active_table_type AS '$libdir/" DISKQUOTA_BINARY_NAME
".so', 'diskquota_fetch_table_stat' LANGUAGE C VOLATILE;";

StartTransactionCommand();

/*
* Cache Errors during SPI functions, for example a segment may be down
* and current SPI execute will fail. diskquota launcher process should
* tolerate this kind of errors.
*/
PG_TRY();
{
int ret_code = SPI_connect();
if (ret_code != SPI_OK_CONNECT)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota launcher] unable to connect to execute internal query. return code: %d.",
ret_code)));
}
connected = true;
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
SPI_connect_my(&connected, &pushed_active_snap, &ret);

/* debug_query_string need to be set for SPI_execute utility functions. */
debug_query_string = sql;

ret_code = SPI_execute(sql, false, 0);
int ret_code = SPI_execute(sql, false, 0);
if (ret_code != SPI_OK_UTILITY)
{
int saved_errno = errno;
Expand All @@ -1024,12 +1013,7 @@ create_monitor_db_table(void)
RESUME_INTERRUPTS();
}
PG_END_TRY();
if (connected) SPI_finish();
if (pushed_active_snap) PopActiveSnapshot();
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
SPI_finish_my(connected, pushed_active_snap, ret);

debug_query_string = NULL;
}
Expand All @@ -1042,7 +1026,10 @@ static void
init_database_list(void)
{
TupleDesc tupdesc;
int num = 0;
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
int num = 0;
int ret;
int i;

Expand All @@ -1051,16 +1038,8 @@ init_database_list(void)
* startup worker for diskquota launcher. If error happens, we just let
* launcher exits.
*/
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
SPI_connect_my(&connected, &pushed_active_snap, &commit);

ret = SPI_connect();
if (ret != SPI_OK_CONNECT)
{
int saved_errno = errno;
ereport(ERROR, (errmsg("[diskquota launcher] SPI connect error, reason: %s, return code: %d.",
strerror(saved_errno), ret)));
}
ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0);
if (ret != SPI_OK_SELECT)
{
Expand Down Expand Up @@ -1129,9 +1108,7 @@ init_database_list(void)
update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA);
}
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
SPI_finish_my(connected, pushed_active_snap, commit);
/* TODO: clean invalid database */
if (num_db > diskquota_max_workers) DiskquotaLauncherShmem->isDynamicWorker = true;
}
Expand Down Expand Up @@ -1185,24 +1162,14 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
bool pushed_active_snap = false;
bool ret = true;

StartTransactionCommand();

/*
* Cache Errors during SPI functions, for example a segment may be down
* and current SPI execute will fail. diskquota launcher process should
* tolerate this kind of errors.
*/
PG_TRY();
{
int ret_code = SPI_connect();
if (ret_code != SPI_OK_CONNECT)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unable to connect to execute internal query. return code: %d.", ret_code)));
}
connected = true;
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
SPI_connect_my(&connected, &pushed_active_snap, &ret);

switch (local_extension_ddl_message.cmd)
{
Expand Down Expand Up @@ -1235,28 +1202,16 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
}
PG_END_TRY();

if (connected) SPI_finish();
if (pushed_active_snap) PopActiveSnapshot();
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
SPI_finish_my(connected, pushed_active_snap, ret);

/* update something in memory after transaction committed */
if (ret)
{
PG_TRY();
{
/* update_monitor_db_mpp runs sql to distribute dbid to segments */
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
Oid dbid = local_extension_ddl_message.dbid;
int ret_code = SPI_connect();
if (ret_code != SPI_OK_CONNECT)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unable to connect to execute internal query. return code: %d.", ret_code)));
}
Oid dbid = local_extension_ddl_message.dbid;
SPI_connect_my(&connected, &pushed_active_snap, &ret);
switch (local_extension_ddl_message.cmd)
{
case CMD_CREATE_EXTENSION:
Expand All @@ -1278,9 +1233,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
local_extension_ddl_message.cmd)));
break;
}
SPI_finish();
if (pushed_active_snap) PopActiveSnapshot();
CommitTransactionCommand();
}
PG_CATCH();
{
Expand All @@ -1291,6 +1243,8 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
RESUME_INTERRUPTS();
}
PG_END_TRY();

SPI_finish_my(connected, pushed_active_snap, ret);
}
DisconnectAndDestroyAllGangs(false);
}
Expand Down
2 changes: 2 additions & 0 deletions src/diskquota.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,6 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s
extern void refresh_monitored_dbid_cache(void);
extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message,
TimestampTz *last_overflow_report);
void SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret);
void SPI_finish_my(bool connected, bool pushed_active_snap, bool ret);
#endif
43 changes: 36 additions & 7 deletions src/diskquota_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,10 +1235,12 @@ set_per_segment_quota(PG_FUNCTION_ARGS)
int
worker_spi_get_extension_version(int *major, int *minor)
{
StartTransactionCommand();
int ret = SPI_connect();
Assert(ret = SPI_OK_CONNECT);
PushActiveSnapshot(GetTransactionSnapshot());
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
int ret;

SPI_connect_my(&connected, &pushed_active_snap, &commit);

ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0);

Expand Down Expand Up @@ -1283,9 +1285,7 @@ worker_spi_get_extension_version(int *major, int *minor)
ret = 0;

out:
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
SPI_finish_my(connected, pushed_active_snap, commit);

return ret;
}
Expand Down Expand Up @@ -1709,3 +1709,32 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time

return HASH_FIND;
}

void
SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret)
{
int rc;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
if ((rc = SPI_connect()) != SPI_OK_CONNECT)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"),
errdetail("%s", SPI_result_code_string(rc))));
*connected = true;
PushActiveSnapshot(GetTransactionSnapshot());
*pushed_active_snap = true;
*ret = true;
}

void
SPI_finish_my(bool connected, bool pushed_active_snap, bool ret)
{
int rc;
if (pushed_active_snap) PopActiveSnapshot();
if (connected && (rc = SPI_finish()) != SPI_OK_FINISH)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"),
errdetail("%s", SPI_result_code_string(rc))));
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
}
61 changes: 8 additions & 53 deletions src/quotamodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,24 +678,15 @@ check_diskquota_state_is_ready()
bool pushed_active_snap = false;
bool ret = true;

StartTransactionCommand();

/*
* Cache Errors during SPI functions, for example a segment may be down
* and current SPI execute will fail. diskquota worker process should
* tolerate this kind of errors and continue to check at the next loop.
*/
PG_TRY();
{
if (SPI_OK_CONNECT != SPI_connect())
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query")));
}
connected = true;
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
is_ready = do_check_diskquota_state_is_ready();
SPI_connect_my(&connected, &pushed_active_snap, &ret);
is_ready = do_check_diskquota_state_is_ready();
}
PG_CATCH();
{
Expand All @@ -708,12 +699,7 @@ check_diskquota_state_is_ready()
RESUME_INTERRUPTS();
}
PG_END_TRY();
if (connected) SPI_finish();
if (pushed_active_snap) PopActiveSnapshot();
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
SPI_finish_my(connected, pushed_active_snap, ret);
return is_ready;
}

Expand Down Expand Up @@ -805,23 +791,14 @@ refresh_disk_quota_usage(bool is_init)
bool ret = true;
HTAB *local_active_table_stat_map = NULL;

StartTransactionCommand();

/*
* Cache Errors during SPI functions, for example a segment may be down
* and current SPI execute will fail. diskquota worker process should
* tolerate this kind of errors and continue to check at the next loop.
*/
PG_TRY();
{
if (SPI_OK_CONNECT != SPI_connect())
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query")));
}
connected = true;
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
SPI_connect_my(&connected, &pushed_active_snap, &ret);
/*
* initialization stage all the tables are active. later loop, only the
* tables whose disk size changed will be treated as active
Expand Down Expand Up @@ -861,13 +838,7 @@ refresh_disk_quota_usage(bool is_init)
RESUME_INTERRUPTS();
}
PG_END_TRY();
if (connected) SPI_finish();
if (pushed_active_snap) PopActiveSnapshot();
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();

SPI_finish_my(connected, pushed_active_snap, ret);
return;
}

Expand Down Expand Up @@ -1229,7 +1200,7 @@ flush_to_table_size(void)
}
}
/* update the table size by delete+insert in table table_size */
else if (TableSizeEntryGetFlushFlag(tsentry, i))
else if (TableSizeEntryGetFlushFlag(tsentry, i)) //
{
appendStringInfo(&delete_statement, "%s(%u,%d)", (delete_entries_num == 0) ? " " : ", ",
tsentry->key.reloid, i);
Expand Down Expand Up @@ -1417,24 +1388,14 @@ load_quotas(void)
bool pushed_active_snap = false;
bool ret = true;

StartTransactionCommand();

/*
* Cache Errors during SPI functions, for example a segment may be down
* and current SPI execute will fail. diskquota worker process should
* tolerate this kind of errors and continue to check at the next loop.
*/
PG_TRY();
{
int ret_code = SPI_connect();
if (ret_code != SPI_OK_CONNECT)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota] unable to connect to execute SPI query, return code: %d", ret_code)));
}
connected = true;
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
SPI_connect_my(&connected, &pushed_active_snap, &ret);
do_load_quotas();
}
PG_CATCH();
Expand All @@ -1448,13 +1409,7 @@ load_quotas(void)
RESUME_INTERRUPTS();
}
PG_END_TRY();
if (connected) SPI_finish();
if (pushed_active_snap) PopActiveSnapshot();
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();

SPI_finish_my(connected, pushed_active_snap, ret);
return ret;
}

Expand Down

0 comments on commit 1701482

Please sign in to comment.