Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Refactor importing licenses (again), go back to threads
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Feb 13, 2024
1 parent cb84801 commit 6f1bf0c
Showing 1 changed file with 90 additions and 105 deletions.
195 changes: 90 additions & 105 deletions tm_admin/users/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,17 @@
info = get_cpu_info()
cores = info["count"] * 2

# async def licensesThread(
# data: list,
# db: PostgresClient,
# ):
# """Thread to handle importing

# Args:
# data (list): The list of records to import
# db (PostgresClient): A database connection
# """
# array = "licenses"
# column = "license"

# # sql = f"CREATE SERVER IF NOT EXISTS pg_rep_db FOREIGN DATA WRAPPER dblink_fdw OPTIONS (dbname 'tm4');"
# # data = await db.pg.execute(sql)
# # sql = f"CREATE USER MAPPING IF NOT EXISTS FOR rob SERVER pg_rep_db OPTIONS ( user 'rob' ,password 'fu=br');"

# # sql = "SELECT users.*,ARRAY[user_licenses.license] AS license INTO tmp FROM users JOIN dblink('pg_rep_db','SELECT * FROM user_licenses') AS user_licenses(user_id bigint, license int) ON users.id=user_licenses.user_id;"
# # print(sql)
# # result = await db.pg.execute(sql)

# # #sql = f"ALTER TABLE users RENAME TO users_bak; ALTER TABLE tmp RENAME TO users;"
# # #result = await db.pg.execute(sql)

# pbar = tqdm.tqdm(data)
# for record in pbar:
# entry = eval(record[0])
# uid = entry['user']
# licenses = entry['license']
# # FIXME: current TM has this as an int, but it seems a user might agree to
# # more than one license. The database expects an array already, it'll just
# # have a single entry.
# sql = f" UPDATE users SET licenses = licenses||{licenses} WHERE id={uid}"
# sql = f"SELECT users.*,ARRAY[user_licenses.license] AS license INTO tmp FROM users JOIN user_licenses ON users.id=user_licenses.user;"
# print(sql)
# result = await db.pg.execute(f"{sql};")

# return True
async def licensesThread(
licenses: list,
db: PostgresClient,
):
pbar = tqdm.tqdm(licenses)
for record in pbar:
sql = f" UPDATE users SET licenses = licenses||ARRAY[{record[1]}] WHERE id={record[0]}"
# print(sql)
result = await db.execute(sql)

return True

