Skip to content

Commit

Permalink
filter connections
Browse files Browse the repository at this point in the history
  • Loading branch information
giampaolo committed Jan 6, 2024
1 parent 89eac06 commit 2f56ac1
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions psutil/tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import debug
from psutil._common import supports_ipv6
from psutil._compat import PY3
from psutil.tests import AF_UNIX
Expand All @@ -44,26 +45,32 @@
from psutil.tests import wait_for_file


thisproc = psutil.Process()
SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())


def this_proc_connections(kind):
cons = psutil.Process().connections(kind=kind)
if kind in ("all", "unix"):
new = []
for conn in cons:
if conn.family == socket.AF_UNIX:
if MACOS and "/syslog" in conn.raddr:
debug("skipping %r" % conn)
continue
new.append(conn)
return new
else:
return cons


@serialrun
class ConnectionTestCase(PsutilTestCase):
def setUp(self):
if NETBSD or FREEBSD or (MACOS and not PY3):
# Process opens a UNIX socket to /var/log/run.
return
cons = thisproc.connections(kind='all')
self.assertEqual(cons, [])
self.assertEqual(this_proc_connections(kind='all'), [])

def tearDown(self):
# Make sure we closed all resources.
# Some BSDs open a UNIX socket to /var/log/run.
if NETBSD or FREEBSD or (MACOS and not PY3):
return
cons = thisproc.connections(kind='all')
self.assertEqual(cons, [])
self.assertEqual(this_proc_connections(kind='all'), [])

