-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathtest_benchmarks_web_urldispatcher.py
133 lines (101 loc) · 3.74 KB
/
test_benchmarks_web_urldispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""codspeed benchmarks for the URL dispatcher."""
import asyncio
from typing import NoReturn
from unittest import mock
from multidict import CIMultiDict, CIMultiDictProxy
from pytest_codspeed import BenchmarkFixture
from yarl import URL
from aiohttp import web
from aiohttp.http import HttpVersion, RawRequestMessage
def _mock_request(method: str, path: str) -> web.Request:
message = RawRequestMessage(
method,
path,
HttpVersion(1, 1),
CIMultiDictProxy(CIMultiDict()),
(),
False,
None,
False,
False,
URL(path),
)
return web.Request(
message, mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()
)
def test_resolve_root_route(
loop: asyncio.AbstractEventLoop,
benchmark: BenchmarkFixture,
) -> None:
"""Resolve top level PlainResources route 100 times."""
resolve_count = 100
async def handler(request: web.Request) -> NoReturn:
assert False
app = web.Application()
app.router.add_route("GET", "/", handler)
request = _mock_request(method="GET", path="/")
async def run_url_dispatcher_benchmark() -> None:
for _ in range(resolve_count):
await app._router.resolve(request)
@benchmark
def _run() -> None:
loop.run_until_complete(run_url_dispatcher_benchmark())
def test_resolve_single_fixed_url_with_many_routes(
loop: asyncio.AbstractEventLoop,
benchmark: BenchmarkFixture,
) -> None:
"""Resolve PlainResources route 100 times."""
resolve_count = 100
async def handler(request: web.Request) -> NoReturn:
assert False
app = web.Application()
for count in range(250):
app.router.add_route("GET", f"/api/server/dispatch/{count}/update", handler)
request = _mock_request(method="GET", path="/api/server/dispatch/1/update")
async def run_url_dispatcher_benchmark() -> None:
for _ in range(resolve_count):
await app._router.resolve(request)
@benchmark
def _run() -> None:
loop.run_until_complete(run_url_dispatcher_benchmark())
def test_resolve_multiple_fixed_url_with_many_routes(
loop: asyncio.AbstractEventLoop,
benchmark: BenchmarkFixture,
) -> None:
"""Resolve 250 different PlainResources routes."""
async def handler(request: web.Request) -> NoReturn:
assert False
app = web.Application()
for count in range(250):
app.router.add_route("GET", f"/api/server/dispatch/{count}/update", handler)
requests = [
_mock_request(method="GET", path=f"/api/server/dispatch/{count}/update")
for count in range(250)
]
async def run_url_dispatcher_benchmark() -> None:
for request in requests:
await app._router.resolve(request)
@benchmark
def _run() -> None:
loop.run_until_complete(run_url_dispatcher_benchmark())
def test_resolve_dynamic_resource_url_with_many_routes(
loop: asyncio.AbstractEventLoop,
benchmark: BenchmarkFixture,
) -> None:
"""Resolve different a DynamicResource when there are 250 PlainResources registered."""
async def handler(request: web.Request) -> NoReturn:
assert False
app = web.Application()
for count in range(250):
app.router.add_route("GET", f"/api/server/other/{count}/update", handler)
app.router.add_route("GET", "/api/server/dispatch/{customer}/update", handler)
requests = [
_mock_request(method="GET", path=f"/api/server/dispatch/{customer}/update")
for customer in range(250)
]
async def run_url_dispatcher_benchmark() -> None:
for request in requests:
await app._router.resolve(request)
@benchmark
def _run() -> None:
loop.run_until_complete(run_url_dispatcher_benchmark())