@@ -30,6 +30,9 @@ def subscribe(self, channel):
3030 def unsubscribe (self , channel ):
3131 raise NotImplementedError ()
3232
33+ def get_subscriptions (self ):
34+ raise NotImplementedError ()
35+
3336 @classmethod
3437 def publish (cls , channel , payload = None ):
3538 raise NotImplementedError ()
@@ -57,35 +60,35 @@ def drain_non_blocking_fd(fd):
5760 os .read (fd , 256 )
5861
5962
60- PID = os .getpid ()
61-
62-
6363class PostgresPubSub (BasePubSubBackend ):
64+ PID = os .getpid ()
6465
6566 def __init__ (self ):
6667 self ._subscriptions = set ()
6768 self .message_buffer = []
68- self .cursor = connection .cursor ()
69+ # ensures a connection is initialized
70+ with connection .cursor () as cursor :
71+ cursor .execute ("select 1" )
6972 self .backend_pid = connection .connection .info .backend_pid
70- # logger.info(f"{connection.connection.info.backend_pid=}")
7173 self .sentinel_r , self .sentinel_w = os .pipe ()
7274 os .set_blocking (self .sentinel_r , False )
7375 os .set_blocking (self .sentinel_w , False )
7476 connection .connection .add_notify_handler (self ._store_messages )
7577
78+ @classmethod
79+ def _debug (cls , message ):
80+ logger .debug (f"[{ cls .PID } ] { message } " )
81+
7682 def _store_messages (self , notification ):
77- # logger.info(f"[{PID}] Received message: {notification}")
7883 self .message_buffer .append (
7984 PubsubMessage (channel = notification .channel , payload = notification .payload )
8085 )
8186 if notification .pid == self .backend_pid :
8287 os .write (self .sentinel_w , b"1" )
83-
84- def get_subscriptions (self ):
85- return self ._subscriptions .copy ()
88+ self ._debug (f"Received message: { notification } " )
8689
8790 @classmethod
88- def publish (cls , channel , payload = None ):
91+ def publish (cls , channel , payload = "" ):
8992 query = (
9093 (f"NOTIFY { channel } " ,)
9194 if not payload
@@ -94,6 +97,7 @@ def publish(cls, channel, payload=None):
9497
9598 with connection .cursor () as cursor :
9699 cursor .execute (* query )
100+ cls ._debug (f"Sent message: ({ channel } , { str (payload )} )" )
97101
98102 def subscribe (self , channel ):
99103 self ._subscriptions .add (channel )
@@ -108,7 +112,12 @@ def unsubscribe(self, channel):
108112 with connection .cursor () as cursor :
109113 cursor .execute (f"UNLISTEN { channel } " )
110114
115+ def get_subscriptions (self ):
116+ return self ._subscriptions .copy ()
117+
111118 def fileno (self ) -> int :
119+ # when pub/sub clients are the same, the notification callback may be called
120+ # asynchronously, making select on connection miss new notifications
112121 ready , _ , _ = select .select ([self .sentinel_r ], [], [], 0 )
113122 if self .sentinel_r in ready :
114123 return self .sentinel_r
@@ -120,7 +129,7 @@ def fetch(self) -> list[PubsubMessage]:
120129 result = self .message_buffer .copy ()
121130 self .message_buffer .clear ()
122131 drain_non_blocking_fd (self .sentinel_r )
123- # logger.info (f"[{PID}] Fetched messages: {result}")
132+ self . _debug (f"Fetched messages: { result } " )
124133 return result
125134
126135 def close (self ):
0 commit comments