Skip to content

Commit

Permalink
unify
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Oct 17, 2024
1 parent 000e18a commit 181c0eb
Showing 1 changed file with 23 additions and 83 deletions.
106 changes: 23 additions & 83 deletions src/gp_activetable.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ static void object_access_hook_QuotaStmt(ObjectAccessType access, Oid classId, O
static HTAB *get_active_tables_stats(ArrayType *array);
static HTAB *get_active_tables_oid(void);
static HTAB *pull_active_list_from_seg(void);
static void pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array);
static StringInfoData convert_map_to_string(HTAB *active_list);
static void load_table_size(HTAB *local_table_stats_map);
static void load_table_size(HTAB *local_table_stats_map, const char *sql, bool is_init);
static void report_active_table_helper(const RelFileNodeBackend *relFileNode);
static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode);
static void report_relation_cache_helper(Oid relid);
Expand Down Expand Up @@ -383,7 +382,7 @@ gp_fetch_active_tables(bool is_init)

if (is_init)
{
load_table_size(local_table_stats_map);
load_table_size(local_table_stats_map, "select tableid, array_agg(size order by segid) size from diskquota.table_size group by 1 order by 1", is_init);
}
else
{
Expand All @@ -395,10 +394,17 @@ gp_fetch_active_tables(bool is_init)
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] active_old_list = %s", active_oid_list.data)));

/* step 2: fetch active table sizes based on active oids */
pull_active_table_size_from_seg(local_table_stats_map, active_oid_list.data);

StringInfoData sql_command;
initStringInfo(&sql_command);
appendStringInfo(&sql_command, "with s as (select (diskquota.diskquota_fetch_table_stat(1, '%s'::oid[])).* from gp_dist_random('gp_id')) select \"TABLE_OID\", array_agg(\"TABLE_SIZE\" order by \"GP_SEGMENT_ID\") \"TABLE_SIZE\" from s group by 1 order by 1",
active_oid_list.data);

load_table_size(local_table_stats_map, sql_command.data, is_init);

hash_destroy(local_active_table_oid_maps);
pfree(active_oid_list.data);
pfree(sql_command.data);
}
return local_table_stats_map;
}
Expand Down Expand Up @@ -944,15 +950,14 @@ get_active_tables_oid(void)
* and other shared memory will be warmed up by table_size table.
*/
static void
load_table_size(HTAB *local_table_stats_map)
load_table_size(HTAB *local_table_stats_map, const char *sql, bool is_init)
{
TupleDesc tupdesc;
int i;
bool found;
ActiveTableEntryCombined *quota_entry;
SPIPlanPtr plan;
Portal portal;
char *sql = "select tableid, array_agg(size order by segid) size from diskquota.table_size group by 1 order by 1";

if ((plan = SPI_prepare(sql, 0, NULL)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql)));
Expand Down Expand Up @@ -993,6 +998,11 @@ load_table_size(HTAB *local_table_stats_map)
get_database_name(MyDatabaseId))));
}

int16 typlen;
bool typbyval;
char typalign;
get_typlenbyvalalign(INT8OID, &typlen, &typbyval, &typalign);

while (SPI_processed > 0)
{
/* push the table oid and size into local_table_stats_map */
Expand All @@ -1015,15 +1025,14 @@ load_table_size(HTAB *local_table_stats_map)
quota_entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);
quota_entry->reloid = reloid;

/*
select typlen, typbyval, typalign from pg_type where typname = 'int8';
typlen | typbyval | typalign
--------+----------+----------
8 | t | d
*/
deconstruct_array(DatumGetArrayTypeP(dat), INT8OID, sizeof(int64), true, 'd', &sizes, NULL, &nelems);
deconstruct_array(DatumGetArrayTypeP(dat), INT8OID, typlen, typbyval, typalign, &sizes, NULL, &nelems);
for (int j = 0; j < nelems; j++)
quota_entry->tablesize[j] = DatumGetInt64(sizes[j]);
{
quota_entry->tablesize[j + (is_init ? 0 : 1)] = DatumGetInt64(sizes[j]);
/* tablesize for index 0 is the sum of tablesize of master and all segments */
if (!is_init)
quota_entry->tablesize[0] = (j > 0 ? quota_entry->tablesize[0] : 0) + DatumGetInt64(sizes[j]);
}
pfree(sizes);
}
SPI_freetuptable(SPI_tuptable);
Expand Down Expand Up @@ -1132,72 +1141,3 @@ pull_active_list_from_seg(void)

return local_active_table_oid_map;
}

/*
* Get active table list from all the segments.
* Since when loading data, there is case where only subset for
* segment doing the real loading. As a result, the same table
* maybe active on some segments while not active on others. We
* haven't store the table size for each segment on master(to save
* memory), so when re-calculate the table size, we need to sum the
* table size on all of the segments.
*/
static void
pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array)
{
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData sql_command;
int i;
int j;

initStringInfo(&sql_command);
appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '%s'::oid[])",
active_oid_array);
CdbDispatchCommand(sql_command.data, DF_NONE, &cdb_pgresults);
pfree(sql_command.data);

SEGCOUNT = cdb_pgresults.numResults;
if (SEGCOUNT <= 0)
{
ereport(ERROR, (errmsg("[diskquota] there is no active segment, SEGCOUNT is %d", SEGCOUNT)));
}

/* sum table size from each segment into local_table_stats_map */
for (i = 0; i < cdb_pgresults.numResults; i++)
{
Size tableSize;
bool found;
Oid reloid;
int segId;
ActiveTableEntryCombined *entry;

PGresult *pgresult = cdb_pgresults.pg_results[i];

if (PQresultStatus(pgresult) != PGRES_TUPLES_OK)
{
cdbdisp_clearCdbPgResults(&cdb_pgresults);
ereport(ERROR, (errmsg("[diskquota] fetching active tables, encounter unexpected result from segment: %d",
PQresultStatus(pgresult))));
}

for (j = 0; j < PQntuples(pgresult); j++)
{
reloid = atooid(PQgetvalue(pgresult, j, 0));
tableSize = (Size)atoll(PQgetvalue(pgresult, j, 1));
entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);

/* for diskquota extension version is 1.0, pgresult doesn't contain segid */
if (PQnfields(pgresult) == 3)
{
/* get the segid, tablesize for each table */
segId = atoi(PQgetvalue(pgresult, j, 2));
entry->tablesize[segId + 1] = tableSize;
}

/* tablesize for index 0 is the sum of tablesize of master and all segments */
entry->tablesize[0] = (found ? entry->tablesize[0] : 0) + tableSize;
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
return;
}

0 comments on commit 181c0eb

Please sign in to comment.