Skip to content

Commit

Permalink
Refactor diskquota load_table_size
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Oct 18, 2024
1 parent 24546b2 commit 047e174
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 181 deletions.
176 changes: 23 additions & 153 deletions src/gp_activetable.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ static void object_access_hook_QuotaStmt(ObjectAccessType access, Oid classId, O

static HTAB *get_active_tables_stats(ArrayType *array);
static HTAB *get_active_tables_oid(void);
static HTAB *pull_active_list_from_seg(void);
static void pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array);
static StringInfoData convert_map_to_string(HTAB *active_list);
static void load_table_size(HTAB *local_table_stats_map);
static char *pull_active_list_from_seg(void);
static void load_table_size(HTAB *local_table_stats_map, bool is_init);
static void report_active_table_helper(const RelFileNodeBackend *relFileNode);
static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode);
static void report_relation_cache_helper(Oid relid);
Expand Down Expand Up @@ -368,8 +366,6 @@ gp_fetch_active_tables(bool is_init)
{
HTAB *local_table_stats_map = NULL;
HASHCTL ctl;
HTAB *local_active_table_oid_maps;
StringInfoData active_oid_list;

Assert(Gp_role == GP_ROLE_DISPATCH);

Expand All @@ -381,25 +377,8 @@ gp_fetch_active_tables(bool is_init)
local_table_stats_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl,
HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH);

if (is_init)
{
load_table_size(local_table_stats_map);
}
else
{
/* step 1: fetch active oids from all the segments */
local_active_table_oid_maps = pull_active_list_from_seg();
active_oid_list = convert_map_to_string(local_active_table_oid_maps);

ereport(DEBUG1,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] active_old_list = %s", active_oid_list.data)));
load_table_size(local_table_stats_map, is_init);

/* step 2: fetch active table sizes based on active oids */
pull_active_table_size_from_seg(local_table_stats_map, active_oid_list.data);

hash_destroy(local_active_table_oid_maps);
pfree(active_oid_list.data);
}
return local_table_stats_map;
}

Expand Down Expand Up @@ -944,7 +923,7 @@ get_active_tables_oid(void)
* and other shared memory will be warmed up by table_size table.
*/
static void
load_table_size(HTAB *local_table_stats_map)
load_table_size(HTAB *local_table_stats_map, bool is_init)
{
TupleDesc tupdesc;
int i;
Expand All @@ -954,11 +933,17 @@ load_table_size(HTAB *local_table_stats_map)
Portal portal;
char *sql = "select tableid, size, segid from diskquota.table_size";

if (!is_init)
sql = pull_active_list_from_seg();

if ((plan = SPI_prepare(sql, 0, NULL)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql)));
if ((portal = SPI_cursor_open(NULL, plan, NULL, NULL, true)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_cursor_open(\"%s\") failed", sql)));

if (!is_init)
pfree(sql);

SPI_cursor_fetch(portal, true, 10000);

if (SPI_tuptable == NULL)
Expand Down Expand Up @@ -1020,6 +1005,9 @@ load_table_size(HTAB *local_table_stats_map)
quota_entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);
quota_entry->reloid = reloid;
quota_entry->tablesize[segid + 1] = size;
/* tablesize for index 0 is the sum of tablesize of master and all segments */
if (!is_init)
quota_entry->tablesize[0] = (found ? quota_entry->tablesize[0] : 0) + size;
}
SPI_freetuptable(SPI_tuptable);
SPI_cursor_fetch(portal, true, 10000);
Expand All @@ -1030,64 +1018,23 @@ load_table_size(HTAB *local_table_stats_map)
SPI_freeplan(plan);
}

/*
* Convert a hash map with oids into a string array
* This function is used to prepare the second array parameter
* of function diskquota_fetch_table_stat.
*/
static StringInfoData
convert_map_to_string(HTAB *local_active_table_oid_maps)
{
HASH_SEQ_STATUS iter;
StringInfoData buffer;
DiskQuotaActiveTableEntry *entry;
uint32 count = 0;
uint32 nitems = hash_get_num_entries(local_active_table_oid_maps);

initStringInfo(&buffer);
appendStringInfo(&buffer, "{");

hash_seq_init(&iter, local_active_table_oid_maps);

while ((entry = (DiskQuotaActiveTableEntry *)hash_seq_search(&iter)) != NULL)
{
count++;
if (count != nitems)
{
appendStringInfo(&buffer, "%d,", entry->reloid);
}
else
{
appendStringInfo(&buffer, "%d", entry->reloid);
}
}
appendStringInfo(&buffer, "}");

return buffer;
}

