Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gorgone): community pr fix file descriptor leak when poller are disconnected #1621

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions gorgone/gorgone/class/clientzmq.pm
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,20 @@ sub init {
$callbacks->{ $self->{identity} } = $options{callback} if (defined($options{callback}));
}

sub cleanup {
my ($self, %options) = @_;

delete $callbacks->{ $self->{identity} };
delete $connectors->{ $self->{identity} };
delete $sockets->{ $self->{identity} };
}

sub close {
my ($self, %options) = @_;

$sockets->{ $self->{identity} }->close() if (defined($sockets->{ $self->{identity} }));
$self->{core_watcher}->stop() if (defined($self->{core_watcher}));
delete $self->{core_watcher};
$sockets->{ $self->{identity} }->close();
}

sub get_connect_identity {
Expand All @@ -128,14 +137,19 @@ sub get_server_pubkey {
$sockets->{ $self->{identity} }->send('[GETPUBKEY]', ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});

my $w1 = $self->{connect_loop}->timer(
my $w1 = $self->{connect_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
);
my $w2 = $self->{connect_loop}->timer(
10,
0,
sub {
$self->{connect_loop}->break();
}
sub {}
);
$self->{connect_loop}->run();
$self->{connect_loop}->run(EV::RUN_ONCE);
}

sub read_key_protocol {
Expand Down Expand Up @@ -278,9 +292,9 @@ sub ping {
time() - $self->{ping_timeout_time} > $self->{ping_timeout}) {
$self->{logger}->writeLogError("[clientzmq] No ping response") if (defined($self->{logger}));
$self->{ping_progress} = 0;
$sockets->{ $self->{identity} }->close();
delete $self->{core_watcher};

$self->close();
# new identity for a new handshake (for module pull)
$self->{extra_identity} = gorgone::standard::library::generate_token(length => 12);
$self->init();
$status = 1;
}
Expand All @@ -305,25 +319,23 @@ sub event {

$connectors->{ $options{identity} }->{ping_time} = time();
while ($sockets->{ $options{identity} }->has_pollin()) {
my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} });
next if ($connectors->{ $options{identity} }->{handshake} == -1);
next if ($rv);

# We have a response. So it's ok :)
if ($connectors->{ $options{identity} }->{ping_progress} == 1) {
$connectors->{ $options{identity} }->{ping_progress} = 0;
}

my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} });
last if ($rv);

# in progress
if ($connectors->{ $options{identity} }->{handshake} == 0) {
$self->{connect_loop}->break();
$connectors->{ $options{identity} }->{handshake} = 1;
if ($connectors->{ $options{identity} }->check_server_pubkey(message => $message) == 0) {
$connectors->{ $options{identity} }->{handshake} = -1;

}
} elsif ($connectors->{ $options{identity} }->{handshake} == 1) {
$self->{connect_loop}->break();

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret recv [3]");
my ($status, $verbose, $symkey, $hostname) = $connectors->{ $options{identity} }->client_get_secret(
message => $message
Expand All @@ -332,7 +344,7 @@ sub event {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret $verbose [3]");
$connectors->{ $options{identity} }->{handshake} = -1;
$connectors->{ $options{identity} }->{verbose_last_message} = $verbose;
return ;
next;
}
$connectors->{ $options{identity} }->{handshake} = 2;
if (defined($connectors->{ $options{identity} }->{logger})) {
Expand All @@ -348,7 +360,7 @@ sub event {
if ($rv == -1 || $data !~ /^\[([a-zA-Z0-9:\-_]+?)\]\s+/) {
$connectors->{ $options{identity} }->{handshake} = -1;
$connectors->{ $options{identity} }->{verbose_last_message} = 'decrypt issue: ' . $data;
return ;
next;
}

if ($1 eq 'KEY') {
Expand Down Expand Up @@ -389,14 +401,7 @@ sub send_message {
my ($self, %options) = @_;

if ($self->{handshake} == 0) {
$self->{connect_loop} = new EV::Loop();
$self->{connect_watcher} = $self->{connect_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
);
$self->{connect_loop} = EV::Loop->new();

if (!defined($self->{server_pubkey})) {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - get_server_pubkey sent [1]");
Expand Down Expand Up @@ -424,15 +429,24 @@ sub send_message {
$sockets->{ $self->{identity} }->send($ciphertext, ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});

my $w1 = $self->{connect_loop}->timer(
my $w1 = $self->{connect_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
);
my $w2 = $self->{connect_loop}->timer(
10,
0,
sub { $self->{connect_loop}->break(); }
sub {}
);
$self->{connect_loop}->run();
$self->{connect_loop}->run(EV::RUN_ONCE);
}

undef $self->{connect_loop} if (defined($self->{connect_loop}));
if (defined($self->{connect_loop})) {
delete $self->{connect_loop};
}

if ($self->{handshake} < 2) {
$self->{handshake} = 0;
Expand Down
8 changes: 7 additions & 1 deletion gorgone/gorgone/modules/core/proxy/class.pm
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ sub action_proxyaddnode {
});

$self->{clients}->{ $data->{id} }->{class}->close();
$self->{clients}->{ $data->{id} }->{class}->cleanup();
} else {
$self->{internal_channels}->{ $data->{id} } = gorgone::standard::library::connect_com(
context => $self->{zmq_context},
Expand Down Expand Up @@ -282,6 +283,7 @@ sub action_proxycloseconnection {
$self->{logger}->writeLogInfo("[proxy] Close connectionn for $data->{id}");

$self->{clients}->{ $data->{id} }->{class}->close();
$self->{clients}->{ $data->{id} }->{class}->cleanup();
$self->{clients}->{ $data->{id} }->{delete} = 0;
$self->{clients}->{ $data->{id} }->{class} = undef;
}
Expand All @@ -293,6 +295,7 @@ sub close_connections {
if (defined($self->{clients}->{$_}->{class}) && $self->{clients}->{$_}->{type} eq 'push_zmq') {
$self->{logger}->writeLogInfo("[proxy] Close connection for $_");
$self->{clients}->{$_}->{class}->close();
$self->{clients}->{$_}->{class}->cleanup();
}
}
}
Expand Down Expand Up @@ -497,7 +500,10 @@ sub periodic_exec {
token => $connector->generate_token(),
target => ''
});
$connector->{clients}->{$_}->{class}->close() if (defined($connector->{clients}->{$_}->{class}));
if (defined($connector->{clients}->{$_}->{class})) {
$connector->{clients}->{$_}->{class}->close();
$connector->{clients}->{$_}->{class}->cleanup();
}
$connector->{clients}->{$_}->{class} = undef;
$connector->{clients}->{$_}->{delete} = 0;
$connector->{clients}->{$_}->{com_read_internal} = 0;
Expand Down
2 changes: 2 additions & 0 deletions gorgone/gorgone/modules/core/proxy/sshclient.pm
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,6 @@ sub close {
$self->disconnect();
}

sub cleanup {}

1;
Loading