Skip to content

Commit

Permalink
Test some protocol violations. Speed up clean exit.
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Dec 28, 2019
1 parent a2a055e commit a452dd4
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 47 deletions.
63 changes: 32 additions & 31 deletions lanscatter/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,37 +179,38 @@ async def download_chunk(self, chunk: FileChunk, url: str, http_session: ClientS
:param http_session: AioHTTP session to use for GET.
:param file_size: Size of complete file (optional). File will be truncated to this size.
"""
async with http_session.get(url) as resp:
if resp.status != 200: # some error
raise IOError(f'Unknown/unsupported HTTP status: {resp.status}')
else:
async with self.open_and_seek(chunk.path, chunk.pos, for_write=True) as outf:
#csum = HashFunc()
buff_in, buff_out = None, b''

async def read_http():
nonlocal buff_in
limited_n = int(await self.dl_limiter.acquire(
Defaults.DOWNLOAD_BUFFER_MAX, Defaults.NETWORK_BUFFER_MIN))
buff_in = await resp.content.read(limited_n)
self.dl_limiter.unspend(limited_n - len(buff_in))
buff_in = buff_in if buff_in else None

async def write_and_csum():
nonlocal buff_out
#await asyncio.gather(outf.write(buff_out), csum.update_async(buff_out))
await outf.write(buff_out)

while buff_out is not None:
await asyncio.gather(read_http(), write_and_csum()) # Read, write and hash concurrently
buff_in, buff_out = buff_out, buff_in # Swap buffers

# Checksumming here is actually waste of CPU time since we'll rehash the sync dir when finished
#if csum.result() != chunk.hash:
# raise IOError(f'Checksum error verifying {chunk.hash} from {url}')

if file_size >= 0:
outf.truncate(file_size)
with suppress(RuntimeError): # Avoid dirty exit in aiofiles when Ctrl^C (RuntimeError('Event loop is closed')
async with http_session.get(url) as resp:
if resp.status != 200: # some error
raise IOError(f'HTTP status {resp.status}')
else:
async with self.open_and_seek(chunk.path, chunk.pos, for_write=True) as outf:
#csum = HashFunc()
buff_in, buff_out = None, b''

async def read_http():
nonlocal buff_in
limited_n = int(await self.dl_limiter.acquire(
Defaults.DOWNLOAD_BUFFER_MAX, Defaults.NETWORK_BUFFER_MIN))
buff_in = await resp.content.read(limited_n)
self.dl_limiter.unspend(limited_n - len(buff_in))
buff_in = buff_in if buff_in else None

async def write_and_csum():
nonlocal buff_out
#await asyncio.gather(outf.write(buff_out), csum.update_async(buff_out))
await outf.write(buff_out)

while buff_out is not None:
await asyncio.gather(read_http(), write_and_csum()) # Read, write and hash concurrently
buff_in, buff_out = buff_out, buff_in # Swap buffers

# Checksuming here is actually waste of CPU since we'll rehash sync dir anyway when finished
#if csum.result() != chunk.hash:
# raise IOError(f'Checksum error verifying {chunk.hash} from {url}')

if file_size >= 0:
await outf.truncate(file_size)


async def change_mtime(self, path, mtime):
Expand Down
30 changes: 21 additions & 9 deletions lanscatter/peernode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from aiohttp import web, WSMsgType
from pathlib import Path
from contextlib import suppress
from packaging import version
import async_timeout
import traceback, time, argparse, os, collections, json, platform
from signal import SIGINT, SIGTERM
import concurrent.futures
Expand Down Expand Up @@ -183,16 +183,24 @@ async def download_task(self, chunk_hash, url, http_session, timeout):
target = self.remote_batch.first_chunk_with(chunk_hash)
if not target:
raise IOError(f'Bad download command from master, or old filelist? Chunk {chunk_hash} is unknown.')
dl_task = None
try:
self.incoming.add(chunk_hash)
await self.send_transfer_report()
self.status_func(log_info=f'Downloading from: {url}')
self.status_func(cur_status=f'Downloading chunks...',
progress=len(self.local_batch.all_hashes()) / len(self.remote_batch.all_hashes()))
await asyncio.wait([self.file_io.download_chunk(chunk=target, url=url, http_session=http_session,
file_size=self.remote_batch.files[target.path].size),
self.exit_trigger.wait(),
], timeout=timeout, return_when=asyncio.FIRST_COMPLETED)

async with async_timeout.timeout(timeout):
dl_task = asyncio.create_task(
self.file_io.download_chunk(chunk=target, url=url, http_session=http_session,
file_size=self.remote_batch.files[target.path].size))
await asyncio.wait([dl_task, asyncio.create_task(self.exit_trigger.wait())],
return_when=asyncio.FIRST_COMPLETED)
if not dl_task.done():
raise asyncio.CancelledError()

dl_task.result() # raises exception if one happened inside the task

self.local_batch.add(chunks=(target,))
await self.server_send_queue.put({
Expand All @@ -204,18 +212,22 @@ async def download_task(self, chunk_hash, url, http_session, timeout):
self.status_func(log_info=f'All chunks apparently complete. Rescanning to make sure.')
self.full_rescan_trigger.set()

except asyncio.TimeoutError:
self.status_func(log_info=f'Download from {url} took over timeout ({timeout}s). Aborted.')
except asyncio.TimeoutError as e:
self.status_func(log_info=f'Timeout. GET {url} took over {float("%.2g" % timeout)}s.')
except asyncio.CancelledError as e:
self.status_func(log_debug=f'Program exiting. Download aborted from {url}.')
except (IOError, OSError) as e:
self.status_func(log_error=f'Download from {url} failed: {str(e)}')
self.status_func(log_error=f'Failed download from {url}: {str(e)}')
except aiohttp.client_exceptions.ClientError as e:
self.status_func(log_error=f'Download from {url} failed, aiohttp ClientError: {str(e)}')
self.status_func(log_error=f'Failed download from {url}, aiohttp ClientError: {str(e)}')
except (KeyboardInterrupt, concurrent.futures.CancelledError) as e:
pass
except Exception as e:
self.status_func(log_error=f'Exception in download_task: \n' + traceback.format_exc(), popup=True)
raise e
finally:
if dl_task:
dl_task.cancel()
self.incoming.discard(chunk_hash)
await self.send_transfer_report()

Expand Down
1 change: 1 addition & 0 deletions requirements.cli.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ aiofiles
pytest
pytest-timeout
packaging
async-timeout
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ aiofiles
wxPython
appdirs
packaging
async-timeout

pytest
pytest-timeout
Expand Down
64 changes: 57 additions & 7 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from contextlib import suppress
import multiprocessing as mp
import pytest, shutil, time, os, io, random, traceback, sys, threading, filecmp
import pytest, shutil, time, os, io, random, traceback, sys, threading, filecmp, json
from pathlib import Path
from types import SimpleNamespace
import asyncio, aiohttp

from lanscatter import common, masternode, peernode

Expand All @@ -15,10 +16,12 @@
the middle of sync.
"""

random.seed()

TEST_DIR = './temp_test_dir'
TEST_FILES_PER_DIR = 3
CHUNK_SIZE = 1000
PORT_BASE = 53741
PORT_BASE = 53000 + int(2000*random.random())
TEST_PEER_NAMES = {0: 'peer_empty', 1: 'peer_corrupt', 2: 'peer_non_empty'}

# Make sure buffers don't cover whole chunks, for realistic testing
Expand Down Expand Up @@ -74,7 +77,9 @@ def setup_test_content(base: Path, sub_dir: str, keep_empty=False):
create_file(p / 'dlbuf_size_almost.bin', common.Defaults.DOWNLOAD_BUFFER_MAX - 1)
create_file(p / 'dlbuf_size_plus.bin', common.Defaults.DOWNLOAD_BUFFER_MAX + 2)
create_file(p / 'many_chunks.bin', int(CHUNK_SIZE * 5.5))
create_file(p / 'to_be_corrupted.bin', int(CHUNK_SIZE * 5.5))
create_file(p / 'zeroes.bin', int(CHUNK_SIZE * 3.1), pattern=b'\0' * CHUNK_SIZE)
create_file(p / 'zeroes_to_corrupt.bin', int(CHUNK_SIZE * 3.1), pattern=b'\0' * CHUNK_SIZE)
create_file(p / 'less_zeroes.bin', int(CHUNK_SIZE * 1.1), pattern=b'\0' * CHUNK_SIZE)
for x in range(5):
create_file(p / rnd_name('rnd_file', '.bin'), int(random.random() * CHUNK_SIZE * 7))
Expand Down Expand Up @@ -147,11 +152,55 @@ def comm_thread(conn):
# Alter files on one peer in the middle of a sync
time.sleep(4)
print(f"Corrupting some files on '{peers[1].name}'...")
shutil.rmtree(peers[1].dir + '/dir2') # Delete a dir
with open(peers[1].dir+'/many_chunks.bin', 'wb') as f: # Rewrite a file
f.write(b'dummycontent')
with open(peers[1].dir+'/zeroes.bin', 'r+b') as f: # Overwrite beginning
f.write(b'dummycontent')
try:
shutil.rmtree(peers[1].dir + '/dir2') # Delete a dir
with open(peers[1].dir+'/to_be_corrupted.bin', 'wb') as f: # Rewrite a file
f.write(b'dummycontent')
with open(peers[1].dir+'/zeroes_to_corrupt.bin', 'r+b') as f: # Overwrite beginning
f.write(b'dummycontent')
except PermissionError as e:
print("WARNING: File corruption during test failed because of file locking or something: " + str(e))

# Make some non-protocol requests to the server
async def request_tests():

async def read_respose_msgs(ws):
recvd = []
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
assert 'traceback' not in msg.data.lower()
recvd.append(json.loads(msg.data))
return recvd

async with aiohttp.ClientSession() as session:
print("Request: try bad message action")
async with session.ws_connect(f'ws://localhost:{PORT_BASE}/join') as ws:
await ws.send_json({'action': 'BAD_COMMAND'})
recvd = await read_respose_msgs(ws)
assert not recvd or ('fatal' in recvd[-1]['action'])

print("Request: try bad json")
async with session.ws_connect(f'ws://localhost:{PORT_BASE}/join') as ws:
await ws.send_str('INVALID_JSON')
recvd = await read_respose_msgs(ws)
assert not recvd or ('fatal' in recvd[-1]['action'])

print("Request: HTTP on websocet endpoint")
async with session.get(f'http://localhost:{PORT_BASE}/join') as resp:
assert resp.status != 200, "Websocket endpoint answered plain HTML request with 200."

print("Request: request HTML status page")
async with session.get(f'http://localhost:{PORT_BASE}/') as resp:
assert resp.status == 200, "Server status page returned HTTP error: " + str(resp.status)
assert '<html' in (await resp.text()).lower(), "Status query didn't return HTML."

try:
print("Doing request tests")
asyncio.new_event_loop().run_until_complete(
asyncio.wait_for(request_tests(), timeout=20))
except asyncio.TimeoutError:
assert False, "Request tests timed out."


# Wait
for x in range(8):
Expand All @@ -171,6 +220,7 @@ def comm_thread(conn):
assert 'Exception' not in p.out.getvalue(), f'Exception(s) on {p.name}'
assert 'egmentation fault' not in p.out.getvalue()
assert p.is_master or 'Up to date' in p.out.getvalue(), 'Node never reached up-to-date state.'
assert 'traceback' not in str(p.out.getvalue()).lower()
assert not cmp.diff_files, f'Differing files between {p.name} and master: {str(cmp.diff_files)}'
assert not cmp.funny_files, f'"Funny" files between {p.name} and master: {str(cmp.funny_files)}'
assert not cmp.left_only, f'Files found only from {p.name}: {str(cmp.left_only)}'
Expand Down

0 comments on commit a452dd4

Please sign in to comment.