Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom async response to socket.io events. #61

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

tabouassaleh
Copy link

Current implementation of socket.io is synchronous in that the raw_message function in session.py wait for the result from the event handler, and send that result as an ACK response to the event message.

This pull request changes that behaviour to optionally allow the user of the library to use celery+redis to dispatch processing, and then send it back to the client at a later time without blocking Tornado during processing.

For instance, using Brukva, the script starting the tornado service could include:

class Command(BaseCommand):
    def handle(self, *args, **options):
        # ...
        router = TornadioRouter(BaseSocket, {
            # ...
        })
        self._server = router.urls[0][2]['server']
        # ...
        c = brukva.Client()
        c.connect()
        c.subscribe('event_result')
        c.subscribe('broadcast_user')
        c.listen(self._event_result_router)
        SocketServer(application, ssl_options = ssl_options)

    def _event_result_router(self, result):
        '''
        Brukva Redis router for dispatching socket.io event results.

        Message body is a list containing:
          `session_id`: The session ID of the socket connection that sent the event.
          `msg_id`: The message sequence number for the event.
          `data`: The result data to be sent back to the client, a list consisting of error and response.
        '''
        err, message = result
        if err:
            logging.error('Event result error: %r', err)
        elif message.channel != 'event_result':
            return
        else:
            session_id, msg_id, error, response = json.loads(message.body)
            session = self._server._sessions._items.get(session_id)

            # The session may have disconnected.
            if not session: return

            # We don't currently use msg_endpoint, so ignore it for now.
            msg_endpoint = None
            if msg_id:
                if msg_id.endswith('+'):
                    msg_id = msg_id[:-1]

            session.send_message(proto.ack(msg_endpoint, msg_id, (error, response)))

And the celery task could look something like this:

@task
def process_event(session_id, message_id, user):
    ''' Simple celery task that returns the user's email address.'''
    c = brukva.Client()
    c.connect()
    result = json.dumps([
        session_id,
        message_id,
        None, # No error.
        user.email
    ])
    c.publish('event_result', result)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant