diff --git a/src/diskquota.c b/src/diskquota.c index d5630700..4f0fe43c 100644 --- a/src/diskquota.c +++ b/src/diskquota.c @@ -142,6 +142,7 @@ static void vacuum_db_entry(DiskquotaDBEntry *db); static void init_bgworker_handles(void); static BackgroundWorkerHandle *get_bgworker_handle(uint32 worker_id); static void free_bgworker_handle(uint32 worker_id); +static void resetBackgroundWorkerCorruption(void); #if GP_VERSION_NUM < 70000 /* WaitForBackgroundWorkerShutdown is copied from gpdb7 */ static BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle); @@ -525,7 +526,7 @@ disk_quota_worker_main(Datum main_arg) if (!diskquota_is_paused()) { /* Refresh quota model with init mode */ - refresh_disk_quota_model(!MyWorkerInfo->dbEntry->inited); + refresh_disk_quota_model(MyWorkerInfo->dbEntry); MyWorkerInfo->dbEntry->inited = true; is_gang_destroyed = false; } @@ -762,6 +763,7 @@ disk_quota_launcher_main(Datum main_arg) { elog(DEBUG1, "[diskquota] got sighup"); got_sighup = false; + resetBackgroundWorkerCorruption(); ProcessConfigFile(PGC_SIGHUP); } @@ -787,11 +789,12 @@ disk_quota_launcher_main(Datum main_arg) * When curDB->in_use is false means dbEtnry has been romoved * When curDB->dbid doesn't equtal curDBId, it means the slot has * been used by another db - * + * When curDB->corrupted is true means worker couldn't initialize + * the extension in the first run. * For the above conditions, we just skip this loop and try to fetch * next db to run. */ - if (curDB == NULL || !curDB->in_use || curDB->dbid != curDBId) + if (curDB == NULL || !curDB->in_use || curDB->dbid != curDBId || curDB->corrupted) { advance_one_db = true; continue; @@ -1796,7 +1799,9 @@ next_db(DiskquotaDBEntry *curDB) if (nextSlot >= MAX_NUM_MONITORED_DB) nextSlot = 0; DiskquotaDBEntry *dbEntry = &DiskquotaLauncherShmem->dbArray[nextSlot]; nextSlot++; - if (!dbEntry->in_use || dbEntry->workerId != INVALID_WORKER_ID || dbEntry->dbid == InvalidOid) continue; + if (!dbEntry->in_use || dbEntry->workerId != INVALID_WORKER_ID || dbEntry->dbid == InvalidOid || + dbEntry->corrupted) + continue; /* TODO: should release the invalid db related things */ if (!is_valid_dbid(dbEntry->dbid)) continue; result = dbEntry; @@ -1860,10 +1865,11 @@ static void vacuum_db_entry(DiskquotaDBEntry *db) { if (db == NULL) return; - db->dbid = InvalidOid; - db->inited = false; - db->workerId = INVALID_WORKER_ID; - db->in_use = false; + db->dbid = InvalidOid; + db->inited = false; + db->workerId = INVALID_WORKER_ID; + db->in_use = false; + db->corrupted = false; } static void @@ -1898,6 +1904,18 @@ free_bgworker_handle(uint32 worker_id) } } +static void +resetBackgroundWorkerCorruption(void) +{ + LWLockAcquire(diskquota_locks.dblist_lock, LW_EXCLUSIVE); + for (int i = 0; i < MAX_NUM_MONITORED_DB; i++) + { + DiskquotaDBEntry *dbEntry = &DiskquotaLauncherShmem->dbArray[i]; + if (dbEntry->corrupted) dbEntry->corrupted = false; + } + LWLockRelease(diskquota_locks.dblist_lock); +} + #if GP_VERSION_NUM < 70000 static BgwHandleStatus WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) diff --git a/src/diskquota.h b/src/diskquota.h index f044773b..58a00f46 100644 --- a/src/diskquota.h +++ b/src/diskquota.h @@ -215,8 +215,9 @@ struct DiskquotaDBEntry TimestampTz last_run_time; int16 cost; // ms - bool inited; // this entry is inited, will set to true after the worker finish the frist run. - bool in_use; // this slot is in using. AKA dbid != 0 + bool inited; // this entry is inited, will set to true after the worker finish the frist run. + bool in_use; // this slot is in using. AKA dbid != 0 + bool corrupted; // consider this entry as invalid to start the worker on }; typedef enum MonitorDBStatus @@ -249,7 +250,7 @@ extern void invalidate_database_rejectmap(Oid dbid); /* quota model interface*/ extern void init_disk_quota_shmem(void); extern void init_disk_quota_model(uint32 id); -extern void refresh_disk_quota_model(bool force); +extern void refresh_disk_quota_model(DiskquotaDBEntry *dbEntry); extern bool check_diskquota_state_is_ready(void); extern bool quota_check_common(Oid reloid, RelFileNode *relfilenode); diff --git a/src/gp_activetable.c b/src/gp_activetable.c index cf3178b3..cbf6e7b6 100644 --- a/src/gp_activetable.c +++ b/src/gp_activetable.c @@ -378,6 +378,7 @@ gp_fetch_active_tables(bool is_init) if (is_init) { + SIMPLE_FAULT_INJECTOR("diskquota_worker_initialization"); load_table_size(local_table_stats_map); } else diff --git a/src/quotamodel.c b/src/quotamodel.c index 6b8507b3..a0f01dbd 100644 --- a/src/quotamodel.c +++ b/src/quotamodel.c @@ -227,7 +227,7 @@ static void clear_all_quota_maps(void); static void transfer_table_for_quota(int64 totalsize, QuotaType type, Oid *old_keys, Oid *new_keys, int16 segid); /* functions to refresh disk quota model*/ -static void refresh_disk_quota_usage(bool is_init); +static void refresh_disk_quota_usage(DiskquotaDBEntry *dbEntry); static void calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map); static void flush_to_table_size(void); static bool flush_local_reject_map(void); @@ -761,8 +761,10 @@ do_check_diskquota_state_is_ready(void) * recalculate the changed disk usage. */ void -refresh_disk_quota_model(bool is_init) +refresh_disk_quota_model(DiskquotaDBEntry *dbEntry) { + bool is_init = !dbEntry->inited; + SEGCOUNT = getgpsegmentCount(); if (SEGCOUNT <= 0) { @@ -773,7 +775,7 @@ refresh_disk_quota_model(bool is_init) /* skip refresh model when load_quotas failed */ if (load_quotas()) { - refresh_disk_quota_usage(is_init); + refresh_disk_quota_usage(dbEntry); } if (is_init) ereport(LOG, (errmsg("[diskquota] initialize quota model finished"))); } @@ -785,11 +787,12 @@ refresh_disk_quota_model(bool is_init) * process is constructing quota model. */ static void -refresh_disk_quota_usage(bool is_init) +refresh_disk_quota_usage(DiskquotaDBEntry *dbEntry) { bool connected = false; bool pushed_active_snap = false; bool ret = true; + bool is_init = !dbEntry->inited; HTAB *local_active_table_stat_map = NULL; StartTransactionCommand(); @@ -841,6 +844,14 @@ refresh_disk_quota_usage(bool is_init) } PG_CATCH(); { + /* Initialization failed. */ + if (is_init) + { + LWLockAcquire(diskquota_locks.dblist_lock, LW_EXCLUSIVE); + dbEntry->corrupted = true; + LWLockRelease(diskquota_locks.dblist_lock); + PG_RE_THROW(); + } /* Prevents interrupts while cleaning up */ HOLD_INTERRUPTS(); EmitErrorReport(); diff --git a/tests/isolation2/expected/test_worker_init_failure.out b/tests/isolation2/expected/test_worker_init_failure.out new file mode 100644 index 00000000..75e6b6e0 --- /dev/null +++ b/tests/isolation2/expected/test_worker_init_failure.out @@ -0,0 +1,46 @@ +-- +-- Tests for error handling when the worker catches the error during +-- its first run. +-- + +-- Function checking whether worker on given db is up +CREATE or REPLACE LANGUAGE plpython2u; +CREATE +CREATE or REPLACE FUNCTION check_worker_presence(dbname text, wait_time int) RETURNS boolean AS $$ import psutil import time worker_name = 'bgworker: [diskquota] ' + dbname time.sleep(wait_time) for proc in psutil.process_iter(): try: if 'postgres' in proc.name().lower(): for val in proc.cmdline(): if worker_name in val: return True except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return False $$ LANGUAGE plpython2u EXECUTE ON MASTER; +CREATE + +-- Test diskquota behavior when an error occurs during the worker's first run. +-- The error leads to process termination. And launcher won't start it again +-- until extension reload or SIGHUP signal. +CREATE EXTENSION diskquota; +CREATE +SELECT check_worker_presence(current_database(), 0); + check_worker_presence +----------------------- + t +(1 row) +SELECT gp_inject_fault('diskquota_worker_initialization', 'error', dbid) FROM gp_segment_configuration WHERE role='p' AND content=-1; + gp_inject_fault +----------------- + Success: +(1 row) +SELECT diskquota.init_table_size_table(); + init_table_size_table +----------------------- + +(1 row) +SELECT check_worker_presence(current_database(), current_setting('diskquota.worker_timeout')::int / 2); + check_worker_presence +----------------------- + f +(1 row) +-- Reload configuration and check that worker is up again +!\retcode gpstop -u; +(exited with code 0) +SELECT check_worker_presence(current_database(), current_setting('diskquota.worker_timeout')::int / 2); + check_worker_presence +----------------------- + t +(1 row) +DROP EXTENSION diskquota; +DROP diff --git a/tests/isolation2/isolation2_schedule b/tests/isolation2/isolation2_schedule index 090c5cc5..c61f3d97 100644 --- a/tests/isolation2/isolation2_schedule +++ b/tests/isolation2/isolation2_schedule @@ -11,4 +11,5 @@ test: test_per_segment_config test: test_relation_cache test: test_ereport_from_seg test: test_drop_extension +test: test_worker_init_failure test: reset_config diff --git a/tests/isolation2/sql/test_worker_init_failure.sql b/tests/isolation2/sql/test_worker_init_failure.sql new file mode 100644 index 00000000..4e48908a --- /dev/null +++ b/tests/isolation2/sql/test_worker_init_failure.sql @@ -0,0 +1,40 @@ +-- +-- Tests for error handling when the worker catches the error during +-- its first run. +-- + +-- Function checking whether worker on given db is up +CREATE or REPLACE LANGUAGE plpython2u; +CREATE or REPLACE FUNCTION check_worker_presence(dbname text, wait_time int) + RETURNS boolean +AS $$ + import psutil + import time + worker_name = 'bgworker: [diskquota] ' + dbname + time.sleep(wait_time) + for proc in psutil.process_iter(): + try: + if 'postgres' in proc.name().lower(): + for val in proc.cmdline(): + if worker_name in val: + return True + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + return False +$$ LANGUAGE plpython2u EXECUTE ON MASTER; + +-- Test diskquota behavior when an error occurs during the worker's first run. +-- The error leads to process termination. And launcher won't start it again +-- until extension reload or SIGHUP signal. +CREATE EXTENSION diskquota; +SELECT check_worker_presence(current_database(), 0); +SELECT gp_inject_fault('diskquota_worker_initialization', 'error', dbid) + FROM gp_segment_configuration WHERE role='p' AND content=-1; +SELECT diskquota.init_table_size_table(); +SELECT check_worker_presence(current_database(), + current_setting('diskquota.worker_timeout')::int / 2); +-- Reload configuration and check that worker is up again +!\retcode gpstop -u; +SELECT check_worker_presence(current_database(), + current_setting('diskquota.worker_timeout')::int / 2); +DROP EXTENSION diskquota;