From dc66f0ebbadb7be5cad5405acc64c1776b6d6537 Mon Sep 17 00:00:00 2001 From: Rob Savoye Date: Tue, 13 Feb 2024 12:15:58 -0700 Subject: [PATCH] fix: More refactoring to use asyncpg --- tm_admin/users/users.py | 67 +++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/tm_admin/users/users.py b/tm_admin/users/users.py index b33ef265..8bbdb158 100755 --- a/tm_admin/users/users.py +++ b/tm_admin/users/users.py @@ -262,66 +262,53 @@ async def mergeLicenses(self, # # result = await pg.execute(sql) timer.stop() - async def mergeTeam(self, + async def mergeTeams(self, inpg: PostgresClient, ): table = 'team_members' - # FIXME: this shouldn't be hardcoded! log.info(f"Merging team members table...") timer = Timer(text="merging team members table took {seconds:.0f}s") timer.start() - sql = f"SELECT row_to_json({table}) as row FROM {table}" + sql = f"SELECT * FROM {table} ORDER BY user_id" #print(sql) result = await inpg.execute(sql) - pbar = tqdm(result) + pbar = tqdm.tqdm(result) for record in pbar: - func = record[0]['function'] + func = record['function'] tmfunc = Teammemberfunctions(func) - sql = f"UPDATE {self.table} SET team_members.team={record[0]['team_id']}, team_members.active={record[0]['active']}, team_members.function='{tmfunc.name}' WHERE id={record[0]['user_id']}" - print(f"{sql};") - result = await self.pg.execute(sql) + sql = f"UPDATE {self.table} SET team_members.team={record['team_id']}, team_members.active={record['active']}, team_members.function='{tmfunc.name}' WHERE id={record['user_id']}" + print(sql) + result = await inpg.execute(sql) timer.stop() return True - async def mergeFavorites(self): + async def mergeFavorites(self, + inpg: PostgresClient, + ): table = 'project_favorites' log.info(f"Merging favorites table...") # FIXME: this shouldn't be hardcoded! timer = Timer(text="merging favorites table took {seconds:.0f}s") timer.start() - pg = PostgresClient() - await pg.connect('localhost/tm4') - sql = f"SELECT u.user_id,ARRAY(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS user_id FROM {table} u;" + sql = f"SELECT u.user_id,(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS projects FROM {table} u;" #sql = f"SELECT row_to_json({table}) as row FROM {table} ORDER BY user_id" - # print(sql) - try: - result = pg.execute(sql) - except: - log.error(f"Couldn't execute query! {sql}") + print(sql) + result = await inpg.execute(sql) entries = len(result) log.debug(f"There are {entries} entries in {table}") chunk = round(entries / cores) + pbar = tqdm.tqdm(result) - tmpg = list() - for i in range(0, cores + 1): - # FIXME: this shouldn't be hardcoded - tmpg.append(PostgresClient('localhost/tm_admin')) - - index = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: - # futures = list() - block = 0 - while block <= entries: - #log.debug(f"Dispatching Block %d:%d" % (block, block + chunk)) - #favoritesThread(result, tmpg[0]) - executor.submit(favoritesThread, result[block : block + chunk], tmpg[index]) - # futures.append(result) - block += chunk - index += 1 - executor.shutdown() + # This table has a small amount of data, so threading would be overkill. + for record in pbar: + uid = record.get('user_id') + array = record.get('projects') + sql = f" UPDATE users SET favorite_projects = ARRAY{array} WHERE id={uid}" + #print(sql) + result = await self.pg.execute(sql) timer.stop() return True @@ -405,18 +392,14 @@ async def mergeAuxTables(self, inpg = PostgresClient() await inpg.connect(inuri) - # if self.mergeTeam(): - # log.info("UserDB.mergeTeams worked!") + # await self.mergeTeams(inpg) + # log.info("UserDB.mergeTeams worked!") - # if self.mergeFavorites(): - # log.info("UserDB.mergeFavorites worked!") + await self.mergeFavorites(inpg) - # These may take a long time to complete - # await self.mergeInterests(inpg) - # log.info("UserDB.mergeInterests worked!") + await self.mergeInterests(inpg) result = await self.mergeLicenses(inpg) - log.info("UserDB.mergeLicenses worked!") async def main(): """This main function lets this class be run standalone by a bash script."""