Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: More refactoring to use asyncpg
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Feb 13, 2024
1 parent 6f1bf0c commit dc66f0e
Showing 1 changed file with 25 additions and 42 deletions.
67 changes: 25 additions & 42 deletions tm_admin/users/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,66 +262,53 @@ async def mergeLicenses(self,
# # result = await pg.execute(sql)
timer.stop()

async def mergeTeam(self,
async def mergeTeams(self,
inpg: PostgresClient,
):
table = 'team_members'
# FIXME: this shouldn't be hardcoded!
log.info(f"Merging team members table...")
timer = Timer(text="merging team members table took {seconds:.0f}s")
timer.start()
sql = f"SELECT row_to_json({table}) as row FROM {table}"
sql = f"SELECT * FROM {table} ORDER BY user_id"
#print(sql)
result = await inpg.execute(sql)

pbar = tqdm(result)
pbar = tqdm.tqdm(result)
for record in pbar:
func = record[0]['function']
func = record['function']
tmfunc = Teammemberfunctions(func)
sql = f"UPDATE {self.table} SET team_members.team={record[0]['team_id']}, team_members.active={record[0]['active']}, team_members.function='{tmfunc.name}' WHERE id={record[0]['user_id']}"
print(f"{sql};")
result = await self.pg.execute(sql)
sql = f"UPDATE {self.table} SET team_members.team={record['team_id']}, team_members.active={record['active']}, team_members.function='{tmfunc.name}' WHERE id={record['user_id']}"
print(sql)
result = await inpg.execute(sql)

timer.stop()
return True

async def mergeFavorites(self):
async def mergeFavorites(self,
inpg: PostgresClient,
):
table = 'project_favorites'
log.info(f"Merging favorites table...")
# FIXME: this shouldn't be hardcoded!
timer = Timer(text="merging favorites table took {seconds:.0f}s")
timer.start()
pg = PostgresClient()
await pg.connect('localhost/tm4')
sql = f"SELECT u.user_id,ARRAY(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS user_id FROM {table} u;"
sql = f"SELECT u.user_id,(SELECT ARRAY(SELECT c.project_id FROM {table} c WHERE c.user_id = u.user_id)) AS projects FROM {table} u;"
#sql = f"SELECT row_to_json({table}) as row FROM {table} ORDER BY user_id"
# print(sql)
try:
result = pg.execute(sql)
except:
log.error(f"Couldn't execute query! {sql}")
print(sql)
result = await inpg.execute(sql)

entries = len(result)
log.debug(f"There are {entries} entries in {table}")
chunk = round(entries / cores)
pbar = tqdm.tqdm(result)

tmpg = list()
for i in range(0, cores + 1):
# FIXME: this shouldn't be hardcoded
tmpg.append(PostgresClient('localhost/tm_admin'))

index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
# futures = list()
block = 0
while block <= entries:
#log.debug(f"Dispatching Block %d:%d" % (block, block + chunk))
#favoritesThread(result, tmpg[0])
executor.submit(favoritesThread, result[block : block + chunk], tmpg[index])
# futures.append(result)
block += chunk
index += 1
executor.shutdown()
# This table has a small amount of data, so threading would be overkill.
for record in pbar:
uid = record.get('user_id')
array = record.get('projects')
sql = f" UPDATE users SET favorite_projects = ARRAY{array} WHERE id={uid}"
#print(sql)
result = await self.pg.execute(sql)

timer.stop()
return True
Expand Down Expand Up @@ -405,18 +392,14 @@ async def mergeAuxTables(self,
inpg = PostgresClient()
await inpg.connect(inuri)

# if self.mergeTeam():
# log.info("UserDB.mergeTeams worked!")
# await self.mergeTeams(inpg)
# log.info("UserDB.mergeTeams worked!")

# if self.mergeFavorites():
# log.info("UserDB.mergeFavorites worked!")
await self.mergeFavorites(inpg)

# These may take a long time to complete
# await self.mergeInterests(inpg)
# log.info("UserDB.mergeInterests worked!")
await self.mergeInterests(inpg)

result = await self.mergeLicenses(inpg)
log.info("UserDB.mergeLicenses worked!")

async def main():
"""This main function lets this class be run standalone by a bash script."""
Expand Down

0 comments on commit dc66f0e

Please sign in to comment.