From 2f56ac16d00793d2e5767d2a90ff88dcac45557e Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Sun, 7 Jan 2024 00:27:39 +0100 Subject: [PATCH] filter connections --- psutil/tests/test_connections.py | 67 ++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index 082bf981a9..9e7c6b955f 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -25,6 +25,7 @@ from psutil import POSIX from psutil import SUNOS from psutil import WINDOWS +from psutil._common import debug from psutil._common import supports_ipv6 from psutil._compat import PY3 from psutil.tests import AF_UNIX @@ -44,26 +45,32 @@ from psutil.tests import wait_for_file -thisproc = psutil.Process() SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) +def this_proc_connections(kind): + cons = psutil.Process().connections(kind=kind) + if kind in ("all", "unix"): + new = [] + for conn in cons: + if conn.family == socket.AF_UNIX: + if MACOS and "/syslog" in conn.raddr: + debug("skipping %r" % conn) + continue + new.append(conn) + return new + else: + return cons + + @serialrun class ConnectionTestCase(PsutilTestCase): def setUp(self): - if NETBSD or FREEBSD or (MACOS and not PY3): - # Process opens a UNIX socket to /var/log/run. - return - cons = thisproc.connections(kind='all') - self.assertEqual(cons, []) + self.assertEqual(this_proc_connections(kind='all'), []) def tearDown(self): # Make sure we closed all resources. - # Some BSDs open a UNIX socket to /var/log/run. - if NETBSD or FREEBSD or (MACOS and not PY3): - return - cons = thisproc.connections(kind='all') - self.assertEqual(cons, []) + self.assertEqual(this_proc_connections(kind='all'), []) def compare_procsys_connections(self, pid, proc_cons, kind='all'): """Given a process PID and its list of connections compare @@ -95,11 +102,11 @@ def test_system(self): def test_process(self): with create_sockets(): - for conn in psutil.Process().connections(kind='all'): + for conn in this_proc_connections(kind='all'): check_connection_ntuple(conn) def test_invalid_kind(self): - self.assertRaises(ValueError, thisproc.connections, kind='???') + self.assertRaises(ValueError, this_proc_connections, kind='???') self.assertRaises(ValueError, psutil.net_connections, kind='???') @@ -108,7 +115,7 @@ class TestUnconnectedSockets(ConnectionTestCase): """Tests sockets which are open but not connected to anything.""" def get_conn_from_sock(self, sock): - cons = thisproc.connections(kind='all') + cons = this_proc_connections(kind='all') smap = dict([(c.fd, c) for c in cons]) if NETBSD or FREEBSD: # NetBSD opens a UNIX socket to /var/log/run @@ -148,7 +155,7 @@ def check_socket(self, sock): # XXX Solaris can't retrieve system-wide UNIX sockets if sock.family == AF_UNIX and HAS_CONNECTIONS_UNIX: - cons = thisproc.connections(kind='all') + cons = this_proc_connections(kind='all') self.compare_procsys_connections(os.getpid(), cons, kind='all') return conn @@ -210,17 +217,17 @@ class TestConnectedSocket(ConnectionTestCase): @unittest.skipIf(SUNOS, "unreliable on SUONS") def test_tcp(self): addr = ("127.0.0.1", 0) - self.assertEqual(thisproc.connections(kind='tcp4'), []) + self.assertEqual(this_proc_connections(kind='tcp4'), []) server, client = tcp_socketpair(AF_INET, addr=addr) try: - cons = thisproc.connections(kind='tcp4') + cons = this_proc_connections(kind='tcp4') self.assertEqual(len(cons), 2) self.assertEqual(cons[0].status, psutil.CONN_ESTABLISHED) self.assertEqual(cons[1].status, psutil.CONN_ESTABLISHED) # May not be fast enough to change state so it stays # commenteed. # client.close() - # cons = thisproc.connections(kind='all') + # cons = this_proc_connections(kind='all') # self.assertEqual(len(cons), 1) # self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT) finally: @@ -232,7 +239,7 @@ def test_unix(self): testfn = self.get_testfn() server, client = unix_socketpair(testfn) try: - cons = thisproc.connections(kind='unix') + cons = this_proc_connections(kind='unix') assert not (cons[0].laddr and cons[0].raddr), cons assert not (cons[1].laddr and cons[1].raddr), cons if NETBSD or FREEBSD: @@ -258,7 +265,7 @@ def test_unix(self): class TestFilters(ConnectionTestCase): def test_filters(self): def check(kind, families, types): - for conn in thisproc.connections(kind=kind): + for conn in this_proc_connections(kind=kind): self.assertIn(conn.family, families) self.assertIn(conn.type, types) if not SKIP_SYSCONS: @@ -373,7 +380,7 @@ def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): tcp6_addr = None udp6_addr = None - for p in thisproc.children(): + for p in psutil.Process().children(): cons = p.connections() self.assertEqual(len(cons), 1) for conn in cons: @@ -429,48 +436,48 @@ def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): def test_count(self): with create_sockets(): # tcp - cons = thisproc.connections(kind='tcp') + cons = this_proc_connections(kind='tcp') self.assertEqual(len(cons), 2 if supports_ipv6() else 1) for conn in cons: self.assertIn(conn.family, (AF_INET, AF_INET6)) self.assertEqual(conn.type, SOCK_STREAM) # tcp4 - cons = thisproc.connections(kind='tcp4') + cons = this_proc_connections(kind='tcp4') self.assertEqual(len(cons), 1) self.assertEqual(cons[0].family, AF_INET) self.assertEqual(cons[0].type, SOCK_STREAM) # tcp6 if supports_ipv6(): - cons = thisproc.connections(kind='tcp6') + cons = this_proc_connections(kind='tcp6') self.assertEqual(len(cons), 1) self.assertEqual(cons[0].family, AF_INET6) self.assertEqual(cons[0].type, SOCK_STREAM) # udp - cons = thisproc.connections(kind='udp') + cons = this_proc_connections(kind='udp') self.assertEqual(len(cons), 2 if supports_ipv6() else 1) for conn in cons: self.assertIn(conn.family, (AF_INET, AF_INET6)) self.assertEqual(conn.type, SOCK_DGRAM) # udp4 - cons = thisproc.connections(kind='udp4') + cons = this_proc_connections(kind='udp4') self.assertEqual(len(cons), 1) self.assertEqual(cons[0].family, AF_INET) self.assertEqual(cons[0].type, SOCK_DGRAM) # udp6 if supports_ipv6(): - cons = thisproc.connections(kind='udp6') + cons = this_proc_connections(kind='udp6') self.assertEqual(len(cons), 1) self.assertEqual(cons[0].family, AF_INET6) self.assertEqual(cons[0].type, SOCK_DGRAM) # inet - cons = thisproc.connections(kind='inet') + cons = this_proc_connections(kind='inet') self.assertEqual(len(cons), 4 if supports_ipv6() else 2) for conn in cons: self.assertIn(conn.family, (AF_INET, AF_INET6)) self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM)) # inet6 if supports_ipv6(): - cons = thisproc.connections(kind='inet6') + cons = this_proc_connections(kind='inet6') self.assertEqual(len(cons), 2) for conn in cons: self.assertEqual(conn.family, AF_INET6) @@ -478,7 +485,7 @@ def test_count(self): # Skipped on BSD becayse by default the Python process # creates a UNIX socket to '/var/run/log'. if HAS_CONNECTIONS_UNIX and not (FREEBSD or NETBSD): - cons = thisproc.connections(kind='unix') + cons = this_proc_connections(kind='unix') self.assertEqual(len(cons), 3) for conn in cons: self.assertEqual(conn.family, AF_UNIX)