Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Make importing data multi-threaded, it still takes a long time
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Dec 28, 2023
1 parent 67a5bd5 commit 91ab43b
Showing 1 changed file with 91 additions and 27 deletions.
118 changes: 91 additions & 27 deletions tm_admin/users/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,24 @@
from tm_admin.types_tm import Userrole, Mappinglevel, Teammemberfunctions
import concurrent.futures
from cpuinfo import get_cpu_info

from atpbar import atpbar
from tm_admin.dbsupport import DBSupport
from tm_admin.users.users_class import UsersTable
from osm_rawdata.postgres import uriParser, PostgresClient
from tm_admin.types_tm import Userrole
from alive_progress import alive_bar
from tqdm import tqdm
from codetiming import Timer
import threading

# Instantiate logger
log = logging.getLogger(__name__)

# The number of threads is based on the CPU cores
info = get_cpu_info()
cores = info["count"]
cores = info["count"] * 2

def updateThread(
def licensesThread(
data: list,
db: PostgresClient,
):
Expand All @@ -53,8 +57,11 @@ def updateThread(
data (list): The list of records to import
db (PostgresClient): A database connection
"""
array = "licenses"
column = "license"

for record in data:
sql = f" UPDATE users SET licenses = ARRAY[{record[0]['license']}] WHERE id={record[0]['user']}"
sql = f" UPDATE users SET {array} = ARRAY[{record[0]['{column}']}] WHERE id={record[0]['user']}"
# print(sql)
try:
result = db.dbcursor.execute(f"{sql};")
Expand All @@ -63,6 +70,33 @@ def updateThread(

return True

def interestsThread(
interests: list,
db: PostgresClient,
):
"""Thread to handle importing
Args:
data (list): The list of records to import
db (PostgresClient): A database connection
"""
data = dict()
for record in interests:
entry = record[0] # there's only one item in the input data
if entry['user_id'] not in data:
data[entry['user_id']] = list()
data[entry['user_id']].append(entry['interest_id'])

for uid, value in data.items():
sql = f" UPDATE users SET interests = ARRAY{str(value)} WHERE id={uid}"
print(sql)
try:
result = db.dbcursor.execute(f"{sql};")
except:
return False

return True

class UsersDB(DBSupport):
def __init__(self,
dburi: str = "localhost/tm_admin",
Expand All @@ -83,10 +117,15 @@ def __init__(self,

def mergeInterests(self):
table = 'user_interests'
# FIXME: this shouldn't be hardcoded
log.info(f"Merging interests table...")
# One database connection per thread
tmpg = list()
for i in range(0, cores + 1):
# FIXME: this shouldn't be hardcoded
tmpg.append(PostgresClient('localhost/tm_admin'))
pg = PostgresClient('localhost/tm4')
sql = f"SELECT row_to_json({table}) as row FROM {table}"
print(sql)
# print(sql)
try:
result = pg.dbcursor.execute(sql)
except:
Expand All @@ -95,26 +134,37 @@ def mergeInterests(self):

result = pg.dbcursor.fetchall()

data = dict()
for record in result:
entry = record[0] # there's only one item in the input data
if entry['user_id'] not in data:
data[entry['user_id']] = list()
data[entry['user_id']].append(entry['interest_id'])
entries = len(result)
log.debug(f"There are {entries} entries in {table}")
chunk = round(entries / cores)

for uid, value in data.items():
sql = f" UPDATE users SET interests = ARRAY{str(value)} WHERE id={uid}"
print(sql)
try:
result = self.pg.dbcursor.execute(f"{sql};")
except:
return False
# if True:
# interestsThread(result, tmpg[0])

index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
# futures = list()
block = 0
while block <= entries:
log.debug(f"Dispatching Block %d:%d" % (block, block + chunk))
executor.submit(interestsThread, result[block : block + chunk], tmpg[index])
# futures.append(result)
block += chunk
index += 1
# for future in tqdm(futures, desc=f"Dispatching Block {block}:{block + chunk}", total=chunk):
# future.result()
executor.shutdown()

timer.stop
return True

def mergeLicenses(self):
"""Merge data from the TM user_licenses table into TM Admin."""
table = 'user_licenses'
log.info(f"Merging licenses table...")
timer = Timer(text="merging liceneses table took {seconds:.0f}s")
timer.start()

sql = f"SELECT row_to_json({table}) as row FROM {table}"
# One database connection per thread
tmpg = list()
Expand All @@ -135,25 +185,33 @@ def mergeLicenses(self):
chunk = round(entries / cores)

# if True:
# importThread(data, tmpg[0])
# licensesThread(data, tmpg[0])
index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
# futures = list()
block = 0
while block <= entries:
log.debug("Dispatching Block %d:%d" % (block, block + chunk))
result = executor.submit(updateThread, data[block : block + chunk], tmpg[index])
result = executor.submit(licensesThread, data[block : block + chunk], tmpg[index])
# futures.append(result)
block += chunk
index += 1
# for future in tqdm(futures, desc=f"Dispatching Block {block}:{block + chunk}", total=chunk):
# future.result()
executor.shutdown()

timer.stop
return True

def mergeTeam(self):
table = 'team_members'
# FIXME: this shouldn't be hardcoded!
log.info(f"Merging team members table...")
timer = Timer(text="merging team members table took {seconds:.0f}s")
timer.start()
pg = PostgresClient('localhost/tm4')
sql = f"SELECT row_to_json({table}) as row FROM {table}"
print(sql)
#print(sql)
try:
result = pg.dbcursor.execute(sql)
except:
Expand All @@ -165,7 +223,7 @@ def mergeTeam(self):
func = record[0]['function']
tmfunc = Teammemberfunctions(func)
sql = f"UPDATE {self.table} SET team_members.team={record[0]['team_id']}, team_members.active={record[0]['active']}, team_members.function='{tmfunc.name}' WHERE id={record[0]['user_id']}"
print(f"{sql};")
#print(f"{sql};")
try:
# FIXME: this fails to execute, but if I write the out to a file,
# it works just fine.
Expand All @@ -175,9 +233,15 @@ def mergeTeam(self):
log.error(f"Couldn't execute query! '{sql}'")
return False

timer.stop()
return True

def mergeFavorites(self):
table = 'project_favorites'
log.info(f"Merging favorites table...")
# FIXME: this shouldn't be hardcoded!
timer = Timer(text="merging favorites table took {seconds:.0f}s")
timer.start()
pg = PostgresClient('localhost/tm4')
sql = f"SELECT row_to_json({table}) as row FROM {table}"
# print(sql)
Expand All @@ -197,12 +261,12 @@ def mergeFavorites(self):

for uid, value in data.items():
sql = f" UPDATE users SET favorite_projects = ARRAY{str(value)} WHERE id={uid}"
print(sql)
# print(sql)
try:
result = self.pg.dbcursor.execute(f"{sql};")
except:
return False

timer.stop()
return True

# These are just convience wrappers to support the REST API.
Expand Down Expand Up @@ -295,8 +359,8 @@ def main():
if user.mergeInterests():
log.info("UserDB.mergeInterests worked!")

#if user.mergeLicenses():
# log.info("UserDB.mergeLicenses worked!")
if user.mergeLicenses():
log.info("UserDB.mergeLicenses worked!")

if user.mergeFavorites():
log.info("UserDB.mergeFavorites worked!")
Expand Down

0 comments on commit 91ab43b

Please sign in to comment.