/*
* Get active table size from all the segments based on
* active table oid list.
* Function diskquota_fetch_table_stat is called to calculate
* the table size on the fly.
*/
static HTAB *
static char *
pull_active_list_from_seg(void)
{
CdbPgResults cdb_pgresults = {NULL, 0};
int i, j;
char *sql = NULL;
HTAB *local_active_table_oid_map = NULL;
HASHCTL ctl;
DiskQuotaActiveTableEntry *entry;
StringInfoData buffer;
uint32 count = 0;

memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(DiskQuotaActiveTableEntry);
ctl.hcxt = CurrentMemoryContext;
local_active_table_oid_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl,
HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH);
initStringInfo(&buffer);
appendStringInfo(&buffer, "select (diskquota.diskquota_fetch_table_stat(1, '{");

/* first get all oid of tables which are active table on any segment */
sql = "select * from diskquota.diskquota_fetch_table_stat(0, '{}'::oid[])";
Expand All @@ -1096,9 +1043,6 @@ pull_active_list_from_seg(void)
CdbDispatchCommand(sql, DF_NONE, &cdb_pgresults);
for (i = 0; i < cdb_pgresults.numResults; i++)
{
Oid reloid;
bool found;

PGresult *pgresult = cdb_pgresults.pg_results[i];

if (PQresultStatus(pgresult) != PGRES_TUPLES_OK)
Expand All @@ -1111,88 +1055,14 @@ pull_active_list_from_seg(void)
/* push the active table oid into local_active_table_oid_map */
for (j = 0; j < PQntuples(pgresult); j++)
{
reloid = atooid(PQgetvalue(pgresult, j, 0));

entry = (DiskQuotaActiveTableEntry *)hash_search(local_active_table_oid_map, &reloid, HASH_ENTER, &found);

if (!found)
{
entry->reloid = reloid;
entry->tablesize = 0;
entry->segid = -1;
}
if (count++ > 0)
appendStringInfoString(&buffer, ",");
appendStringInfoString(&buffer, PQgetvalue(pgresult, j, 0));
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);

return local_active_table_oid_map;
}

/*
* Get active table list from all the segments.
* Since when loading data, there is case where only subset for
* segment doing the real loading. As a result, the same table
* maybe active on some segments while not active on others. We
* haven't store the table size for each segment on master(to save
* memory), so when re-calculate the table size, we need to sum the
* table size on all of the segments.
*/
static void
pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array)
{
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData sql_command;
int i;
int j;

initStringInfo(&sql_command);
appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '%s'::oid[])",
active_oid_array);
CdbDispatchCommand(sql_command.data, DF_NONE, &cdb_pgresults);
pfree(sql_command.data);

SEGCOUNT = cdb_pgresults.numResults;
if (SEGCOUNT <= 0)
{
ereport(ERROR, (errmsg("[diskquota] there is no active segment, SEGCOUNT is %d", SEGCOUNT)));
}

/* sum table size from each segment into local_table_stats_map */
for (i = 0; i < cdb_pgresults.numResults; i++)
{
Size tableSize;
bool found;
Oid reloid;
int segId;
ActiveTableEntryCombined *entry;
appendStringInfo(&buffer, "}'::oid[])).* from gp_dist_random('gp_id')");

PGresult *pgresult = cdb_pgresults.pg_results[i];

if (PQresultStatus(pgresult) != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
ereport(ERROR, (errmsg("[diskquota] fetching active tables, encounter unexpected result from segment: %d",
PQresultStatus(pgresult))));
}

for (j = 0; j < PQntuples(pgresult); j++)
{
reloid = atooid(PQgetvalue(pgresult, j, 0));
tableSize = (Size)atoll(PQgetvalue(pgresult, j, 1));
entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);

/* for diskquota extension version is 1.0, pgresult doesn't contain segid */
if (PQnfields(pgresult) == 3)
{
/* get the segid, tablesize for each table */
segId = atoi(PQgetvalue(pgresult, j, 2));
entry->tablesize[segId + 1] = tableSize;
}

/* tablesize for index 0 is the sum of tablesize of master and all segments */
entry->tablesize[0] = (found ? entry->tablesize[0] : 0) + tableSize;
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
return;
return buffer.data;
}
55 changes: 27 additions & 28 deletions src/quotamodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ static void transfer_table_for_quota(int64 totalsize, QuotaType type, Oid *old_k

/* functions to refresh disk quota model*/
static void refresh_disk_quota_usage(bool is_init);
static void calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map);
static StringInfoData calculate_table_disk_usage(bool is_init);
static void flush_to_table_size(void);
static bool flush_local_reject_map(void);
static void dispatch_rejectmap(HTAB *local_active_table_stat_map);
static void dispatch_rejectmap(StringInfoData active_oids);
static bool load_quotas(void);
static void do_load_quotas(void);

Expand Down Expand Up @@ -803,7 +803,7 @@ refresh_disk_quota_usage(bool is_init)
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
HTAB *local_active_table_stat_map = NULL;
StringInfoData active_oids;

StartTransactionCommand();

Expand All @@ -826,14 +826,11 @@ refresh_disk_quota_usage(bool is_init)
* initialization stage all the tables are active. later loop, only the
* tables whose disk size changed will be treated as active
*
* local_active_table_stat_map only contains the active tables which belong
* active_oids only contains the active tables which belong
* to the current database.
*/
local_active_table_stat_map = gp_fetch_active_tables(is_init);
bool hasActiveTable = (hash_get_num_entries(local_active_table_stat_map) != 0);
/* TODO: if we can skip the following steps when there is no active table */
/* recalculate the disk usage of table, schema and role */
calculate_table_disk_usage(is_init, local_active_table_stat_map);
active_oids = calculate_table_disk_usage(is_init);
bool hasActiveTable = (active_oids.len > 0);
/* refresh quota_info_map */
refresh_quota_info_map();
/* flush local table_size_map to user table table_size */
Expand All @@ -847,8 +844,8 @@ refresh_disk_quota_usage(bool is_init)
* not empty the rejectmap should be dispatched to segments.
*/
if (is_init || (diskquota_hardlimit && (reject_map_changed || hasActiveTable)))
dispatch_rejectmap(local_active_table_stat_map);
hash_destroy(local_active_table_stat_map);
dispatch_rejectmap(active_oids);
pfree(active_oids.data);
}
PG_CATCH();
{
Expand Down Expand Up @@ -909,8 +906,8 @@ merge_uncommitted_table_to_oidlist(List *oidlist)
* size from table table_size
*/

static void
calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map)
static StringInfoData
calculate_table_disk_usage(bool is_init)
{
bool table_size_map_found;
bool active_tbl_found;
Expand All @@ -927,6 +924,8 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map)

initStringInfo(&delete_statement);

HTAB *local_active_table_stat_map = gp_fetch_active_tables(is_init);

/*
* unset is_exist flag for tsentry in table_size_map this is used to
* detect tables which have been dropped.
Expand Down Expand Up @@ -1147,6 +1146,20 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map)
}
}
}

HASH_SEQ_STATUS hash_seq;
StringInfoData active_oids;
int count = 0;
int num_entries = hash_get_num_entries(local_active_table_stat_map);
initStringInfo(&active_oids);
hash_seq_init(&hash_seq, local_active_table_stat_map);
while ((active_table_entry = hash_seq_search(&hash_seq)) != NULL)
{
appendStringInfo(&active_oids, "%d", active_table_entry->reloid);

if (++count != num_entries) appendStringInfo(&active_oids, ",");
}
return active_oids;
}

static void
Expand Down Expand Up @@ -1342,19 +1355,16 @@ flush_local_reject_map(void)
* Dispatch rejectmap to segment servers.
*/
static void
dispatch_rejectmap(HTAB *local_active_table_stat_map)
dispatch_rejectmap(StringInfoData active_oids)
{
HASH_SEQ_STATUS hash_seq;
GlobalRejectMapEntry *rejectmap_entry;
ActiveTableEntryCombined *active_table_entry;
int num_entries, count = 0;
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData rows;
StringInfoData active_oids;
StringInfoData sql;

initStringInfo(&rows);
initStringInfo(&active_oids);
initStringInfo(&sql);

LWLockAcquire(diskquota_locks.reject_map_lock, LW_SHARED);
Expand All @@ -1370,16 +1380,6 @@ dispatch_rejectmap(HTAB *local_active_table_stat_map)
}
LWLockRelease(diskquota_locks.reject_map_lock);

count = 0;
num_entries = hash_get_num_entries(local_active_table_stat_map);
hash_seq_init(&hash_seq, local_active_table_stat_map);
while ((active_table_entry = hash_seq_search(&hash_seq)) != NULL)
{
appendStringInfo(&active_oids, "%d", active_table_entry->reloid);

if (++count != num_entries) appendStringInfo(&active_oids, ",");
}

appendStringInfo(&sql,
"select diskquota.refresh_rejectmap("
"ARRAY[%s]::diskquota.rejectmap_entry[], "
Expand All @@ -1388,7 +1388,6 @@ dispatch_rejectmap(HTAB *local_active_table_stat_map)
CdbDispatchCommand(sql.data, DF_NONE, &cdb_pgresults);

pfree(rows.data);
pfree(active_oids.data);
pfree(sql.data);
cdbdisp_clearCdbPgResults(&cdb_pgresults);
}
Expand Down

0 comments on commit 047e174

Please sign in to comment.