From fe5736e1a25a28ff3f75d7a0d509304a850493e2 Mon Sep 17 00:00:00 2001 From: Rob Savoye Date: Sun, 7 Jan 2024 11:12:30 -0700 Subject: [PATCH] fix: Refactor to use postgres views rather than LIMIT and OFFSET --- tm_admin/tasks/tasks.py | 119 +++++++++++++++++++++++++--------------- 1 file changed, 76 insertions(+), 43 deletions(-) diff --git a/tm_admin/tasks/tasks.py b/tm_admin/tasks/tasks.py index cc3188cb..e6f61514 100755 --- a/tm_admin/tasks/tasks.py +++ b/tm_admin/tasks/tasks.py @@ -38,6 +38,7 @@ import threading from cpuinfo import get_cpu_info import psycopg2.extensions +from dateutil.parser import parse # Instantiate logger @@ -56,29 +57,30 @@ def historyThread( Args: data (list): The list of records to import db (PostgresClient): A database connection - tm (TMImport): the input handle """ pbar = tqdm(data) for record in pbar: - action = record['action'] - date = record['action_date'] + entry = record[0] # there is only one entry + action = entry['action'] + date = entry['action_date'] # Remove embedded single quotes text = str() - if record['action_text'] is None: + if entry['action_text'] is None: text = "NULL" else: - text = record['action_text'].replace("'", "") - timestamp = "{:%Y-%m-%dT%H:%M:%S}".format(date) - record['action_date'] = "NULL" #timestamp + text = entry['action_text'].replace("'", "") + # timestamp = "{%Y-%m-%dT%H:%M:%S}".format(date) + timestamp = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") + entry['action_date'] = "NULL" # timestamp # FIXME: currently the schema has this as an int, it's actully an enum func = eval(f"Taskaction.{action}") - record['action'] = func.value + entry['action'] = func.value # columns = f"id, project_id, history.action, history.action_text, history.action_date, history.user_id" # nested = f"{record['id']}, {record['project_id']}, {func.value}, '{text}', '{timestamp}', {record['user_id']}" - sql = f"UPDATE tasks" + sql = f"UPDATE tasks " # sql += f" SET history.action='{func.value}', history.action_text='{text}', history.action_date='{timestamp}' WHERE id={record['task_id']} AND project_id={record['project_id']}" - sql += f" SET history = (SELECT ARRAY_APPEND(history,({func.value}, '{text}', '{timestamp}', {record['user_id']})::task_history)) " - sql += f"WHERE id={record['task_id']} AND project_id={record['project_id']}" + sql += f" SET history = (SELECT ARRAY_APPEND(history,({func.value}, '{text}', '{timestamp}', {entry['user_id']})::task_history)) " + sql += f"WHERE id={entry['task_id']} AND project_id={entry['project_id']}" # print(f"{sql};") try: result = db.dbcursor.execute(sql) @@ -154,8 +156,8 @@ def __init__(self, super().__init__('tasks', dburi) def getPage(self, - offset: int, - count: int, + start: int, + end: int, pg: PostgresClient, table: str, ): @@ -171,27 +173,48 @@ def getPage(self, Returns: (list): The results of the query """ - # It turns out to be much faster to use the columns specified in the - # SELECT statement, and construct our own dictionary than using row_to_json(). - tmp = f"{table.capitalize()}Table()" - tt = eval(tmp) - columns = str(tt.data.keys())[11:-2].replace("'", "") + sql = f"DROP VIEW IF EXISTS {table}_view; CREATE VIEW {table}_view AS SELECT * FROM {table} WHERE project_id>={start} AND project_id<={end}" + # print(sql) + result = list() + try: + pg.dbcursor.execute(sql) + except: + log.error(f"Couldn't execute: {sql}") + + # FIXME: now that we're using views, row_to_json() has acceptable performance. + # # It turns out to be much faster to use the columns specified in the + # # SELECT statement, and construct our own dictionary than using row_to_json(). + # tmp = f"{table.capitalize()}Table()" + # tt = eval(tmp) + # columns = str(tt.data.keys())[11:-2].replace("'", "") # sql = f"SELECT row_to_json({self.table}) as row FROM {self.table} ORDER BY id LIMIT {count} OFFSET {offset}" - sql = f"SELECT {columns} FROM {table} ORDER BY project_id LIMIT {count} OFFSET {offset}" + sql = f"SELECT row_to_json({table}_view) as row FROM {table}_view ORDER BY project_id" + # sql = f"SELECT {columns} FROM {table}_view ORDER BY project_id" # print(sql) result = list() try: pg.dbcursor.execute(sql) - result = pg.dbcursor.fetchall() except: log.error(f"Couldn't execute: {sql}") data = list() - # Since we're not using row_to_json(), build a data structure - for record in result: - table = dict(zip(tt.data.keys(), record)) - data.append(table) + # result = pg.dbcursor.fetchmany(end-start) + result = pg.dbcursor.fetchall() + return result + # for entry in pg.dbcursor.fetchone(): + # # print(entry) + # try: + # parse(entry['action_date']) + # except: + # log.error(f"{entry['action_date']} is not a valid datetime!") + # continue + # data.append(entry) + + # # Since we're not using row_to_json(), build a data structure + # for record in result: + # table = dict(zip(tt.data.keys(), record)) + # data.append(table) return data @@ -209,20 +232,31 @@ def mergeHistory(self): # tmpg.append(PostgresClient('localhost/tm4')) pg = PostgresClient('localhost/tm4') - sql = f"SELECT COUNT(id) FROM task_history" + sql = f"SELECT MIN(id),MAX(id) FROM task_history" # sql = f"SELECT id, project_id, task_id, action, action_text, action_date, user_id FROM task_history" # print(sql) try: - result = pg.dbcursor.execute(sql) + pg.dbcursor.execute(sql) except: log.error(f"Couldn't execute query! {sql}") return False # data = pg.dbcursor.fetchall() - entries = pg.dbcursor.fetchone()[0] - log.debug(f"There are {entries} records in {table}") + result = pg.dbcursor.fetchone() + minid = result[0] + maxid = result[1] + log.debug(f"There are {maxid} records in {table}") + + records = round(maxid/cores) + blocks = list() + previous = 0 + for id in range(0, maxid, 1000): + if id == 0: + continue + blocks.append([previous, id + 1]) + previous = id # This input data is huge! Make smaller chunks - chunk = round((entries / cores)) + # chunk = round((entries / cores)) # Some tables in the input database are huge, and can either core # dump python, or have performance issues. Past a certain threshold @@ -231,30 +265,29 @@ def mergeHistory(self): with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: # adminpg = PostgresClient('localhost/tm_admin')) # tmpg = PostgresClient('localhost/tm4')) - for block in range(0, entries, chunk): + for entry in blocks: + # for block in range(0, entries, chunk): adminpg = PostgresClient('localhost/tm_admin') - index = 0 try: - data = self.getPage(block, chunk, pg, table) + data = self.getPage(entry[0], entry[1], pg, table) if len(data) == 0: log.error(f"getPage() returned no data for {block}:{chunk}") - data = self.getPage(block, chunk, pg, table) - if len(data) == 0: - log.error(f"getPage() returned no data for {block}:{chunk}, second attempt") - continue + # data = self.getPage(block, chunk, pg, table) + # if len(data) == 0: + # log.error(f"getPage() returned no data for {block}:{chunk}, second attempt") + # continue except: #tmpg[index].dbshell.close() #tmpg[index] = PostgresClient('localhost/tm4') log.error(f"Couldn't get a page of data!") continue try: - log.error(f"Dispatching thread {index}...") - result = historyThread(data, adminpg) - # result = executor.submit(historyThread, data, adminpg) + log.error(f"Dispatching thread...") + # result = historyThread(data, adminpg) + result = executor.submit(historyThread, data, adminpg) index += 1 except: - log.error(f"FIXME: {index}") - index = 0 + log.error(f"Couldn't dispatch thread for block {entry[0]}-{entry[1]}") executor.shutdown() # # cleanup the connections @@ -332,8 +365,8 @@ def main(): tasks = TasksDB(args.uri) # tasks.mergeAxuTables() - # tasks.mergeHistory() - tasks.mergeInvalidations() + tasks.mergeHistory() + # tasks.mergeInvalidations() # # user.resetSequence() # all = task.getAll()