@@ -29,28 +29,58 @@ def django_connection_reset(request):
2929 django_db_blocker .block ()
3030
3131
32- def test_postgres_pubsub ():
33- """Testing postgres low-level implementation."""
34- from django .db import connection
35-
36- state = SimpleNamespace ()
37- state .got_message = False
38- with connection .cursor () as cursor :
39- assert connection .connection is cursor .connection
40- conn = cursor .connection
41- # Listen and Notify
42- conn .execute ("LISTEN abc" )
43- conn .add_notify_handler (lambda notification : setattr (state , "got_message" , True ))
44- cursor .execute ("NOTIFY abc, 'foo'" )
45- assert state .got_message is True
46- conn .execute ("SELECT 1" )
47- assert state .got_message is True
48-
49- # Reset and retry
32+ class TestPostgresSpecifics :
33+ def test_listen_notify_in_same_process (self ):
34+ """Testing postgres low-level implementation."""
35+ from django .db import connection
36+
37+ state = SimpleNamespace ()
5038 state .got_message = False
51- conn .execute ("UNLISTEN abc" )
52- cursor .execute ("NOTIFY abc, 'foo'" )
53- assert state .got_message is False
39+ with connection .cursor () as cursor :
40+ assert connection .connection is cursor .connection
41+ conn = cursor .connection
42+ # Listen and Notify
43+ conn .execute ("LISTEN abc" )
44+ conn .add_notify_handler (lambda notification : setattr (state , "got_message" , True ))
45+ cursor .execute ("NOTIFY abc, 'foo'" )
46+ assert state .got_message is True
47+ conn .execute ("SELECT 1" )
48+ assert state .got_message is True
49+
50+ # Reset and retry
51+ state .got_message = False
52+ conn .execute ("UNLISTEN abc" )
53+ cursor .execute ("NOTIFY abc, 'foo'" )
54+ assert state .got_message is False
55+
56+ def test_low_level_assumptions_on_multiprocess (self ):
57+ """Asserts that we are really testing two different connections.
58+
59+ From psycopg, the backend_id is:
60+ "The process ID (PID) of the backend process handling this connection."
61+ """
62+ from django .db import connection
63+
64+ def host_act (host_turn , log ):
65+ with host_turn (): # 1
66+ assert connection .connection is None
67+ with connection .cursor () as cursor :
68+ cursor .execute ("select 1" )
69+ assert connection .connection is not None
70+ log .put (connection .connection .info .backend_pid )
71+
72+ def child_act (child_turn , log ):
73+ with child_turn (): # 2
74+ assert connection .connection is None
75+ with connection .cursor () as cursor :
76+ cursor .execute ("select 1" )
77+ assert connection .connection is not None
78+ log .put (connection .connection .info .backend_pid )
79+
80+ log = IpcUtil .run (host_act , child_act )
81+ assert len (log ) == 2
82+ host_connection_pid , child_connection_pid = log
83+ assert host_connection_pid != child_connection_pid
5484
5585
5686M = pubsub .PubsubMessage
@@ -59,20 +89,34 @@ def test_postgres_pubsub():
5989]
6090
6191
92+ def unsubscribe_all (channels , subscriber ):
93+ for channel in channels :
94+ subscriber .unsubscribe (channel )
95+
96+
97+ def subscribe_all (channels , subscriber ):
98+ for channel in channels :
99+ subscriber .subscribe (channel )
100+
101+
102+ def publish_all (messages , publisher ):
103+ for channel , payload in messages :
104+ publisher .publish (channel , payload = payload )
105+
106+
62107@pytest .mark .parametrize ("pubsub_backend" , PUBSUB_BACKENDS )
108+ @pytest .mark .parametrize (
109+ "payload" ,
110+ (
111+ pytest .param (None , id = "none" ),
112+ pytest .param ("" , id = "empty-string" ),
113+ pytest .param ("payload" , id = "non-empty-string" ),
114+ pytest .param (123 , id = "int" ),
115+ pytest .param (datetime .now (), id = "datetime" ),
116+ pytest .param (True , id = "bool" ),
117+ ),
118+ )
63119class TestPublish :
64-
65- @pytest .mark .parametrize (
66- "payload" ,
67- (
68- pytest .param (None , id = "none" ),
69- pytest .param ("" , id = "empty-string" ),
70- pytest .param ("payload" , id = "non-empty-string" ),
71- pytest .param (123 , id = "int" ),
72- pytest .param (datetime .now (), id = "datetime" ),
73- pytest .param (True , id = "bool" ),
74- ),
75- )
76120 def test_with_payload_as (self , pubsub_backend : pubsub .BasePubSubBackend , payload ):
77121 pubsub_backend .publish ("channel" , payload = payload )
78122
@@ -90,29 +134,17 @@ def test_with_payload_as(self, pubsub_backend: pubsub.BasePubSubBackend, payload
90134 ),
91135)
92136class TestNoIpcSubscribeFetch :
93- def unsubscribe_all (self , channels , subscriber ):
94- for channel in channels :
95- subscriber .unsubscribe (channel )
96-
97- def subscribe_all (self , channels , subscriber ):
98- for channel in channels :
99- subscriber .subscribe (channel )
100-
101- def publish_all (self , messages , publisher ):
102- for channel , payload in messages :
103- publisher .publish (channel , payload = payload )
104-
105137 def test_with (
106138 self , pubsub_backend : pubsub .BasePubSubBackend , messages : list [pubsub .PubsubMessage ]
107139 ):
108140 channels = {m .channel for m in messages }
109141 publisher = pubsub_backend
110142 with pubsub_backend () as subscriber :
111- self . subscribe_all (channels , subscriber )
112- self . publish_all (messages , publisher )
143+ subscribe_all (channels , subscriber )
144+ publish_all (messages , publisher )
113145 assert subscriber .fetch () == messages
114146
115- self . unsubscribe_all (channels , subscriber )
147+ unsubscribe_all (channels , subscriber )
116148 assert subscriber .fetch () == []
117149
118150 def test_select_readiness_with (
@@ -122,14 +154,14 @@ def test_select_readiness_with(
122154 CHANNELS = {m .channel for m in messages }
123155 publisher = pubsub_backend
124156 with pubsub_backend () as subscriber :
125- self . subscribe_all (CHANNELS , subscriber )
157+ subscribe_all (CHANNELS , subscriber )
126158 assert subscriber .get_subscriptions () == CHANNELS
127159
128160 ready , _ , _ = select .select ([subscriber ], [], [], TIMEOUT )
129161 assert subscriber not in ready
130162 assert subscriber .fetch () == []
131163
132- self . publish_all (messages , publisher )
164+ publish_all (messages , publisher )
133165 ready , _ , _ = select .select ([subscriber ], [], [], TIMEOUT )
134166 assert subscriber in ready
135167
@@ -141,49 +173,19 @@ def test_select_readiness_with(
141173 assert subscriber not in ready
142174 assert subscriber .fetch () == []
143175
144- self . unsubscribe_all (CHANNELS , subscriber )
145- self . publish_all (messages , publisher )
176+ unsubscribe_all (CHANNELS , subscriber )
177+ publish_all (messages , publisher )
146178 ready , _ , _ = select .select ([subscriber ], [], [], TIMEOUT )
147179 assert subscriber not in ready
148180 assert subscriber .fetch () == []
149181
150182
151- def test_postgres_backend_ipc ():
152- """Asserts that we are really testing two different connections.
153-
154- From psycopg, the backend_id is:
155- "The process ID (PID) of the backend process handling this connection."
156- """
157- from django .db import connection
158-
159- def host_act (host_turn , log ):
160- with host_turn (): # 1
161- assert connection .connection is None
162- with connection .cursor () as cursor :
163- cursor .execute ("select 1" )
164- assert connection .connection is not None
165- log .put (connection .connection .info .backend_pid )
166-
167- def child_act (child_turn , log ):
168- with child_turn (): # 2
169- assert connection .connection is None
170- with connection .cursor () as cursor :
171- cursor .execute ("select 1" )
172- assert connection .connection is not None
173- log .put (connection .connection .info .backend_pid )
174-
175- log = IpcUtil .run (host_act , child_act )
176- assert len (log ) == 2
177- host_connection_pid , child_connection_pid = log
178- assert host_connection_pid != child_connection_pid
179-
180-
181183@pytest .mark .parametrize ("pubsub_backend" , PUBSUB_BACKENDS )
182184@pytest .mark .parametrize (
183185 "messages" ,
184186 (
185187 pytest .param ([M ("a" , "A1" )], id = "single-message" ),
186- pytest .param ([M ("a" , "A1" )], id = "test-leaking" ),
188+ pytest .param ([M ("a" , "A1" )], id = "test-if- leaking" ),
187189 pytest .param ([M ("b" , "B1" ), M ("b" , "B2" )], id = "two-messages-in-same-channel" ),
188190 pytest .param (
189191 [M ("c" , "C1" ), M ("c" , "C2" ), M ("d" , "D1" ), M ("d" , "D1" )],
@@ -212,8 +214,7 @@ def subscriber_act(subscriber_turn, log):
212214 with pubsub_backend () as subscriber :
213215 with subscriber_turn (): # 1
214216 log .put ("subscribe" )
215- for channel in CHANNELS :
216- subscriber .subscribe (channel )
217+ subscribe_all (CHANNELS , subscriber )
217218
218219 with subscriber_turn (): # 3
219220 log .put ("fetch" )
@@ -225,8 +226,7 @@ def subscriber_act(subscriber_turn, log):
225226 log .put ("fetch+unsubscribe" )
226227 assert subscriber .fetch () == messages
227228 assert subscriber .fetch () == []
228- for channel in CHANNELS :
229- subscriber .unsubscribe (channel )
229+ unsubscribe_all (CHANNELS , subscriber )
230230
231231 with subscriber_turn (done = True ): # 7
232232 log .put ("fetch-empty" )
@@ -235,20 +235,17 @@ def subscriber_act(subscriber_turn, log):
235235 # child
236236 def publisher_act (publisher_turn , log ):
237237 publisher = pubsub_backend
238- with publisher_turn ():
238+ with publisher_turn (): # 2
239239 log .put ("publish" )
240- for message in messages : # 2
241- publisher .publish (message .channel , payload = message .payload )
240+ publish_all (messages , publisher )
242241
243- with publisher_turn ():
242+ with publisher_turn (): # 4
244243 log .put ("publish" )
245- for message in messages : # 4
246- publisher .publish (message .channel , payload = message .payload )
244+ publish_all (messages , publisher )
247245
248- with publisher_turn ():
246+ with publisher_turn (): # 6
249247 log .put ("publish" )
250- for message in messages : # 6
251- publisher .publish (message .channel , payload = message .payload )
248+ publish_all (messages , publisher )
252249
253250 log = IpcUtil .run (subscriber_act , publisher_act )
254251 assert log == EXPECTED_LOG
@@ -270,8 +267,7 @@ def subscriber_act(subscriber_turn, log):
270267 with pubsub_backend () as subscriber :
271268 with subscriber_turn (): # 1
272269 log .put ("subscribe/select-empty" )
273- for channel in CHANNELS :
274- subscriber .subscribe (channel )
270+ subscribe_all (CHANNELS , subscriber )
275271 assert subscriber .get_subscriptions () == CHANNELS
276272 ready , _ , _ = select .select ([subscriber ], [], [], TIMEOUT )
277273 assert subscriber not in ready
@@ -286,8 +282,7 @@ def subscriber_act(subscriber_turn, log):
286282 assert subscriber in ready
287283 assert subscriber .fetch () == messages
288284 assert subscriber .fetch () == []
289- for channel in CHANNELS :
290- subscriber .unsubscribe (channel )
285+ unsubscribe_all (CHANNELS , subscriber )
291286
292287 with subscriber_turn (done = True ): # 5
293288 log .put ("fetch/select-empty" )
@@ -299,13 +294,11 @@ def publisher_act(publisher_turn, log):
299294 publisher = pubsub_backend
300295 with publisher_turn (): # 2
301296 log .put ("publish" )
302- for message in messages :
303- publisher .publish (message .channel , payload = message .payload )
297+ publish_all (messages , publisher )
304298
305299 with publisher_turn (): # 4
306300 log .put ("publish" )
307- for message in messages :
308- publisher .publish (message .channel , payload = message .payload )
301+ publish_all (messages , publisher )
309302
310303 log = IpcUtil .run (subscriber_act , publisher_act )
311304 assert log == EXPECTED_LOG
0 commit comments