Description
sqlalchemy.exc.InvalidRequestError: Table 'langchain_pg_collection' is already defined for this MetaData instance
This error occurs when you create two instances of the PGVector instance at the same time. Since the models (CollectionStore and EmbeddingStore) are created "dynamically", they can be instantiated at the same time, since SQLAlchemy uses a metadata cache, this introduces a race condition.
must_exist = kw.pop("must_exist", kw.pop("mustexist", False))
key = _get_table_key(name, schema)
if key in metadata.tables:
if not keep_existing and not extend_existing and bool(args):
raise exc.InvalidRequestError(
f"Table '{key}' is already defined for this MetaData "
"instance. Specify 'extend_existing=True' "
"to redefine "
"options and columns on an "
"existing Table object."
)
I have a couple of suggestions, one can be a simple mutex on the _get_embedding_collection_store method, and another one can be defining the models with extend_existing or keep_existing table_args. Finally, receiving table args by parameter and sending them to the models.
What do you think? I can create the PR, but I want to know what you prefer.
I would go with adding table args with "extend_existing" as True
@pytest.mark.parametrize("execution_number", range(10))
def test_race_condition(execution_number):
from langchain_postgres.vectorstores import PGVector
from langchain_openai import OpenAIEmbeddings
from threading import Thread
def store():
PGVector(
connection="postgresql+psycopg://postgres:postgres@localhost:5432/embeddings",
embeddings=OpenAIEmbeddings(),
collection_name="test_collection",
)
Thread(target=store).start()
store()
EDIT: There is also a race condition on the table creation, a mutex makes sense in there