From fdc54190556a0e8989acc57f0fa23bdfc60d9239 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 9 Mar 2018 20:34:11 +0000 Subject: [PATCH 01/29] start looking at websocket testing --- test/transports/test_websockets.py | 25 +++++++++++++++++++ wampy/peers/clients.py | 6 ++--- wampy/testing/websocket_server.py | 31 ++++++++++++++++++++++++ wampy/transports/interface.py | 2 +- wampy/transports/websocket/connection.py | 14 ++++++----- 5 files changed, 68 insertions(+), 10 deletions(-) create mode 100644 test/transports/test_websockets.py create mode 100644 wampy/testing/websocket_server.py diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py new file mode 100644 index 0000000..6c7a5bd --- /dev/null +++ b/test/transports/test_websockets.py @@ -0,0 +1,25 @@ +import socket +from threading import Thread + +import pytest +from wampy.testing.websocket_server import WebsocketServer + +from wampy.transports.websocket.connection import WebSocket + +HOST, PORT = "localhost", 9999 + +@pytest.fixture(scope='function') +def server(): + s = WebsocketServer(url="ws://localhost:9999") + server_thread = Thread(target=s.serve_forever) + server_thread.daemon = True + server_thread.start() + yield s + import pdb + pdb.set_trace() + s.server_close() + + +def test_websocket_connects_to_server(server): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + diff --git a/wampy/peers/clients.py b/wampy/peers/clients.py index da83ce4..c8742df 100644 --- a/wampy/peers/clients.py +++ b/wampy/peers/clients.py @@ -109,7 +109,7 @@ def __init__( ) # the transport is responsible for the connection. - self.transport.register_router(self.router) + self.transport.register_server(self.router) # generally ``name`` is used for debuggubg and logging only self.name = name or self.__class__.__name__ @@ -152,8 +152,8 @@ def publish(self): return PublishProxy(client=self) def start(self): - # establish the underlying connection. this will raise on error. - connection = self.transport.connect() + # establish the underlying connection and upgrade it to WAMP. + connection = self.transport.connect(upgrade=True) # create a Session repr between ourselves and the Router. # pass in the live connection over a transport that the Session diff --git a/wampy/testing/websocket_server.py b/wampy/testing/websocket_server.py new file mode 100644 index 0000000..c4d08f2 --- /dev/null +++ b/wampy/testing/websocket_server.py @@ -0,0 +1,31 @@ +import sys +if sys.version_info[0] < 3: + import SocketServer as socketserver +else: + import socketserver + +from wampy.mixins import ParseUrlMixin + + +class WebSocketHandler(socketserver.BaseRequestHandler): + # taken from Python docs + def handle(self): + # self.request is the TCP socket connected to the client + self.data = self.request.recv(1024).strip() + print("{} wrote:".format(self.client_address[0])) + print(self.data) + # just send back the same data, but upper-cased + self.request.sendall(self.data.upper()) + + +class WebsocketServer(socketserver.TCPServer, ParseUrlMixin): + + def __init__(self, url, ipv=4): + self.url = url + self.host = None + self.port = None + self.parse_url() + + super( + socketserver.TCPServer, self + ).__init__((self.host, self.port), WebSocketHandler) diff --git a/wampy/transports/interface.py b/wampy/transports/interface.py index 5fa169c..7344719 100644 --- a/wampy/transports/interface.py +++ b/wampy/transports/interface.py @@ -7,7 +7,7 @@ class Transport(object): @abc.abstractmethod - def register_router(self, router): + def register_server(self, router): pass @abc.abstractmethod diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index e3a499c..627c15a 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -25,7 +25,7 @@ class WebSocket(Transport, ParseUrlMixin): - def register_router(self, router): + def register_server(self, router): self.url = router.url self.host = None @@ -38,9 +38,10 @@ def register_router(self, router): self.key = encodestring(uuid.uuid4().bytes).decode('utf-8').strip() self.socket = None - def connect(self): + def connect(self, upgrade=False): self._connect() - self._upgrade() + if upgrade is True: + self._upgrade() return self def disconnect(self): @@ -91,7 +92,8 @@ def receive(self, bufsize=1): bufsize = exc.required_bytes logger.debug('now requesting the missing %s bytes', bufsize) else: - if frame.opcode == 9: + + if frame.opcode == frame.OPCODE_PING: # Opcode 0x9 marks a ping frame. It does not contain wamp # data, so the frame is not returned. # Still it must be handled or the server will close the @@ -246,8 +248,8 @@ def read_line(): class SecureWebSocket(WebSocket): - def register_router(self, router): - super(SecureWebSocket, self).register_router(router) + def register_server(self, router): + super(SecureWebSocket, self).register_server(router) self.ipv = router.ipv From b0b941721343564cfef143859f73f3882fb42bd4 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 10 Mar 2018 14:46:49 +0000 Subject: [PATCH 02/29] pass server details into WebSocket object --- wampy/peers/clients.py | 7 ++----- wampy/transports/interface.py | 4 ---- wampy/transports/websocket/connection.py | 10 +++++----- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/wampy/peers/clients.py b/wampy/peers/clients.py index c8742df..1602be4 100644 --- a/wampy/peers/clients.py +++ b/wampy/peers/clients.py @@ -100,17 +100,14 @@ def __init__( # as WebSocket messages by default (well, actually... that's because no # other transports are supported!) if self.router.scheme == "ws": - self.transport = WebSocket() + self.transport = WebSocket(server_url=self.router.url, ipv=self.router.ipv) elif self.router.scheme == "wss": - self.transport = SecureWebSocket() + self.transport = SecureWebSocket(server_url=self.router.url, ipv=self.router.ipv) else: raise WampyError( 'Network protocl must be "ws" or "wss"' ) - # the transport is responsible for the connection. - self.transport.register_server(self.router) - # generally ``name`` is used for debuggubg and logging only self.name = name or self.__class__.__name__ diff --git a/wampy/transports/interface.py b/wampy/transports/interface.py index 7344719..fbbe6db 100644 --- a/wampy/transports/interface.py +++ b/wampy/transports/interface.py @@ -6,10 +6,6 @@ @six.add_metaclass(abc.ABCMeta) class Transport(object): - @abc.abstractmethod - def register_server(self, router): - pass - @abc.abstractmethod def connect(self): """ should return ``self`` as the "connection" object """ diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 627c15a..a448191 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -25,12 +25,12 @@ class WebSocket(Transport, ParseUrlMixin): - def register_server(self, router): - self.url = router.url + def __init__(self, server_url, ipv=4): + self.url = server_url + self.ipv = ipv self.host = None self.port = None - self.ipv = router.ipv self.resource = None self.parse_url() @@ -248,8 +248,8 @@ def read_line(): class SecureWebSocket(WebSocket): - def register_server(self, router): - super(SecureWebSocket, self).register_server(router) + def register_router(self, router): + super(SecureWebSocket, self).register_router(router) self.ipv = router.ipv From c715f5859eb50449174804544103e3fb29713dd5 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 10 Mar 2018 14:57:21 +0000 Subject: [PATCH 03/29] test websocket connection --- setup.py | 1 + test/transports/test_websockets.py | 20 ++++++++++--------- wampy/peers/clients.py | 6 ++++-- wampy/testing/websocket_server.py | 31 ------------------------------ 4 files changed, 16 insertions(+), 42 deletions(-) delete mode 100644 wampy/testing/websocket_server.py diff --git a/setup.py b/setup.py index a293a72..1f6b863 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ "pytest-capturelog==0.7", "colorlog", "flake8==3.5.0", + "gevent-websocket==0.10.1", ], 'docs': [ "Sphinx==1.4.5", diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 6c7a5bd..511dbdd 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -1,25 +1,27 @@ -import socket from threading import Thread import pytest -from wampy.testing.websocket_server import WebsocketServer +from geventwebsocket import WebSocketServer, Resource from wampy.transports.websocket.connection import WebSocket -HOST, PORT = "localhost", 9999 @pytest.fixture(scope='function') def server(): - s = WebsocketServer(url="ws://localhost:9999") + s = WebSocketServer( + ('0.0.0.0', 8000), + Resource([]), + debug=False, + ) + server_thread = Thread(target=s.serve_forever) server_thread.daemon = True server_thread.start() yield s - import pdb - pdb.set_trace() - s.server_close() + s.stop() def test_websocket_connects_to_server(server): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - + websocket = WebSocket(server_url='ws://0.0.0.0:8000') + websocket.connect() + websocket.disconnect() diff --git a/wampy/peers/clients.py b/wampy/peers/clients.py index 1602be4..86ae341 100644 --- a/wampy/peers/clients.py +++ b/wampy/peers/clients.py @@ -100,9 +100,11 @@ def __init__( # as WebSocket messages by default (well, actually... that's because no # other transports are supported!) if self.router.scheme == "ws": - self.transport = WebSocket(server_url=self.router.url, ipv=self.router.ipv) + self.transport = WebSocket( + server_url=self.router.url, ipv=self.router.ipv) elif self.router.scheme == "wss": - self.transport = SecureWebSocket(server_url=self.router.url, ipv=self.router.ipv) + self.transport = SecureWebSocket( + server_url=self.router.url, ipv=self.router.ipv) else: raise WampyError( 'Network protocl must be "ws" or "wss"' diff --git a/wampy/testing/websocket_server.py b/wampy/testing/websocket_server.py deleted file mode 100644 index c4d08f2..0000000 --- a/wampy/testing/websocket_server.py +++ /dev/null @@ -1,31 +0,0 @@ -import sys -if sys.version_info[0] < 3: - import SocketServer as socketserver -else: - import socketserver - -from wampy.mixins import ParseUrlMixin - - -class WebSocketHandler(socketserver.BaseRequestHandler): - # taken from Python docs - def handle(self): - # self.request is the TCP socket connected to the client - self.data = self.request.recv(1024).strip() - print("{} wrote:".format(self.client_address[0])) - print(self.data) - # just send back the same data, but upper-cased - self.request.sendall(self.data.upper()) - - -class WebsocketServer(socketserver.TCPServer, ParseUrlMixin): - - def __init__(self, url, ipv=4): - self.url = url - self.host = None - self.port = None - self.parse_url() - - super( - socketserver.TCPServer, self - ).__init__((self.host, self.port), WebSocketHandler) From aa80c152cc0b3e7b821f0edefd23a7d4ed34b0d7 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 10 Mar 2018 15:49:01 +0000 Subject: [PATCH 04/29] use gevent thread to run server --- test/transports/test_websockets.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 511dbdd..6108446 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -1,27 +1,33 @@ -from threading import Thread - import pytest +import gevent +from gevent import Greenlet from geventwebsocket import WebSocketServer, Resource from wampy.transports.websocket.connection import WebSocket -@pytest.fixture(scope='function') +@pytest.fixture def server(): s = WebSocketServer( - ('0.0.0.0', 8000), + ('0.0.0.0', 8001), Resource([]), debug=False, ) - - server_thread = Thread(target=s.serve_forever) - server_thread.daemon = True - server_thread.start() + thread = Greenlet.spawn(s.serve_forever) yield s - s.stop() + thread.kill() def test_websocket_connects_to_server(server): - websocket = WebSocket(server_url='ws://0.0.0.0:8000') + websocket = WebSocket(server_url='ws://0.0.0.0:8001') websocket.connect() websocket.disconnect() + + +def __test_send_ping(server): + websocket = WebSocket(server_url='ws://0.0.0.0:8000') + websocket.connect() + + + + From 90aab62df6fcd432e1b3936a991fdaa8eee71840 Mon Sep 17 00:00:00 2001 From: Simon Date: Sun, 11 Mar 2018 20:32:11 +0000 Subject: [PATCH 05/29] lots of debugging --- test/transports/test_websockets.py | 59 +++++++++- wampy/peers/clients.py | 2 +- wampy/transports/websocket/connection.py | 40 ++++--- wampy/transports/websocket/frames.py | 133 ++++++++++++++--------- 4 files changed, 165 insertions(+), 69 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 6108446..8e5d4aa 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -1,33 +1,82 @@ +import json + import pytest import gevent from gevent import Greenlet from geventwebsocket import WebSocketServer, Resource +from mock import patch from wampy.transports.websocket.connection import WebSocket +from wampy.transports.websocket.frames import Ping + + +from geventwebsocket import WebSocketServer, WebSocketApplication, Resource +from collections import OrderedDict + +class TestApplication(WebSocketApplication): + pass @pytest.fixture def server(): s = WebSocketServer( ('0.0.0.0', 8001), - Resource([]), - debug=False, + Resource(OrderedDict([('/', TestApplication)])) ) + s.start() thread = Greenlet.spawn(s.serve_forever) yield s + s.stop() thread.kill() + + def test_websocket_connects_to_server(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') + + assert not hasattr(server, 'socket') + websocket.connect() + assert hasattr(server, 'socket') + assert server.clients + websocket.disconnect() -def __test_send_ping(server): - websocket = WebSocket(server_url='ws://0.0.0.0:8000') - websocket.connect() +def test_send_ping(server): + websocket = WebSocket(server_url='ws://0.0.0.0:8001') + connection = websocket.connect(upgrade=False) + + def connection_handler(): + while True: + try: + message = connection.receive() + + except Exception as exc: + print(type(exc), exc) + raise + + return message + + thread = gevent.spawn(connection_handler) + + with patch.object(websocket, 'handle_ping') as mock_handle: + + while not hasattr(server, 'socket'): + gevent.sleep(0.01) + + clients = server.clients + assert len(clients) == 1 + + client_handler = list(clients.values())[0] + socket = client_handler.ws + + frame = Ping() + payload = frame.payload + ping_masked_bytes = b'0x8a0x850x370xfa0x210x3d0x7f0x9f0x4d0x510x58' + socket.send(payload) diff --git a/wampy/peers/clients.py b/wampy/peers/clients.py index 86ae341..4dcc50d 100644 --- a/wampy/peers/clients.py +++ b/wampy/peers/clients.py @@ -110,7 +110,7 @@ def __init__( 'Network protocl must be "ws" or "wss"' ) - # generally ``name`` is used for debuggubg and logging only + # generally ``name`` is used for debugging and logging only self.name = name or self.__class__.__name__ self._session = None diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 5fc524d..1a8de5c 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -39,9 +39,9 @@ def __init__(self, server_url, ipv=4): self.socket = None def connect(self, upgrade=False): + # TCP connection self._connect() - if upgrade is True: - self._upgrade() + self._handshake(upgrade=upgrade) return self def disconnect(self): @@ -64,10 +64,11 @@ def _send_raw(self, websocket_message): def receive(self, bufsize=1): frame = None received_bytes = bytearray() - while True: + print('receiving...') logger.debug("waiting for %s bytes", bufsize) - + import pdb + #pdb.set_trace() try: bytes = self.socket.recv(bufsize) except gevent.greenlet.GreenletExit as exc: @@ -75,6 +76,9 @@ def receive(self, bufsize=1): except socket.timeout as e: message = str(e) raise ConnectionError('timeout: "{}"'.format(message)) + except ConnectionResetError: + pdb.set_trace() + raise ConnectionError('the connection was reset by the peer') except Exception as exc: raise ConnectionError( 'unexpected error reading from socket: "{}"'.format(exc) @@ -92,13 +96,15 @@ def receive(self, bufsize=1): bufsize = exc.required_bytes logger.debug('now requesting the missing %s bytes', bufsize) else: - + print(frame.opcode) if frame.opcode == frame.OPCODE_PING: + import pdb + pdb.set_trace() # Opcode 0x9 marks a ping frame. It does not contain wamp # data, so the frame is not returned. # Still it must be handled or the server will close the # connection. - self._send_raw(PongFrame(frame.payload).payload) + self.handle_ping(ping_frame=frame) received_bytes = bytearray() continue break @@ -145,8 +151,8 @@ def _connect(self): self.socket = _socket logger.debug("socket connected") - def _upgrade(self): - handshake_headers = self._get_handshake_headers() + def _handshake(self, upgrade): + handshake_headers = self._get_handshake_headers(upgrade=upgrade) handshake = '\r\n'.join(handshake_headers) + "\r\n\r\n" self.socket.send(handshake.encode()) @@ -161,7 +167,7 @@ def _upgrade(self): logger.debug("connection upgraded") - def _get_handshake_headers(self): + def _get_handshake_headers(self, upgrade): """ Do an HTTP upgrade handshake with the server. Websockets upgrade from HTTP rather than TCP largely because it was @@ -186,8 +192,11 @@ def _get_handshake_headers(self): headers.append("Sec-WebSocket-Key: {}".format(self.key)) headers.append("Origin: ws://{}:{}".format(self.host, self.port)) headers.append("Sec-WebSocket-Version: {}".format(WEBSOCKET_VERSION)) - headers.append("Sec-WebSocket-Protocol: {}".format( - WEBSOCKET_SUBPROTOCOLS)) + + if upgrade: + headers.append("Sec-WebSocket-Protocol: {}".format( + WEBSOCKET_SUBPROTOCOLS) + ) logger.debug("connection headers: %s", headers) @@ -202,8 +211,8 @@ def _read_handshake_response(self): def read_line(): bytes_cache = [] received_bytes = None - while received_bytes != b'\r\n': - received_bytes = self.socket.recv(2) + while received_bytes not in [b'\r\n', b'\n', b'\n\r']: + received_bytes = self.socket.recv(1) bytes_cache.append(received_bytes) return b''.join(bytes_cache) @@ -215,6 +224,7 @@ def read_line(): received_bytes = read_line() if received_bytes == b'\r\n': # end of the response + print('end of response') break bytes_as_str = received_bytes.decode() @@ -241,11 +251,15 @@ def read_line(): key, value = kv headers[key.lower()] = value.strip().lower() + print(headers) logger.info("handshake complete: %s : %s", status, headers) return status, headers + def handle_ping(self, ping_frame): + self._send_raw(PongFrame(ping_frame.payload).payload) + class SecureWebSocket(WebSocket): def register_router(self, router): diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index bc60647..83e548a 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -83,6 +83,10 @@ class Frame(object): def __init__(self, bytes): self.body = bytes + self.fin_bit = 1 + self.rsv1_bit = 0 + self.rsv2_bit = 0 + self.rsv3_bit = 0 def __len__(self): # UTF-8 is an unicode encoding which uses more than one byte for @@ -100,54 +104,6 @@ def __len__(self): def __str__(self): return self.body - -class ClientFrame(Frame): - """ Represent outgoing Client -> Server messages - """ - - def __init__(self, bytes): - super(ClientFrame, self).__init__(bytes) - - self.fin_bit = 1 - self.rsv1_bit = 0 - self.rsv2_bit = 0 - self.rsv3_bit = 0 - self.opcode = self.OPCODE_TEXT - self.payload = self.generate_payload() - - def data_to_bytes(self, data): - return bytearray(data, 'utf-8') - - def generate_mask(self, mask_key, data): - """ Mask data. - - :Parameters: - mask_key: byte string - 4 byte string(byte), e.g. '\x10\xc6\xc4\x16' - data: str - data to mask - - """ - # Masking of WebSocket traffic from client to server is required - # because of the unlikely chance that malicious code could cause - # some broken proxies to do the wrong thing and use this as an - # attack of some kind. Nobody has proved that this could actually - # happen, but since the fact that it could happen was reason enough - # for browser vendors to get twitchy, masking was added to remove - # the possibility of it being used as an attack. - if data is None: - data = "" - - data_bytes = self.data_to_bytes(data) - - _m = array.array("B", mask_key) - _d = array.array("B", data_bytes) - - for i in range(len(_d)): - _d[i] ^= _m[i % 4] - - return _d.tostring() - def generate_payload(self): """ Format data to string (bytes) to send to server. """ @@ -204,10 +160,59 @@ def generate_payload(self): else: payload += pack('!B', (mask_bit | 127)) + pack('!Q', length) + # this is a bytes string being returned here + return payload + + +class ClientFrame(Frame): + """ Represent outgoing Client -> Server messages + """ + + def __init__(self, bytes): + super(ClientFrame, self).__init__(bytes) + + self.opcode = self.OPCODE_TEXT + self.payload = self.generate_payload() + + def data_to_bytes(self, data): + return bytearray(data, 'utf-8') + + def generate_mask(self, mask_key, data): + """ Mask data. + + :Parameters: + mask_key: byte string + 4 byte string(byte), e.g. '\x10\xc6\xc4\x16' + data: str + data to mask + + """ + # Masking of WebSocket traffic from client to server is required + # because of the unlikely chance that malicious code could cause + # some broken proxies to do the wrong thing and use this as an + # attack of some kind. Nobody has proved that this could actually + # happen, but since the fact that it could happen was reason enough + # for browser vendors to get twitchy, masking was added to remove + # the possibility of it being used as an attack. + if data is None: + data = "" + + data_bytes = self.data_to_bytes(data) + + _m = array.array("B", mask_key) + _d = array.array("B", data_bytes) + + for i in range(len(_d)): + _d[i] ^= _m[i % 4] + + return _d.tostring() + + def generate_payload(self): + payload = super(ClientFrame, self).generate_payload() + # we always mask frames from the client to server # use a string of n random bytes for the mask mask_key = os.urandom(4) - mask_data = self.generate_mask(mask_key=mask_key, data=self.body) mask = mask_key + mask_data payload += mask @@ -263,7 +268,7 @@ def __init__(self, bytes): self.opcode = bytes[0] & 0b1111 - if self.opcode != 9: + if self.opcode == 1: # Wamp data frames contain a json-encoded payload. # The other kind of frame we handle (opcode 0x9) is a ping and it # has a non-json payload @@ -324,3 +329,31 @@ def ensure_complete_frame(self, buffered_bytes): self.body = body_candidate self.payload_length_indicator = payload_length_indicator + + +class Ping(Frame): + + def __init__(self): + super(Ping, self).__init__(bytes='0x8a') + self.opcode = Frame.OPCODE_PING + self.payload = self.generate_payload() + + def data_to_bytes(self, data): + return data + + def generate_ping(key, timestamp=False): + data = ['0x89','0x8a'] # 0x89 = fin, ping 0x8a = masked,len=10 + data.extend(key) + if timestamp: + t = str(timestamp) + else: + t = str(int(time())) + for i in range(10): + masking_byte = int(key[i%4],16) + masked = ord(t[i]) + data.append(hex(masked ^ masking_byte)) + frame = '' + for i in range(len(data)): + frame += chr(int(data[i],16)) + return frame + From a1c978de17d8b72a71ee2239dbefcb1f830b25ed Mon Sep 17 00:00:00 2001 From: Simon Date: Sun, 11 Mar 2018 20:44:37 +0000 Subject: [PATCH 06/29] flakes --- test/transports/test_websockets.py | 14 +++++--------- wampy/session.py | 1 - wampy/transports/websocket/connection.py | 3 --- wampy/transports/websocket/frames.py | 17 ----------------- 4 files changed, 5 insertions(+), 30 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 8e5d4aa..b33c00a 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -1,18 +1,17 @@ -import json +from collections import OrderedDict import pytest import gevent from gevent import Greenlet -from geventwebsocket import WebSocketServer, Resource +from geventwebsocket import ( + WebSocketApplication, WebSocketServer, Resource, +) from mock import patch from wampy.transports.websocket.connection import WebSocket from wampy.transports.websocket.frames import Ping -from geventwebsocket import WebSocketServer, WebSocketApplication, Resource -from collections import OrderedDict - class TestApplication(WebSocketApplication): pass @@ -30,8 +29,6 @@ def server(): thread.kill() - - def test_websocket_connects_to_server(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') @@ -62,7 +59,6 @@ def connection_handler(): thread = gevent.spawn(connection_handler) with patch.object(websocket, 'handle_ping') as mock_handle: - while not hasattr(server, 'socket'): gevent.sleep(0.01) @@ -79,4 +75,4 @@ def connection_handler(): socket.send(payload) - + thread.kill() diff --git a/wampy/session.py b/wampy/session.py index de99ad5..9036101 100644 --- a/wampy/session.py +++ b/wampy/session.py @@ -3,7 +3,6 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. import logging -from functools import partial import gevent import gevent.queue diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 1a8de5c..0066604 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -65,10 +65,7 @@ def receive(self, bufsize=1): frame = None received_bytes = bytearray() while True: - print('receiving...') logger.debug("waiting for %s bytes", bufsize) - import pdb - #pdb.set_trace() try: bytes = self.socket.recv(bufsize) except gevent.greenlet.GreenletExit as exc: diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 83e548a..97f3507 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -340,20 +340,3 @@ def __init__(self): def data_to_bytes(self, data): return data - - def generate_ping(key, timestamp=False): - data = ['0x89','0x8a'] # 0x89 = fin, ping 0x8a = masked,len=10 - data.extend(key) - if timestamp: - t = str(timestamp) - else: - t = str(int(time())) - for i in range(10): - masking_byte = int(key[i%4],16) - masked = ord(t[i]) - data.append(hex(masked ^ masking_byte)) - frame = '' - for i in range(len(data)): - frame += chr(int(data[i],16)) - return frame - From 36c5c8820e0a00982b048faf95241859cc540031 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 14 Mar 2018 17:26:15 +0000 Subject: [PATCH 07/29] update websocket handling --- test/transports/test_websockets.py | 57 ++-- wampy/transports/websocket/connection.py | 17 +- wampy/transports/websocket/frames.py | 316 +++++++++++++---------- 3 files changed, 224 insertions(+), 166 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index b33c00a..f580985 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -6,7 +6,8 @@ from geventwebsocket import ( WebSocketApplication, WebSocketServer, Resource, ) -from mock import patch +from mock import ANY +from mock import call, patch from wampy.transports.websocket.connection import WebSocket from wampy.transports.websocket.frames import Ping @@ -29,38 +30,37 @@ def server(): thread.kill() -def test_websocket_connects_to_server(server): +def __test_websocket_connects_to_server(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') - - assert not hasattr(server, 'socket') - websocket.connect() - assert hasattr(server, 'socket') - assert server.clients + + assert len(server.clients) == 1 websocket.disconnect() def test_send_ping(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') - connection = websocket.connect(upgrade=False) - - def connection_handler(): - while True: - try: - message = connection.receive() + with patch.object(websocket, 'handle_ping') as mock_handle: + assert websocket.connected is False - except Exception as exc: - print(type(exc), exc) - raise + websocket.connect(upgrade=False) - return message + def connection_handler(): + while True: + try: + websocket.receive() + except Exception as exc: + print(exc) + raise - thread = gevent.spawn(connection_handler) + assert websocket.connected is True - with patch.object(websocket, 'handle_ping') as mock_handle: - while not hasattr(server, 'socket'): - gevent.sleep(0.01) + # the first bytes sent down the connection are the response bytes + # to the TCP connection and upgrade. we receieve in this thread + # because it will block all execution + Greenlet.spawn(connection_handler) + gevent.sleep(0.01) # enough for the upgrade to happen clients = server.clients assert len(clients) == 1 @@ -69,10 +69,17 @@ def connection_handler(): socket = client_handler.ws frame = Ping() - payload = frame.payload + payload = frame.generate_frame() + socket.send(payload) - ping_masked_bytes = b'0x8a0x850x370xfa0x210x3d0x7f0x9f0x4d0x510x58' + websocket.receive() - socket.send(payload) + with gevent.Timeout(1): + while mock_handle.call_count != 1: + gevent.sleep(0.01) - thread.kill() + assert mock_handle.call_count == 1 + assert mock_handle.call_args == call(ping_frame=ANY) + + call_param = mock_handle.call_args[1]['ping_frame'] + assert isinstance(call_param, Ping) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 0066604..ae837d6 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -18,7 +18,7 @@ from wampy.transports.interface import Transport from wampy.serializers import json_serialize -from . frames import ClientFrame, ServerFrame, PongFrame +from . frames import ClientFrame, FrameFactory, PongFrame logger = logging.getLogger(__name__) @@ -37,6 +37,7 @@ def __init__(self, server_url, ipv=4): self.websocket_location = self.resource self.key = encodestring(uuid.uuid4().bytes).decode('utf-8').strip() self.socket = None + self.connected = False def connect(self, upgrade=False): # TCP connection @@ -64,8 +65,10 @@ def _send_raw(self, websocket_message): def receive(self, bufsize=1): frame = None received_bytes = bytearray() + while True: logger.debug("waiting for %s bytes", bufsize) + try: bytes = self.socket.recv(bufsize) except gevent.greenlet.GreenletExit as exc: @@ -74,7 +77,6 @@ def receive(self, bufsize=1): message = str(e) raise ConnectionError('timeout: "{}"'.format(message)) except ConnectionResetError: - pdb.set_trace() raise ConnectionError('the connection was reset by the peer') except Exception as exc: raise ConnectionError( @@ -88,15 +90,12 @@ def receive(self, bufsize=1): received_bytes.extend(bytes) try: - frame = ServerFrame(received_bytes) + frame = FrameFactory.from_bytes(received_bytes) except IncompleteFrameError as exc: bufsize = exc.required_bytes logger.debug('now requesting the missing %s bytes', bufsize) else: - print(frame.opcode) if frame.opcode == frame.OPCODE_PING: - import pdb - pdb.set_trace() # Opcode 0x9 marks a ping frame. It does not contain wamp # data, so the frame is not returned. # Still it must be handled or the server will close the @@ -155,7 +154,7 @@ def _handshake(self, upgrade): self.socket.send(handshake.encode()) try: - with gevent.Timeout(5): + with gevent.Timeout(50): self.status, self.headers = self._read_handshake_response() except gevent.Timeout: raise WampyError( @@ -251,11 +250,11 @@ def read_line(): print(headers) logger.info("handshake complete: %s : %s", status, headers) - + self.connected = True return status, headers def handle_ping(self, ping_frame): - self._send_raw(PongFrame(ping_frame.payload).payload) + self._send_raw(PongFrame().generate_frame()) class SecureWebSocket(WebSocket): diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 97f3507..1235bf0 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -58,6 +58,8 @@ class Frame(object): OPCODE_PING, OPCODE_PONG, OPCODE_TEXT, ) + CONTROL_FRAMES = [OPCODE_PING, ] + # Frame Length # The WebSocket protocol has a frame-size limit of 2^63 octets, but @@ -81,8 +83,12 @@ class Frame(object): LENGTH_16 = 1 << 16 # 0x10000, 65536, 10000000000000000 MAX_LENGTH = 1 << 63 # 1 x 2**63 - def __init__(self, bytes): - self.body = bytes + def __init__(self, payload, opcode): + # this is just the payload, i.e. the bytes that the application cares + # about + self.payload = payload + self.opcode = opcode + self.fin_bit = 1 self.rsv1_bit = 0 self.rsv2_bit = 0 @@ -92,32 +98,76 @@ def __len__(self): # UTF-8 is an unicode encoding which uses more than one byte for # special characters. calculating the length needs consideration. try: - unicode_body = self.body.decode("utf-8") + unicode_body = self.payload.decode("utf-8") except UnicodeError: - unicode_body = self.body + unicode_body = self.payload except AttributeError: # already decoded, hence no "decode" attribute - unicode_body = self.body + unicode_body = self.payload + import pdb + pdb.set_trace() return len(unicode_body.encode('utf-8')) def __str__(self): - return self.body + return self.payload + + +class ClientFrame(Frame): + """ Represent outgoing Client -> Server messages + """ + + def __init__(self, payload, opcode=Frame.OPCODE_TEXT): + super(ClientFrame, self).__init__(payload, opcode) + + def data_to_bytes(self, data): + return bytearray(data, 'utf-8') + + def generate_mask(self, mask_key, data): + """ Mask data. + + :Parameters: + mask_key: byte string + 4 byte string(byte), e.g. '\x10\xc6\xc4\x16' + data: str + data to mask + + """ + # Masking of WebSocket traffic from client to server is required + # because of the unlikely chance that malicious code could cause + # some broken proxies to do the wrong thing and use this as an + # attack of some kind. Nobody has proved that this could actually + # happen, but since the fact that it could happen was reason enough + # for browser vendors to get twitchy, masking was added to remove + # the possibility of it being used as an attack. + if data is None: + data = "" - def generate_payload(self): + data_bytes = self.data_to_bytes(data) + + _m = array.array("B", mask_key) + _d = array.array("B", data_bytes) + + for i in range(len(_d)): + _d[i] ^= _m[i % 4] + + return _d.tostring() + + def generate_frame(self): """ Format data to string (bytes) to send to server. """ # the first byte contains the FIN bit, the 3 RSV bits and the # 4 opcode bits and for a client will *always* be 1000 0001 (or 129). # so we want the first byte to look like... # - # 1 0 0 0 0 0 0 1 + # 1 0 0 0 0 0 0 1 (1 is a text frame) # +-+-+-+-+-------+ # |F|R|R|R| opcode| - # |I|S|S|S| (4) | + # |I|S|S|S| | # |N|V|V|V| | # | |1|2|3| | # +-+-+-+-+-------+ + # note that because all RSV bits are zero, we can ignore them # this shifts each bit into position and bitwise ORs them together, # using the struct module to pack them as incoming network bytes @@ -128,8 +178,6 @@ def generate_payload(self): ) ) # which is '\x81' as a raw byte repr - # note that because all RSV bits are zero, we can ignore them - # the second byte - and maybe the 7 after this, we'll use to tell # the server how long our payload is. @@ -160,56 +208,6 @@ def generate_payload(self): else: payload += pack('!B', (mask_bit | 127)) + pack('!Q', length) - # this is a bytes string being returned here - return payload - - -class ClientFrame(Frame): - """ Represent outgoing Client -> Server messages - """ - - def __init__(self, bytes): - super(ClientFrame, self).__init__(bytes) - - self.opcode = self.OPCODE_TEXT - self.payload = self.generate_payload() - - def data_to_bytes(self, data): - return bytearray(data, 'utf-8') - - def generate_mask(self, mask_key, data): - """ Mask data. - - :Parameters: - mask_key: byte string - 4 byte string(byte), e.g. '\x10\xc6\xc4\x16' - data: str - data to mask - - """ - # Masking of WebSocket traffic from client to server is required - # because of the unlikely chance that malicious code could cause - # some broken proxies to do the wrong thing and use this as an - # attack of some kind. Nobody has proved that this could actually - # happen, but since the fact that it could happen was reason enough - # for browser vendors to get twitchy, masking was added to remove - # the possibility of it being used as an attack. - if data is None: - data = "" - - data_bytes = self.data_to_bytes(data) - - _m = array.array("B", mask_key) - _d = array.array("B", data_bytes) - - for i in range(len(_d)): - _d[i] ^= _m[i % 4] - - return _d.tostring() - - def generate_payload(self): - payload = super(ClientFrame, self).generate_payload() - # we always mask frames from the client to server # use a string of n random bytes for the mask mask_key = os.urandom(4) @@ -221,76 +219,27 @@ def generate_payload(self): return payload -class PongFrame(ClientFrame): - - def __init__(self, *args): - super(PongFrame, self).__init__(*args) - self.opcode = 0xa - self.payload = self.generate_payload() - - def data_to_bytes(self, data): - return data - - -class ServerFrame(Frame): - """ Represent incoming Server -> Client messages - """ - - def __init__(self, bytes): - super(ServerFrame, self).__init__(bytes) - - if not bytes: - return +class FrameFactory(object): + @classmethod + def from_bytes(cls, bytes): try: - self.payload_length_indicator = bytes[1] & 0b1111111 + payload_length_indicator = bytes[1] & 0b1111111 except Exception: raise IncompleteFrameError(required_bytes=1) - # if this doesn't raise, all the above will receive a value - self.ensure_complete_frame(bytes) - - # server must not mask the payload - mask = bytes[1] >> 7 - assert mask == 0 - - self.buffered_bytes = bytes - - self.len = 0 - # Parse the first two bytes of header. - self.fin = bytes[0] >> 7 - - if self.fin == 0: - logger.exception("Multiple Frames Returned: %s", bytes) - raise WampyError( - 'Multiple framed responses not yet supported: {}'.format(bytes) - ) + available_bytes_for_body = bytes[2:] + if not available_bytes_for_body: + opcode = bytes[0] & 0xf + if opcode == Frame.OPCODE_BINARY: + # binary - the handshake response? + return Frame(payload=None, opcode=Frame.OPCODE_BINARY) - self.opcode = bytes[0] & 0b1111 - - if self.opcode == 1: - # Wamp data frames contain a json-encoded payload. - # The other kind of frame we handle (opcode 0x9) is a ping and it - # has a non-json payload - try: - # decode required before loading JSON for python 2 only - self.payload = json.loads(self.body.decode('utf-8')) - except Exception: - raise WebsocktProtocolError( - 'Failed to load JSON object from: "%s"', self.body - ) - else: - self.payload = self.body + if opcode in Frame.CONTROL_FRAMES: + if opcode == Frame.OPCODE_PING: + return Ping() - def ensure_complete_frame(self, buffered_bytes): - # we need a minimum of 2 bytes to determine the payload length and - # hence whether this is a complete frame or not. - if len(buffered_bytes) < 2: - # there are more bytes to receive. - raise IncompleteFrameError(required_bytes=1) - - payload_length_indicator = buffered_bytes[1] & 0b1111111 - available_bytes_for_body = buffered_bytes[2:] + available_bytes_for_body = bytes[2:] try: available_bytes_for_body[1] @@ -327,16 +276,119 @@ def ensure_complete_frame(self, buffered_bytes): required_bytes=required_bytes ) - self.body = body_candidate - self.payload_length_indicator = payload_length_indicator + # server must not mask the payload + mask = bytes[1] >> 7 + assert mask == 0 + + # Parse the first two bytes of header. + fin = bytes[0] >> 7 + + if fin == 0: + logger.exception("Multiple Frames Returned: %s", bytes) + raise WampyError( + 'Multiple framed responses not yet supported: {}'.format(bytes) + ) + + try: + # decode required before loading JSON for python 2 only + payload = json.loads(body_candidate.decode('utf-8')) + except Exception: + raise WebsocktProtocolError( + 'Failed to load JSON object from: "%s"', body_candidate + ) + + return Frame(payload=payload, opcode=opcode) class Ping(Frame): - def __init__(self): - super(Ping, self).__init__(bytes='0x8a') - self.opcode = Frame.OPCODE_PING - self.payload = self.generate_payload() + def __init__(self, payload=None): + # PING is a Control Frame denoted by the opcode 9 + # this modelsa PING from the Server -> Client, and as such does + # not mask. + super(Ping, self).__init__(payload=payload, opcode=Frame.OPCODE_PING) + if self.payload is None: + self.payload = self.generate_frame() - def data_to_bytes(self, data): - return data + def generate_frame(self): + # This is long hand for documentation purposes + + # the first byte contains the FIN bit, the 3 RSV bits and the + # 4 opcode bits, so we are looking for + + # 1 0 0 0 0 1 0 1 Opcode 9 for a Ping Frame + # +-+-+-+-+-------+ + # |F|R|R|R| opcode| + # |I|S|S|S| | + # |N|V|V|V| | + # | |1|2|3| | + # +-+-+-+-+-------+ + + # this shifts each bit into position and bitwise ORs them together, + # using the struct module to pack them as incoming network bytes + b0 = pack( + '!B', ( + (self.fin_bit << 7) | + self.opcode + ) + ) # which is '\x81' as a raw byte repr + + # frame-masked ; 1 bit in length + # frame-payload-length ; either 7, 7+16, + # ; or 7+64 bits in length + + # second byte, payload len bytes and mask, so we are looking for + + # 0 0 0 0 0 0 0 0 + # +-+-+-+-+-------+ + # |M| payload len | + # |A| | + # |S| | + # |K| | + # +-+-+-+-+-------+ + + b1 = 0 + # as implemented here, the Client could not send this as it is not + # masking + mask_bit = 0 << 7 + # next we have to | this bit with the payload length, + # if not too long! + b1 = pack('!B', (mask_bit | 0)) + + bytes = b''.join([b0, b1]) + + first_byte = bytes[0] + assert self.opcode == first_byte & 0xf + + # we could append payload data such as a HELLO here... + # then we'd have to update the 2nd byte with the new length + + return bytes # this is b'\x89\x00' + + +class PongFrame(ClientFrame): + + def __init__(self, payload=''): + super(PongFrame, self).__init__( + payload=payload, opcode=Frame.OPCODE_PONG, + ) + if not self.payload: + self.payload = self.generate_frame() + + def generate_frame(self): + b0 = pack( + '!B', ( + (self.fin_bit << 7) | + self.opcode + ) + ) + + b1 = 0 + mask_bit = 1 << 7 # masked, as the Client is sending + b1 = pack('!B', (mask_bit | 0)) + bytes = b''.join([b0, b1]) + + first_byte = bytes[0] + assert self.opcode == first_byte & 0xf + + return bytes # this is b'\x8a\x00' From 1d3b49727f3f9b484760011ef5e95a702a0bf408 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 14 Mar 2018 18:07:41 +0000 Subject: [PATCH 08/29] move stuff about --- wampy/transports/websocket/connection.py | 2 + wampy/transports/websocket/frames.py | 170 ++++++++++++----------- 2 files changed, 89 insertions(+), 83 deletions(-) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index ae837d6..b1eb557 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -60,6 +60,8 @@ def send(self, message): self._send_raw(websocket_message) def _send_raw(self, websocket_message): + import pdb + pdb.set_trace() self.socket.sendall(websocket_message) def receive(self, bufsize=1): diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 1235bf0..a03e129 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -113,6 +113,89 @@ def __str__(self): return self.payload +class FrameFactory(object): + + @classmethod + def from_bytes(cls, bytes): + try: + payload_length_indicator = bytes[1] & 0b1111111 + except Exception: + raise IncompleteFrameError(required_bytes=1) + + available_bytes_for_body = bytes[2:] + if not available_bytes_for_body: + opcode = bytes[0] & 0xf + if opcode == Frame.OPCODE_BINARY: + # binary - the handshake response? + return Frame(payload=None, opcode=Frame.OPCODE_BINARY) + + if opcode in Frame.CONTROL_FRAMES: + if opcode == Frame.OPCODE_PING: + return Ping() + + import pdb + pdb.set_trace() + available_bytes_for_body = bytes[2:] + + try: + available_bytes_for_body[1] + except IndexError: + raise IncompleteFrameError(required_bytes=payload_length_indicator) + + # unpack the buffered bytes into an integer + body_length = unpack_from(">h", available_bytes_for_body)[0] + + if payload_length_indicator < 126: + # then we have enough knowlege about the payload length as it's + # contained within the 2nd byte of the header - because the + # trailing 7 bits of the 2 bytes tells us exactly how long the + # payload is + body_candidate = available_bytes_for_body + # in this case body length is represented by the indicator + body_length = payload_length_indicator + + elif payload_length_indicator == 126: + # then we don't have enough knowledge yet. + # and actually the following two bytes indicate the payload length. + # get all buffered bytes beyond the header and the excluded 2 bytes + body_candidate = available_bytes_for_body[2:] # require >= 2 bytes + + else: + # actually, the following eight bytes indicate the payload length. + # so check we have at least as much as we need, else exit. + body_candidate = available_bytes_for_body[6:] # require >= 8 bytes + + if len(body_candidate) < body_length: + required_bytes = body_length - len(body_candidate) + logger.debug("missing %s bytes", required_bytes) + raise IncompleteFrameError( + required_bytes=required_bytes + ) + + # server must not mask the payload + mask = bytes[1] >> 7 + assert mask == 0 + + # Parse the first two bytes of header. + fin = bytes[0] >> 7 + + if fin == 0: + logger.exception("Multiple Frames Returned: %s", bytes) + raise WampyError( + 'Multiple framed responses not yet supported: {}'.format(bytes) + ) + + try: + # decode required before loading JSON for python 2 only + payload = json.loads(body_candidate.decode('utf-8')) + except Exception: + raise WebsocktProtocolError( + 'Failed to load JSON object from: "%s"', body_candidate + ) + + return Frame(payload=payload, opcode=opcode) + + class ClientFrame(Frame): """ Represent outgoing Client -> Server messages """ @@ -216,88 +299,9 @@ def generate_frame(self): payload += mask # this is a bytes string being returned here - return payload - - -class FrameFactory(object): - - @classmethod - def from_bytes(cls, bytes): - try: - payload_length_indicator = bytes[1] & 0b1111111 - except Exception: - raise IncompleteFrameError(required_bytes=1) - - available_bytes_for_body = bytes[2:] - if not available_bytes_for_body: - opcode = bytes[0] & 0xf - if opcode == Frame.OPCODE_BINARY: - # binary - the handshake response? - return Frame(payload=None, opcode=Frame.OPCODE_BINARY) - - if opcode in Frame.CONTROL_FRAMES: - if opcode == Frame.OPCODE_PING: - return Ping() - - available_bytes_for_body = bytes[2:] - - try: - available_bytes_for_body[1] - except IndexError: - raise IncompleteFrameError(required_bytes=payload_length_indicator) - - # unpack the buffered bytes into an integer - body_length = unpack_from(">h", available_bytes_for_body)[0] - - if payload_length_indicator < 126: - # then we have enough knowlege about the payload length as it's - # contained within the 2nd byte of the header - because the - # trailing 7 bits of the 2 bytes tells us exactly how long the - # payload is - body_candidate = available_bytes_for_body - # in this case body length is represented by the indicator - body_length = payload_length_indicator - - elif payload_length_indicator == 126: - # then we don't have enough knowledge yet. - # and actually the following two bytes indicate the payload length. - # get all buffered bytes beyond the header and the excluded 2 bytes - body_candidate = available_bytes_for_body[2:] # require >= 2 bytes - - else: - # actually, the following eight bytes indicate the payload length. - # so check we have at least as much as we need, else exit. - body_candidate = available_bytes_for_body[6:] # require >= 8 bytes - - if len(body_candidate) < body_length: - required_bytes = body_length - len(body_candidate) - logger.debug("missing %s bytes", required_bytes) - raise IncompleteFrameError( - required_bytes=required_bytes - ) - - # server must not mask the payload - mask = bytes[1] >> 7 - assert mask == 0 - - # Parse the first two bytes of header. - fin = bytes[0] >> 7 - - if fin == 0: - logger.exception("Multiple Frames Returned: %s", bytes) - raise WampyError( - 'Multiple framed responses not yet supported: {}'.format(bytes) - ) - - try: - # decode required before loading JSON for python 2 only - payload = json.loads(body_candidate.decode('utf-8')) - except Exception: - raise WebsocktProtocolError( - 'Failed to load JSON object from: "%s"', body_candidate - ) - - return Frame(payload=payload, opcode=opcode) + import pdb + pdb.set_trace() + return self.data_to_bytes(payload) class Ping(Frame): @@ -366,7 +370,7 @@ def generate_frame(self): return bytes # this is b'\x89\x00' -class PongFrame(ClientFrame): +class PongFrame(Frame): def __init__(self, payload=''): super(PongFrame, self).__init__( From c1943795dc13a4301ed5549e6c154889ad5fd19f Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 15 Mar 2018 17:59:28 +0000 Subject: [PATCH 09/29] remove super calls to simplify debugging of WAMP messaging --- wampy/message_handler.py | 5 + wampy/transports/websocket/connection.py | 7 +- wampy/transports/websocket/frames.py | 127 +++++++++++++---------- 3 files changed, 81 insertions(+), 58 deletions(-) diff --git a/wampy/message_handler.py b/wampy/message_handler.py index 9438228..16b06b4 100644 --- a/wampy/message_handler.py +++ b/wampy/message_handler.py @@ -33,6 +33,11 @@ class MessageHandler(object): """ def handle_message(self, message, client): + if message is None: + logger.warning('no message to handle') + import sys + sys.exit() + wamp_code = message[0] logger.debug( diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index b1eb557..011f44e 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -56,12 +56,11 @@ def disconnect(self): def send(self, message): serialized_message = json_serialize(message) frame = ClientFrame(serialized_message) - websocket_message = frame.payload + websocket_message = frame.frame self._send_raw(websocket_message) def _send_raw(self, websocket_message): - import pdb - pdb.set_trace() + logger.debug('send raw: %s', websocket_message) self.socket.sendall(websocket_message) def receive(self, bufsize=1): @@ -107,6 +106,8 @@ def receive(self, bufsize=1): continue break + logger.info(frame.frame) + if frame is None: raise WampProtocolError("No frame returned") diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index a03e129..7e76c47 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -83,58 +83,32 @@ class Frame(object): LENGTH_16 = 1 << 16 # 0x10000, 65536, 10000000000000000 MAX_LENGTH = 1 << 63 # 1 x 2**63 - def __init__(self, payload, opcode): + def __init__(self, bytes, payload=None): + self.bytes = bytes + self.opcode = bytes[0] & 0xf # this is just the payload, i.e. the bytes that the application cares # about - self.payload = payload - self.opcode = opcode - - self.fin_bit = 1 - self.rsv1_bit = 0 - self.rsv2_bit = 0 - self.rsv3_bit = 0 + self.payload = payload or Frame.generate_payload(bytes) def __len__(self): - # UTF-8 is an unicode encoding which uses more than one byte for - # special characters. calculating the length needs consideration. - try: - unicode_body = self.payload.decode("utf-8") - except UnicodeError: - unicode_body = self.payload - except AttributeError: - # already decoded, hence no "decode" attribute - unicode_body = self.payload - - import pdb - pdb.set_trace() - return len(unicode_body.encode('utf-8')) + # the lengrh of the ws frame (not just the payload) + return len(self.bytes) def __str__(self): return self.payload - -class FrameFactory(object): - @classmethod - def from_bytes(cls, bytes): + def generate_payload(cls, bytes): try: payload_length_indicator = bytes[1] & 0b1111111 except Exception: raise IncompleteFrameError(required_bytes=1) available_bytes_for_body = bytes[2:] - if not available_bytes_for_body: - opcode = bytes[0] & 0xf - if opcode == Frame.OPCODE_BINARY: - # binary - the handshake response? - return Frame(payload=None, opcode=Frame.OPCODE_BINARY) - if opcode in Frame.CONTROL_FRAMES: - if opcode == Frame.OPCODE_PING: - return Ping() + if not available_bytes_for_body: + return None - import pdb - pdb.set_trace() available_bytes_for_body = bytes[2:] try: @@ -193,15 +167,52 @@ def from_bytes(cls, bytes): 'Failed to load JSON object from: "%s"', body_candidate ) - return Frame(payload=payload, opcode=opcode) + return payload + + @property + def frame(self): + return self.bytes + + +class FrameFactory(object): + + @classmethod + def from_bytes(cls, bytes): + opcode = bytes[0] & 0xf + payload = Frame.generate_payload(bytes) + if payload is None: + if opcode == Frame.OPCODE_BINARY: + # binary - the handshake response? + return Frame(bytes=bytes, payload=None) + + if opcode in Frame.CONTROL_FRAMES: + if opcode == Frame.OPCODE_PING: + return Ping() + + raise IncompleteFrameError(required_bytes=1) + + return Frame(bytes=bytes, payload=payload) class ClientFrame(Frame): - """ Represent outgoing Client -> Server messages + """ Represent outgoing Client -> Server messages. + + Takes a paylod and wraps it in a WebSocket frame. + """ - def __init__(self, payload, opcode=Frame.OPCODE_TEXT): - super(ClientFrame, self).__init__(payload, opcode) + def __init__(self, message): + """ + + :Parameters: + message : str + The data to be sent to the server. Tbis will form the + "payload" segments of the WebSocket frame(s). + + """ + self.fin_bit = 1 + self.opcode = Frame.OPCODE_TEXT + self.bytes = self.generate_frame(message) def data_to_bytes(self, data): return bytearray(data, 'utf-8') @@ -236,7 +247,7 @@ def generate_mask(self, mask_key, data): return _d.tostring() - def generate_frame(self): + def generate_frame(self, message): """ Format data to string (bytes) to send to server. """ # the first byte contains the FIN bit, the 3 RSV bits and the @@ -251,12 +262,13 @@ def generate_frame(self): # | |1|2|3| | # +-+-+-+-+-------+ # note that because all RSV bits are zero, we can ignore them + fin_bit = 1 # this shifts each bit into position and bitwise ORs them together, # using the struct module to pack them as incoming network bytes payload = pack( '!B', ( - (self.fin_bit << 7) | + (fin_bit << 7) | self.opcode ) ) # which is '\x81' as a raw byte repr @@ -278,7 +290,7 @@ def generate_frame(self): # i.e. encoded mask_bit = 1 << 7 # next we have to | this bit with the payload length, if not too long! - length = len(self) + length = len(message) if length >= self.MAX_LENGTH: raise WebsocktProtocolError("data is too long") @@ -294,25 +306,32 @@ def generate_frame(self): # we always mask frames from the client to server # use a string of n random bytes for the mask mask_key = os.urandom(4) - mask_data = self.generate_mask(mask_key=mask_key, data=self.body) + mask_data = self.generate_mask(mask_key=mask_key, data=message) mask = mask_key + mask_data payload += mask # this is a bytes string being returned here - import pdb - pdb.set_trace() - return self.data_to_bytes(payload) + return payload class Ping(Frame): - def __init__(self, payload=None): + def __init__(self, message=''): + """ + + :Parameters: + message : str + An optional message to send along with the PING, + e.g. "hello". Forms the ``payload`` of the WebSocket + message. + + """ # PING is a Control Frame denoted by the opcode 9 # this modelsa PING from the Server -> Client, and as such does # not mask. - super(Ping, self).__init__(payload=payload, opcode=Frame.OPCODE_PING) - if self.payload is None: - self.payload = self.generate_frame() + self.fin_bit = 1 + self.opcode = Frame.OPCODE_PING + self.bytes = self.generate_frame() def generate_frame(self): # This is long hand for documentation purposes @@ -373,11 +392,9 @@ def generate_frame(self): class PongFrame(Frame): def __init__(self, payload=''): - super(PongFrame, self).__init__( - payload=payload, opcode=Frame.OPCODE_PONG, - ) - if not self.payload: - self.payload = self.generate_frame() + self.fin_bit = 1 + self.opcode = Frame.OPCODE_PONG + self.bytes = self.generate_frame() def generate_frame(self): b0 = pack( From dbc980100de7b4fd28d352b65668aab1f1029c54 Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 15 Mar 2018 18:08:56 +0000 Subject: [PATCH 10/29] debugging --- wampy/transports/websocket/frames.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 7e76c47..fcc4a1f 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -91,8 +91,15 @@ def __init__(self, bytes, payload=None): self.payload = payload or Frame.generate_payload(bytes) def __len__(self): - # the lengrh of the ws frame (not just the payload) - return len(self.bytes) + try: + unicode_body = self.bytes.decode("utf-8") + except UnicodeError: + unicode_body = self.bytes + except AttributeError: + # already decoded, hence no "decode" attribute + unicode_body = self.bytes + + return len(unicode_body) def __str__(self): return self.payload @@ -178,9 +185,9 @@ class FrameFactory(object): @classmethod def from_bytes(cls, bytes): - opcode = bytes[0] & 0xf payload = Frame.generate_payload(bytes) if payload is None: + opcode = bytes[0] & 0xf if opcode == Frame.OPCODE_BINARY: # binary - the handshake response? return Frame(bytes=bytes, payload=None) From 0b989259c6d8b9ef23cd0837ab24b0f3ef884c64 Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 15 Mar 2018 20:14:51 +0000 Subject: [PATCH 11/29] len is probably payload len --- wampy/transports/websocket/frames.py | 35 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index fcc4a1f..e1199e1 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -92,12 +92,12 @@ def __init__(self, bytes, payload=None): def __len__(self): try: - unicode_body = self.bytes.decode("utf-8") + unicode_body = self.payload.decode("utf-8") except UnicodeError: - unicode_body = self.bytes + unicode_body = self.payload except AttributeError: # already decoded, hence no "decode" attribute - unicode_body = self.bytes + unicode_body = self.payload return len(unicode_body) @@ -106,6 +106,18 @@ def __str__(self): @classmethod def generate_payload(cls, bytes): + if not bytes: + raise IncompleteFrameError(required_bytes=1) + + # Parse the first two bytes of header. + fin = bytes[0] >> 7 + + if fin == 0: + logger.exception("Multiple Frames Returned: %s", bytes) + raise WampyError( + 'Multiple framed responses not yet supported: {}'.format(bytes) + ) + try: payload_length_indicator = bytes[1] & 0b1111111 except Exception: @@ -153,19 +165,6 @@ def generate_payload(cls, bytes): required_bytes=required_bytes ) - # server must not mask the payload - mask = bytes[1] >> 7 - assert mask == 0 - - # Parse the first two bytes of header. - fin = bytes[0] >> 7 - - if fin == 0: - logger.exception("Multiple Frames Returned: %s", bytes) - raise WampyError( - 'Multiple framed responses not yet supported: {}'.format(bytes) - ) - try: # decode required before loading JSON for python 2 only payload = json.loads(body_candidate.decode('utf-8')) @@ -186,6 +185,7 @@ class FrameFactory(object): @classmethod def from_bytes(cls, bytes): payload = Frame.generate_payload(bytes) + logger.info('got a frame: %s', bytes) if payload is None: opcode = bytes[0] & 0xf if opcode == Frame.OPCODE_BINARY: @@ -219,6 +219,7 @@ def __init__(self, message): """ self.fin_bit = 1 self.opcode = Frame.OPCODE_TEXT + self.payload = message self.bytes = self.generate_frame(message) def data_to_bytes(self, data): @@ -297,7 +298,7 @@ def generate_frame(self, message): # i.e. encoded mask_bit = 1 << 7 # next we have to | this bit with the payload length, if not too long! - length = len(message) + length = len(self) if length >= self.MAX_LENGTH: raise WebsocktProtocolError("data is too long") From 4549e3c42c71e25da71e1f6fe6dfb3b5f12637b4 Mon Sep 17 00:00:00 2001 From: Simon Date: Thu, 15 Mar 2018 20:52:26 +0000 Subject: [PATCH 12/29] debug logging and do the WAMP extra handshake --- wampy/transports/websocket/connection.py | 13 +++++++++---- wampy/transports/websocket/frames.py | 5 +++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 011f44e..0fb550d 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -39,7 +39,7 @@ def __init__(self, server_url, ipv=4): self.socket = None self.connected = False - def connect(self, upgrade=False): + def connect(self, upgrade=True): # TCP connection self._connect() self._handshake(upgrade=upgrade) @@ -68,7 +68,7 @@ def receive(self, bufsize=1): received_bytes = bytearray() while True: - logger.debug("waiting for %s bytes", bufsize) + logger.warning("waiting for %s bytes", bufsize) try: bytes = self.socket.recv(bufsize) @@ -87,12 +87,13 @@ def receive(self, bufsize=1): if not bytes: break - logger.debug("received %s bytes", bufsize) + logger.warning("received %s bytes", bufsize) received_bytes.extend(bytes) try: frame = FrameFactory.from_bytes(received_bytes) except IncompleteFrameError as exc: + logger.warning('not good enough: %s', received_bytes) bufsize = exc.required_bytes logger.debug('now requesting the missing %s bytes', bufsize) else: @@ -104,9 +105,13 @@ def receive(self, bufsize=1): self.handle_ping(ping_frame=frame) received_bytes = bytearray() continue + if frame.opcode == frame.OPCODE_BINARY: + import pdb + pdb.set_trace() break - logger.info(frame.frame) + logger.warning('happy with %s', frame.frame) + logger.warning(frame.frame) if frame is None: raise WampProtocolError("No frame returned") diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index e1199e1..9f406c8 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -106,7 +106,7 @@ def __str__(self): @classmethod def generate_payload(cls, bytes): - if not bytes: + if not bytes or len(bytes) < 2: raise IncompleteFrameError(required_bytes=1) # Parse the first two bytes of header. @@ -173,6 +173,7 @@ def generate_payload(cls, bytes): 'Failed to load JSON object from: "%s"', body_candidate ) + logger.info('generated payload: %s', payload) return payload @property @@ -204,7 +205,7 @@ def from_bytes(cls, bytes): class ClientFrame(Frame): """ Represent outgoing Client -> Server messages. - Takes a paylod and wraps it in a WebSocket frame. + Takes a payload and wraps it in a WebSocket frame. """ From 076f6eca0bf18eed03ac0a124d531da294564644 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 16 Mar 2018 19:07:26 +0100 Subject: [PATCH 13/29] handling control and binary frames --- test/transports/test_websockets.py | 15 ++++++---- wampy/transports/websocket/connection.py | 14 ++------- wampy/transports/websocket/frames.py | 36 ++++++++++++++++-------- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index f580985..5b79f25 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -1,3 +1,4 @@ +import logging from collections import OrderedDict import pytest @@ -12,6 +13,8 @@ from wampy.transports.websocket.connection import WebSocket from wampy.transports.websocket.frames import Ping +logger = logging.getLogger(__name__) + class TestApplication(WebSocketApplication): pass @@ -49,10 +52,12 @@ def test_send_ping(server): def connection_handler(): while True: try: - websocket.receive() - except Exception as exc: - print(exc) + message = websocket.receive() + except Exception: + logger.execption('connection handler exploded') raise + if message: + logger.info('got message: %s', message) assert websocket.connected is True @@ -72,9 +77,7 @@ def connection_handler(): payload = frame.generate_frame() socket.send(payload) - websocket.receive() - - with gevent.Timeout(1): + with gevent.Timeout(5): while mock_handle.call_count != 1: gevent.sleep(0.01) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 0fb550d..844c5fd 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -68,8 +68,6 @@ def receive(self, bufsize=1): received_bytes = bytearray() while True: - logger.warning("waiting for %s bytes", bufsize) - try: bytes = self.socket.recv(bufsize) except gevent.greenlet.GreenletExit as exc: @@ -87,15 +85,12 @@ def receive(self, bufsize=1): if not bytes: break - logger.warning("received %s bytes", bufsize) received_bytes.extend(bytes) try: frame = FrameFactory.from_bytes(received_bytes) except IncompleteFrameError as exc: - logger.warning('not good enough: %s', received_bytes) bufsize = exc.required_bytes - logger.debug('now requesting the missing %s bytes', bufsize) else: if frame.opcode == frame.OPCODE_PING: # Opcode 0x9 marks a ping frame. It does not contain wamp @@ -106,12 +101,9 @@ def receive(self, bufsize=1): received_bytes = bytearray() continue if frame.opcode == frame.OPCODE_BINARY: - import pdb - pdb.set_trace() - break + break - logger.warning('happy with %s', frame.frame) - logger.warning(frame.frame) + break if frame is None: raise WampProtocolError("No frame returned") @@ -228,7 +220,6 @@ def read_line(): received_bytes = read_line() if received_bytes == b'\r\n': # end of the response - print('end of response') break bytes_as_str = received_bytes.decode() @@ -255,7 +246,6 @@ def read_line(): key, value = kv headers[key.lower()] = value.strip().lower() - print(headers) logger.info("handshake complete: %s : %s", status, headers) self.connected = True diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 9f406c8..c77a386 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -58,6 +58,8 @@ class Frame(object): OPCODE_PING, OPCODE_PONG, OPCODE_TEXT, ) + # not intended to carr data for the application but instead for + # protocol-level signaling, CONTROL_FRAMES = [OPCODE_PING, ] # Frame Length @@ -99,10 +101,13 @@ def __len__(self): # already decoded, hence no "decode" attribute unicode_body = self.payload + if unicode_body is None: + return 0 + return len(unicode_body) def __str__(self): - return self.payload + return str(self.payload) @classmethod def generate_payload(cls, bytes): @@ -113,11 +118,20 @@ def generate_payload(cls, bytes): fin = bytes[0] >> 7 if fin == 0: - logger.exception("Multiple Frames Returned: %s", bytes) raise WampyError( 'Multiple framed responses not yet supported: {}'.format(bytes) ) + opcode = bytes[0] & 0xf + # binary data interpretation is left up to th application... + if opcode == Frame.OPCODE_BINARY: + # ..and wampy ignores them + return None + + if opcode in Frame.CONTROL_FRAMES: + # handled by wampy + return None + try: payload_length_indicator = bytes[1] & 0b1111111 except Exception: @@ -126,14 +140,9 @@ def generate_payload(cls, bytes): available_bytes_for_body = bytes[2:] if not available_bytes_for_body: - return None - - available_bytes_for_body = bytes[2:] - - try: - available_bytes_for_body[1] - except IndexError: - raise IncompleteFrameError(required_bytes=payload_length_indicator) + raise IncompleteFrameError( + required_bytes=payload_length_indicator + ) # unpack the buffered bytes into an integer body_length = unpack_from(">h", available_bytes_for_body)[0] @@ -165,6 +174,10 @@ def generate_payload(cls, bytes): required_bytes=required_bytes ) + opcode = bytes[0] & 0xf + if opcode == Frame.OPCODE_BINARY: + return body_candidate + try: # decode required before loading JSON for python 2 only payload = json.loads(body_candidate.decode('utf-8')) @@ -173,7 +186,7 @@ def generate_payload(cls, bytes): 'Failed to load JSON object from: "%s"', body_candidate ) - logger.info('generated payload: %s', payload) + logger.debug('generated payload: %s', payload) return payload @property @@ -186,7 +199,6 @@ class FrameFactory(object): @classmethod def from_bytes(cls, bytes): payload = Frame.generate_payload(bytes) - logger.info('got a frame: %s', bytes) if payload is None: opcode = bytes[0] & 0xf if opcode == Frame.OPCODE_BINARY: From 80059e8c88bd676c252cadc3f6b6f548ff30d7c0 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 16 Mar 2018 20:20:49 +0100 Subject: [PATCH 14/29] add logging to JSON errors --- wampy/transports/websocket/frames.py | 1 + 1 file changed, 1 insertion(+) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index c77a386..88b9428 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -182,6 +182,7 @@ def generate_payload(cls, bytes): # decode required before loading JSON for python 2 only payload = json.loads(body_candidate.decode('utf-8')) except Exception: + logger.error('invalid WAMP payload: %s', body_candidate) raise WebsocktProtocolError( 'Failed to load JSON object from: "%s"', body_candidate ) From e9c66cb734fdec7a315c9a2149e56b7d9583e555 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 16 Mar 2018 20:32:02 +0100 Subject: [PATCH 15/29] typo and clarity --- wampy/transports/websocket/frames.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 88b9428..c5a373b 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -227,14 +227,14 @@ def __init__(self, message): :Parameters: message : str - The data to be sent to the server. Tbis will form the + The data to be sent to the server. Tis will form the "payload" segments of the WebSocket frame(s). """ self.fin_bit = 1 self.opcode = Frame.OPCODE_TEXT self.payload = message - self.bytes = self.generate_frame(message) + self.bytes = self.generate_frame(self.payload) def data_to_bytes(self, data): return bytearray(data, 'utf-8') From 134ee4d085ce2b14d7c91b817c082b6c42c46336 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 11:22:41 +0100 Subject: [PATCH 16/29] debugging unicode --- wampy/messages/call.py | 5 +++++ wampy/serializers.py | 3 ++- wampy/transports/websocket/frames.py | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/wampy/messages/call.py b/wampy/messages/call.py index 9c7ce48..c085961 100644 --- a/wampy/messages/call.py +++ b/wampy/messages/call.py @@ -38,6 +38,11 @@ def __init__(self, procedure, options=None, args=None, kwargs=None): @property def message(self): + kwargs = self.kwargs + #encoded_kwargs = { + # key: val.encode('utf8') for key, val in kwargs.items() + #} + return [ self.WAMP_CODE, self.request_id, self.options, self.procedure, self.args, self.kwargs diff --git a/wampy/serializers.py b/wampy/serializers.py index a208e0b..98fcab9 100644 --- a/wampy/serializers.py +++ b/wampy/serializers.py @@ -19,5 +19,6 @@ def json_serialize(message): message, str(exc) ) ) - + import pdb + pdb.set_trace() return data diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index c5a373b..05f93f4 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -123,6 +123,7 @@ def generate_payload(cls, bytes): ) opcode = bytes[0] & 0xf + # binary data interpretation is left up to th application... if opcode == Frame.OPCODE_BINARY: # ..and wampy ignores them @@ -284,13 +285,12 @@ def generate_frame(self, message): # | |1|2|3| | # +-+-+-+-+-------+ # note that because all RSV bits are zero, we can ignore them - fin_bit = 1 # this shifts each bit into position and bitwise ORs them together, # using the struct module to pack them as incoming network bytes payload = pack( '!B', ( - (fin_bit << 7) | + (self.fin_bit << 7) | self.opcode ) ) # which is '\x81' as a raw byte repr From 07b11d11edc470b482001ebc73ffc010b4b4fe97 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 11:24:57 +0100 Subject: [PATCH 17/29] debugging unicode --- wampy/transports/websocket/connection.py | 2 ++ wampy/transports/websocket/frames.py | 1 + 2 files changed, 3 insertions(+) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 844c5fd..394b4da 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -54,7 +54,9 @@ def disconnect(self): self.socket.close() def send(self, message): + logger.warning('message: %s', message) serialized_message = json_serialize(message) + logger.warning('serialized message: %s', serialized_message) frame = ClientFrame(serialized_message) websocket_message = frame.frame self._send_raw(websocket_message) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 05f93f4..1222f05 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -236,6 +236,7 @@ def __init__(self, message): self.opcode = Frame.OPCODE_TEXT self.payload = message self.bytes = self.generate_frame(self.payload) + logger.warning('client frame body: %s', self.bytes) def data_to_bytes(self, data): return bytearray(data, 'utf-8') From 64be62aa37468b26bb044a1dc5868150bee19f29 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 11:26:48 +0100 Subject: [PATCH 18/29] debugging unicode --- wampy/serializers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/wampy/serializers.py b/wampy/serializers.py index 98fcab9..a208e0b 100644 --- a/wampy/serializers.py +++ b/wampy/serializers.py @@ -19,6 +19,5 @@ def json_serialize(message): message, str(exc) ) ) - import pdb - pdb.set_trace() + return data From d5f8ac36ca9ab6decf2758e12e9889212e21cfa3 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 11:35:36 +0100 Subject: [PATCH 19/29] debugging --- wampy/transports/websocket/frames.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 1222f05..7ad30a0 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -295,7 +295,7 @@ def generate_frame(self, message): self.opcode ) ) # which is '\x81' as a raw byte repr - + logger.warning('initial byte: %s', payload) # the second byte - and maybe the 7 after this, we'll use to tell # the server how long our payload is. @@ -314,6 +314,7 @@ def generate_frame(self, message): mask_bit = 1 << 7 # next we have to | this bit with the payload length, if not too long! length = len(self) + logger.warning('length self: %s', length) if length >= self.MAX_LENGTH: raise WebsocktProtocolError("data is too long") From 08594388a27feb746b05ce9065e7f4f88f362d5f Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 13:19:17 +0100 Subject: [PATCH 20/29] remove debug mess --- wampy/messages/call.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/wampy/messages/call.py b/wampy/messages/call.py index c085961..9c7ce48 100644 --- a/wampy/messages/call.py +++ b/wampy/messages/call.py @@ -38,11 +38,6 @@ def __init__(self, procedure, options=None, args=None, kwargs=None): @property def message(self): - kwargs = self.kwargs - #encoded_kwargs = { - # key: val.encode('utf8') for key, val in kwargs.items() - #} - return [ self.WAMP_CODE, self.request_id, self.options, self.procedure, self.args, self.kwargs From dad0e6fe0d40386ae0bcaea9a3c44f7d428c4c9d Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 13:20:00 +0100 Subject: [PATCH 21/29] fix payload length bugs and tidy up old use of register_router --- wampy/transports/websocket/connection.py | 14 ++----- wampy/transports/websocket/frames.py | 52 +++++++++++++----------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 394b4da..456d4b9 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -16,7 +16,6 @@ IncompleteFrameError, ConnectionError, WampProtocolError, WampyError) from wampy.mixins import ParseUrlMixin from wampy.transports.interface import Transport -from wampy.serializers import json_serialize from . frames import ClientFrame, FrameFactory, PongFrame @@ -54,10 +53,7 @@ def disconnect(self): self.socket.close() def send(self, message): - logger.warning('message: %s', message) - serialized_message = json_serialize(message) - logger.warning('serialized message: %s', serialized_message) - frame = ClientFrame(serialized_message) + frame = ClientFrame(message) websocket_message = frame.frame self._send_raw(websocket_message) @@ -258,10 +254,8 @@ def handle_ping(self, ping_frame): class SecureWebSocket(WebSocket): - def register_router(self, router): - super(SecureWebSocket, self).register_router(router) - - self.ipv = router.ipv + def __init__(self, server_url, certificate_path, ipv=4): + super(SecureWebSocket, self).__init__(server_url=server_url, ipv=ipv) # PROTOCOL_TLSv1_1 and PROTOCOL_TLSv1_2 are only available if Python is # linked with OpenSSL 1.0.1 or later. @@ -270,7 +264,7 @@ def register_router(self, router): except AttributeError: raise WampyError("Your Python Environment does not support TLS") - self.certificate = router.certificate + self.certificate = certificate_path def _connect(self): _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 7ad30a0..e72e9aa 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -11,9 +11,10 @@ from wampy.errors import ( WampyError, WebsocktProtocolError, IncompleteFrameError ) +from wampy.serializers import json_serialize -logger = logging.getLogger('wampy.networking.frames') +logger = logging.getLogger(__name__) class Frame(object): @@ -92,20 +93,6 @@ def __init__(self, bytes, payload=None): # about self.payload = payload or Frame.generate_payload(bytes) - def __len__(self): - try: - unicode_body = self.payload.decode("utf-8") - except UnicodeError: - unicode_body = self.payload - except AttributeError: - # already decoded, hence no "decode" attribute - unicode_body = self.payload - - if unicode_body is None: - return 0 - - return len(unicode_body) - def __str__(self): return str(self.payload) @@ -228,15 +215,34 @@ def __init__(self, message): :Parameters: message : str - The data to be sent to the server. Tis will form the - "payload" segments of the WebSocket frame(s). + The WAMP message to be sent to the server. This will form + the "payload" segments of the WebSocket frame(s). """ self.fin_bit = 1 self.opcode = Frame.OPCODE_TEXT - self.payload = message + self.message = message self.bytes = self.generate_frame(self.payload) - logger.warning('client frame body: %s', self.bytes) + + @property + def payload(self): + return json_serialize(self.message) + + @property + def payload_length(self): + """ the length in bytes of the utf-8 serialized WAMP message """ + try: + unicode_body = self.payload.decode("utf-8") + except UnicodeError: + unicode_body = self.payload + except AttributeError: + # already decoded, hence no "decode" attribute + unicode_body = self.payload + + if unicode_body is None: + return 0 + + return len(self.payload.encode('utf-8')) def data_to_bytes(self, data): return bytearray(data, 'utf-8') @@ -295,7 +301,7 @@ def generate_frame(self, message): self.opcode ) ) # which is '\x81' as a raw byte repr - logger.warning('initial byte: %s', payload) + # the second byte - and maybe the 7 after this, we'll use to tell # the server how long our payload is. @@ -313,8 +319,8 @@ def generate_frame(self, message): # i.e. encoded mask_bit = 1 << 7 # next we have to | this bit with the payload length, if not too long! - length = len(self) - logger.warning('length self: %s', length) + length = self.payload_length + if length >= self.MAX_LENGTH: raise WebsocktProtocolError("data is too long") @@ -341,7 +347,7 @@ def generate_frame(self, message): class Ping(Frame): def __init__(self, message=''): - """ + """ Represent a PING Control Frame. :Parameters: message : str From e1981042a2fc628439a795fe2b9c631818f57007 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 13:20:24 +0100 Subject: [PATCH 22/29] add cetificate to init of secure websocket --- wampy/peers/clients.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/wampy/peers/clients.py b/wampy/peers/clients.py index 4dcc50d..94ef703 100644 --- a/wampy/peers/clients.py +++ b/wampy/peers/clients.py @@ -101,10 +101,13 @@ def __init__( # other transports are supported!) if self.router.scheme == "ws": self.transport = WebSocket( - server_url=self.router.url, ipv=self.router.ipv) + server_url=self.router.url, ipv=self.router.ipv, + ) elif self.router.scheme == "wss": self.transport = SecureWebSocket( - server_url=self.router.url, ipv=self.router.ipv) + server_url=self.router.url, ipv=self.router.ipv, + certificate_path=self.router.certificate, + ) else: raise WampyError( 'Network protocl must be "ws" or "wss"' From 0b7f6d3dd0326615abd62347d6ed496ca015b654 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 13:29:41 +0100 Subject: [PATCH 23/29] tidy up --- test/{ => transports}/test_transports.py | 0 test/transports/test_websockets.py | 2 +- wampy/message_handler.py | 5 ----- wampy/transports/websocket/connection.py | 2 +- 4 files changed, 2 insertions(+), 7 deletions(-) rename test/{ => transports}/test_transports.py (100%) diff --git a/test/test_transports.py b/test/transports/test_transports.py similarity index 100% rename from test/test_transports.py rename to test/transports/test_transports.py diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 5b79f25..08b13ba 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -33,7 +33,7 @@ def server(): thread.kill() -def __test_websocket_connects_to_server(server): +def test_websocket_connects_to_server(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') websocket.connect() diff --git a/wampy/message_handler.py b/wampy/message_handler.py index 16b06b4..9438228 100644 --- a/wampy/message_handler.py +++ b/wampy/message_handler.py @@ -33,11 +33,6 @@ class MessageHandler(object): """ def handle_message(self, message, client): - if message is None: - logger.warning('no message to handle') - import sys - sys.exit() - wamp_code = message[0] logger.debug( diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 456d4b9..f88c9ce 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -152,7 +152,7 @@ def _handshake(self, upgrade): self.socket.send(handshake.encode()) try: - with gevent.Timeout(50): + with gevent.Timeout(5): self.status, self.headers = self._read_handshake_response() except gevent.Timeout: raise WampyError( From f60f04ee73c2c58aba51ae05a19dba3f29648e11 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 13:38:26 +0100 Subject: [PATCH 24/29] remove unused param --- wampy/transports/websocket/frames.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index e72e9aa..d225bbd 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -59,7 +59,7 @@ class Frame(object): OPCODE_PING, OPCODE_PONG, OPCODE_TEXT, ) - # not intended to carr data for the application but instead for + # not intended to carry data for the application but instead for # protocol-level signaling, CONTROL_FRAMES = [OPCODE_PING, ] @@ -346,16 +346,7 @@ def generate_frame(self, message): class Ping(Frame): - def __init__(self, message=''): - """ Represent a PING Control Frame. - - :Parameters: - message : str - An optional message to send along with the PING, - e.g. "hello". Forms the ``payload`` of the WebSocket - message. - - """ + def __init__(self): # PING is a Control Frame denoted by the opcode 9 # this modelsa PING from the Server -> Client, and as such does # not mask. @@ -421,7 +412,7 @@ def generate_frame(self): class PongFrame(Frame): - def __init__(self, payload=''): + def __init__(self): self.fin_bit = 1 self.opcode = Frame.OPCODE_PONG self.bytes = self.generate_frame() From 34c5da93c70f473763c1078ec4be10e6af6c24d5 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 14:44:41 +0100 Subject: [PATCH 25/29] fix sending PONG messgae which must contain the payload from the PING --- wampy/transports/websocket/connection.py | 5 +- wampy/transports/websocket/frames.py | 89 ++++++++++++------------ 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index f88c9ce..95bca9e 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -250,7 +250,10 @@ def read_line(): return status, headers def handle_ping(self, ping_frame): - self._send_raw(PongFrame().generate_frame()) + pong_frame = PongFrame(ping_frame=ping_frame) + bytes = pong_frame.frame + logger.info('sending pong: %s', bytes) + self._send_raw(bytes) class SecureWebSocket(WebSocket): diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index d225bbd..72bbd99 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -8,9 +8,7 @@ import os from struct import pack, unpack_from -from wampy.errors import ( - WampyError, WebsocktProtocolError, IncompleteFrameError -) +from wampy.errors import WebsocktProtocolError, IncompleteFrameError from wampy.serializers import json_serialize @@ -101,24 +99,17 @@ def generate_payload(cls, bytes): if not bytes or len(bytes) < 2: raise IncompleteFrameError(required_bytes=1) - # Parse the first two bytes of header. - fin = bytes[0] >> 7 - - if fin == 0: - raise WampyError( - 'Multiple framed responses not yet supported: {}'.format(bytes) - ) - opcode = bytes[0] & 0xf - # binary data interpretation is left up to th application... if opcode == Frame.OPCODE_BINARY: # ..and wampy ignores them return None - if opcode in Frame.CONTROL_FRAMES: - # handled by wampy - return None + # Parse the first two bytes of header. + fin = bytes[0] >> 7 + + if fin == 0: + logger.warning('fragmented frame received: %s', bytes) try: payload_length_indicator = bytes[1] & 0b1111111 @@ -126,7 +117,6 @@ def generate_payload(cls, bytes): raise IncompleteFrameError(required_bytes=1) available_bytes_for_body = bytes[2:] - if not available_bytes_for_body: raise IncompleteFrameError( required_bytes=payload_length_indicator @@ -162,9 +152,9 @@ def generate_payload(cls, bytes): required_bytes=required_bytes ) - opcode = bytes[0] & 0xf - if opcode == Frame.OPCODE_BINARY: - return body_candidate + if opcode in Frame.CONTROL_FRAMES: + payload = body_candidate + return payload try: # decode required before loading JSON for python 2 only @@ -188,17 +178,11 @@ class FrameFactory(object): @classmethod def from_bytes(cls, bytes): payload = Frame.generate_payload(bytes) - if payload is None: - opcode = bytes[0] & 0xf - if opcode == Frame.OPCODE_BINARY: - # binary - the handshake response? - return Frame(bytes=bytes, payload=None) - - if opcode in Frame.CONTROL_FRAMES: - if opcode == Frame.OPCODE_PING: - return Ping() - raise IncompleteFrameError(required_bytes=1) + opcode = bytes[0] & 0xf + if opcode in Frame.CONTROL_FRAMES: + if opcode == Frame.OPCODE_PING: + return Ping(bytes=bytes, payload=payload) return Frame(bytes=bytes, payload=payload) @@ -267,10 +251,11 @@ def generate_mask(self, mask_key, data): if data is None: data = "" - data_bytes = self.data_to_bytes(data) + if not isinstance(data, bytearray): + data = self.data_to_bytes(data) _m = array.array("B", mask_key) - _d = array.array("B", data_bytes) + _d = array.array("B", data) for i in range(len(_d)): _d[i] ^= _m[i % 4] @@ -346,13 +331,14 @@ def generate_frame(self, message): class Ping(Frame): - def __init__(self): + def __init__(self, bytes, payload=''): # PING is a Control Frame denoted by the opcode 9 # this modelsa PING from the Server -> Client, and as such does # not mask. self.fin_bit = 1 self.opcode = Frame.OPCODE_PING - self.bytes = self.generate_frame() + self.bytes = bytes or self.generate_frame() + self.payload = payload or Frame.generate_payload(bytes) def generate_frame(self): # This is long hand for documentation purposes @@ -410,27 +396,40 @@ def generate_frame(self): return bytes # this is b'\x89\x00' -class PongFrame(Frame): +class PongFrame(ClientFrame): - def __init__(self): + def __init__(self, ping_frame): self.fin_bit = 1 self.opcode = Frame.OPCODE_PONG - self.bytes = self.generate_frame() + self.bytes = self.generate_frame(message=ping_frame.payload) - def generate_frame(self): - b0 = pack( + def generate_frame(self, message): + payload = pack( '!B', ( (self.fin_bit << 7) | self.opcode ) ) - b1 = 0 - mask_bit = 1 << 7 # masked, as the Client is sending - b1 = pack('!B', (mask_bit | 0)) - bytes = b''.join([b0, b1]) + mask_bit = 1 << 7 + # next we have to | this bit with the payload length, if not too long! + length = len(message) - first_byte = bytes[0] - assert self.opcode == first_byte & 0xf + # the second byte contains the payload length and mask + if length < self.LENGTH_7: + # we can simply represent payload length with first 7 bits + payload += pack('!B', (mask_bit | length)) + elif length < self.LENGTH_16: + payload += pack('!B', (mask_bit | 126)) + pack('!H', length) + else: + payload += pack('!B', (mask_bit | 127)) + pack('!Q', length) + + # we always mask frames from the client to server + # use a string of n random bytes for the mask + mask_key = os.urandom(4) + mask_data = self.generate_mask(mask_key=mask_key, data=message) + mask = mask_key + mask_data + payload += mask - return bytes # this is b'\x8a\x00' + # this is a bytes string being returned here + return payload From 914f340db2d3be90045d9e389f0cda825992d4fa Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 14:53:36 +0100 Subject: [PATCH 26/29] tidy PINGS and PONGS --- test/test_clients.py | 6 ++++-- test/transports/test_websockets.py | 2 +- wampy/transports/websocket/frames.py | 7 +++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/test/test_clients.py b/test/test_clients.py index 629a621..55e32a9 100644 --- a/test/test_clients.py +++ b/test/test_clients.py @@ -3,8 +3,8 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. import datetime -from time import sleep +import gevent import pytest from wampy.peers.clients import Client @@ -169,8 +169,10 @@ class MyClient(Client): client.start() wait_for_session(client) - sleep(5) + gevent.sleep(5) + # this is purely to demonstrate we can make calls while sending + # pongs client.publish(topic="test", message="test") client.stop() except Exception as e: diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 08b13ba..5b79f25 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -33,7 +33,7 @@ def server(): thread.kill() -def test_websocket_connects_to_server(server): +def __test_websocket_connects_to_server(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') websocket.connect() diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 72bbd99..32e0a9d 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -116,6 +116,9 @@ def generate_payload(cls, bytes): except Exception: raise IncompleteFrameError(required_bytes=1) + if payload_length_indicator == 0: + return None + available_bytes_for_body = bytes[2:] if not available_bytes_for_body: raise IncompleteFrameError( @@ -331,14 +334,14 @@ def generate_frame(self, message): class Ping(Frame): - def __init__(self, bytes, payload=''): + def __init__(self, bytes=None, payload=''): # PING is a Control Frame denoted by the opcode 9 # this modelsa PING from the Server -> Client, and as such does # not mask. self.fin_bit = 1 self.opcode = Frame.OPCODE_PING self.bytes = bytes or self.generate_frame() - self.payload = payload or Frame.generate_payload(bytes) + self.payload = payload or Frame.generate_payload(self.bytes) def generate_frame(self): # This is long hand for documentation purposes From 790c156865f79ca8c536d1c44c1db7f8d0a43b57 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 14:55:12 +0100 Subject: [PATCH 27/29] removed duped test --- test/transports/test_websockets.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 5b79f25..8dcbbca 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -33,15 +33,6 @@ def server(): thread.kill() -def __test_websocket_connects_to_server(server): - websocket = WebSocket(server_url='ws://0.0.0.0:8001') - websocket.connect() - - assert len(server.clients) == 1 - - websocket.disconnect() - - def test_send_ping(server): websocket = WebSocket(server_url='ws://0.0.0.0:8001') with patch.object(websocket, 'handle_ping') as mock_handle: From 460e12c853bae623e07ce2dfc20913cdf9bbc920 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 18:22:54 +0100 Subject: [PATCH 28/29] some python 2 compat --- test/transports/test_websockets.py | 5 +- wampy/transports/websocket/connection.py | 6 -- wampy/transports/websocket/frames.py | 108 +++++++++++------------ 3 files changed, 55 insertions(+), 64 deletions(-) diff --git a/test/transports/test_websockets.py b/test/transports/test_websockets.py index 8dcbbca..451c630 100644 --- a/test/transports/test_websockets.py +++ b/test/transports/test_websockets.py @@ -64,9 +64,8 @@ def connection_handler(): client_handler = list(clients.values())[0] socket = client_handler.ws - frame = Ping() - payload = frame.generate_frame() - socket.send(payload) + ping_frame = Ping() + socket.send(ping_frame.frame) with gevent.Timeout(5): while mock_handle.call_count != 1: diff --git a/wampy/transports/websocket/connection.py b/wampy/transports/websocket/connection.py index 95bca9e..b8ad359 100644 --- a/wampy/transports/websocket/connection.py +++ b/wampy/transports/websocket/connection.py @@ -73,12 +73,6 @@ def receive(self, bufsize=1): except socket.timeout as e: message = str(e) raise ConnectionError('timeout: "{}"'.format(message)) - except ConnectionResetError: - raise ConnectionError('the connection was reset by the peer') - except Exception as exc: - raise ConnectionError( - 'unexpected error reading from socket: "{}"'.format(exc) - ) if not bytes: break diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 32e0a9d..3a9f5f5 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -84,55 +84,55 @@ class Frame(object): LENGTH_16 = 1 << 16 # 0x10000, 65536, 10000000000000000 MAX_LENGTH = 1 << 63 # 1 x 2**63 - def __init__(self, bytes, payload=None): - self.bytes = bytes - self.opcode = bytes[0] & 0xf + def __init__(self, buffered_bytes, payload=None): + self.buffered_bytes = buffered_bytes + self.opcode = buffered_bytes[0] & 0xf # this is just the payload, i.e. the bytes that the application cares # about - self.payload = payload or Frame.generate_payload(bytes) + self.payload = payload or Frame.generate_payload(buffered_bytes) def __str__(self): return str(self.payload) @classmethod - def generate_payload(cls, bytes): - if not bytes or len(bytes) < 2: + def generate_payload(cls, buffered_bytes): + if not buffered_bytes or len(buffered_bytes) < 2: raise IncompleteFrameError(required_bytes=1) - opcode = bytes[0] & 0xf + opcode = cls.opcode_from_bytes(buffered_bytes=buffered_bytes) # binary data interpretation is left up to th application... if opcode == Frame.OPCODE_BINARY: # ..and wampy ignores them return None - # Parse the first two bytes of header. - fin = bytes[0] >> 7 - + # Parse the first two buffered_bytes of header + fin = buffered_bytes[0] >> 7 if fin == 0: - logger.warning('fragmented frame received: %s', bytes) + logger.warning('fragmented frame received: %s', buffered_bytes) + # TODO get the correct error handling in here try: - payload_length_indicator = bytes[1] & 0b1111111 + payload_length_indicator = buffered_bytes[1] & 0b1111111 except Exception: raise IncompleteFrameError(required_bytes=1) if payload_length_indicator == 0: return None - available_bytes_for_body = bytes[2:] + available_bytes_for_body = buffered_bytes[2:] if not available_bytes_for_body: raise IncompleteFrameError( required_bytes=payload_length_indicator ) - # unpack the buffered bytes into an integer + # unpack the buffered buffered_bytes into an integer body_length = unpack_from(">h", available_bytes_for_body)[0] if payload_length_indicator < 126: # then we have enough knowlege about the payload length as it's # contained within the 2nd byte of the header - because the - # trailing 7 bits of the 2 bytes tells us exactly how long the - # payload is + # trailing 7 bits of the 2 buffered_bytes tells us exactly how long + # the payload is body_candidate = available_bytes_for_body # in this case body length is represented by the indicator body_length = payload_length_indicator @@ -150,7 +150,7 @@ def generate_payload(cls, bytes): if len(body_candidate) < body_length: required_bytes = body_length - len(body_candidate) - logger.debug("missing %s bytes", required_bytes) + logger.debug("missing %s buffered_bytes", required_bytes) raise IncompleteFrameError( required_bytes=required_bytes ) @@ -171,23 +171,27 @@ def generate_payload(cls, bytes): logger.debug('generated payload: %s', payload) return payload + @classmethod + def opcode_from_bytes(cls, buffered_bytes): + return buffered_bytes[0] & 0xf + @property def frame(self): - return self.bytes + return self.buffered_bytes class FrameFactory(object): @classmethod - def from_bytes(cls, bytes): - payload = Frame.generate_payload(bytes) + def from_bytes(cls, buffered_bytes): + payload = Frame.generate_payload(buffered_bytes) - opcode = bytes[0] & 0xf + opcode = buffered_bytes[0] & 0xf if opcode in Frame.CONTROL_FRAMES: if opcode == Frame.OPCODE_PING: - return Ping(bytes=bytes, payload=payload) + return Ping(buffered_bytes=buffered_bytes, payload=payload) - return Frame(bytes=bytes, payload=payload) + return Frame(buffered_bytes=buffered_bytes, payload=payload) class ClientFrame(Frame): @@ -209,7 +213,7 @@ def __init__(self, message): self.fin_bit = 1 self.opcode = Frame.OPCODE_TEXT self.message = message - self.bytes = self.generate_frame(self.payload) + self.buffered_bytes = self.generate_frame(self.payload) @property def payload(self): @@ -219,19 +223,14 @@ def payload(self): def payload_length(self): """ the length in bytes of the utf-8 serialized WAMP message """ try: - unicode_body = self.payload.decode("utf-8") - except UnicodeError: - unicode_body = self.payload - except AttributeError: - # already decoded, hence no "decode" attribute - unicode_body = self.payload + length = len(self.payload.encode('utf-8')) + except TypeError: + length = 0 - if unicode_body is None: - return 0 + logger.info('length of %s is "%s"', self.payload, length) + return length - return len(self.payload.encode('utf-8')) - - def data_to_bytes(self, data): + def data_to_buffered_bytes(self, data): return bytearray(data, 'utf-8') def generate_mask(self, mask_key, data): @@ -255,7 +254,7 @@ def generate_mask(self, mask_key, data): data = "" if not isinstance(data, bytearray): - data = self.data_to_bytes(data) + data = self.data_to_buffered_bytes(data) _m = array.array("B", mask_key) _d = array.array("B", data) @@ -266,7 +265,7 @@ def generate_mask(self, mask_key, data): return _d.tostring() def generate_frame(self, message): - """ Format data to string (bytes) to send to server. + """ Format data to string (buffered_bytes) to send to server. """ # the first byte contains the FIN bit, the 3 RSV bits and the # 4 opcode bits and for a client will *always* be 1000 0001 (or 129). @@ -322,31 +321,31 @@ def generate_frame(self, message): payload += pack('!B', (mask_bit | 127)) + pack('!Q', length) # we always mask frames from the client to server - # use a string of n random bytes for the mask + # use a string of n random buffered_bytes for the mask mask_key = os.urandom(4) mask_data = self.generate_mask(mask_key=mask_key, data=message) mask = mask_key + mask_data payload += mask - # this is a bytes string being returned here + # this is a buffered_bytes string being returned here return payload class Ping(Frame): - def __init__(self, bytes=None, payload=''): + def __init__(self, buffered_bytes=None, payload=''): # PING is a Control Frame denoted by the opcode 9 # this modelsa PING from the Server -> Client, and as such does # not mask. self.fin_bit = 1 self.opcode = Frame.OPCODE_PING - self.bytes = bytes or self.generate_frame() - self.payload = payload or Frame.generate_payload(self.bytes) + self.buffered_bytes = buffered_bytes or self.generate_bytes() + self.payload = payload or Frame.generate_payload(self.buffered_bytes) - def generate_frame(self): + def generate_bytes(self): # This is long hand for documentation purposes - # the first byte contains the FIN bit, the 3 RSV bits and the + # the first byte contains the FIN bit, the 3 RSV bits arend the # 4 opcode bits, so we are looking for # 1 0 0 0 0 1 0 1 Opcode 9 for a Ping Frame @@ -388,15 +387,14 @@ def generate_frame(self): # if not too long! b1 = pack('!B', (mask_bit | 0)) - bytes = b''.join([b0, b1]) - - first_byte = bytes[0] - assert self.opcode == first_byte & 0xf + frame_bytes = bytearray([b0, b1]) + assert self.opcode == self.opcode_from_bytes( + buffered_bytes=frame_bytes + ) # we could append payload data such as a HELLO here... # then we'd have to update the 2nd byte with the new length - - return bytes # this is b'\x89\x00' + return frame_bytes # this is b'\x89\x00' class PongFrame(ClientFrame): @@ -404,9 +402,9 @@ class PongFrame(ClientFrame): def __init__(self, ping_frame): self.fin_bit = 1 self.opcode = Frame.OPCODE_PONG - self.bytes = self.generate_frame(message=ping_frame.payload) + self.buffered_bytes = self.generate_bytes(message=ping_frame.payload) - def generate_frame(self, message): + def generate_bytes(self, message): payload = pack( '!B', ( (self.fin_bit << 7) | @@ -428,11 +426,11 @@ def generate_frame(self, message): payload += pack('!B', (mask_bit | 127)) + pack('!Q', length) # we always mask frames from the client to server - # use a string of n random bytes for the mask + # use a string of n random buffered_bytes for the mask mask_key = os.urandom(4) mask_data = self.generate_mask(mask_key=mask_key, data=message) mask = mask_key + mask_data payload += mask - # this is a bytes string being returned here - return payload + # this is a buffered_bytes string being returned here + return bytearray(payload) From 5a88fab3eff1d744939d7930eba66c3eae4ae9f8 Mon Sep 17 00:00:00 2001 From: Simon Date: Sat, 17 Mar 2018 18:31:23 +0100 Subject: [PATCH 29/29] fix python 3 compat issue --- wampy/transports/websocket/frames.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wampy/transports/websocket/frames.py b/wampy/transports/websocket/frames.py index 3a9f5f5..a043205 100644 --- a/wampy/transports/websocket/frames.py +++ b/wampy/transports/websocket/frames.py @@ -387,7 +387,7 @@ def generate_bytes(self): # if not too long! b1 = pack('!B', (mask_bit | 0)) - frame_bytes = bytearray([b0, b1]) + frame_bytes = bytearray(b''.join([b0, b1])) assert self.opcode == self.opcode_from_bytes( buffered_bytes=frame_bytes )