-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtests.py
executable file
·122 lines (101 loc) · 4.46 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
import os
import sys
import unittest
from collections import deque
from unittest import mock
# import the real pymongo
import pymongo
# Add our local pymongo.watcher as a sub-package to the real pymongo
local_pymongo_path = os.path.join(os.path.dirname(__file__), "pymongo")
sys.path.insert(0, local_pymongo_path)
pymongo.watcher = __import__("watcher")
class TestWatchCursor(unittest.TestCase):
def setUp(self):
# We use port=-1 (an invalid value for port number) to make
# sure we will never connect to a real MongoDB by mistake
self.test_collection = \
pymongo.MongoClient(port=-1).test_db.test_collection
self.query = {"value": 10}
self.test_cursor = pymongo.watcher.cursor.WatchCursor(
self.test_collection, filter=self.query)
# clear the logs from the class attribute
self.test_cursor.watch_clear_logs()
@unittest.mock.patch("pymongo.cursor.Cursor.next")
def test_log(self, mock_next):
with mock.patch("time.time", side_effect=range(100)):
items = [next(self.test_cursor) for _ in range(10)]
self.assertEqual(mock_next.call_count, 10)
self.assertEqual(
self.test_cursor.watch_log_dict(), {
"create_time": mock.ANY,
"last_fetched_time": mock.ANY,
"query": self.query,
"normalized_query": self.query,
"collection": "test_collection",
"fetched_count": 10,
"tried_fetched_count": 10,
"fetch_time": 10})
self.assertRegex(
self.test_cursor.watch_log(
"{create_time.strftime('%Y %b %d %X.%f')[:-3]} - "
"{last_fetched_time.strftime('%Y %b %d %X.%f')[:-3]} : "
"Collection={json.dumps(collection)} "
"Query={query!r} "
"NQuery={normalized_query!r} "
"FetchTime={fetch_time:.6f} "
"TriedFetchedCount={tried_fetched_count} "
"FetchedCount={fetched_count}"),
r'\d+ \w+ \d\d \d\d:\d\d:\d\d.\d{3} - '
r'\d+ \w+ \d\d \d\d:\d\d:\d\d.\d{3} : '
f'Collection="test_collection" Query={self.query!r} '
f'NQuery={self.query!r} FetchTime=10.000000 '
'TriedFetchedCount=10 FetchedCount=10')
@unittest.mock.patch("pymongo.cursor.Cursor.next")
def test_all_logs(self, mock_next):
for i in range(3):
cursor = pymongo.watcher.cursor.WatchCursor(
self.test_collection, filter=self.query)
items = [next(cursor) for _ in range(10)]
self.assertEqual(
list(pymongo.watcher.cursor.WatchCursor.watch_all_logs(
log_format=dict)),
[{"create_time": mock.ANY,
"last_fetched_time": mock.ANY,
"query": self.query,
"normalized_query": self.query,
"collection": "test_collection",
"fetched_count": 10,
"tried_fetched_count": 10,
"fetch_time": mock.ANY}] * 3)
@unittest.mock.patch("pymongo.cursor.Cursor.next")
@unittest.mock.patch("threading.Condition.wait")
@unittest.mock.patch("pymongo.watcher.cursor.deque")
def test_follow_logs(self, mock_deque, mock_wait, mock_next):
queue = mock_deque.return_value = deque()
try:
pymongo.watcher.cursor.WatchCursor._watch_followers.append(queue)
for i in range(3):
cursor = pymongo.watcher.cursor.WatchCursor(
self.test_collection, filter=self.query)
items = [next(cursor) for _ in range(10)]
finally:
# calling `watch_follow_logs` will add the deque again
# into the followers list
pymongo.watcher.cursor.WatchCursor._watch_followers.pop()
mock_wait.side_effect = [True] * 3
# create the generator
follower = pymongo.watcher.cursor.WatchCursor.watch_follow_logs(
log_format=dict, wait_time_ms=0)
self.assertEqual(
[next(follower) for _ in range(3)],
[{"create_time": mock.ANY,
"last_fetched_time": mock.ANY,
"query": self.query,
"normalized_query": self.query,
"collection": "test_collection",
"fetched_count": 10,
"tried_fetched_count": 10,
"fetch_time": mock.ANY}] * 3)
if __name__ == '__main__':
unittest.main()