Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Document docstore/data not being created on occassion #15888

Open
jcpaulco opened this issue Sep 6, 2024 · 20 comments
Open

[Bug]: Document docstore/data not being created on occassion #15888

jcpaulco opened this issue Sep 6, 2024 · 20 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@jcpaulco
Copy link

jcpaulco commented Sep 6, 2024

Bug Description

Occasionally the document docstore/data is not being created (KVDocumentStore) when add_documents is called
However docstore/metadata is consistently being created.

This is leading to an issue when aget_document is called and the document cannot be found because it does not exist for the /data collection

Version

0.10.67.post1

Steps to Reproduce

I've been unable to intentionally reproduce this. However we have several thousand occurrences of the docstore/data document not being created when add_documents is called.

99% of the time it works and I can't determine what the cause of failure to create this is.

Relevant Logs/Tracbacks

async def insert_faq(self: Self, params: FAQCreateParams) -> FAQ:
        if "uid" in params:
            document_exists = await self.docstore.adocument_exists(params["uid"])
            if document_exists:
                raise ValueError(f"[FAQAlreadyExists] uid: {params['uid']}")

        faq = FAQ.from_params(**params.model_dump())
        faq_document = faq.to_document()

        self.index.insert(faq_document)
        await self.docstore.async_add_documents([faq_document])

        return faq
@jcpaulco jcpaulco added bug Something isn't working triage Issue needs to be triaged/prioritized labels Sep 6, 2024
Copy link

dosubot bot commented Sep 6, 2024

Hey there, @jcpaulco! I'm here to help you with any bugs, questions, or contributions you have. Let's squash those bugs together!

The issue with the docstore/data document not being created occasionally when add_documents is called in version 0.10.67.post1 is due to the store_text parameter. The creation of entries in the docstore/data collection is conditional on the store_text parameter being True. If store_text is False, the node_kv_pair will be None, and no entries will be added to the docstore/data collection.

Here is the relevant code snippet:

def _get_kv_pairs_for_insert(
        self, node: BaseNode, ref_doc_info: Optional[RefDocInfo], store_text: bool
    ) -> Tuple[
        Optional[Tuple[str, dict]],
        Optional[Tuple[str, dict]],
        Optional[Tuple[str, dict]],
    ]:
        node_kv_pair = None
        metadata_kv_pair = None
        ref_doc_kv_pair = None

        node_key = node.node_id
        data = doc_to_json(node)
        if store_text:
            node_kv_pair = (node_key, data)

        # update doc_collection if needed
        metadata = {"doc_hash": node.hash}
        if ref_doc_info is not None and node.ref_doc_id:
            if node.node_id not in ref_doc_info.node_ids:
                ref_doc_info.node_ids.append(node.node_id)
            if not ref_doc_info.metadata:
                ref_doc_info.metadata = node.metadata or {}

            # update metadata with map
            metadata["ref_doc_id"] = node.ref_doc_id

            metadata_kv_pair = (node_key, metadata)
            ref_doc_kv_pair = (node.ref_doc_id, ref_doc_info.to_dict())
        else:
            metadata_kv_pair = (node_key, metadata)

        return node_kv_pair, metadata_kv_pair, ref_doc_kv_pair

In the add_documents method, the node_kv_pair is only appended to node_kv_pairs if it is not None:

def add_documents(
        self,
        nodes: Sequence[BaseNode],
        allow_update: bool = True,
        batch_size: Optional[int] = None,
        store_text: bool = True,
    ) -> None:
        ...
        for node in nodes:
            ...
            (
                node_kv_pair,
                metadata_kv_pair,
                ref_doc_kv_pair,
            ) = self._get_kv_pairs_for_insert(node, ref_doc_info, store_text)

            if node_kv_pair is not None:
                node_kv_pairs.append(node_kv_pair)
            if metadata_kv_pair is not None:
                metadata_kv_pairs.append(metadata_kv_pair)
            if ref_doc_kv_pair is not None:
                key = ref_doc_kv_pair[0]
                if key not in ref_doc_kv_pairs:
                    ref_doc_kv_pairs[key] = []
                ref_doc_kv_pairs[key].append(ref_doc_kv_pair)

        self._kvstore.put_all(
            node_kv_pairs,
            collection=self._node_collection,
            batch_size=batch_size,
        )
        self._kvstore.put_all(
            metadata_kv_pairs,
            collection=self._metadata_collection,
            batch_size=batch_size,
        )
        ...

Therefore, if store_text is False, no entries will be added to the docstore/data collection, which explains why it is not being created occasionally [1].

To continue talking to Dosu, mention @dosu.

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

We're not passing any value for store_text and the value of store_text should be defaulting to True

@logan-markewich
Copy link
Collaborator

@jcpaulco looking at the source code for add_documents(), I don't really see how that could be possible 🤯

The only thing that comes to mind is, maybe for the specific docstore/kvstore backend you are using, put_all doesn't create the collection if it's empty? But even then, docstore/data is holding the actual nodes/documents themselves, so that doesn't really make sense

@logan-markewich
Copy link
Collaborator

Maybe put_all() is silently failing? Or there's some delay in your store, and the user queries before its actually fully inserted/created?

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

