Skip to content

Commit

Permalink
Merge branch 'ADBDEV-6520' into ADBDEV-6443
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Oct 30, 2024
2 parents 7c53b17 + f2e66f2 commit a5f451b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 88 deletions.
33 changes: 16 additions & 17 deletions src/diskquota.c
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ disk_quota_launcher_main(Datum main_arg)
static void
create_monitor_db_table(void)
{
SPI_state state;
int state = 0;
const char *sql;

/*
Expand Down Expand Up @@ -986,7 +986,7 @@ create_monitor_db_table(void)
*/
PG_TRY();
{
SPI_connect_wrapper(&state);
state = SPI_connect_wrapper();

/* debug_query_string need to be set for SPI_execute utility functions. */
debug_query_string = sql;
Expand All @@ -1005,13 +1005,13 @@ create_monitor_db_table(void)
HOLD_INTERRUPTS();
EmitErrorReport();
FlushErrorState();
state.do_commit = false;
state |= is_abort;
debug_query_string = NULL;
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);

debug_query_string = NULL;
}
Expand All @@ -1023,18 +1023,17 @@ create_monitor_db_table(void)
static void
init_database_list(void)
{
SPI_state state;
TupleDesc tupdesc;
int num = 0;
int ret;
int i;
int state = SPI_connect_wrapper();

/*
* Don't catch errors in start_workers_from_dblist. Since this is the
* startup worker for diskquota launcher. If error happens, we just let
* launcher exits.
*/
SPI_connect_wrapper(&state);

ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0);
if (ret != SPI_OK_SELECT)
Expand Down Expand Up @@ -1104,7 +1103,7 @@ init_database_list(void)
update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA);
}
}
SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);
/* TODO: clean invalid database */
if (num_db > diskquota_max_workers) DiskquotaLauncherShmem->isDynamicWorker = true;
}
Expand Down Expand Up @@ -1153,8 +1152,8 @@ process_extension_ddl_message()
static void
do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_extension_ddl_message)
{
SPI_state state;
int old_num_db = num_db;
int state = 0;
int old_num_db = num_db;

/*
* Cache Errors during SPI functions, for example a segment may be down
Expand All @@ -1163,7 +1162,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
*/
PG_TRY();
{
SPI_connect_wrapper(&state);
state = SPI_connect_wrapper();

switch (local_extension_ddl_message.cmd)
{
Expand All @@ -1190,22 +1189,22 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
HOLD_INTERRUPTS();
EmitErrorReport();
FlushErrorState();
state.do_commit = false;
num_db = old_num_db;
state |= is_abort;
num_db = old_num_db;
RESUME_INTERRUPTS();
}
PG_END_TRY();

SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);

/* update something in memory after transaction committed */
if (state.do_commit)
if (!(state & is_abort))
{
PG_TRY();
{
/* update_monitor_db_mpp runs sql to distribute dbid to segments */
Oid dbid = local_extension_ddl_message.dbid;
SPI_connect_wrapper(&state);
state = SPI_connect_wrapper();
switch (local_extension_ddl_message.cmd)
{
case CMD_CREATE_EXTENSION:
Expand Down Expand Up @@ -1234,12 +1233,12 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
HOLD_INTERRUPTS();
EmitErrorReport();
FlushErrorState();
state.do_commit = false;
state |= is_abort;
RESUME_INTERRUPTS();
}
PG_END_TRY();

SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);
}
DisconnectAndDestroyAllGangs(false);
}
Expand Down
16 changes: 8 additions & 8 deletions src/diskquota.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ extern int diskquota_worker_timeout;

#define DatumGetArrayTypePwrapper(X) ((X) ? DatumGetArrayTypeP(X) : NULL)

typedef struct
enum
{
bool is_connected;
bool is_active_snapshot_pushed;
bool do_commit;
bool is_under_transaction;
} SPI_state;
is_connected = 1 << 0,
is_active_snapshot_pushed = 1 << 1,
is_abort = 1 << 2,
is_under_transaction = 1 << 3,
};

typedef enum
{
Expand Down Expand Up @@ -329,7 +329,7 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s
extern void refresh_monitored_dbid_cache(void);
extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message,
TimestampTz *last_overflow_report);
void SPI_connect_wrapper(SPI_state *state);
void SPI_finish_wrapper(const SPI_state *state);
int SPI_connect_wrapper(void);
void SPI_finish_wrapper(int state);
Datum SPI_getbinval_wrapper(HeapTuple tuple, TupleDesc tupdesc, const char *fname, bool allow_null, Oid typeid);
#endif
49 changes: 24 additions & 25 deletions src/diskquota_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1236,12 +1236,8 @@ set_per_segment_quota(PG_FUNCTION_ARGS)
int
worker_spi_get_extension_version(int *major, int *minor)
{
SPI_state state;
int ret;

SPI_connect_wrapper(&state);

ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0);
int state = SPI_connect_wrapper();
int ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0);

if (SPI_processed == 0)
{
Expand Down Expand Up @@ -1284,7 +1280,7 @@ worker_spi_get_extension_version(int *major, int *minor)
ret = 0;

out:
SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);

return ret;
}
Expand Down Expand Up @@ -1647,51 +1643,54 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time
return HASH_FIND;
}

void
SPI_connect_wrapper(SPI_state *state)
int
SPI_connect_wrapper(void)
{
int rc;

state->is_connected = false;
state->is_active_snapshot_pushed = false;
state->is_under_transaction = false;
state->do_commit = true;
int state = 0;

SetCurrentStatementStartTimestamp();

if (!IsTransactionState())
{
StartTransactionCommand();
state->is_under_transaction = true;
state |= is_under_transaction;
}

if (!SPI_context())
{
if ((rc = SPI_connect()) != SPI_OK_CONNECT)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"),
errdetail("%s", SPI_result_code_string(rc))));
state->is_connected = true;
state |= is_connected;
}
if (state->is_under_transaction)

if (state & is_under_transaction)
{
PushActiveSnapshot(GetTransactionSnapshot());
state->is_active_snapshot_pushed = true;
state |= is_active_snapshot_pushed;
}

return state;
}

void
SPI_finish_wrapper(const SPI_state *state)
SPI_finish_wrapper(int state)
{
int rc;

if (state->is_connected && (rc = SPI_finish()) != SPI_OK_FINISH)
if ((state & is_connected) && (rc = SPI_finish()) != SPI_OK_FINISH)
ereport(WARNING, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"),
errdetail("%s", SPI_result_code_string(rc))));
if (state->is_active_snapshot_pushed) PopActiveSnapshot();
if (state->is_under_transaction)

if (state & is_active_snapshot_pushed) PopActiveSnapshot();

if (state & is_under_transaction)
{
if (state->do_commit)
CommitTransactionCommand();
else
if (state & is_abort)
AbortCurrentTransaction();
else
CommitTransactionCommand();
}
}

Expand Down
68 changes: 30 additions & 38 deletions src/quotamodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ vacuum_disk_quota_model(uint32 id)
bool
check_diskquota_state_is_ready()
{
SPI_state state;
bool is_ready = false;
int state = 0;
bool is_ready = false;

/*
* Cache Errors during SPI functions, for example a segment may be down
Expand All @@ -683,7 +683,7 @@ check_diskquota_state_is_ready()
*/
PG_TRY();
{
SPI_connect_wrapper(&state);
state = SPI_connect_wrapper();
is_ready = do_check_diskquota_state_is_ready();
}
PG_CATCH();
Expand All @@ -692,12 +692,12 @@ check_diskquota_state_is_ready()
HOLD_INTERRUPTS();
EmitErrorReport();
FlushErrorState();
state.do_commit = false;
state |= is_abort;
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);
return is_ready;
}

