Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improving stability and cleanup in case of disconnection / improving report in case of disconnection #131

Merged
merged 7 commits into from
Oct 19, 2023

Conversation

DanielePalaia
Copy link
Collaborator

@DanielePalaia DanielePalaia commented Sep 22, 2023

This will close #121

Improve the information report of on_connection_closed when a disconnection happens.

  • The handler on_connection_closed now takes in input a DisconnectionErrorInfo dataclass which also includes the name of the stream(s) of the connection and the exception raised. This is useful in case of superstream so we know the streams lost from the connection.
  • producer.close() now skips delete_publisher and other calls if the connnection is broken in order to finalize clean up operations
  • There was also a bug in supertream during disconnection described here: Producer Needs Safely Closed #121, not the handler function is connected to the producer/consumer clients the superstream is using
  • New examples for superstreams have been created

In the issue we discussed also the possibility to add a method in order to reconnect to the streams with a new connection, but this will be done in a future development/PR

@DanielePalaia DanielePalaia marked this pull request as draft September 22, 2023 13:02
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch 4 times, most recently from 84464a5 to 4b3b48e Compare September 23, 2023 15:48
@DanielePalaia DanielePalaia marked this pull request as ready for review September 25, 2023 14:39
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch from 4b3b48e to e01000d Compare October 2, 2023 08:20
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch from e01000d to 70adead Compare October 11, 2023 09:05
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch from 70adead to 78f2b22 Compare October 11, 2023 09:06
@DanielePalaia DanielePalaia marked this pull request as draft October 11, 2023 12:07
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch from 01a2bcf to 77e517a Compare October 12, 2023 12:28
@DanielePalaia DanielePalaia marked this pull request as ready for review October 12, 2023 12:33
@DanielePalaia
Copy link
Collaborator Author

HI @garvenlee with this PR I tried to solve some of the points/issues you met in: #121

  1. Now the connection_closed_handler callback is also connected to the supersteam producers/consumers. This was clearly a bug
  2. I increased the informations returned by the callback adding a DisconnectionErrorInfo which also contains the streams that have been disconnected
  3. I added some further checks in order to be able to call producer.close() and consume.close() and correctly clean up the status.

What is missing is the possibility to reconnect to a stream (useful in case of a superstream). I will work on this in another PR (on top of this) in this other issue: #135

What do you think? Do you have some time to test this branch?

rstream/client.py Show resolved Hide resolved
else:
result = self._connection_closed_handler(e)
connection_error_info = DisconnectionErrorInfo(e, self._streams)
result = self._connection_closed_handler(connection_error_info)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it unnecessary to trigger _connnection_closed_handler here. If RabbitMQ server was down, the operation read_frame in listener would catch it firstly.

So send_frame can be written like this:

async def send_frame(self, frame: schema.Frame) -> None:
    try:
        await self._conn.write_frame(frame)
    except ConnectionError as exc:
        print(fwrite_frame: {exc}”)
        self._is_not_closed = False  # in case
    except BaseException as exc:
        print(fwrite_frame: {exc}”)
        self._is_not_closed = False  # in case

Copy link
Collaborator Author

@DanielePalaia DanielePalaia Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I tested a Producer scenario like this and if I don't trigger there then the call back is no triggered.

I tried this scenario (closing the connection from the server in the UI)

async def publish():

    async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
        print(
            "connection has been closed from stream: "
            + str(disconnection_info.streams)
            + " for reason: "
            + str(disconnection_info.reason)
        )

        # clean close or reconnect
        global connection_is_closed
        connection_is_closed = True
        await producer.close()



    async with Producer(
        "localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
    ) as producer:
        # create a stream if it doesn't already exist
        await producer.create_stream(STREAM, exists_ok=True)

        # sending a million of messages in AMQP format
        start_time = time.perf_counter()

        for i in range(MESSAGES):
            amqp_message = AMQPMessage(
                body="hello: {}".format(i),
            )
            # send is asynchronous
            if connection_is_closed is False:
                await producer.send(stream=STREAM, message=amqp_message)
            else:
                break

        end_time = time.perf_counter()
        print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")


asyncio.run(publish())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the code in #121 with systemctl stop rabbitmq-server, and got some exceptions. But with the above modifications, it would be fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garvenlee you mean the callback get triggered even if you remove it from line 154? In my case it is not happening if I force the connection to close through the UI

Copy link

@garvenlee garvenlee Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it gets triggered. But it’s under the code in #121, and it is closed successfully with systemctl stop rabbitmq-server. Because the read_frame in _listener will find the connection lost.

Now I am trying to test your code.

Copy link

@garvenlee garvenlee Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your code, producer.close is called multiple times. When I removed the async with, it’s fine with the modifications above.

Additionally, I got one extra RuntimeError when Python garbage collector clears a StreamWriter. I think the reason is that conn.close is not called due to is_not_closed check in Client.close. And after I remove the is_not_closed check and decrease refcount of the StreamWriter in Connection.close, the exception RuntimeError is gone.

# Connection
async def close(self):
    …
    self._reader = self._writer = None

Another option is to add a __del__ function to Connection, and self._conn = None will trigger it, then clear the internal StreamWriter. Like this:

def __del__(self):
    self._reader = self._writer = None
    # maybe clear more other instance variables here.

Copy link
Collaborator Author

@DanielePalaia DanielePalaia Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garvenlee Not sure but I'm having issues with my tests removing this call here.

You can see tests in this PR (I also updated the branch with the last suggestions). they are working fine but If I remove the call here the on_connection_closed sometimes get not triggered (you can test the producer scenario). Sometimes it is working sometimes not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the issue is happening mainly with the super_stream producer example

Copy link

@garvenlee garvenlee Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DanielePalaia can you evaluate this?

Here is my modification, I tested your code above and the code in #121, all is fine.

# Connection
async def close(self):
    …
    # add this line
    self._reader = self._writer = None


# BaseClient
async def _listener(self):
    while True:
        try:
            frame = await self._conn.read_frame()
        except (ConnectionClosed, socket.error) as exc:
            self._is_not_closed = False
            if self._connection_closed_handler is not None:
                err_info = DisconnectionErrorInfo(exc, self._streams)
                maybe_coro = self._connection_closed_handler(err_info)
                if maybe_coro is not None and inspect.isawaitable(maybe_coro):
                    asyncio.create_task(maybe_coro)
            else:
                logger.exception(“TCP connection closed.”)
            break
        except asyncio.CancelledError:
            logger.info(“close is called.”)
            break
        except BaseException as exc:  # frame error? KeyboardInterrupt or sth.
            logger.exception(funknown error: {exc}”)
            break
        else:
            # keep the same
             ……


# BaseClient
async def send_frame(self, frame: schema.Frame) -> None:
    try:
        await self._conn.write_frame(frame)
    except ConnectionError as exc:
        print(fwrite_frame: {exc}”)
        # raised on `writer.drain()`, for following reason:
        # 1.ConnectionReset: connection was already lost
        # 2.BrokenPipe etc
        self._is_not_closed = False  # in case
    except BaseException as exc:
        print(fwrite_frame: {exc}”)
        self._is_not_closed = False  # in case

# BaseClient
async def close(self):
    if self._is_not_closed and self.is_started:
        try:
            await self.sync_request(
                schema.Close(…), 
                resp_schema=schema.CloseResponse, 
                timeout=5,
            )
        except asyncio.TimeoutError:
            print(“Close in sync_request: Timeout”)
        except BaseException as exc:  # Connection is lost
            print(fClose in sync_request: {exc}”)

    tasks = []
    for task in self._tasks.values():
        task.cancel()
        tasks.append(task)
    await asyncio.wait(tasks)

    await self._conn.close()
    self._conn = None
    self._tasks.clear()
    self._handlers.clear()
    self.server_properties = None
    self._is_not_closed = True

# BaseClient
# `stop_task` isn’t used.
async def _heartbeat_sender(self):
    if self._heartbeat == 0:
        return
    while self._is_not_closed:
        await self.send_frame(…)
        await asyncio.sleep(delay)
        if - statement
        # no suppress here, it will be cancelled in `close`, and `asyncio.wait` controls this.

# Client
# remove `close` function

About Producer:

async def close(self):
    if self._default_client is None:
        return

    self._close_called = True
    if self.task is not None:
        task, self.task = self.task, None
        await task  # wait `_timer` done, better to add double check in it

    for publisher in self._publishers.values():
        client = publisher.client        
        if client.is_connection_alive():
            try:
                client.delete_publisher(publisher.id)
            except asyncio.TimeoutError:
                print(…)
            except BaseException as exc:
                print(…)
        client.remove_handler()
        client.remove_handler()
    self._publisher.clear()
    await self._pool.close()
    self._clients.clear()
    self._waiting_for_confirm.clear()
    self.default_client = None
    self._close_called = False

rstream/client.py Show resolved Hide resolved
reason="OK",
),
resp_schema=schema.CloseResponse,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When RabbitMQ server is down, multiple _listener will catch the ConnectionClosed or something, where multiple _connection_closed_handler will be awaited in sequence. And when _connection_closed_handler completes, the _listener task is done.

_connection_closed_handler will be awaited in sequence.

But if there is “producer.close()” in _connection_closed_handler, it’s possible to find other clients healthy(_is_not_closed = True) in pool.close(). So though _is_not_closed = True, the connection is not surely alive in this way.

So Client.close() should be like this:

async def close() -> None:
    if self._is_not_closed and self.is_started:
        try:
            await self.sync_request(schema.Close, timeout=5)
        except asyncio.TimeoutError:
            print(“Close in sync_request: Timeout”)
        except BaseException as exc:
            print(fClose in sync_request: {exc}”)

    tasks = []
    for name, task in self._task.items():
        if name !=listener”:  # `close` may be called in `_listener`
            task.cancel()
            tasks.append(task)
    await asyncio.wait(tasks)

    # in this way, `_listener` task may be missing.
    # Considering this scenario: RabbitMQ Server is not down, but user code calls `producer.close`
    # To solve this, two options:
    # 1.divide `_listener` task from `self.tasks`, and check it individually when `close` is called. 
    # 2.create_task(self._connection_closed_handler(err)) 

    if self._is_not_closed:
        # underlaying stream design doesn’t raise any exception, so it’s safe here
        await self._conn.close()
        self._conn = None

    self._tasks.clear()
    self._handlers.clear()
    self.server_properties = None
    self._is_not_closed = True

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing about subscriber task, if there are still frames in frames_queue, how to handle this? Just cancel this task, or wait these frames to be handled?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garvenlee yeah probaly it would be safer to put a timeout in the schema.Close here.

The task can be cancelled in this case. In stream messages are kept in the server and they can be consumed again. (I think also other clients work in the same way adding @Gsantomaggio to confirm)

Copy link

@garvenlee garvenlee Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, the _listener task should not be stopped in Producer.close in the current code implementation. Because connection_closed_handler in _listener may call Producer.close internally. And if the _listener task is cancelled, _connection_closed_handler will be aborted.

And I recommend the second option in above code, make _connection_closed_handler a task, it’s simpler. If so, when _connection_closed_handler is “scheduled” by the eventloop, _listener task is already done because of the break statement, then it can be cancelled safely. But if Producer.close is not called in _connection_closed_handler, which means _listener task is healthy and still “awaits” in the operation read_frame at this time, so it’s better to catch the CancelledError in _listener without triggering the _connection_closed_handler.

Copy link
Collaborator Author

@DanielePalaia DanielePalaia Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @garvenlee I put a modification with a timeout on sync_request of schema Close().

Regarding the second point to avoid further complications, we could document to not call producer.close() inside the callback but to call it in the main flow.
For example the callback could just notify that there was a connection issue and then the producer.close() could be called on the main flow.
This PR was just the base for the next implementation that would be to provide a method to reconnect to a given stream.
Anyway I'm not noticing side effects at the moment with the tests I made seem working fine.

Copy link

@garvenlee garvenlee Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DanielePalaia can you evaluate this?
point 1: You can add an arg timeout to sync_request, and reuse wait_frame to apply timeout.

point 2: I think it’s worth wrapping connection_closed_handler as a task in listener. It has little effect on other logic, but helps to solve the scenario above. And this makes Client.close more straightforward.

async def close(self):
    …

    tasks = []
    for task in self.tasks.values():
        task.cancel()
        tasks.append(task)
    await asyncio.wait(tasks)
    
    await self._conn.close()
    self._conn = None

point 3: in Client, you can directly use if self._is_not_closed: to judge the connection is healthy, and is_connection_alive is used by the upper caller such as Producer and Consumer.

rstream/producer.py Outdated Show resolved Hide resolved
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch 4 times, most recently from fbc807f to a5c2a21 Compare October 18, 2023 08:01
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch from a5c2a21 to e9171a7 Compare October 18, 2023 08:19
@DanielePalaia DanielePalaia changed the title improving disconnection Information report improving stability and cleanup in case of disconnection / improving report in case of disconnection Oct 18, 2023
Gsantomaggio and others added 3 commits October 19, 2023 10:29
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@DanielePalaia DanielePalaia force-pushed the improve_connection_broke branch from d06a27e to 76a44d2 Compare October 19, 2023 09:18
@Gsantomaggio
Copy link
Member

@DanielePalaia @garvenlee Thank you for the effort in this PR.

I and @DanielePalaia tested it internally, and it works as expected. We want to merge it. If there is an edge case, we can analyse it and improve the handling.

@garvenlee, about #121 we could not compile the code because of the dependencies. Now it should work btw. In case of problems, it would be great to provide a consistent script. Thank you.

@DanielePalaia
Copy link
Collaborator Author

Yes we are also planning to review few things when we implement #135 as well as adding more complete test cases for it. For the moment let's consider this as a base PR for the bugs detected and to be able to continue with the implementation. Now that we have the list of the streams disconnected it should be not so difficult to implement a method to reconnect to the stream.

@DanielePalaia DanielePalaia merged commit cd50fcb into master Oct 19, 2023
1 check passed
@DanielePalaia DanielePalaia deleted the improve_connection_broke branch October 19, 2023 09:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Producer Needs Safely Closed
3 participants