async def interestsThread(
interests: list,
Expand All @@ -97,7 +70,6 @@ async def interestsThread(
data (list): The list of records to import
db (PostgresClient): A database connection
"""
data = dict()
pbar = tqdm.tqdm(interests)
for record in pbar:
for id, array in record.items():
Expand Down Expand Up @@ -205,98 +177,110 @@ async def mergeInterests(self,

return True

async def mergeLicenses(self):
async def mergeLicenses(self,
inpg: PostgresClient,
):
"""
Merge data from the TM user_licenses table into TM Admin. The
fastest way to do a bulk update of a table is by copying the
remote database table into the local database, and then merging
into a new temporary table and then renaming it.
Args:
inturi (str): The input database
"""
table = 'user_licenses'
# log.info(f"Merging licenses table...")
timer = Timer(initial_text="Merging user_licenses table...",
text="merging user_liceneses table took {seconds:.0f}s",
logger=log.debug,
)
pg = PostgresClient()
await pg.connect('localhost/tm4')
self.columns = await pg.getColumns(table)
print(f"COLUMNS: {self.columns}")
await pg.pg.close()
await pg.connect('localhost/tm_admin')

# await self.copyTable(table, pg)
# log.warning(f"Merging tables can take considerable time...")

# cleanup old temporary tables
drop = ["DROP TABLE IF EXISTS users_bak",
"DROP TABLE IF EXISTS foo"]
# result = await pg.pg.executemany(drop)
sql = f"DROP TABLE IF EXISTS users_bak"
result = await pg.execute(sql)
sql = f"DROP TABLE IF EXISTS user_licenses"
result = await pg.execute(sql)
sql = f"DROP TABLE IF EXISTS new_users"
result = await pg.execute(sql)

# We need to use DBLINK
sql = f"CREATE EXTENSION IF NOT EXISTS dblink;"
data = await pg.execute(sql)

timer.start()
dbuser = pg.dburi["dbuser"]
dbpass = pg.dburi["dbpass"]
sql = f"CREATE SERVER IF NOT EXISTS pg_rep_db FOREIGN DATA WRAPPER dblink_fdw OPTIONS (dbname 'tm4');"
data = await pg.execute(sql)

sql = f"CREATE USER MAPPING IF NOT EXISTS FOR {dbuser} SERVER pg_rep_db OPTIONS ( user '{dbuser}', password '{dbpass}');"
result = await pg.execute(sql)

# Copy table from remote database so JOIN is faster when it's in the
# same database
log.warning(f"Copying a remote table is slow, but faster than remote access......")
sql = f"SELECT * INTO user_licenses FROM dblink('pg_rep_db','SELECT * FROM user_licenses') AS user_licenses(user_id bigint, license int)"
print(pg.dburi)
# print(sql)
result = await pg.execute(sql)
sql = "SELECT * FROM user_licenses ORDER BY user"
data = await inpg.execute(sql)

entries = len(data)
chunk = round(entries / cores)

start = 0
async with asyncio.TaskGroup() as tg:
for block in range(0, entries, chunk):
# for index in range(0, cores):
outpg = PostgresClient()
await outpg.connect('localhost/tm_admin')
log.debug(f"Dispatching thread {block}:{block + chunk}")
# await licensesThread(data, outpg)
task = tg.create_task(licensesThread(data[block:block + chunk], outpg))

# JOINing into a new table is much faster than doing an UPDATE
sql = f"SELECT users.*,ARRAY[user_licenses.license] INTO new_users FROM users JOIN user_licenses ON users.id=user_licenses.user_id"
result = await pg.execute(sql)
# self.columns = await inpg.getColumns(table)
# print(f"COLUMNS: {self.columns}")

# self.renameTable(table, pg)
# sql = f"ALTER TABLE users RENAME TO users_bak;"
# result = await pg.execute(sql)
# sql = f"ALTER TABLE new_users RENAME TO users;"
# result = await pg.execute(sql)
# await self.copyTable(table, self.pg)
# # log.warning(f"Merging tables can take considerable time...")

# # cleanup old temporary tables
# drop = ["DROP TABLE IF EXISTS users_bak",
# "DROP TABLE IF EXISTS foo"]
# # result = await pg.pg.executemany(drop)
# sql = f"DROP TABLE IF EXISTS users_bak"
# result = await self.pg.execute(sql)
# sql = f"DROP TABLE IF EXISTS user_licenses"
# result = await pg.execute(sql)
# result = await self.pg.execute(sql)
# sql = f"DROP TABLE IF EXISTS new_users"
# result = await self.pg.execute(sql)

# # We need to use DBLINK
# sql = f"CREATE EXTENSION IF NOT EXISTS dblink;"
# data = await self.pg.execute(sql)

# dbuser = self.pg.dburi["dbuser"]
# dbpass = self.pg.dburi["dbpass"]
# sql = f"CREATE SERVER IF NOT EXISTS pg_rep_db FOREIGN DATA WRAPPER dblink_fdw OPTIONS (dbname 'tm4');"
# data = await self.pg.execute(sql)

# sql = f"CREATE USER MAPPING IF NOT EXISTS FOR {dbuser} SERVER pg_rep_db OPTIONS ( user '{dbuser}', password '{dbpass}');"
# result = await self.pg.execute(sql)

# # Copy table from remote database so JOIN is faster when it's in the
# # same database
# log.warning(f"Copying a remote table is slow, but faster than remote access......")
# # sql = f"SELECT * INTO user_licenses FROM dblink('pg_rep_db','SELECT * FROM user_licenses') AS user_licenses(user_id bigint, license int)"
# # print(pg.dburi)
# # print(sql)
# # result = await pg.execute(sql)

# # JOINing into a new table is much faster than doing an UPDATE
# sql = f"SELECT users.*,ARRAY[user_licenses.license] INTO new_users FROM users JOIN user_licenses ON users.id=user_licenses.user_id"
# result = await self.pg.execute(sql)

# # self.renameTable(table, pg)
# # sql = f"ALTER TABLE users RENAME TO users_bak;"
# # result = await pg.execute(sql)
# # sql = f"ALTER TABLE new_users RENAME TO users;"
# # result = await pg.execute(sql)
# # sql = f"DROP TABLE IF EXISTS user_licenses"
# # result = await pg.execute(sql)
timer.stop()

async def mergeTeam(self):
async def mergeTeam(self,
inpg: PostgresClient,
):
table = 'team_members'
# FIXME: this shouldn't be hardcoded!
log.info(f"Merging team members table...")
timer = Timer(text="merging team members table took {seconds:.0f}s")
timer.start()
pg = PostgresClient('localhost/tm4')
sql = f"SELECT row_to_json({table}) as row FROM {table}"
#print(sql)
try:
result = await pg.execute(sql)
except:
log.error(f"Couldn't execute query! {sql}")
result = await inpg.execute(sql)

pbar = tqdm(result)
for record in pbar:
func = record[0]['function']
tmfunc = Teammemberfunctions(func)
sql = f"UPDATE {self.table} SET team_members.team={record[0]['team_id']}, team_members.active={record[0]['active']}, team_members.function='{tmfunc.name}' WHERE id={record[0]['user_id']}"
# print(f"{sql};")
try:
result = await self.pg.execute(sql)
except:
log.error(f"Couldn't execute query! '{sql}'")
print(f"{sql};")
result = await self.pg.execute(sql)

timer.stop()
return True
Expand Down Expand Up @@ -413,10 +397,11 @@ async def mergeAuxTables(self,
Merge more tables from TM into the unified users table.
Args:
inturi (str): The input database
inuri (str): The input database
outuri (str): The output database
"""
await self.connect(outuri)

inpg = PostgresClient()
await inpg.connect(inuri)

Expand All @@ -427,11 +412,11 @@ async def mergeAuxTables(self,
# log.info("UserDB.mergeFavorites worked!")

# These may take a long time to complete
await self.mergeInterests(inpg)
log.info("UserDB.mergeInterests worked!")
# await self.mergeInterests(inpg)
# log.info("UserDB.mergeInterests worked!")

# result = await self.mergeLicenses()
# log.info("UserDB.mergeLicenses worked!")
result = await self.mergeLicenses(inpg)
log.info("UserDB.mergeLicenses worked!")

async def main():
"""This main function lets this class be run standalone by a bash script."""
Expand Down

0 comments on commit 6f1bf0c

Please sign in to comment.