Expand Down Expand Up @@ -1136,51 +1136,43 @@ calculate_table_disk_usage(StringInfo active_oids, bool is_init)
static void
delete_from_table_size_map(ArrayBuildState *tableids, ArrayBuildState *segids)
{
SPI_state state;
int ret;
Datum tableid = makeArrayResult(tableids, CurrentMemoryContext);
Datum segid = makeArrayResult(segids, CurrentMemoryContext);

SPI_connect_wrapper(&state);
ret = SPI_execute_with_args(
"delete from diskquota.table_size where (tableid, segid) in (select * from unnest($1, $2))", 2,
(Oid[]){OIDARRAYOID, INT2ARRAYOID}, (Datum[]){tableid, segid}, NULL, false, 0);
Datum tableid = makeArrayResult(tableids, CurrentMemoryContext);
Datum segid = makeArrayResult(segids, CurrentMemoryContext);
int state = SPI_connect_wrapper();
int ret = SPI_execute_with_args(
"delete from diskquota.table_size where (tableid, segid) in (select * from unnest($1, $2))", 2,
(Oid[]){OIDARRAYOID, INT2ARRAYOID}, (Datum[]){tableid, segid}, NULL, false, 0);
if (ret != SPI_OK_DELETE)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota] delete_from_table_size_map SPI_execute failed: error code %d", ret)));
SPI_finish_wrapper(&state);

SPI_finish_wrapper(state);
pfree(DatumGetPointer(tableid));
pfree(DatumGetPointer(segid));
}

static void
update_table_size_map(ArrayBuildState *tableids, ArrayBuildState *sizes, ArrayBuildState *segids)
{
SPI_state state;
int ret;
Datum tableid = makeArrayResult(tableids, CurrentMemoryContext);
Datum size = makeArrayResult(sizes, CurrentMemoryContext);
Datum segid = makeArrayResult(segids, CurrentMemoryContext);

SPI_connect_wrapper(&state);
ret = SPI_execute_with_args(
"delete from diskquota.table_size where (tableid, segid) in (select * from unnest($1, $2))", 2,
(Oid[]){OIDARRAYOID, INT2ARRAYOID}, (Datum[]){tableid, segid}, NULL, false, 0);
Datum tableid = makeArrayResult(tableids, CurrentMemoryContext);
Datum size = makeArrayResult(sizes, CurrentMemoryContext);
Datum segid = makeArrayResult(segids, CurrentMemoryContext);
int state = SPI_connect_wrapper();
int ret = SPI_execute_with_args(
"delete from diskquota.table_size where (tableid, segid) in (select * from unnest($1, $2))", 2,
(Oid[]){OIDARRAYOID, INT2ARRAYOID}, (Datum[]){tableid, segid}, NULL, false, 0);
if (ret != SPI_OK_DELETE)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota] delete_from_table_size_map SPI_execute failed: error code %d", ret)));
SPI_finish_wrapper(&state);
SPI_finish_wrapper(state);

SPI_connect_wrapper(&state);
ret = SPI_execute_with_args("insert into diskquota.table_size select * from unnest($1, $2, $3)", 3,
(Oid[]){OIDARRAYOID, INT8ARRAYOID, INT2ARRAYOID}, (Datum[]){tableid, size, segid}, NULL,
false, 0);
state = SPI_connect_wrapper();
ret = SPI_execute_with_args("insert into diskquota.table_size select * from unnest($1, $2, $3)", 3,
(Oid[]){OIDARRAYOID, INT8ARRAYOID, INT2ARRAYOID}, (Datum[]){tableid, size, segid}, NULL,
false, 0);
if (ret != SPI_OK_INSERT)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota] insert_into_table_size_map SPI_execute failed: error code %d", ret)));
SPI_finish_wrapper(&state);

SPI_finish_wrapper(state);
pfree(DatumGetPointer(tableid));
pfree(DatumGetPointer(size));
pfree(DatumGetPointer(segid));
Expand Down Expand Up @@ -1386,7 +1378,7 @@ truncateStringInfo(StringInfo str, int nchars)
static bool
load_quotas(void)
{
SPI_state state;
int state = 0;

/*
* Cache Errors during SPI functions, for example a segment may be down
Expand All @@ -1395,7 +1387,7 @@ load_quotas(void)
*/
PG_TRY();
{
SPI_connect_wrapper(&state);
state = SPI_connect_wrapper();
do_load_quotas();
}
PG_CATCH();
Expand All @@ -1404,13 +1396,13 @@ load_quotas(void)
HOLD_INTERRUPTS();
EmitErrorReport();
FlushErrorState();
state.do_commit = false;
state |= is_abort;
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_wrapper(&state);
return state.do_commit;
SPI_finish_wrapper(state);
return !(state & is_abort);
}

/*
Expand Down

0 comments on commit a5f451b

Please sign in to comment.