Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add proper logger to consumer and handlers #78

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Revision history for Perl extension PGXN::Manager
output.
- Configured handlers for INT, TERM, and QUIT signals to log flagging
for shutdown in the next loop.
- Fixed invalid license example in the META spec.
- Added a logger to the Consumer and the Mastodon and Twitter handlers,
so that they now log debug and info messages about what's being sent.

0.31.1 2023-10-07T21:40:53Z
- Restored the writing of the pgxn_consumer PID file, accidentally
Expand Down
134 changes: 79 additions & 55 deletions lib/PGXN/Manager/Consumer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,34 @@ use Moose;
use JSON::XS;
use Try::Tiny;
use Encode qw(encode_utf8);
use Carp;
use Time::HiRes qw(sleep);
use PGXN::Manager;
use Proc::Daemon;
use IO::File;
use POSIX ();
use Fcntl qw(:flock);
use Cwd;
use namespace::autoclean;

our $VERSION = v0.31.2;
use constant CHANNELS => qw(release new_user new_mirror);

has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has interval => (is => 'ro', isa => 'Num', required => 1, default => 5);
has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has interval => (is => 'ro', isa => 'Num', required => 1, default => 5);
has continue => (is => 'rw', isa => 'Bool', required => 1, default => 1);
has pid_file => (is => 'rw', isa => 'Str');
has log_fh => (is => 'ro', isa => 'IO::Handle', required => 1, default => sub {
_log_fh()
});
has logger => (
is => 'ro',
isa => 'PGXN::Manager::Consumer::Log',
required => 1,
lazy => 1,
default => sub { PGXN::Manager::Consumer::Log->new(verbose => $_[0]->verbose) },
);