def compare_procsys_connections(self, pid, proc_cons, kind='all'):
"""Given a process PID and its list of connections compare
Expand Down Expand Up @@ -95,11 +102,11 @@ def test_system(self):

def test_process(self):
with create_sockets():
for conn in psutil.Process().connections(kind='all'):
for conn in this_proc_connections(kind='all'):
check_connection_ntuple(conn)

def test_invalid_kind(self):
self.assertRaises(ValueError, thisproc.connections, kind='???')
self.assertRaises(ValueError, this_proc_connections, kind='???')
self.assertRaises(ValueError, psutil.net_connections, kind='???')


Expand All @@ -108,7 +115,7 @@ class TestUnconnectedSockets(ConnectionTestCase):
"""Tests sockets which are open but not connected to anything."""

def get_conn_from_sock(self, sock):
cons = thisproc.connections(kind='all')
cons = this_proc_connections(kind='all')
smap = dict([(c.fd, c) for c in cons])
if NETBSD or FREEBSD:
# NetBSD opens a UNIX socket to /var/log/run
Expand Down Expand Up @@ -148,7 +155,7 @@ def check_socket(self, sock):

# XXX Solaris can't retrieve system-wide UNIX sockets
if sock.family == AF_UNIX and HAS_CONNECTIONS_UNIX:
cons = thisproc.connections(kind='all')
cons = this_proc_connections(kind='all')
self.compare_procsys_connections(os.getpid(), cons, kind='all')
return conn

Expand Down Expand Up @@ -210,17 +217,17 @@ class TestConnectedSocket(ConnectionTestCase):
@unittest.skipIf(SUNOS, "unreliable on SUONS")
def test_tcp(self):
addr = ("127.0.0.1", 0)
self.assertEqual(thisproc.connections(kind='tcp4'), [])
self.assertEqual(this_proc_connections(kind='tcp4'), [])
server, client = tcp_socketpair(AF_INET, addr=addr)
try:
cons = thisproc.connections(kind='tcp4')
cons = this_proc_connections(kind='tcp4')
self.assertEqual(len(cons), 2)
self.assertEqual(cons[0].status, psutil.CONN_ESTABLISHED)
self.assertEqual(cons[1].status, psutil.CONN_ESTABLISHED)
# May not be fast enough to change state so it stays
# commenteed.
# client.close()
# cons = thisproc.connections(kind='all')
# cons = this_proc_connections(kind='all')
# self.assertEqual(len(cons), 1)
# self.assertEqual(cons[0].status, psutil.CONN_CLOSE_WAIT)
finally:
Expand All @@ -232,7 +239,7 @@ def test_unix(self):
testfn = self.get_testfn()
server, client = unix_socketpair(testfn)
try:
cons = thisproc.connections(kind='unix')
cons = this_proc_connections(kind='unix')
assert not (cons[0].laddr and cons[0].raddr), cons
assert not (cons[1].laddr and cons[1].raddr), cons
if NETBSD or FREEBSD:
Expand All @@ -258,7 +265,7 @@ def test_unix(self):
class TestFilters(ConnectionTestCase):
def test_filters(self):
def check(kind, families, types):
for conn in thisproc.connections(kind=kind):
for conn in this_proc_connections(kind=kind):
self.assertIn(conn.family, families)
self.assertIn(conn.type, types)
if not SKIP_SYSCONS:
Expand Down Expand Up @@ -373,7 +380,7 @@ def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
tcp6_addr = None
udp6_addr = None

for p in thisproc.children():
for p in psutil.Process().children():
cons = p.connections()
self.assertEqual(len(cons), 1)
for conn in cons:
Expand Down Expand Up @@ -429,56 +436,56 @@ def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
def test_count(self):
with create_sockets():
# tcp
cons = thisproc.connections(kind='tcp')
cons = this_proc_connections(kind='tcp')
self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
for conn in cons:
self.assertIn(conn.family, (AF_INET, AF_INET6))
self.assertEqual(conn.type, SOCK_STREAM)
# tcp4
cons = thisproc.connections(kind='tcp4')
cons = this_proc_connections(kind='tcp4')
self.assertEqual(len(cons), 1)
self.assertEqual(cons[0].family, AF_INET)
self.assertEqual(cons[0].type, SOCK_STREAM)
# tcp6
if supports_ipv6():
cons = thisproc.connections(kind='tcp6')
cons = this_proc_connections(kind='tcp6')
self.assertEqual(len(cons), 1)
self.assertEqual(cons[0].family, AF_INET6)
self.assertEqual(cons[0].type, SOCK_STREAM)
# udp
cons = thisproc.connections(kind='udp')
cons = this_proc_connections(kind='udp')
self.assertEqual(len(cons), 2 if supports_ipv6() else 1)
for conn in cons:
self.assertIn(conn.family, (AF_INET, AF_INET6))
self.assertEqual(conn.type, SOCK_DGRAM)
# udp4
cons = thisproc.connections(kind='udp4')
cons = this_proc_connections(kind='udp4')
self.assertEqual(len(cons), 1)
self.assertEqual(cons[0].family, AF_INET)
self.assertEqual(cons[0].type, SOCK_DGRAM)
# udp6
if supports_ipv6():
cons = thisproc.connections(kind='udp6')
cons = this_proc_connections(kind='udp6')
self.assertEqual(len(cons), 1)
self.assertEqual(cons[0].family, AF_INET6)
self.assertEqual(cons[0].type, SOCK_DGRAM)
# inet
cons = thisproc.connections(kind='inet')
cons = this_proc_connections(kind='inet')
self.assertEqual(len(cons), 4 if supports_ipv6() else 2)
for conn in cons:
self.assertIn(conn.family, (AF_INET, AF_INET6))
self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
# inet6
if supports_ipv6():
cons = thisproc.connections(kind='inet6')
cons = this_proc_connections(kind='inet6')
self.assertEqual(len(cons), 2)
for conn in cons:
self.assertEqual(conn.family, AF_INET6)
self.assertIn(conn.type, (SOCK_STREAM, SOCK_DGRAM))
# Skipped on BSD becayse by default the Python process
# creates a UNIX socket to '/var/run/log'.
if HAS_CONNECTIONS_UNIX and not (FREEBSD or NETBSD):
cons = thisproc.connections(kind='unix')
cons = this_proc_connections(kind='unix')
self.assertEqual(len(cons), 3)
for conn in cons:
self.assertEqual(conn.family, AF_UNIX)
Expand Down

0 comments on commit 2f56ac1

Please sign in to comment.