Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions ingest/tiger_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,49 @@ class DatabaseManager:
def __init__(self, database_uri, embedding_model=None):
self.database_uri = database_uri
self.embedding_model = embedding_model
self.finalize_queries: list[SQL] = []

try:
self.connection = psycopg.connect(self.database_uri)
except Exception as e:
raise RuntimeError(f"Database connection failed: {e}")

def create_tmp_tables(self):
def initialize(self):
with self.connection.cursor() as cursor:
cursor.execute(SQL("DROP TABLE IF EXISTS {schema}.timescale_chunks_tmp").format(schema=Identifier(schema)))
cursor.execute(SQL("DROP TABLE IF EXISTS {schema}.timescale_pages_tmp").format(schema=Identifier(schema)))
cursor.execute(SQL("CREATE TABLE {schema}.timescale_pages_tmp (LIKE {schema}.timescale_pages INCLUDING ALL EXCLUDING CONSTRAINTS)").format(schema=Identifier(schema)))
cursor.execute(SQL("CREATE TABLE {schema}.timescale_chunks_tmp (LIKE {schema}.timescale_chunks INCLUDING ALL EXCLUDING CONSTRAINTS)").format(schema=Identifier(schema)))
cursor.execute(SQL("ALTER TABLE {schema}.timescale_chunks_tmp ADD FOREIGN KEY (page_id) REFERENCES {schema}.timescale_pages_tmp(id) ON DELETE CASCADE").format(schema=Identifier(schema)))

# The bm25 indexes have a bug that prevent inserting data into a table
# underneath non-public schemas that has them, so we need to make remove
# them from the tmp tables and recreate them after renaming.
cursor.execute(
"""
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = %s
AND tablename LIKE %s
AND indexdef LIKE %s
""",
["docs", "timescale%_tmp%", "%bm25%"],
)
rows = cursor.fetchall()
for row in rows:
index_name = row[0]
index_def = row[1]
tmp_index_def = index_def.replace("_tmp", "")
cursor.execute(
SQL("DROP INDEX IF EXISTS {schema}.{index_name}").format(
schema=Identifier(schema),
index_name=Identifier(index_name),
)
)
self.finalize_queries.append(SQL(tmp_index_def))
self.connection.commit()

def rename_objects(self):
def finalize(self):
"""Rename the temporary tables and their indexes to the final names, dropping the old tables if they exist"""
with self.connection.cursor() as cursor:
cursor.execute(SQL("DROP TABLE IF EXISTS {schema}.timescale_chunks").format(schema=Identifier(schema)))
Expand Down Expand Up @@ -135,6 +163,9 @@ def rename_objects(self):
)
)

for query in self.finalize_queries:
cursor.execute(query)

self.connection.commit()

def save_page(self, url, domain, filename, content_length, chunking_method='header'):
Expand Down Expand Up @@ -1035,7 +1066,7 @@ def get_text_embeddings(self, texts):

embedding_model = OpenAIEmbeddingWrapper(client)
db_manager = DatabaseManager(database_uri=args.database_uri, embedding_model=embedding_model)
db_manager.create_tmp_tables()
db_manager.initialize()
else:
file_manager = FileManager(args.output_dir)

Expand All @@ -1056,9 +1087,9 @@ def get_text_embeddings(self, texts):
# Create database indexes after scraping completes
if args.storage_type == 'database' and not args.skip_indexes and db_manager:
try:
print("Renaming temporary tables to final names...")
db_manager.rename_objects()
print("Database indexes created successfully!")
print("Finalizing database...")
db_manager.finalize()
print("Database finalized successfully.")
except Exception as e:
print(f"Failed to finish database: {e}")
raise SystemExit(1)
Expand Down
4 changes: 2 additions & 2 deletions src/apis/kewordSearchTigerDocs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ SELECT
id::int,
content,
metadata::text,
-(content <@> to_tpquery($1, 'docs.timescale_chunks_content_idx')) as score
-(content <@> to_bm25query($1, 'docs.timescale_chunks_content_idx')) as score
FROM ${schema}.timescale_chunks
ORDER BY content <@> to_tpquery($1, 'docs.timescale_chunks_content_idx')
ORDER BY content <@> to_bm25query($1, 'docs.timescale_chunks_content_idx')
LIMIT $2
`,
[keywords, limit || 10],
Expand Down