@@ -63,21 +63,38 @@ class DatabaseManager:
6363 def __init__ (self , database_uri , embedding_model = None ):
6464 self .database_uri = database_uri
6565 self .embedding_model = embedding_model
66+ self .finalize_queries : list [SQL ] = []
67+
6668 try :
6769 self .connection = psycopg .connect (self .database_uri )
6870 except Exception as e :
6971 raise RuntimeError (f"Database connection failed: { e } " )
7072
71- def create_tmp_tables (self ):
73+ def initialize (self ):
7274 with self .connection .cursor () as cursor :
7375 cursor .execute (SQL ("DROP TABLE IF EXISTS {schema}.timescale_chunks_tmp" ).format (schema = Identifier (schema )))
7476 cursor .execute (SQL ("DROP TABLE IF EXISTS {schema}.timescale_pages_tmp" ).format (schema = Identifier (schema )))
7577 cursor .execute (SQL ("CREATE TABLE {schema}.timescale_pages_tmp (LIKE {schema}.timescale_pages INCLUDING ALL EXCLUDING CONSTRAINTS)" ).format (schema = Identifier (schema )))
7678 cursor .execute (SQL ("CREATE TABLE {schema}.timescale_chunks_tmp (LIKE {schema}.timescale_chunks INCLUDING ALL EXCLUDING CONSTRAINTS)" ).format (schema = Identifier (schema )))
7779 cursor .execute (SQL ("ALTER TABLE {schema}.timescale_chunks_tmp ADD FOREIGN KEY (page_id) REFERENCES {schema}.timescale_pages_tmp(id) ON DELETE CASCADE" ).format (schema = Identifier (schema )))
80+
81+ # The bm25 indexes have a bug that prevent inserting data into a table
82+ # underneath non-public schemas that has them, so we need to make remove
83+ # them from the tmp tables and recreate them after renaming.
84+ cursor .execute ("""
85+ SELECT indexdef
86+ FROM pg_indexes
87+ WHERE schemaname = %s
88+ AND tablename LIKE %s
89+ AND indexdef LIKE %s
90+ """ , ['docs' , 'timescale%_tmp_%' , '%bm25%' ])
91+ for row in cursor .fetchall ():
92+ index_def = row [0 ]
93+ tmp_index_def = index_def .replace ('_tmp' , '' )
94+ self .finalize_queries .append (SQL (tmp_index_def ))
7895 self .connection .commit ()
7996
80- def rename_objects (self ):
97+ def finalize (self ):
8198 """Rename the temporary tables and their indexes to the final names, dropping the old tables if they exist"""
8299 with self .connection .cursor () as cursor :
83100 cursor .execute (SQL ("DROP TABLE IF EXISTS {schema}.timescale_chunks" ).format (schema = Identifier (schema )))
@@ -135,6 +152,9 @@ def rename_objects(self):
135152 )
136153 )
137154
155+ for query in self .finalize_queries :
156+ cursor .execute (query )
157+
138158 self .connection .commit ()
139159
140160 def save_page (self , url , domain , filename , content_length , chunking_method = 'header' ):
@@ -1035,7 +1055,7 @@ def get_text_embeddings(self, texts):
10351055
10361056 embedding_model = OpenAIEmbeddingWrapper (client )
10371057 db_manager = DatabaseManager (database_uri = args .database_uri , embedding_model = embedding_model )
1038- db_manager .create_tmp_tables ()
1058+ db_manager .initialize ()
10391059 else :
10401060 file_manager = FileManager (args .output_dir )
10411061
@@ -1056,9 +1076,9 @@ def get_text_embeddings(self, texts):
10561076 # Create database indexes after scraping completes
10571077 if args .storage_type == 'database' and not args .skip_indexes and db_manager :
10581078 try :
1059- print ("Renaming temporary tables to final names ..." )
1060- db_manager .rename_objects ()
1061- print ("Database indexes created successfully! " )
1079+ print ("Finalizing database ..." )
1080+ db_manager .finalize ()
1081+ print ("Database finalized successfully. " )
10621082 except Exception as e :
10631083 print (f"Failed to finish database: { e } " )
10641084 raise SystemExit (1 )
0 commit comments