Skip to content

Commit

Permalink
Add more protocol violation tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Feb 6, 2020
1 parent 1e1040b commit 7aef4e7
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 58 deletions.
9 changes: 5 additions & 4 deletions lanscatter/masternode.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ async def ok(txt):
here = packaging.version.parse(Defaults.PROTOCOL_VERSION)
try:
there = packaging.version.parse(msg.get('protocol') or 'MISSING')
if not there.release:
raise packaging.version.InvalidVersion
if there.release[0] != here.release[0]:
await send_queue.put({'action': 'fatal', 'message':
f'Incompatible protocol. Server has {Defaults.PROTOCOL_VERSION}, '
f'yours is {msg.get("protocol")}'})
except InvalidVersion:
else:
await ok({'message': 'Version accepted.'})
except packaging.version.InvalidVersion:
return await error(f'Invalid protocol version {str(msg.get("protocol"))}.', fatal=True)
self.status_func(log_info=f'Client "{client_name}" uses protocol version {msg.get("protocol")}, '+
f'app version {msg.get("app") or "MISSING"}')
Expand Down Expand Up @@ -247,7 +251,6 @@ async def send_loop():
if msg and msg.get('action') == 'fatal':
self.status_func(log_info=f'Sent fatal error to client. Kicking them out: {str(msg)}"')
await ws.close()
break

send_task = asyncio.create_task(send_loop())

Expand Down Expand Up @@ -405,8 +408,6 @@ def scandir_blocking():
server.planner_loop(),
], return_when=asyncio.FIRST_COMPLETED)

status_func(log_info='Server finished.')

except (KeyboardInterrupt, concurrent.futures.CancelledError):
status_func(log_info='User exit.')
kb_exit = True
Expand Down
154 changes: 100 additions & 54 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,102 @@ def test_bad_dir(is_master):
_assert_node_basics(p, check_sync_results=False)


async def _bad_request_tests(master_port):

async with aiohttp.ClientSession() as session:

print("Request: test HTTP requests")
if True:
print(" - HTTP on websocet endpoint")
async with session.get(f'http://localhost:{master_port}/join') as resp:
assert resp.status != 200, "Websocket endpoint answered plain HTML request with 200."

print(" - request HTML status page")
async with session.get(f'http://localhost:{master_port}/') as resp:
assert resp.status == 200, "Server status page returned HTTP error: " + str(resp.status)
assert '<html' in (await resp.text()).lower(), "Status query didn't return HTML."

print(" - HTTP on noexisting peer blob")
async with session.get(f'http://localhost:{master_port}/blob/NONEXISTINGBLOB') as resp:
assert resp.status != 200, "Fileserver returned success on non-existing blob"

async def read_respose_msgs(ws, timeout=None):
recvd = []
async def do_iter():
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
assert 'traceback' not in msg.data.lower()
recvd.append(json.loads(msg.data))
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(do_iter(), timeout=timeout)
return recvd

print("Request: test session terminating protocol violations")
msg_pairs = [
[{'action': 'BAD_COMMAND'}, 'Bad message action'],
[{'NOT_AN_ACTION': 123}, 'Message without action'],
[{'action': 'version', 'protocol': '0.0.0', 'app': '0.0.0'}, 'Bad protocol version'],
[{'action': 'version', 'protocol': 'ASDFG', 'app': 'ASDFG'}, 'Bad version format'],
['INVALID_JSON', 'Invalid JSON message']
]
for msg, desc in msg_pairs:
async with session.ws_connect(f'ws://localhost:{master_port}/join') as ws:
print(" - sending: " + str(msg))
await (ws.send_str(msg) if isinstance(msg, str) else ws.send_json(msg))
recvd = await read_respose_msgs(ws, 1.0)
assert recvd[-1]['action'] == 'fatal', desc

print("Request: test non-terminating protocol violations")
async with session.ws_connect(f'ws://localhost:{master_port}/join') as ws:
msg_pairs = [
[{'action': 'set_hashes', 'hashes': ['abba123']},
(lambda m: 'error' in m.pop(0)['action']), 'Set_hashes before join succeeded'],
[{'action': 'set_hashes'},
(lambda m: 'error' in m.pop(0)['action']), 'Set_hashes with bad parms succeeded'],
[{'action': 'version', 'protocol': common.Defaults.PROTOCOL_VERSION, 'app': common.Defaults.APP_VERSION},
(lambda m: 'ok' in m.pop(0)['action']), 'Proper version test failed'],
[{'action': 'error', 'ABABA': 'TEST_ERROR'}, (lambda m: True), '(always ok, test exceptions only)'],
[{'action': 'report_transfers'},
(lambda m: 'error' in m.pop(0)['action']), 'Report_transfer with missing args succeeded'],
[{'action': 'report_transfers', 'dls':0, 'uls':0, 'incoming':[], 'ul_times':[]},
(lambda m: 'error' in m.pop(0)['action']), 'Proper report_transfer without join succeeded'],
[{'action': 'join_swarm', 'hashes': ['abba123']},
(lambda m: 'error' in m.pop(0)['action']), 'Join with missing dl_url succeeded'],
[{'action': 'join_swarm', 'dl_url': 'http://localhost:12345/{hash}'},
(lambda m: 'error' in m.pop(0)['action']), 'Join with missing hashes succeeded'],
[{'action': 'join_swarm', 'hashes': ['abba123'], 'dl_url': 'http://localhost:12345/ABABABAB'},
(lambda m: 'error' in m.pop(0)['action']), 'Join with bad dl_url succeeded'],
[{'action': 'join_swarm', 'hashes': ['abba123'], 'dl_url': 'http://localhost:12345/{hash}'},
(lambda m: 'ok' in m.pop(0)['action']), 'Join with proper parms failed'],
[{'action': 'set_hashes', 'hashes': ['abba123']},
(lambda m: any([('unknown_hashes' in x) for x in m])), 'No unknown hashes msg received']
]
for (msg, test, desc) in msg_pairs:
print(" - sending: " + str(msg))
await ws.send_json(msg)
recvd = await read_respose_msgs(ws, 1.0)
assert recvd.pop(0)['action'] == 'initial_batch'
for (msg, test, desc) in msg_pairs:
print(f' * Testing against: "{desc}"')
assert test(recvd), desc


def test_bad_protocol(test_dir_factory):
master_dir = test_dir_factory('master')
master_port = PORT_BASE+30
p = _spawn_sync_process('master', True, master_dir, master_port, master_port)
time.sleep(0.5) # wait for start
try:
asyncio.new_event_loop().run_until_complete(
asyncio.wait_for(_bad_request_tests(master_port), timeout=30))
except Exception as e:
_print_process_stdout(p)
raise e
p.proc.terminate()
_print_process_stdout(p)
_assert_node_basics(p, check_sync_results=False)


def test_minimal_downloads(test_dir_factory):
"""Test that each chunk is downloaded only once."""
master = _spawn_sync_process(f'master', True, test_dir_factory('master'), 0, PORT_BASE, ['--no-compress', '--rescan-interval', '2'])
Expand Down Expand Up @@ -333,62 +429,12 @@ def test_swarm__corruption__bad_protocol__uptodate__errors(test_dir_factory):
f.write(b'abcdefgh')
f.flush()

# Make some non-protocol requests to the server
async def request_tests():

async def read_respose_msgs(ws):
recvd = []
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
assert 'traceback' not in msg.data.lower()
recvd.append(json.loads(msg.data))
return recvd

async with aiohttp.ClientSession() as session:
print("Request: try bad message action")
async with session.ws_connect(f'ws://localhost:{master_port}/join') as ws:
await ws.send_json({'action': 'BAD_COMMAND'})
recvd = await read_respose_msgs(ws)
assert not recvd or ('fatal' in recvd[-1]['action'])

print("Request: try bad message without 'action'")
async with session.ws_connect(f'ws://localhost:{master_port}/join') as ws:
await ws.send_json({'NOT_AN_ACTION': 123})
recvd = await read_respose_msgs(ws)
assert not recvd or ('fatal' in recvd[-1]['action'])

print("Request: try bad protocol version")
async with session.ws_connect(f'ws://localhost:{master_port}/join') as ws:
await ws.send_json({'action': 'version', 'protocol': '0.0.0', 'app': '0.0.0'})
recvd = await read_respose_msgs(ws)
assert not recvd or ('fatal' in recvd[-1]['action'])

print("Request: try bad json")
async with session.ws_connect(f'ws://localhost:{master_port}/join') as ws:
await ws.send_str('INVALID_JSON')
recvd = await read_respose_msgs(ws)
assert not recvd or ('fatal' in recvd[-1]['action'])

print("Request: HTTP on websocet endpoint")
async with session.get(f'http://localhost:{master_port}/join') as resp:
assert resp.status != 200, "Websocket endpoint answered plain HTML request with 200."

print("Request: request HTML status page")
async with session.get(f'http://localhost:{master_port}/') as resp:
assert resp.status == 200, "Server status page returned HTTP error: " + str(resp.status)
assert '<html' in (await resp.text()).lower(), "Status query didn't return HTML."

print("Request: HTTP on noexisting peer blob")
for port in (master_port, master_port+2):
async with session.get(f'http://localhost:{port}/blob/NONEXISTINGBLOB') as resp:
assert resp.status != 200, "Fileserver returned success on non-existing blob"

# Make bad protocol calls while serving the swarm
try:
print("Doing request tests")
asyncio.new_event_loop().run_until_complete(
asyncio.wait_for(request_tests(), timeout=30))
except asyncio.TimeoutError:
assert False, "Request tests timed out."
asyncio.wait_for(_bad_request_tests(master_port), timeout=30))
except Exception as e:
raise e

_wait_seconds(10)
_terminate_procs(master, *peers)
Expand Down

0 comments on commit 7aef4e7

Please sign in to comment.