Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Make importing the primary table multi-threaded to improve perfo…
Browse files Browse the repository at this point in the history
…rmance
  • Loading branch information
rsavoye committed Dec 29, 2023
1 parent 87b6524 commit 61c1b9d
Showing 1 changed file with 51 additions and 4 deletions.
55 changes: 51 additions & 4 deletions tm_admin/tmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from progress.bar import Bar, PixelBar
from tm_admin.types_tm import Userrole, Mappinglevel, Organizationtype, Taskcreationmode, Projectstatus, Permissions, Projectpriority, Projectdifficulty, Mappingtypes, Editors, Teamvisibility, Taskstatus
from tm_admin.yamlfile import YamlFile
import concurrent.futures
from cpuinfo import get_cpu_info
# from tm_admin.users.users import createSQLValues
# from tm_admin.organizations.organizations import createSQLValues

Expand All @@ -40,6 +42,25 @@
import tm_admin as tma
rootdir = tma.__path__[0]

# The number of threads is based on the CPU cores
info = get_cpu_info()
cores = info["count"]

def importThread(
data: list,
db: PostgresClient,
tm,
):
"""Thread to handle importing
Args:
data (list): The list of records to import
db (PostgresClient): A database connection
tm (TMImport): the input handle
"""
tm.writeAllData(data, tm.table)

return True

class TMImport(object):
def __init__(self,
Expand Down Expand Up @@ -71,6 +92,7 @@ def __init__(self,
self.admindb = PostgresClient(outuri)
self.columns = list()
self.data = list()
self.table = table

yaml = YamlFile(f"{rootdir}/{table}/{table}.yaml")
# yaml.dump()
Expand Down Expand Up @@ -155,6 +177,7 @@ def writeAllData(self,
data (list): The table data from TM
table str(): The table to get the columns for.
"""
#log.debug(f"Writing block {len(data)} to the database")
builtins = ['int32', 'int64', 'string', 'timestamp', 'bool']
#bar = Bar('Importing into TMAdmin', max=len(data))
# columns2 = self.getColumns(table)
Expand Down Expand Up @@ -328,13 +351,37 @@ def main():
)

doit = TMImport(args.inuri, args.outuri, args.table)
table = args.table
# You have to love subtle cultural spelling differences.
if table == 'organizations':
data = list
if args.table == 'organizations':
data = doit.getAllData('organisations')
else:
data = doit.getAllData(table)
doit.writeAllData(data, table)
data = doit.getAllData(args.table)

entries = len(data)
log.debug(f"There are {entries} entries in {args.table}")
chunk = round(entries / cores)

tmpg = list()
for i in range(0, cores + 1):
tmpg.append(PostgresClient(args.outuri))


if entries < 10000:
importThread(data, tmpg[0], doit)
quit()

index = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
block = 0
while block <= entries:
log.debug("Dispatching Block %d:%d" % (block, block + chunk))
result = executor.submit(importThread, data[block : block + chunk], tmpg[index], doit)
block += chunk
index += 1
# for future in tqdm(futures, desc=f"Dispatching Block {block}:{block + chunk}", total=chunk):
# future.result()
executor.shutdown()

if __name__ == "__main__":
"""This is just a hook so this file can be run standalone during development."""
Expand Down

0 comments on commit 61c1b9d

Please sign in to comment.