Skip to content

Commit

Permalink
ADBDEV-3685 Error handling for disqkuota worker startup stage (#20)
Browse files Browse the repository at this point in the history
During diskquota worker's first run the initial set of active tables
with their sizes is being loaded from diskquota.table_size table in
order to warm up diskquota rejectmap and other shared memory objects.
If an error occurs during this initialization process, the error will be
ignored in PG_CATCH() block. Because of that local_active_table_stat_map
will not be filled properly. And at the next loop iteration tables, that
are not in acitive table list will be marked as irrelevant and to be deleted
both from table_size_map and table_size table in flush_to_table_size function.
In case when the inital set of active tables is huge (thousands of tables),
this error ignorance could lead to the formation of a too long
delete statement, which the SPI executor won't be able to process due to
memory limits. And this case can lead to worker's segmentation fault or
other errorneous behaviour of whole extension.

This commit proposes the handling of the initialization errors, which
occur during worker's first run. In the DiskquotaDBEntry structure the
bool variable "corrupted" is added in order to indicate, that the
worker wasn't able to initialize itself on given database. And
DiskquotaDBEntry also is now passed to refresh_disk_quota_model function
from worker main loop, because one need to change the state of dbEntry.
The state is changed when the refresh_disk_quota_usage function catches
an error, which occured during the initialization step, in PG_CATCH()
block. And after the error is catched, the "corrupted" flag is set in
given dbEntry, and then the error is rethrown. This leads to worker
process termination. The launcher will not be able to start it again,
because added flag is set in the database structure, and this flag is
being checked inside the disk_quota_launcher_main function. The flag
can be reseted by calling resetBackgroundWorkerCorruption function,
which is currently called in SIGHUP handler.
  • Loading branch information
bimboterminator1 authored Jun 29, 2023
1 parent be945ba commit 3b06e37
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 15 deletions.
34 changes: 26 additions & 8 deletions src/diskquota.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ static void vacuum_db_entry(DiskquotaDBEntry *db);
static void init_bgworker_handles(void);
static BackgroundWorkerHandle *get_bgworker_handle(uint32 worker_id);
static void free_bgworker_handle(uint32 worker_id);
static void resetBackgroundWorkerCorruption(void);
#if GP_VERSION_NUM < 70000
/* WaitForBackgroundWorkerShutdown is copied from gpdb7 */
static BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle);
Expand Down Expand Up @@ -525,7 +526,7 @@ disk_quota_worker_main(Datum main_arg)
if (!diskquota_is_paused())
{
/* Refresh quota model with init mode */
refresh_disk_quota_model(!MyWorkerInfo->dbEntry->inited);
refresh_disk_quota_model(MyWorkerInfo->dbEntry);
MyWorkerInfo->dbEntry->inited = true;
is_gang_destroyed = false;
}
Expand Down Expand Up @@ -762,6 +763,7 @@ disk_quota_launcher_main(Datum main_arg)
{
elog(DEBUG1, "[diskquota] got sighup");
got_sighup = false;
resetBackgroundWorkerCorruption();
ProcessConfigFile(PGC_SIGHUP);
}

Expand All @@ -787,11 +789,12 @@ disk_quota_launcher_main(Datum main_arg)
* When curDB->in_use is false means dbEtnry has been romoved
* When curDB->dbid doesn't equtal curDBId, it means the slot has
* been used by another db
*
* When curDB->corrupted is true means worker couldn't initialize
* the extension in the first run.
* For the above conditions, we just skip this loop and try to fetch
* next db to run.
*/
if (curDB == NULL || !curDB->in_use || curDB->dbid != curDBId)
if (curDB == NULL || !curDB->in_use || curDB->dbid != curDBId || curDB->corrupted)
{
advance_one_db = true;
continue;
Expand Down Expand Up @@ -1796,7 +1799,9 @@ next_db(DiskquotaDBEntry *curDB)
if (nextSlot >= MAX_NUM_MONITORED_DB) nextSlot = 0;
DiskquotaDBEntry *dbEntry = &DiskquotaLauncherShmem->dbArray[nextSlot];
nextSlot++;
if (!dbEntry->in_use || dbEntry->workerId != INVALID_WORKER_ID || dbEntry->dbid == InvalidOid) continue;
if (!dbEntry->in_use || dbEntry->workerId != INVALID_WORKER_ID || dbEntry->dbid == InvalidOid ||
dbEntry->corrupted)
continue;
/* TODO: should release the invalid db related things */
if (!is_valid_dbid(dbEntry->dbid)) continue;
result = dbEntry;
Expand Down Expand Up @@ -1860,10 +1865,11 @@ static void
vacuum_db_entry(DiskquotaDBEntry *db)
{
if (db == NULL) return;
db->dbid = InvalidOid;
db->inited = false;
db->workerId = INVALID_WORKER_ID;
db->in_use = false;
db->dbid = InvalidOid;
db->inited = false;
db->workerId = INVALID_WORKER_ID;
db->in_use = false;
db->corrupted = false;
}

static void
Expand Down Expand Up @@ -1898,6 +1904,18 @@ free_bgworker_handle(uint32 worker_id)
}
}

static void
resetBackgroundWorkerCorruption(void)
{
LWLockAcquire(diskquota_locks.dblist_lock, LW_EXCLUSIVE);
for (int i = 0; i < MAX_NUM_MONITORED_DB; i++)
{
DiskquotaDBEntry *dbEntry = &DiskquotaLauncherShmem->dbArray[i];
if (dbEntry->corrupted) dbEntry->corrupted = false;
}
LWLockRelease(diskquota_locks.dblist_lock);
}

#if GP_VERSION_NUM < 70000
static BgwHandleStatus
WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
Expand Down
7 changes: 4 additions & 3 deletions src/diskquota.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ struct DiskquotaDBEntry
TimestampTz last_run_time;
int16 cost; // ms

bool inited; // this entry is inited, will set to true after the worker finish the frist run.
bool in_use; // this slot is in using. AKA dbid != 0
bool inited; // this entry is inited, will set to true after the worker finish the frist run.
bool in_use; // this slot is in using. AKA dbid != 0
bool corrupted; // consider this entry as invalid to start the worker on
};

typedef enum MonitorDBStatus
Expand Down Expand Up @@ -249,7 +250,7 @@ extern void invalidate_database_rejectmap(Oid dbid);
/* quota model interface*/
extern void init_disk_quota_shmem(void);
extern void init_disk_quota_model(uint32 id);
extern void refresh_disk_quota_model(bool force);
extern void refresh_disk_quota_model(DiskquotaDBEntry *dbEntry);
extern bool check_diskquota_state_is_ready(void);
extern bool quota_check_common(Oid reloid, RelFileNode *relfilenode);

Expand Down
1 change: 1 addition & 0 deletions src/gp_activetable.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ gp_fetch_active_tables(bool is_init)

if (is_init)
{
SIMPLE_FAULT_INJECTOR("diskquota_worker_initialization");
load_table_size(local_table_stats_map);
}
else
Expand Down
19 changes: 15 additions & 4 deletions src/quotamodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ static void clear_all_quota_maps(void);
static void transfer_table_for_quota(int64 totalsize, QuotaType type, Oid *old_keys, Oid *new_keys, int16 segid);

