Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Refactor to use postgres views rather than LIMIT and OFFSET
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Jan 7, 2024
1 parent 96d1921 commit fe5736e
Showing 1 changed file with 76 additions and 43 deletions.
119 changes: 76 additions & 43 deletions tm_admin/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import threading
from cpuinfo import get_cpu_info
import psycopg2.extensions
from dateutil.parser import parse


# Instantiate logger
Expand All @@ -56,29 +57,30 @@ def historyThread(
Args:
data (list): The list of records to import
db (PostgresClient): A database connection
tm (TMImport): the input handle
"""
pbar = tqdm(data)
for record in pbar:
action = record['action']
date = record['action_date']
entry = record[0] # there is only one entry
action = entry['action']
date = entry['action_date']
# Remove embedded single quotes
text = str()
if record['action_text'] is None:
if entry['action_text'] is None:
text = "NULL"
else:
text = record['action_text'].replace("'", "")
timestamp = "{:%Y-%m-%dT%H:%M:%S}".format(date)
record['action_date'] = "NULL" #timestamp
text = entry['action_text'].replace("'", "")
# timestamp = "{%Y-%m-%dT%H:%M:%S}".format(date)
timestamp = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
entry['action_date'] = "NULL" # timestamp
# FIXME: currently the schema has this as an int, it's actully an enum
func = eval(f"Taskaction.{action}")
record['action'] = func.value
entry['action'] = func.value
# columns = f"id, project_id, history.action, history.action_text, history.action_date, history.user_id"
# nested = f"{record['id']}, {record['project_id']}, {func.value}, '{text}', '{timestamp}', {record['user_id']}"
sql = f"UPDATE tasks"
sql = f"UPDATE tasks "
# sql += f" SET history.action='{func.value}', history.action_text='{text}', history.action_date='{timestamp}' WHERE id={record['task_id']} AND project_id={record['project_id']}"
sql += f" SET history = (SELECT ARRAY_APPEND(history,({func.value}, '{text}', '{timestamp}', {record['user_id']})::task_history)) "
sql += f"WHERE id={record['task_id']} AND project_id={record['project_id']}"
sql += f" SET history = (SELECT ARRAY_APPEND(history,({func.value}, '{text}', '{timestamp}', {entry['user_id']})::task_history)) "
sql += f"WHERE id={entry['task_id']} AND project_id={entry['project_id']}"
# print(f"{sql};")
try:
result = db.dbcursor.execute(sql)
Expand Down Expand Up @@ -154,8 +156,8 @@ def __init__(self,
super().__init__('tasks', dburi)

def getPage(self,
offset: int,
count: int,
start: int,
end: int,
pg: PostgresClient,
table: str,
):
Expand All @@ -171,27 +173,48 @@ def getPage(self,
Returns:
(list): The results of the query
"""
# It turns out to be much faster to use the columns specified in the
# SELECT statement, and construct our own dictionary than using row_to_json().
tmp = f"{table.capitalize()}Table()"
tt = eval(tmp)
columns = str(tt.data.keys())[11:-2].replace("'", "")
sql = f"DROP VIEW IF EXISTS {table}_view; CREATE VIEW {table}_view AS SELECT * FROM {table} WHERE project_id>={start} AND project_id<={end}"
# print(sql)
result = list()
try:
pg.dbcursor.execute(sql)
except:
log.error(f"Couldn't execute: {sql}")

# FIXME: now that we're using views, row_to_json() has acceptable performance.
# # It turns out to be much faster to use the columns specified in the
# # SELECT statement, and construct our own dictionary than using row_to_json().
# tmp = f"{table.capitalize()}Table()"
# tt = eval(tmp)
# columns = str(tt.data.keys())[11:-2].replace("'", "")

# sql = f"SELECT row_to_json({self.table}) as row FROM {self.table} ORDER BY id LIMIT {count} OFFSET {offset}"
sql = f"SELECT {columns} FROM {table} ORDER BY project_id LIMIT {count} OFFSET {offset}"
sql = f"SELECT row_to_json({table}_view) as row FROM {table}_view ORDER BY project_id"
# sql = f"SELECT {columns} FROM {table}_view ORDER BY project_id"
# print(sql)
result = list()
try:
pg.dbcursor.execute(sql)
result = pg.dbcursor.fetchall()
except:
log.error(f"Couldn't execute: {sql}")

data = list()
# Since we're not using row_to_json(), build a data structure
for record in result:
table = dict(zip(tt.data.keys(), record))
data.append(table)
# result = pg.dbcursor.fetchmany(end-start)
result = pg.dbcursor.fetchall()
return result
# for entry in pg.dbcursor.fetchone():
# # print(entry)
# try:
# parse(entry['action_date'])
# except:
# log.error(f"{entry['action_date']} is not a valid datetime!")
# continue
# data.append(entry)

# # Since we're not using row_to_json(), build a data structure
# for record in result:
# table = dict(zip(tt.data.keys(), record))
# data.append(table)

return data

Expand All @@ -209,20 +232,31 @@ def mergeHistory(self):
# tmpg.append(PostgresClient('localhost/tm4'))

pg = PostgresClient('localhost/tm4')
sql = f"SELECT COUNT(id) FROM task_history"
sql = f"SELECT MIN(id),MAX(id) FROM task_history"
# sql = f"SELECT id, project_id, task_id, action, action_text, action_date, user_id FROM task_history"
# print(sql)
try:
result = pg.dbcursor.execute(sql)
pg.dbcursor.execute(sql)
except:
log.error(f"Couldn't execute query! {sql}")
return False
# data = pg.dbcursor.fetchall()
entries = pg.dbcursor.fetchone()[0]
log.debug(f"There are {entries} records in {table}")
result = pg.dbcursor.fetchone()
minid = result[0]
maxid = result[1]
log.debug(f"There are {maxid} records in {table}")

records = round(maxid/cores)
blocks = list()
previous = 0
for id in range(0, maxid, 1000):
if id == 0:
continue
blocks.append([previous, id + 1])
previous = id

# This input data is huge! Make smaller chunks
chunk = round((entries / cores))
# chunk = round((entries / cores))

# Some tables in the input database are huge, and can either core
# dump python, or have performance issues. Past a certain threshold
Expand All @@ -231,30 +265,29 @@ def mergeHistory(self):
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
# adminpg = PostgresClient('localhost/tm_admin'))
# tmpg = PostgresClient('localhost/tm4'))
for block in range(0, entries, chunk):
for entry in blocks:
# for block in range(0, entries, chunk):
adminpg = PostgresClient('localhost/tm_admin')
index = 0
try:
data = self.getPage(block, chunk, pg, table)
data = self.getPage(entry[0], entry[1], pg, table)
if len(data) == 0:
log.error(f"getPage() returned no data for {block}:{chunk}")
data = self.getPage(block, chunk, pg, table)
if len(data) == 0:
log.error(f"getPage() returned no data for {block}:{chunk}, second attempt")
continue
# data = self.getPage(block, chunk, pg, table)
# if len(data) == 0:
# log.error(f"getPage() returned no data for {block}:{chunk}, second attempt")
# continue
except:
#tmpg[index].dbshell.close()
#tmpg[index] = PostgresClient('localhost/tm4')
log.error(f"Couldn't get a page of data!")
continue
try:
log.error(f"Dispatching thread {index}...")
result = historyThread(data, adminpg)
# result = executor.submit(historyThread, data, adminpg)
log.error(f"Dispatching thread...")
# result = historyThread(data, adminpg)
result = executor.submit(historyThread, data, adminpg)
index += 1
except:
log.error(f"FIXME: {index}")
index = 0
log.error(f"Couldn't dispatch thread for block {entry[0]}-{entry[1]}")
executor.shutdown()

# # cleanup the connections
Expand Down Expand Up @@ -332,8 +365,8 @@ def main():

tasks = TasksDB(args.uri)
# tasks.mergeAxuTables()
# tasks.mergeHistory()
tasks.mergeInvalidations()
tasks.mergeHistory()
# tasks.mergeInvalidations()

# # user.resetSequence()
# all = task.getAll()
Expand Down

0 comments on commit fe5736e

Please sign in to comment.