has conn => (is => 'ro', isa => 'DBIx::Connector', lazy => 1, default => sub {
# Use our own connetion instead of $pgxn->conn in order to add the callback.
my $self = shift;
my $cb = $self->verbose ? sub {
$_[0]->do("LISTEN pgxn_$_") for CHANNELS;
$self->log("INFO: Listening on ", join ', ', map { s/^pgxn_//r } @{
$self->log(INFO => 'Listening on ', join ', ', map { s/^pgxn_//r } @{
$_[0]->selectcol_arrayref('SELECT * FROM pg_listening_channels()')
});
return;
Expand All @@ -48,15 +50,6 @@ has conn => (is => 'ro', isa => 'DBIx::Connector', lazy => 1, default => sub
});
});

sub _log_fh {
my $fn = shift;
my $fh = $fn ? IO::File->new($fn, '>>:utf8')
: IO::Handle->new_from_fd(fileno STDOUT, 'w');
binmode $fh, ":utf8";
$fh->autoflush(1);
$fh;
}

sub go {
my $class = shift;
my $cfg = $class->_config;
Expand All @@ -69,16 +62,19 @@ sub go {
);
if (my $pid = $daemon->Init) {
my $pid_file = $cfg->{'pid-file'} || 'STDOUT';
_log(
_log_fh($cfg->{'log-file'}),
"INFO: Forked PID $pid written to $pid_file",
);
PGXN::Manager::Consumer::Log->new(
verbose => $cfg->{verbose},
file => $cfg->{'log-file'},
)->log(INFO => "Forked PID $pid written to $pid_file");
return 0;
}
}

# In the child process. Set up log file handle and go.
$cfg->{log_fh} = _log_fh delete $cfg->{'log-file'};
# In the child process. Set up logger and go.
$cfg->{logger} = PGXN::Manager::Consumer::Log->new(
verbose => $cfg->{verbose},
file => delete $cfg->{'log-file'},
);
$cfg->{pid_file} = delete $cfg->{'pid-file'} if exists $cfg->{'pid-file'};
my $cmd = $class->new( $cfg );
$SIG{$_} = $cmd->_signal_handler($_) for qw(TERM INT QUIT);
Expand All @@ -88,7 +84,7 @@ sub go {
sub _signal_handler {
my ($self, $sig) = @_;
return sub {
$self->log("INFO: $sig signal caught; flagging shutdown for next loop");
$self->log(INFO => "$sig signal caught; flagging shutdown for next loop");
$self->continue(0);
};
}
Expand All @@ -102,10 +98,10 @@ sub DEMOLISH {

sub run {
my $self = shift;
$self->log(sprintf "INFO: Starting %s %s", __PACKAGE__, __PACKAGE__->VERSION);
$self->log(INFO => sprintf "Starting %s %s", __PACKAGE__, __PACKAGE__->VERSION);
my $pgxn = PGXN::Manager->instance;
my $cfg = $pgxn->config->{consumers} || do {
$self->log("WARN: No consumers configured; messages will be dropped");
$self->log(WARN => 'No consumers configured; messages will be dropped');
undef
};

Expand All @@ -118,7 +114,7 @@ sub run {
sleep($self->interval);
}

$self->log("INFO: Shutting down");
$self->log(INFO => 'Shutting down');
return 0;
}

Expand All @@ -129,17 +125,17 @@ sub load_consumers {
my $type = delete $cfg->{type}
or die "No type specified for event consumer\n";
my $pkg = __PACKAGE__ . "::$type";
$self->log("INFO: Loading $pkg") if $self->verbose > 1;
$self->log(DEBUG => "Loading $pkg");
eval "use $pkg";
die "Error loading $pkg: $@\n" if $@;
my $events = delete $cfg->{events};
my $consumer = $pkg->new(
verbose => $self->verbose,
config => $cfg,
logger => $self->logger,
config => $cfg,
);

for my $e (@{ $events }) {
$self->log("INFO: Configuring $pkg for $e") if $self->verbose > 1;
$self->log(DEBUG => "Configuring $pkg for $e");
push @{ $consumers{$e} ||= [] } => $consumer;
}
}
Expand All @@ -154,57 +150,45 @@ sub consume {
# Notify payload treated as UTF-8 text, so already decoded from UTF-8 bytes.
my $json = JSON::XS->new->utf8(0);
my $dbh = shift;
$self->log("INFO: Consuming") if $self->verbose > 2;
$self->log(DEBUG => 'Consuming');
while (my $notify = $dbh->pg_notifies) {
my ($name, $pid, $msg) = @{ $notify };
$self->log("INFO: Received “$name” event from PID $pid")
if $self->verbose;
$self->log(INFO => "Received “$name” event from PID $pid");
unless ($name =~ s/^pgxn_//) {
$self->log("WARN: Unknown channel “$name”; skipping");
$self->log(WARN => "Unknown channel “$name”; skipping");
next;
}
my $handlers = $consumers_for->{$name} || do {
$self->log(
"INFO: No handlers configured for ",
"pgxn_$name channel; skipping",
)if $self->verbose;
$self->log(INFO =>
"No handlers configured for pgxn_$name channel; skipping",
);
next;
};

# Decode the JSON payload;
my $meta = try {
$json->decode($msg);
} catch {
$self->log("ERORR: Cannot decode JSON: $_");
$self->log(ERORR => "Cannot decode JSON: $_");
undef;
};
next unless $meta;

# Run all the handlers.
for my $h (@{ $handlers }) {
$self->log("INFO: Sending to ", $h->name, " handler")
if $self->verbose;
$self->log(INFO => 'Sending to ', $h->name, " handler");
try { $h->handle($name, $meta) }
catch { $self->log("ERROR: $_") };
catch { $self->log(ERROR => $_) };
}
}
});
} catch {
$self->log("ERROR: $_");
$self->log(ERROR => $_);
};
return 1;
}

sub log {
_log(shift->log_fh, @_);
}

sub _log {
my $fh = shift;
flock $fh, LOCK_EX;
say {$fh} POSIX::strftime('%Y-%m-%dT%H:%M:%SZ - ', gmtime), join '', @_;
flock $fh, LOCK_UN;
}
sub log { shift->logger->log(@_) }

sub _config {
my $self = shift;
Expand Down Expand Up @@ -262,6 +246,46 @@ sub _pod2usage {
);
}

package
PGXN::Manager::Consumer::Log;

use 5.10.0;
use strict;
use warnings;
use utf8;
use Moose;
use Fcntl qw(:flock);

has verbose => (is => 'ro', isa => 'Int', required => 1, default => 0);
has log_fh => (is => 'ro', isa => 'FileHandle', required => 1, default => sub {
_log_fh()
});

around BUILDARGS => sub {
my ($orig, $class, %args) = @_;
my $fn = delete $args{file} or return $class->$orig(%args);
return $class->$orig(%args, log_fh => _log_fh($fn));
};

sub log {
my ($self, $level) = (shift, shift);
return if $level eq 'INFO' && $self->verbose < 1;
return if $level eq 'DEBUG' && $self->verbose < 2;
my $fh = $self->log_fh;
flock $fh, LOCK_EX;
say {$fh} POSIX::strftime("%Y-%m-%dT%H:%M:%SZ - $level: ", gmtime), join '', @_;
flock $fh, LOCK_UN;
}

sub _log_fh {
my $fn = shift;
my $fh = $fn ? IO::File->new($fn, '>>:utf8')
: IO::Handle->new_from_fd(fileno STDOUT, 'w');
binmode $fh, ":utf8";
$fh->autoflush(1);
$fh;
}

1;
__END__

Expand Down
18 changes: 14 additions & 4 deletions lib/PGXN/Manager/Consumer/mastodon.pm
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ our $VERSION = v0.31.2;
has server => (is => 'ro', required => 1, isa => 'Str');
has ua => (is => 'ro', required => 1, isa => 'LWP::UserAgent');
has delay => (is => 'ro', required => 0, isa => 'Int', default => 0);
has logger => (is => 'ro', required => 1, isa => 'PGXN::Manager::Consumer::Log');

around BUILDARGS => sub {
my ($orig, $class, %args) = @_;
Expand Down Expand Up @@ -50,7 +51,13 @@ my %EMOJI = (

sub handle {
my ($self, $type, $meta) = @_;
return unless $type eq 'release';
if ($type ne 'release') {
$self->logger->log(DEBUG => "Mastodon skiping $type notification");
return;
};

my $release = lc "$meta->{name}-$meta->{version}";
$self->logger->log(INFO => "Posting $release to Mastodon");

my $link = PGXN::Manager->instance->config->{release_permalink};
my $url = URI::Template->new($link)->process({
Expand All @@ -59,7 +66,7 @@ sub handle {
});

my %emo = map { $_ => $EMOJI{$_}[rand @{ $EMOJI{$_} }] } keys %EMOJI;
$self->toot(join("\n\n",
$self->toot($release, join("\n\n",
"$emo{send} Released: $meta->{name} $meta->{version}",
"$emo{info} $meta->{abstract}",
"$emo{user} By $meta->{user}",
Expand All @@ -68,7 +75,7 @@ sub handle {
}

sub toot {
my ($self, $body) = @_;
my ($self, $release, $body) = @_;

my $res = $self->ua->post(
$self->server . '/api/v1/statuses',
Expand All @@ -78,7 +85,10 @@ sub toot {
},
);
return 1 if $res->is_success;
die "Error posting to Mastodon: " . $res->decoded_content . "\n";

$self->logger->log(ERROR =>
"Error posting $release to Mastodon: " . $res->decoded_content
);
}

sub scheduled_at {
Expand Down
20 changes: 16 additions & 4 deletions lib/PGXN/Manager/Consumer/twitter.pm
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ our $VERSION = v0.31.2;

subtype MaybeTwitterAPI => as maybe_type class_type 'Net::Twitter::Lite';

has verbose => (is => 'ro', required => 1, isa => 'Int', default => 0);
has client => (is => 'ro', required => 1, isa => 'Net::Twitter::Lite');
has logger => (is => 'ro', required => 1, isa => 'PGXN::Manager::Consumer::Log');

around BUILDARGS => sub {
my ($orig, $class, %args) = @_;
Expand Down Expand Up @@ -45,11 +45,16 @@ around BUILDARGS => sub {
};

sub handle {
my ($self, $channel, $meta) = @_;
return unless $channel eq 'release';
my ($self, $type, $meta) = @_;
if ($type ne 'release') {
$self->logger->log(DEBUG => "Twitter skiping $type notification");
return;
};

my $client = $self->client or return;
my $pgxn = PGXN::Manager->instance;

$self->logger->log(DEBUG => "Fetching Twitter username for $meta->{user}");
my $nick = $pgxn->conn->run(sub {
shift->selectcol_arrayref(
'SELECT twitter FROM users WHERE nickname = ?',
Expand All @@ -59,11 +64,18 @@ sub handle {

$nick = $nick ? "\@$nick" : $meta->{user};

my $release = lc "$meta->{name}-$meta->{version}";
$self->logger->log(INFO => "Posting $release to Twitter");

my $url = URI::Template->new($pgxn->config->{release_permalink})->process({
dist => lc $meta->{name},
version => lc $meta->{version},
});
$client->update( "$meta->{name} $meta->{version} released by $nick: $url" );
try {
$client->update( "$meta->{name} $meta->{version} released by $nick: $url" );
} catch {
$self->logger->log(ERROR => "Error posting $release to Twitter: $_");
};
}

1;
Expand Down
3 changes: 1 addition & 2 deletions lib/PGXN/Manager/Maint.pm
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use File::Spec;
use File::Path qw(make_path remove_tree);
use File::Basename qw(dirname basename);
use Encode qw(encode_utf8);
use Carp;
use namespace::autoclean;

our $VERSION = v0.31.2;
Expand Down Expand Up @@ -50,7 +49,7 @@ sub run {
my ($self, $command) = (shift, shift);
$command =~ s/-/_/g;
my $meth = $self->can($command)
or croak qq{PGXN Maint: "$command" is not a command};
or die qq{PGXN Maint: "$command" is not a command};
require PGXN::Manager;
$self->$meth(@_);
}
Expand Down
Loading