/* functions to refresh disk quota model*/
static void refresh_disk_quota_usage(bool is_init);
static void refresh_disk_quota_usage(DiskquotaDBEntry *dbEntry);
static void calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map);
static void flush_to_table_size(void);
static bool flush_local_reject_map(void);
Expand Down Expand Up @@ -761,8 +761,10 @@ do_check_diskquota_state_is_ready(void)
* recalculate the changed disk usage.
*/
void
refresh_disk_quota_model(bool is_init)
refresh_disk_quota_model(DiskquotaDBEntry *dbEntry)
{
bool is_init = !dbEntry->inited;

SEGCOUNT = getgpsegmentCount();
if (SEGCOUNT <= 0)
{
Expand All @@ -773,7 +775,7 @@ refresh_disk_quota_model(bool is_init)
/* skip refresh model when load_quotas failed */
if (load_quotas())
{
refresh_disk_quota_usage(is_init);
refresh_disk_quota_usage(dbEntry);
}
if (is_init) ereport(LOG, (errmsg("[diskquota] initialize quota model finished")));
}
Expand All @@ -785,11 +787,12 @@ refresh_disk_quota_model(bool is_init)
* process is constructing quota model.
*/
static void
refresh_disk_quota_usage(bool is_init)
refresh_disk_quota_usage(DiskquotaDBEntry *dbEntry)
{
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
bool is_init = !dbEntry->inited;
HTAB *local_active_table_stat_map = NULL;

StartTransactionCommand();
Expand Down Expand Up @@ -841,6 +844,14 @@ refresh_disk_quota_usage(bool is_init)
}
PG_CATCH();
{
/* Initialization failed. */
if (is_init)
{
LWLockAcquire(diskquota_locks.dblist_lock, LW_EXCLUSIVE);
dbEntry->corrupted = true;
LWLockRelease(diskquota_locks.dblist_lock);
PG_RE_THROW();
}
/* Prevents interrupts while cleaning up */
HOLD_INTERRUPTS();
EmitErrorReport();
Expand Down
46 changes: 46 additions & 0 deletions tests/isolation2/expected/test_worker_init_failure.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
--
-- Tests for error handling when the worker catches the error during
-- its first run.
--

-- Function checking whether worker on given db is up
CREATE or REPLACE LANGUAGE plpython2u;
CREATE
CREATE or REPLACE FUNCTION check_worker_presence(dbname text, wait_time int) RETURNS boolean AS $$ import psutil import time worker_name = 'bgworker: [diskquota] ' + dbname time.sleep(wait_time) for proc in psutil.process_iter(): try: if 'postgres' in proc.name().lower(): for val in proc.cmdline(): if worker_name in val: return True except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return False $$ LANGUAGE plpython2u EXECUTE ON MASTER;
CREATE

-- Test diskquota behavior when an error occurs during the worker's first run.
-- The error leads to process termination. And launcher won't start it again
-- until extension reload or SIGHUP signal.
CREATE EXTENSION diskquota;
CREATE
SELECT check_worker_presence(current_database(), 0);
check_worker_presence
-----------------------
t
(1 row)
SELECT gp_inject_fault('diskquota_worker_initialization', 'error', dbid) FROM gp_segment_configuration WHERE role='p' AND content=-1;
gp_inject_fault
-----------------
Success:
(1 row)
SELECT diskquota.init_table_size_table();
init_table_size_table
-----------------------

(1 row)
SELECT check_worker_presence(current_database(), current_setting('diskquota.worker_timeout')::int / 2);
check_worker_presence
-----------------------
f
(1 row)
-- Reload configuration and check that worker is up again
!\retcode gpstop -u;
(exited with code 0)
SELECT check_worker_presence(current_database(), current_setting('diskquota.worker_timeout')::int / 2);
check_worker_presence
-----------------------
t
(1 row)
DROP EXTENSION diskquota;
DROP
1 change: 1 addition & 0 deletions tests/isolation2/isolation2_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ test: test_per_segment_config
test: test_relation_cache
test: test_ereport_from_seg
test: test_drop_extension
test: test_worker_init_failure
test: reset_config
40 changes: 40 additions & 0 deletions tests/isolation2/sql/test_worker_init_failure.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
--
-- Tests for error handling when the worker catches the error during
-- its first run.
--

-- Function checking whether worker on given db is up
CREATE or REPLACE LANGUAGE plpython2u;
CREATE or REPLACE FUNCTION check_worker_presence(dbname text, wait_time int)
RETURNS boolean
AS $$
import psutil
import time
worker_name = 'bgworker: [diskquota] ' + dbname
time.sleep(wait_time)
for proc in psutil.process_iter():
try:
if 'postgres' in proc.name().lower():
for val in proc.cmdline():
if worker_name in val:
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
$$ LANGUAGE plpython2u EXECUTE ON MASTER;

-- Test diskquota behavior when an error occurs during the worker's first run.
-- The error leads to process termination. And launcher won't start it again
-- until extension reload or SIGHUP signal.
CREATE EXTENSION diskquota;
SELECT check_worker_presence(current_database(), 0);
SELECT gp_inject_fault('diskquota_worker_initialization', 'error', dbid)
FROM gp_segment_configuration WHERE role='p' AND content=-1;
SELECT diskquota.init_table_size_table();
SELECT check_worker_presence(current_database(),
current_setting('diskquota.worker_timeout')::int / 2);
-- Reload configuration and check that worker is up again
!\retcode gpstop -u;
SELECT check_worker_presence(current_database(),
current_setting('diskquota.worker_timeout')::int / 2);
DROP EXTENSION diskquota;

0 comments on commit 3b06e37

Please sign in to comment.