From 5f51a78b00483cd91f199d3d557fa5ff00735149 Mon Sep 17 00:00:00 2001 From: Evan-Adam <152897682+Evan-Adam@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:02:03 +0200 Subject: [PATCH] MON-48766-community-pr-fix-file-descriptors-leak-in-gorgone-that-occur-when-pollers-are-disconnected (#1512) fix(gorgone): fd leak TODO : we need to change the gha to launch this test only on develop, and if possible execute it nighty Co-authored-by: garnier-quentin Co-authored-by: qgarnier --- gorgone/gorgone/class/clientzmq.pm | 72 +++++++++++-------- gorgone/gorgone/modules/core/proxy/class.pm | 8 ++- .../gorgone/modules/core/proxy/sshclient.pm | 2 + 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/gorgone/gorgone/class/clientzmq.pm b/gorgone/gorgone/class/clientzmq.pm index a293c8695c5..9c34f5bed97 100644 --- a/gorgone/gorgone/class/clientzmq.pm +++ b/gorgone/gorgone/class/clientzmq.pm @@ -109,11 +109,20 @@ sub init { $callbacks->{ $self->{identity} } = $options{callback} if (defined($options{callback})); } +sub cleanup { + my ($self, %options) = @_; + + delete $callbacks->{ $self->{identity} }; + delete $connectors->{ $self->{identity} }; + delete $sockets->{ $self->{identity} }; +} + sub close { my ($self, %options) = @_; + $sockets->{ $self->{identity} }->close() if (defined($sockets->{ $self->{identity} })); + $self->{core_watcher}->stop() if (defined($self->{core_watcher})); delete $self->{core_watcher}; - $sockets->{ $self->{identity} }->close(); } sub get_connect_identity { @@ -128,14 +137,19 @@ sub get_server_pubkey { $sockets->{ $self->{identity} }->send('[GETPUBKEY]', ZMQ_DONTWAIT); $self->event(identity => $self->{identity}); - my $w1 = $self->{connect_loop}->timer( + my $w1 = $self->{connect_loop}->io( + $sockets->{ $self->{identity} }->get_fd(), + EV::READ, + sub { + $self->event(identity => $self->{identity}); + } + ); + my $w2 = $self->{connect_loop}->timer( 10, 0, - sub { - $self->{connect_loop}->break(); - } + sub {} ); - $self->{connect_loop}->run(); + $self->{connect_loop}->run(EV::RUN_ONCE); } sub read_key_protocol { @@ -278,9 +292,9 @@ sub ping { time() - $self->{ping_timeout_time} > $self->{ping_timeout}) { $self->{logger}->writeLogError("[clientzmq] No ping response") if (defined($self->{logger})); $self->{ping_progress} = 0; - $sockets->{ $self->{identity} }->close(); - delete $self->{core_watcher}; - + $self->close(); + # new identity for a new handshake (for module pull) + $self->{extra_identity} = gorgone::standard::library::generate_token(length => 12); $self->init(); $status = 1; } @@ -305,25 +319,23 @@ sub event { $connectors->{ $options{identity} }->{ping_time} = time(); while ($sockets->{ $options{identity} }->has_pollin()) { + my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} }); + next if ($connectors->{ $options{identity} }->{handshake} == -1); + next if ($rv); + # We have a response. So it's ok :) if ($connectors->{ $options{identity} }->{ping_progress} == 1) { $connectors->{ $options{identity} }->{ping_progress} = 0; } - my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} }); - last if ($rv); - # in progress if ($connectors->{ $options{identity} }->{handshake} == 0) { - $self->{connect_loop}->break(); $connectors->{ $options{identity} }->{handshake} = 1; if ($connectors->{ $options{identity} }->check_server_pubkey(message => $message) == 0) { $connectors->{ $options{identity} }->{handshake} = -1; } } elsif ($connectors->{ $options{identity} }->{handshake} == 1) { - $self->{connect_loop}->break(); - $self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret recv [3]"); my ($status, $verbose, $symkey, $hostname) = $connectors->{ $options{identity} }->client_get_secret( message => $message @@ -332,7 +344,7 @@ sub event { $self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret $verbose [3]"); $connectors->{ $options{identity} }->{handshake} = -1; $connectors->{ $options{identity} }->{verbose_last_message} = $verbose; - return ; + next; } $connectors->{ $options{identity} }->{handshake} = 2; if (defined($connectors->{ $options{identity} }->{logger})) { @@ -348,7 +360,7 @@ sub event { if ($rv == -1 || $data !~ /^\[([a-zA-Z0-9:\-_]+?)\]\s+/) { $connectors->{ $options{identity} }->{handshake} = -1; $connectors->{ $options{identity} }->{verbose_last_message} = 'decrypt issue: ' . $data; - return ; + next; } if ($1 eq 'KEY') { @@ -389,14 +401,7 @@ sub send_message { my ($self, %options) = @_; if ($self->{handshake} == 0) { - $self->{connect_loop} = new EV::Loop(); - $self->{connect_watcher} = $self->{connect_loop}->io( - $sockets->{ $self->{identity} }->get_fd(), - EV::READ, - sub { - $self->event(identity => $self->{identity}); - } - ); + $self->{connect_loop} = EV::Loop->new(); if (!defined($self->{server_pubkey})) { $self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - get_server_pubkey sent [1]"); @@ -424,15 +429,24 @@ sub send_message { $sockets->{ $self->{identity} }->send($ciphertext, ZMQ_DONTWAIT); $self->event(identity => $self->{identity}); - my $w1 = $self->{connect_loop}->timer( + my $w1 = $self->{connect_loop}->io( + $sockets->{ $self->{identity} }->get_fd(), + EV::READ, + sub { + $self->event(identity => $self->{identity}); + } + ); + my $w2 = $self->{connect_loop}->timer( 10, 0, - sub { $self->{connect_loop}->break(); } + sub {} ); - $self->{connect_loop}->run(); + $self->{connect_loop}->run(EV::RUN_ONCE); } - undef $self->{connect_loop} if (defined($self->{connect_loop})); + if (defined($self->{connect_loop})) { + delete $self->{connect_loop}; + } if ($self->{handshake} < 2) { $self->{handshake} = 0; diff --git a/gorgone/gorgone/modules/core/proxy/class.pm b/gorgone/gorgone/modules/core/proxy/class.pm index de525cd98e5..3798142ec9a 100644 --- a/gorgone/gorgone/modules/core/proxy/class.pm +++ b/gorgone/gorgone/modules/core/proxy/class.pm @@ -230,6 +230,7 @@ sub action_proxyaddnode { }); $self->{clients}->{ $data->{id} }->{class}->close(); + $self->{clients}->{ $data->{id} }->{class}->cleanup(); } else { $self->{internal_channels}->{ $data->{id} } = gorgone::standard::library::connect_com( context => $self->{zmq_context}, @@ -282,6 +283,7 @@ sub action_proxycloseconnection { $self->{logger}->writeLogInfo("[proxy] Close connectionn for $data->{id}"); $self->{clients}->{ $data->{id} }->{class}->close(); + $self->{clients}->{ $data->{id} }->{class}->cleanup(); $self->{clients}->{ $data->{id} }->{delete} = 0; $self->{clients}->{ $data->{id} }->{class} = undef; } @@ -293,6 +295,7 @@ sub close_connections { if (defined($self->{clients}->{$_}->{class}) && $self->{clients}->{$_}->{type} eq 'push_zmq') { $self->{logger}->writeLogInfo("[proxy] Close connection for $_"); $self->{clients}->{$_}->{class}->close(); + $self->{clients}->{$_}->{class}->cleanup(); } } } @@ -497,7 +500,10 @@ sub periodic_exec { token => $connector->generate_token(), target => '' }); - $connector->{clients}->{$_}->{class}->close() if (defined($connector->{clients}->{$_}->{class})); + if (defined($connector->{clients}->{$_}->{class})) { + $connector->{clients}->{$_}->{class}->close(); + $connector->{clients}->{$_}->{class}->cleanup(); + } $connector->{clients}->{$_}->{class} = undef; $connector->{clients}->{$_}->{delete} = 0; $connector->{clients}->{$_}->{com_read_internal} = 0; diff --git a/gorgone/gorgone/modules/core/proxy/sshclient.pm b/gorgone/gorgone/modules/core/proxy/sshclient.pm index d0f40303e12..af81969bee0 100644 --- a/gorgone/gorgone/modules/core/proxy/sshclient.pm +++ b/gorgone/gorgone/modules/core/proxy/sshclient.pm @@ -552,4 +552,6 @@ sub close { $self->disconnect(); } +sub cleanup {} + 1;