Or there's some delay in your store, and the user queries before its actually fully inserted/created?

we thought perhaps this but its never getting created - the issues were seeing data back to several months ago

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

it looks like aput_all() is being called - should it be calling put_all() instead?

@logan-markewich
Copy link
Collaborator

aput_all() is fine, I was just reading the sync-code

The async version is here (and is the exact same 😅)

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

ok good

yeah it looks like this should take care of it appropriately so it seems very odd that its not being created

await asyncio.gather(
            self._kvstore.aput_all(
                node_kv_pairs,
                collection=self._node_collection,
                batch_size=batch_size,
            ),
            self._kvstore.aput_all(
                metadata_kv_pairs,
                collection=self._metadata_collection,
                batch_size=batch_size,
            ),
            self._kvstore.aput_all(
                ref_doc_kv_pairs,
                collection=self._ref_doc_collection,
                batch_size=batch_size,
            ),
        )

@logan-markewich
Copy link
Collaborator

@jcpaulco which docstore/kvstore are you using, out of curiosity? Perhaps there's something about its specific implementation of aput_all()

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

here's the docstore we're creating. It looks like the kvstore is the default BaseKVStore i think

FAQs.docstore = docstore or PostgresDocumentStore.from_uri(
            uri=config.GTB_ASYNC_DATABASE_URL,
            table_name="faq_embeddings_docstore",
            use_jsonb=True,
        )

@logan-markewich
Copy link
Collaborator

ah ok, so postgres

I'm far from an expert on Postgres, but maybe you can spot something in this code that might cause this to happen?

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

thanks - i'm getting thrown into a new project here and have little to no idea what i'm digging through so i appreciate the patience

I'm no expert on postgres either but from my eyes it seems alright..?

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

in your opinion could the line before be an issue? self.index.insert(faq_document)

if not self._initial_setup_done:
            self._initial_setup(**kwargs)
            FAQs._initial_setup_done = True

        self.index = index or VectorStoreIndex(
            nodes=[],
            vector_store=self.vector_store,
            storage_context=self.storage_context,
            transformations=self.transformations,
            embed_model=self.embed_model,
            use_async=True,
        )

    def _initial_setup(
        self: Self,
        vector_store: PGVectorStore = None,
        docstore: PostgresDocumentStore = None,
        storage_context: StorageContext = None,
        dimensions: int = 1024,
    ) -> None:
        nest_asyncio.apply()
        ...

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

it looks like this sync call insert calls insert_nodes() which calls add_documents()

perhaps that isn't finished adding the documents and then await self.docstore.async_add_documents([faq_document]) is called creating a race condition?

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

could this be related actually?
#14940

@logan-markewich
Copy link
Collaborator

Hmm, insert() is sync, and therefore blocking, so I don't see that being an issue (unless you are multithreading stuff)

I don't think that issue is related, that's more so talking about the lack of an async insert function on the index class

@jcpaulco
Copy link
Author

jcpaulco commented Sep 6, 2024

Nope - no threading here

@jcpaulco
Copy link
Author

jcpaulco commented Sep 9, 2024

@logan-markewich Update here - We found some logs that provide a good entry point to the issue.

It looks like we're getting a TimeoutError on this call await self.docstore.async_add_documents([faq_document]). Appears to get an issue with the ORM's connection to the DB so it seems the postgres KVDocstore is indeed the issue

File "/code/api/services/faqs/faqs.py", line 348, in insert_faq
    await self.docstore.async_add_documents([faq_document])

File "/code/.venv/lib/python3.11/site-packages/llama_index/core/storage/docstore/keyval_docstore.py", line 331, in async_add_documents
    await asyncio.gather(
  
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 339, in __wakeup
    future.result()
  
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 267, in __step
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/llama_index/storage/kvstore/postgres/base.py", line 326, in aput_all
    await session.execute(stmt, params)
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2226, in _execute_internal
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2095, in _connection_for_bind
    return trans._connection_for_bind(engine, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "<string>", line 2, in _connection_for_bind
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
                ^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1189, in _connection_for_bind
    conn = bind.connect()
           ^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/opentelemetry/instrumentation/sqlalchemy/engine.py", line 98, in _wrap_connect_internal
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3276, in connect
    return self._connection_cls(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 146, in __init__
    self._dbapi_connection = engine.raw_connection()
                             ^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 3300, in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 179, in _do_get
    with util.safe_reraise():
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py", line 177, in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
    self.__connect()
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 900, in __connect
    with util.safe_reraise():
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 896, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/engine/create.py", line 643, in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 620, in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 932, in connect
    await_only(creator_fn(*arg, **kw)),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  
File "/code/.venv/lib/python3.11/site-packages/asyncpg/connection.py", line 2328, in connect
    async with compat.timeout(timeout):
  
File "/usr/local/lib/python3.11/asyncio/timeouts.py", line 111, in __aexit__
    raise TimeoutError from exc_val

@jcpaulco
Copy link
Author

@logan-markewich what's your thoughts on this?

@jcpaulco
Copy link
Author

It feels like these kvstore.aput_all() calls should return either the success or failure of the doc creation and perhaps include a retry as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

2 participants