Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Scalability issue with multiple simultaneous DIDExchange requests #3492

Open
ff137 opened this issue Feb 4, 2025 · 12 comments
Open

🐛 Scalability issue with multiple simultaneous DIDExchange requests #3492

ff137 opened this issue Feb 4, 2025 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@ff137
Copy link
Contributor

ff137 commented Feb 4, 2025

When multiple tenants simultaneously request a DIDExchange connection with an issuer's public DID, several unhandled exceptions are raised, causing all requested connections to fail.

The handling logic and auto-complete flows associated with the DIDExchange request do not report any error to the clients that made the request, leaving their connection record in the request-sent state.

The issuer does not receive any request-received records as expected - not even one of the many requests.

Note: this is running the latest ACA-Py release, with askar 0.4.3

Steps to Reproduce

There are many steps required to reproduce this in acapy alone... so the simplest way to reproduce this would be to check out our acapy-cloud repo (previously aries-cloudapi-python), where a simple test script can do all the setup and replicate it for you: https://github.com/didx-xyz/acapy-cloud

As a summary - besides all the steps for onboarding an issuer, and registering their public DID - here's how to replicate this issue:

  1. Create multiple tenants (reliably fails for me with 10)
  2. For each one, initiate a DIDExchange connection request (POST /didexchange/create-request) using use_public_did to set the issuer's public DID for the request.
  3. Observe the unhandled exceptions raised in the system logs.
  4. Check the state of the connection records for the tenants and the issuer.

The above steps can be achieved:

  1. Follow the quick start guide in acapy-cloud (clone the repo, install prerequisites, spin up the stack)
  2. Cherry pick the following commit, to get my test script:
    git cherry-pick 8153a47eb62bea6de75ef132d0600bec8e76cab6
    This will get you a test file which you can check out at app/tests/e2e/test_many_connections.py
  3. Spin up the stack: mise run tilt:up, and wait for services to be up and running (visit localhost:10350)
  4. Run the test: pytest app/tests/e2e/test_many_connections.py
  5. Click on the Multitenant-Agent tab in the Tilt UI (localhost:10350) to view logs

The test should fail with "Connection 0 failed with exception" and then "expected webhook not received".

Under Multitenant-Agent logs, you'll see many exceptions being raised, one for each request.

The stack trace seems to reveal that it's to do with a timeout waiting to open an askar session:

2025-02-04 11:14:49,084 acapy_agent.core.dispatcher ERROR Handler error: Dispatcher.handle_v1_message
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/aries_askar/store.py", line 773, in _open
    await bindings.session_start(self._store, self._profile, self._is_txn),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/aries_askar/bindings/__init__.py", line 266, in session_start
    handle = await invoke_async(
             ^^^^^^^^^^^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/aries_askar/bindings/lib.py", line 393, in invoke_async
    return await self.loaded.invoke_async(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/futures.py", line 289, in __await__
    yield self  # This tells Task to wait for completion.
    ^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 385, in __wakeup
    future.result()
  File "/usr/local/lib/python3.12/asyncio/futures.py", line 197, in result
    raise self._make_cancelled_error()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/core/dispatcher.py", line 257, in handle_v1_message
    await handler(context, responder)
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/protocols/didexchange/v1_0/handlers/request_handler.py", line 36, in handle
    conn_rec = await mgr.receive_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/protocols/didexchange/v1_0/manager.py", line 558, in receive_request
    conn_rec = await self._receive_request_public_did(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/protocols/didexchange/v1_0/manager.py", line 704, in _receive_request_public_did
    await self._extract_and_record_did_doc_info(request)
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/protocols/didexchange/v1_0/manager.py", line 725, in _extract_and_record_did_doc_info
    await self.store_did_document(conn_did_doc)
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/connections/base_manager.py", line 380, in store_did_document
    async with self._profile.session() as session:
               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/core/profile.py", line 197, in __aenter__
    await self._setup()
  File "/home/aries/.local/lib/python3.12/site-packages/acapy_agent/askar/profile.py", line 252, in _setup
    self._handle = await asyncio.wait_for(self._opener, 10)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/tasks.py", line 519, in wait_for
    async with timeouts.timeout(timeout):
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

PS: Log levels can be modified in helm/acapy-cloud/conf/local/multitenant-agent.yaml, e.g. set ACAPY_LOG_LEVEL to debug


Please let me know if the replication steps are successful or not, or whether you need help with the acapy-cloud mise setup.

@swcurran
Copy link
Contributor

swcurran commented Feb 4, 2025

Ah…this is the issue you thought was resolved with the last Askar updates. I don’t think the follow up Askar changes would have impacted this, but who knows.

Just to clarify, if you do fewer than the 10 parallel tenants, things work fine. Its only once you get to a threshold of tenants that the problems occur, correct?

@ff137
Copy link
Contributor Author

ff137 commented Feb 4, 2025

Ah…this is the issue you thought was resolved with the last Askar updates. I don’t think the follow up Askar changes would have impacted this, but who knows.

Indeed - I had this debug script ready when askar 0.4.2 came out, and when I upgraded it in acapy and reran these tests, I got 2 successful runs and thought it's fixed (after previously consistent test failures).

The stack traces show timing out on opening askar session, and that's why I thought caching changes were just the thing that might fix it, but I haven't done enough testing on each askar version to really verify impact of all the changes.

It's probably sensitive to resource constraints. In our dev environment, it happens less often with a slightly beefier node.

Just to clarify, if you do fewer than the 10 parallel tenants, things work fine. Its only once you get to a threshold of tenants that the problems occur, correct?

Exactly - I tested 2, 3, 4, 5, 6 - all working. Wanted to find the exact boundary, but alas, impatience. With 10 consecutive requests, they usually all fail.

I'll have to add some more debug logs around the place to figure out what's causing the issue.

Quick inspection shows a lot of async with contexts, in quick successions opening and closing askar sessions, each time doing one operation which could presumably be batched a bit better. I think some light refactoring could improve that, but will have to investigate if that makes the difference. I'll report back tomorrow!

PS: We've recently begun open-sourcing acapy-cloud (previously some hidden helm charts were needed to deploy everything locally), and so we're partially curious to hear if others can succeed in setting up a local acapy-cloud environment. I think it should prove to be a very useful and powerful repo to simplify working with acapy, and debugging things like this. It can definitely benefit from more users and contributors! So please, to all maintainers here, check it out and feel free to let know if you need help 🚀

@ff137
Copy link
Contributor Author

ff137 commented Feb 6, 2025

Note: in my latest runs, I see this askar warning log pops up just before the exceptions occur:

2025-02-06 11:47:06,893 aries_askar.native.sqlx.pool.acquire WARNING 	/cargo/registry/src/index.crates.io-6f17d22bba15001f/sqlx-core-0.8.3/src/pool/inner.rs:308 | acquired connection, but time to acquire exceeded slow threshold aquired_after_secs=10.005785024 slow_acquire_threshold_secs=2.0

@ff137
Copy link
Contributor Author

ff137 commented Feb 6, 2025

Ok. After adding debug logs to the opening and closing of Askar sessions, I can see that the Issuer tenant's session that gets absolutely spammed with open-close requests.

AskarProfile _setup method gets called 80+ times for the 10 didexchange requests. Logs are truncated, so can't see exactly.

Right before the Timeout gets raised, there is a barrage of setup and teardown requests called in quick succession.

An issue appears to be that the AskarProfileSessions are shortlived -- they are repeatedly re-initialized.

I would expect in a given request context for there to be one long-lived ProfileSession object. But this does not appear to be the case, since __init__ gets called many times for the same profile. It means the initialised ProfileSession is not held onto.


I also notice that AskarProfile(backend=askar, name=multitenant) gets re-initialized on every /status/live call.


The issue can probably be solved by increasing the 10s timeout, and minor optimization to the amount of times the Askar sessions are opened and closed in short succession. But I presume this is just a bandaid, and pushes the scalability issue to a higher threshold. Worth it for a short term solution, but the reason for ProfileSessions being constantly re-initialised should be investigated further.

@ff137 ff137 self-assigned this Feb 6, 2025
@ff137 ff137 added the bug Something isn't working label Feb 6, 2025
@ff137
Copy link
Contributor Author

ff137 commented Feb 6, 2025

Ah, I see now why sessions are recreated. I missed that in askar.profile, session returns a new instance every time:

class AskarProfile(Profile):
...
    def session(self, context: Optional[InjectionContext] = None) -> ProfileSession:
        """Start a new interactive session with no transaction support requested."""
        return AskarProfileSession(self, False, context=context)

Probably makes sense for that to be cached as part of the AskarProfile, but maybe there's a reason not to. Gonna test and see

@ff137
Copy link
Contributor Author

ff137 commented Feb 6, 2025

Caching the session (when context = None) speeds up live/status checks by ~40ms 👀 since _setup and _teardown aren't called every time

@jamshale
Copy link
Contributor

jamshale commented Feb 6, 2025

I wonder if there's any difference between single-wallet/db and multi-wallet/db scenarios?

I agree we should try and prevent unnecessary opening and closing of sessions. I don't think this was ever really considered a problem in the code base and is probably done excessively. I'd need to think about a caching solution but looks like you are already doing that.

@ff137
Copy link
Contributor Author

ff137 commented Feb 6, 2025

It's proving quite tricky for me to refactor, because there's a (classic) trade off between keeping too many connections open, and opening/closing too frequently.

For this particular issue, it's mostly because one wallet is trying to store all these new connection records simultaneously. And something causes the DB connections to freeze / lock up.

My idea was to modify the session teardown logic, so that it's a delayed background task, which will get cancelled if it needs to open again in a short time window -- but that can have the tradeoff of too many connections staying open, and preventing new sessions.

A lot of new things here for me, so I'll have to explore a bit more. There's hopefully some config available in askar that can tweak the max concurrent connections. With some minor re-grouping of tasks, so they happen in one session being open, instead of trying the teardown refactoring.

@ff137
Copy link
Contributor Author

ff137 commented Feb 7, 2025

The answer is with the max_connections config, e.g.:

ACAPY_WALLET_STORAGE_CONFIG: '{ "max_connections":10, "min_idle_count":10, "url":"cloudapi-postgresql:5432" }'

We were running with 10 as the max_connections, and that's why things start acting up around 10 concurrent requests.

When increasing this to 100, then it works.

I can then run my test script with 60 tenants, and I just get 1 connection that didn't succeed... which is obviously much better than 10/10 failing, as before. (With 90 requests, I got about ~15 errors)

So, while there is a patch / workaround for our problem, I think it's definitely worth reviewing the didexchange flow + askar session logic, so that max_connections: 5 won't result in 5/5 concurrent requests all failing. Especially, failing without the end user knowing. It's fine if the auto-accept flow fails, but the tenant should know that their request-sent record is invalid.

I've got some in-progress work that tries to make some minor improvements, for which I'll make a draft PR next week. But I'll definitely need some more help with this one

ff137 added a commit to didx-xyz/acapy-cloud that referenced this issue Feb 7, 2025
🐛 This is a bandaid for the didexchange scalability issues (see openwallet-foundation/acapy#3492)
@andrewwhitehead
Copy link
Contributor

With 10 concurrent requests, Askar shouldn't be holding on to any sessions for nearly long enough to create contention issues. I would guess there's a cache for the multi-tenant sessions, or a session instance is being maintained just because an HTTP connection is still open. The connections are pooled, so opening and closing sessions is meant to be cheap. When caching on the ACA-Py side it is helpful to keep Profile instances around, but not Session instances.

@swcurran
Copy link
Contributor

swcurran commented Feb 7, 2025

So it sounds like this might be an issue in ACA-Py and its handling of connections to Askar. Who wants to dive into that code. Presumably it is relatively small section of code...

ff137 added a commit to didx-xyz/acapy-cloud that referenced this issue Feb 8, 2025
🐛 This is a bandaid for the didexchange scalability issues (see openwallet-foundation/acapy#3492)
@ff137
Copy link
Contributor Author

ff137 commented Feb 8, 2025

One thing worth mentioning, that I noticed in the didexchange methods, is that there is an async with session block (opening up one session) and then within that block, more sessions are opened and closed (look for my # <-- comments):

class DIDXManager(BaseConnectionManager):

    async def _extract_and_record_did_doc_info(self, request: DIDXRequest):
        """Extract and record DID Document information from the DID Exchange request.

        Extracting this info enables us to correlate messages from these keys back to a
        connection when we later receive inbound messages.
        """
        if request.did_doc_attach and request.did_doc_attach.data:
            self._logger.debug("Received DID Doc attachment in request")
            async with self.profile.session() as session:  # <-- first opening of session
                wallet = session.inject(BaseWallet)
                conn_did_doc = await self.verify_diddoc(wallet, request.did_doc_attach)
                await self.store_did_document(conn_did_doc)  # <-- let's follow into this method



class BaseConnectionManager:

    async def store_did_document(self, value: Union[DIDDoc, dict]):
        """Store a DID document.

        Args:
            value: The `DIDDoc` instance to persist
        """
...
        self._logger.debug("Storing DID document for %s: %s", did, doc)

        try:
            stored_doc, record = await self.fetch_did_document(did)  # <-- opens a session
        except StorageNotFoundError:
            record = StorageRecord(self.RECORD_TYPE_DID_DOC, doc, {"did": did})
            async with self._profile.session() as session:  # <-- opens a session if above not found
                storage: BaseStorage = session.inject(BaseStorage)
                await storage.add_record(record)
        else:
            async with self._profile.session() as session:  # <-- opens a session again if it was found
                storage: BaseStorage = session.inject(BaseStorage)
                await storage.update_record(record, doc, {"did": did})

        await self.remove_keys_for_did(did)  # <-- opens a session
        await self.record_keys_for_resolvable_did(did)

It looks like the design of the ProfileSession had this possibility of nested opening in mind, by keeping track of _entered:

class ProfileSession(ABC):
    """An active connection to the profile management backend."""

    def __init__(
        self,
        profile: Profile,
        *,
        context: Optional[InjectionContext] = None,
        settings: Mapping[str, Any] = None,
    ):
        """Initialize a base profile session."""
        self._active = False
        self._awaited = False
        self._entered = 0  # <--
...
    async def __aenter__(self):  # <-- when stepping into async with contexts
        """Async context manager entry."""
        LOGGER.debug(  # my added debug lines
            "Profile __aenter__ called. self._active: %s for profile: %s",
            self._active,
            self._profile,
        )
        if not self._active:
            LOGGER.debug(
                "Setting up profile session in def __aenter__ for profile: %s.",
                self._profile,
            )
            await self._setup()
            self._active = True
            LOGGER.debug("Profile session active for profile: %s.", self._profile)
        self._entered += 1  # <--
        LOGGER.debug(
            "__aenter__ returning. self._entered: %s for profile: %s",
            self._entered,
            self._profile,
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        self._entered -= 1  # <--
        LOGGER.debug(
            "Profile __aexit__ called. self._entered: %s for profile: %s",
            self._entered,
            self._profile,
        )
        if not self._awaited and not self._entered:  # <--
            LOGGER.debug(
                "Tearing down profile session in def __aexit__ for profile: %s.",
                self._profile,
            )
            await self._teardown()  # <-- teardown only called if _entered is 0
            self._active = False
            LOGGER.debug("Profile session inactive for profile: %s.", self._profile)

When inspecting debug lines, I can see that _entered would always log with a maximum value of 1.

This is because a brand new ProfileSession is returned with each .session() call in the async with blocks:

class AskarProfile(Profile):

    def session(self, context: Optional[InjectionContext] = None) -> ProfileSession:
        """Start a new interactive session with no transaction support requested."""
        return AskarProfileSession(self, False, context=context)

So, because a new AskarProfileSession is returned each time, _entered does not persist and is reset across each new session, despite being for the same profile.

This is why I had the idea of caching and reusing the same session in the profile object somehow, if it's still active, but it gets a bit messy. My initial experimentation didn't solve the problem (just seemed to create other issues), and it'll take some more clever thinking to get a proper solution.

Just sharing these as some notes, for what I think would be relevant improvements to make to ACA-Py.

Besides that, there needs to be exception handling so that the client making the request will know that their request-sent record is actually abandoned / get some error message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants